You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/10/18 19:41:51 UTC
[1/6] mesos git commit: Fixed flakyness in
'SlaveTest.ExecutorShutdownGracePeriod'.
Repository: mesos
Updated Branches:
refs/heads/master 7e6786e62 -> 37053061e
Fixed flakyness in 'SlaveTest.ExecutorShutdownGracePeriod'.
On slow systems, it could happen that an additional offer was
delivered to the scheduler before the test case finished.
Review: https://reviews.apache.org/r/63113/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d8950a84
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d8950a84
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d8950a84
Branch: refs/heads/master
Commit: d8950a84c3721b5ef8abdcb897d150df38fcb24e
Parents: 7e6786e
Author: Benno Evers <be...@mesosphere.com>
Authored: Wed Oct 18 12:13:45 2017 -0700
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Oct 18 12:13:45 2017 -0700
----------------------------------------------------------------------
src/tests/slave_tests.cpp | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d8950a84/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 91d97d1..76a157f 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -5669,7 +5669,8 @@ TEST_F(SlaveTest, ExecutorShutdownGracePeriod)
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers));
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
driver.start();
[2/6] mesos git commit: Removed a stray trailing parenthesis from a
validation error.
Posted by al...@apache.org.
Removed a stray trailing parenthesis from a validation error.
Review: https://reviews.apache.org/r/61855/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b6d848e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b6d848e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b6d848e
Branch: refs/heads/master
Commit: 4b6d848e8a4ba19c5e790f40655f5be000686941
Parents: d8950a8
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Oct 18 12:14:06 2017 -0700
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Oct 18 12:14:06 2017 -0700
----------------------------------------------------------------------
src/master/validation.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b6d848e/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 01bc2e0..42f5742 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -894,7 +894,7 @@ Option<Error> validateCompatibleExecutorInfo(
if (executorInfo.isSome() && executor != executorInfo.get()) {
return Error(
"ExecutorInfo is not compatible with existing ExecutorInfo"
- " with same ExecutorID).\n"
+ " with same ExecutorID.\n"
"------------------------------------------------------------\n"
"Existing ExecutorInfo:\n" +
stringify(executorInfo.get()) + "\n"
[5/6] mesos git commit: Fix unit tests that were broken by the
additional TASK_STARTING update.
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/docker_volume_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_volume_isolator_tests.cpp b/src/tests/containerizer/docker_volume_isolator_tests.cpp
index ed7d438..9123cbe 100644
--- a/src/tests/containerizer/docker_volume_isolator_tests.cpp
+++ b/src/tests/containerizer/docker_volume_isolator_tests.cpp
@@ -342,15 +342,20 @@ TEST_F(DockerVolumeIsolatorTest, ROOT_CommandTaskNoRootfsWithVolumes)
.WillOnce(DoAll(FutureArg<1>(&unmount2Name),
Return(Nothing())));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -589,21 +594,32 @@ TEST_F(DockerVolumeIsolatorTest, ROOT_CommandTaskNoRootfsSlaveRecovery)
.WillOnce(DoAll(FutureArg<1>(&mount2Name),
Return(mountPoint2)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ // Wait for the ACK to be checkpointed.
+ AWAIT_READY(ack1);
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack2);
// Stop the slave after TASK_RUNNING is received.
slave.get()->terminate();
@@ -769,12 +785,16 @@ TEST_F(DockerVolumeIsolatorTest,
EXPECT_CALL(*mockClient, unmount(driver1, _))
.WillOnce(Return(Nothing()));
+ Future<TaskStatus> statusStarting1;
Future<TaskStatus> statusRunning1;
Future<TaskStatus> statusKilled1;
+ Future<TaskStatus> statusStarting2;
Future<TaskStatus> statusRunning2;
Future<TaskStatus> statusKilled2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting1))
+ .WillOnce(FutureArg<1>(&statusStarting2))
.WillOnce(FutureArg<1>(&statusRunning1))
.WillOnce(FutureArg<1>(&statusRunning2))
.WillOnce(FutureArg<1>(&statusKilled1))
@@ -782,8 +802,10 @@ TEST_F(DockerVolumeIsolatorTest,
driver.launchTasks(offer.id(), {task1, task2});
- AWAIT_READY(statusRunning1);
- EXPECT_EQ(TASK_RUNNING, statusRunning1->state());
+ // TASK_STARTING and TASK_RUNNING updates might arrive interleaved,
+ // so we only check the first and the last where the values are known.
+ AWAIT_READY(statusStarting1);
+ EXPECT_EQ(TASK_STARTING, statusStarting1->state());
AWAIT_READY(statusRunning2);
EXPECT_EQ(TASK_RUNNING, statusRunning2->state());
@@ -910,15 +932,20 @@ TEST_F(DockerVolumeIsolatorTest,
.WillOnce(DoAll(FutureArg<1>(&unmountName),
Return(Nothing())));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1052,15 +1079,20 @@ TEST_F(DockerVolumeIsolatorTest,
.WillOnce(DoAll(FutureArg<1>(&unmountName),
Return(Nothing())));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/environment_secret_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/environment_secret_isolator_tests.cpp b/src/tests/containerizer/environment_secret_isolator_tests.cpp
index cf7b9eb..fd1cd46 100644
--- a/src/tests/containerizer/environment_secret_isolator_tests.cpp
+++ b/src/tests/containerizer/environment_secret_isolator_tests.cpp
@@ -107,15 +107,19 @@ TEST_F(EnvironmentSecretIsolatorTest, ResolveSecret)
Resources::parse("cpus:0.1;mem:32").get(),
command);
- // NOTE: Successful tasks will output two status updates.
+ // NOTE: Successful tasks will output three status updates.
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting.get().state());
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY(statusFinished);
@@ -212,9 +216,11 @@ TEST_F(EnvironmentSecretIsolatorTest, ResolveSecretDefaultExecutor)
command);
// NOTE: Successful tasks will output two status updates.
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
@@ -222,6 +228,8 @@ TEST_F(EnvironmentSecretIsolatorTest, ResolveSecretDefaultExecutor)
driver.acceptOffers({offer.id()}, {LAUNCH_GROUP(executorInfo, taskGroup)});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY(statusFinished);
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/linux_filesystem_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/linux_filesystem_isolator_tests.cpp b/src/tests/containerizer/linux_filesystem_isolator_tests.cpp
index a657a6f..4dfd90b 100644
--- a/src/tests/containerizer/linux_filesystem_isolator_tests.cpp
+++ b/src/tests/containerizer/linux_filesystem_isolator_tests.cpp
@@ -707,13 +707,18 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
driver.launchTasks(offer.id(), {task});
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -808,13 +813,18 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
{CREATE(persistentVolume), LAUNCH({task})},
filters);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -933,9 +943,11 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
"test_image",
{createVolumeHostPath("/tmp", dir1, Volume::RW)}));
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status))
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
Future<Nothing> ack =
@@ -946,8 +958,11 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
{offer.id()},
{CREATE(persistentVolume), LAUNCH({task})});
- AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Wait for the ACK to be checkpointed.
AWAIT_READY(ack);
@@ -1058,13 +1073,18 @@ TEST_F(LinuxFilesystemIsolatorMesosTest, ROOT_SandboxEnvironmentVariable)
driver.launchTasks(offer.id(), {task});
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1149,10 +1169,12 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
taskResources,
"dd if=/dev/zero of=volume_path/file bs=1048576 count=2 && sleep 1");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
@@ -1161,6 +1183,10 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1248,10 +1274,12 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
// The task fails to write to the volume since the task's resources
// intends to use the volume as read-only.
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFailed));
@@ -1260,6 +1288,10 @@ TEST_F(LinuxFilesystemIsolatorMesosTest,
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/memory_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/memory_isolator_tests.cpp b/src/tests/containerizer/memory_isolator_tests.cpp
index 14bd705..b8ea5d3 100644
--- a/src/tests/containerizer/memory_isolator_tests.cpp
+++ b/src/tests/containerizer/memory_isolator_tests.cpp
@@ -110,12 +110,17 @@ TEST_P(MemoryIsolatorTest, ROOT_MemUsage)
TaskInfo task = createTask(offers.get()[0], "sleep 120");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/memory_pressure_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/memory_pressure_tests.cpp b/src/tests/containerizer/memory_pressure_tests.cpp
index c4d8bfc..9ada46d 100644
--- a/src/tests/containerizer/memory_pressure_tests.cpp
+++ b/src/tests/containerizer/memory_pressure_tests.cpp
@@ -124,15 +124,21 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics)
Resources::parse("cpus:1;mem:256;disk:1024").get(),
"while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
+ Future<TaskStatus> starting;
Future<TaskStatus> running;
Future<TaskStatus> killed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&starting))
.WillOnce(FutureArg<1>(&running))
.WillOnce(FutureArg<1>(&killed))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(starting);
+ EXPECT_EQ(task.task_id(), starting->task_id());
+ EXPECT_EQ(TASK_STARTING, starting->state());
+
AWAIT_READY(running);
EXPECT_EQ(task.task_id(), running->task_id());
EXPECT_EQ(TASK_RUNNING, running->state());
@@ -245,22 +251,33 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery)
Resources::parse("cpus:1;mem:256;disk:1024").get(),
"while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
+ Future<TaskStatus> starting;
Future<TaskStatus> running;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&running));
+ .WillOnce(FutureArg<1>(&starting))
+ .WillOnce(FutureArg<1>(&running))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+ Future<Nothing> runningAck =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> startingAck =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(starting);
+ EXPECT_EQ(task.task_id(), starting->task_id());
+ EXPECT_EQ(TASK_STARTING, starting->state());
+
+ AWAIT_READY(startingAck);
+
AWAIT_READY(running);
EXPECT_EQ(task.task_id(), running->task_id());
EXPECT_EQ(TASK_RUNNING, running->state());
// Wait for the ACK to be checkpointed.
- AWAIT_READY_FOR(_statusUpdateAcknowledgement, Seconds(120));
+ AWAIT_READY_FOR(runningAck, Seconds(120));
// We restart the slave to let it recover.
slave.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/nested_mesos_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/nested_mesos_containerizer_tests.cpp b/src/tests/containerizer/nested_mesos_containerizer_tests.cpp
index f8b4423..fbd2887 100644
--- a/src/tests/containerizer/nested_mesos_containerizer_tests.cpp
+++ b/src/tests/containerizer/nested_mesos_containerizer_tests.cpp
@@ -1030,12 +1030,17 @@ TEST_F(NestedMesosContainerizerTest,
task.mutable_container()->CopyFrom(createContainerInfo("alpine"));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers->at(0).id(), {task});
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
// We wait wait up to 120 seconds
// to download the docker image.
AWAIT_READY_FOR(statusRunning, Seconds(120));
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/port_mapping_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/port_mapping_tests.cpp b/src/tests/containerizer/port_mapping_tests.cpp
index 56e193b..4b8282d 100644
--- a/src/tests/containerizer/port_mapping_tests.cpp
+++ b/src/tests/containerizer/port_mapping_tests.cpp
@@ -2314,17 +2314,23 @@ TEST_F(PortMappingMesosTest, CGROUPS_ROOT_RecoverMixedKnownAndUnKnownOrphans)
Future<TaskStatus> status1;
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task1, task2});
+ // Only check the first and the last status, as the other two might
+ // be interleaved between TASK_STARTING and TASK_RUNNING
AWAIT_READY(status1);
- ASSERT_EQ(TASK_RUNNING, status1->state());
+ ASSERT_EQ(TASK_STARTING, status1->state());
- AWAIT_READY(status2);
+ AWAIT_READY(status4);
ASSERT_EQ(TASK_RUNNING, status2->state());
// Obtain the container IDs.
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/posix_rlimits_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/posix_rlimits_isolator_tests.cpp b/src/tests/containerizer/posix_rlimits_isolator_tests.cpp
index 0030cd1..f639cac 100644
--- a/src/tests/containerizer/posix_rlimits_isolator_tests.cpp
+++ b/src/tests/containerizer/posix_rlimits_isolator_tests.cpp
@@ -179,14 +179,20 @@ TEST_F(PosixRLimitsIsolatorTest, UnsetLimits)
container->mutable_rlimit_info()->CopyFrom(rlimitInfo);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinal;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinal));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -321,14 +327,20 @@ TEST_F(PosixRLimitsIsolatorTest, TaskExceedingLimit)
container->mutable_rlimit_info()->CopyFrom(rlimitInfo);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFailed));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -476,14 +488,15 @@ TEST_F(PosixRLimitsIsolatorTest, NestedContainers)
// that they transition from TASK_RUNNING to TASK_FINISHED.
enum class Stage
{
+ STARTING,
INITIAL,
RUNNING,
FINISHED
};
hashmap<TaskID, Stage> taskStages;
- taskStages[task1.task_id()] = Stage::INITIAL;
- taskStages[task2.task_id()] = Stage::INITIAL;
+ taskStages[task1.task_id()] = Stage::STARTING;
+ taskStages[task2.task_id()] = Stage::STARTING;
foreach (const Future<TaskStatus>& taskStatus, taskStatuses) {
AWAIT_READY(taskStatus);
@@ -492,6 +505,13 @@ TEST_F(PosixRLimitsIsolatorTest, NestedContainers)
ASSERT_SOME(taskStage);
switch (taskStage.get()) {
+ case Stage::STARTING: {
+ ASSERT_EQ(TASK_STARTING, taskStatus->state())
+ << taskStatus->DebugString();
+
+ taskStages[taskStatus->task_id()] = Stage::INITIAL;
+ break;
+ }
case Stage::INITIAL: {
ASSERT_EQ(TASK_RUNNING, taskStatus->state())
<< taskStatus->DebugString();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/provisioner_appc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/provisioner_appc_tests.cpp b/src/tests/containerizer/provisioner_appc_tests.cpp
index 89fe4fc..9c4b685 100644
--- a/src/tests/containerizer/provisioner_appc_tests.cpp
+++ b/src/tests/containerizer/provisioner_appc_tests.cpp
@@ -1018,15 +1018,21 @@ TEST_F(AppcProvisionerIntegrationTest, ROOT_SimpleLinuxImageTest)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(120));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/provisioner_docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/provisioner_docker_tests.cpp b/src/tests/containerizer/provisioner_docker_tests.cpp
index 920be77..832c81f 100644
--- a/src/tests/containerizer/provisioner_docker_tests.cpp
+++ b/src/tests/containerizer/provisioner_docker_tests.cpp
@@ -432,14 +432,20 @@ TEST_F(ProvisionerDockerTest, ROOT_LocalPullerSimpleCommand)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -526,15 +532,21 @@ TEST_P(ProvisionerDockerTest, ROOT_INTERNET_CURL_SimpleCommand)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
- AWAIT_READY_FOR(statusRunning, Minutes(10));
+ AWAIT_READY_FOR(statusStarting, Minutes(10));
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -600,14 +612,20 @@ TEST_F(ProvisionerDockerTest, ROOT_INTERNET_CURL_ScratchImage)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -712,15 +730,21 @@ TEST_P(ProvisionerDockerBackendTest, ROOT_INTERNET_CURL_Whiteout)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
- AWAIT_READY_FOR(statusRunning, Seconds(60));
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -798,9 +822,11 @@ TEST_P(ProvisionerDockerBackendTest, ROOT_INTERNET_CURL_Overwrite)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
@@ -816,6 +842,10 @@ TEST_P(ProvisionerDockerBackendTest, ROOT_INTERNET_CURL_Overwrite)
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -894,14 +924,20 @@ TEST_F(ProvisionerDockerTest, ROOT_INTERNET_CURL_ImageDigest)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -979,14 +1015,20 @@ TEST_F(ProvisionerDockerTest, ROOT_INTERNET_CURL_CommandTaskUser)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/runtime_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/runtime_isolator_tests.cpp b/src/tests/containerizer/runtime_isolator_tests.cpp
index ea5d035..81c0d82 100644
--- a/src/tests/containerizer/runtime_isolator_tests.cpp
+++ b/src/tests/containerizer/runtime_isolator_tests.cpp
@@ -148,14 +148,20 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_DockerDefaultCmdLocalPuller)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -231,14 +237,20 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_DockerDefaultEntryptLocalPuller)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -308,14 +320,20 @@ TEST_F(DockerRuntimeIsolatorTest,
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -401,8 +419,13 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_INTERNET_CURL_NestedSimpleCommand)
taskInfo.mutable_container()->CopyFrom(
v1::createContainerInfo("library/alpine"));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -414,6 +437,10 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_INTERNET_CURL_NestedSimpleCommand)
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
@@ -520,8 +547,13 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_NestedDockerDefaultCmdLocalPuller)
taskInfo.mutable_container()->CopyFrom(
v1::createContainerInfo("alpine"));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -533,6 +565,10 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_NestedDockerDefaultCmdLocalPuller)
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
@@ -640,8 +676,13 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_NestedDockerDefaultEntryptLocalPuller)
taskInfo.mutable_container()->CopyFrom(
v1::createContainerInfo("alpine"));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -653,6 +694,10 @@ TEST_F(DockerRuntimeIsolatorTest, ROOT_NestedDockerDefaultEntryptLocalPuller)
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/volume_host_path_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/volume_host_path_isolator_tests.cpp b/src/tests/containerizer/volume_host_path_isolator_tests.cpp
index f692b87..1d00672 100644
--- a/src/tests/containerizer/volume_host_path_isolator_tests.cpp
+++ b/src/tests/containerizer/volume_host_path_isolator_tests.cpp
@@ -398,13 +398,18 @@ TEST_P(VolumeHostPathIsolatorMesosTest, ROOT_ChangeRootFilesystem)
FAIL() << "Unexpected executor type";
}
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 6831201..1669a85 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -176,9 +176,14 @@ TEST_P(DefaultExecutorTest, TaskRunning)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
- Future<v1::scheduler::Event::Update> update;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(FutureArg<1>(&runningUpdate))
+ .WillRepeatedly(Return());
v1::TaskInfo taskInfo =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
@@ -190,11 +195,13 @@ TEST_P(DefaultExecutorTest, TaskRunning)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
- AWAIT_READY(update);
+ AWAIT_READY(startingUpdate);
- ASSERT_EQ(TASK_RUNNING, update->status().state());
- EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
- EXPECT_TRUE(update->status().has_timestamp());
+ ASSERT_EQ(TASK_STARTING, startingUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+ EXPECT_TRUE(startingUpdate->status().has_timestamp());
+
+ AWAIT_READY(runningUpdate);
// Ensure that the task sandbox symbolic link is created.
EXPECT_TRUE(os::exists(path::join(
@@ -290,16 +297,26 @@ TEST_P(DefaultExecutorTest, KillTask)
const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()};
+ Future<v1::scheduler::Event::Update> startingUpdate1;
+ Future<v1::scheduler::Event::Update> startingOrRunningUpdate1;
+ Future<v1::scheduler::Event::Update> startingOrRunningUpdate2;
Future<v1::scheduler::Event::Update> runningUpdate1;
- Future<v1::scheduler::Event::Update> runningUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
- FutureArg<1>(&runningUpdate1),
+ FutureArg<1>(&startingUpdate1),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(
DoAll(
- FutureArg<1>(&runningUpdate2),
+ FutureArg<1>(&startingOrRunningUpdate1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&startingOrRunningUpdate2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate1),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
Future<v1::scheduler::Event::Offers> offers2;
@@ -320,17 +337,19 @@ TEST_P(DefaultExecutorTest, KillTask)
mesos.send(call);
}
+ AWAIT_READY(startingUpdate1);
+ ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
+
AWAIT_READY(runningUpdate1);
ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
- AWAIT_READY(runningUpdate2);
- ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
-
// When running a task, TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
- runningUpdate1->status().task_id(),
- runningUpdate2->status().task_id()};
+ startingUpdate1->status().task_id(),
+ startingOrRunningUpdate1->status().task_id(),
+ startingOrRunningUpdate2->status().task_id(),
+ runningUpdate1->status().task_id()};
ASSERT_EQ(tasks1, tasksRunning);
@@ -340,10 +359,15 @@ TEST_P(DefaultExecutorTest, KillTask)
v1::TaskInfo taskInfo3 =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+ Future<v1::scheduler::Event::Update> startingUpdate3;
Future<v1::scheduler::Event::Update> runningUpdate3;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&startingUpdate3),
+ v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id())))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&runningUpdate3),
v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id())));
@@ -355,6 +379,10 @@ TEST_P(DefaultExecutorTest, KillTask)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo3}))}));
+ AWAIT_READY(startingUpdate3);
+ ASSERT_EQ(TASK_STARTING, startingUpdate3->status().state());
+ ASSERT_EQ(taskInfo3.task_id(), startingUpdate3->status().task_id());
+
AWAIT_READY(runningUpdate3);
ASSERT_EQ(TASK_RUNNING, runningUpdate3->status().state());
ASSERT_EQ(taskInfo3.task_id(), runningUpdate3->status().task_id());
@@ -486,6 +514,22 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ auto acknowledge = [&](const Future<v1::scheduler::Event::Update>& update) {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+
+ acknowledge->mutable_task_id()->CopyFrom(
+ update->status().task_id());
+
+ acknowledge->mutable_agent_id()->CopyFrom(agentId);
+ acknowledge->set_uuid(update->status().uuid());
+
+ mesos.send(call);
+ };
+
// The first task exits with a non-zero status code.
v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 1");
@@ -494,9 +538,13 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
+ Future<v1::scheduler::Event::Update> startingUpdate1;
+ Future<v1::scheduler::Event::Update> startingUpdate2;
Future<v1::scheduler::Event::Update> runningUpdate1;
Future<v1::scheduler::Event::Update> runningUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&startingUpdate1))
+ .WillOnce(FutureArg<1>(&startingUpdate2))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2));
@@ -507,6 +555,15 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
+ AWAIT_READY(startingUpdate1);
+ ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
+
+ AWAIT_READY(startingUpdate2);
+ ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state());
+
+ acknowledge(startingUpdate1);
+ acknowledge(startingUpdate2);
+
AWAIT_READY(runningUpdate1);
ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
@@ -528,38 +585,8 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
.WillOnce(FutureArg<1>(&update2));
// Acknowledge the TASK_RUNNING updates to receive the next updates.
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate1->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(runningUpdate1->status().uuid());
-
- mesos.send(call);
- }
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate2->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(runningUpdate2->status().uuid());
-
- mesos.send(call);
- }
+ acknowledge(runningUpdate1);
+ acknowledge(runningUpdate2);
// Updates for the tasks in a task group can be received in any order.
set<pair<v1::TaskID, v1::TaskState>> taskStates;
@@ -657,7 +684,7 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
AWAIT_READY(update);
- ASSERT_EQ(TASK_RUNNING, update->status().state());
+ ASSERT_EQ(TASK_STARTING, update->status().state());
EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
EXPECT_TRUE(update->status().has_timestamp());
}
@@ -723,12 +750,22 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- Future<Event::Update> updateRunning1;
+ Future<Event::Update> updateStarting1;
+ Future<Event::Update> updateStartingOrRunning1;
+ Future<Event::Update> updateStartingOrRunning2;
Future<Event::Update> updateRunning2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
- FutureArg<1>(&updateRunning1),
+ FutureArg<1>(&updateStarting1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateStartingOrRunning1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateStartingOrRunning2),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(
DoAll(
@@ -742,17 +779,30 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({task1, task2}))}));
- AWAIT_READY(updateRunning1);
+
+ AWAIT_READY(updateStarting1);
+ AWAIT_READY(updateStartingOrRunning1);
+ AWAIT_READY(updateStartingOrRunning2);
AWAIT_READY(updateRunning2);
- ASSERT_EQ(TASK_RUNNING, updateRunning1->status().state());
+ ASSERT_EQ(TASK_STARTING, updateStarting1->status().state());
ASSERT_EQ(TASK_RUNNING, updateRunning2->status().state());
- ASSERT_TRUE(updateRunning1->status().has_container_status());
- ASSERT_TRUE(updateRunning2->status().has_container_status());
+ // Select the two TASK_RUNNING updates from the first four updates
+ Event::Update update1 = updateStartingOrRunning1.get();
+ if (update1.status().state() == v1::TASK_STARTING) {
+ update1 = updateStartingOrRunning2.get();
+ }
+ Event::Update update2 = updateRunning2.get();
+
+ ASSERT_EQ(TASK_RUNNING, update1.status().state());
+ ASSERT_EQ(TASK_RUNNING, update2.status().state());
- v1::ContainerStatus status1 = updateRunning1->status().container_status();
- v1::ContainerStatus status2 = updateRunning2->status().container_status();
+ ASSERT_TRUE(update1.status().has_container_status());
+ ASSERT_TRUE(update2.status().has_container_status());
+
+ v1::ContainerStatus status1 = update1.status().container_status();
+ v1::ContainerStatus status2 = update2.status().container_status();
ASSERT_TRUE(status1.has_container_id());
ASSERT_TRUE(status2.has_container_id());
@@ -822,11 +872,16 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
// The task exits with a non-zero status code.
v1::TaskInfo taskInfo = v1::createTask(agentId, resources, "exit 1");
+ Future<v1::scheduler::Event::Update> startingUpdate;
Future<v1::scheduler::Event::Update> runningUpdate;
Future<v1::scheduler::Event::Update> failedUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&runningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(FutureArg<1>(&failedUpdate));
@@ -842,6 +897,9 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+ AWAIT_READY(startingUpdate);
+ ASSERT_EQ(TASK_STARTING, startingUpdate->status().state());
+
AWAIT_READY(runningUpdate);
ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
@@ -923,11 +981,21 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
+ Future<v1::scheduler::Event::Update> startingUpdate1;
+ Future<v1::scheduler::Event::Update> startingUpdate2;
Future<v1::scheduler::Event::Update> runningUpdate1;
Future<v1::scheduler::Event::Update> runningUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&startingUpdate1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&startingUpdate2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&runningUpdate1),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(
@@ -946,8 +1014,11 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
- AWAIT_READY(runningUpdate1);
- ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
+ AWAIT_READY(startingUpdate1);
+ ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
+
+ // We only check the first and last update, because the other two might
+ // arrive in a different order.
AWAIT_READY(runningUpdate2);
ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
@@ -955,6 +1026,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
// When running a task, TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
+ startingUpdate1->status().task_id(),
+ startingUpdate2->status().task_id(),
runningUpdate1->status().task_id(),
runningUpdate2->status().task_id()};
@@ -1071,15 +1144,15 @@ TEST_P(DefaultExecutorTest, ReservedResources)
v1::Offer::Operation launchGroup =
v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo}));
- Future<v1::scheduler::Event::Update> runningUpdate;
+ Future<v1::scheduler::Event::Update> startingUpdate;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&runningUpdate));
+ .WillOnce(FutureArg<1>(&startingUpdate));
mesos.send(v1::createCallAccept(frameworkId, offer, {reserve, launchGroup}));
- AWAIT_READY(runningUpdate);
- ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
- ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
+ AWAIT_READY(startingUpdate);
+ ASSERT_EQ(TASK_STARTING, startingUpdate->status().state());
+ ASSERT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
}
@@ -1156,9 +1229,17 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
"sleep 1000");
- Future<v1::scheduler::Event::Update> update;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(Return());
+
v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
executorInfo,
@@ -1166,14 +1247,19 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
- AWAIT_READY(update);
+ AWAIT_READY(startingUpdate);
- ASSERT_EQ(TASK_RUNNING, update->status().state());
- EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
- EXPECT_TRUE(update->status().has_timestamp());
- ASSERT_TRUE(update->status().has_container_status());
+ ASSERT_EQ(TASK_STARTING, startingUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+
+ AWAIT_READY(runningUpdate);
+
+ ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
+ EXPECT_TRUE(runningUpdate->status().has_timestamp());
+ ASSERT_TRUE(runningUpdate->status().has_container_status());
- v1::ContainerStatus status = update->status().container_status();
+ v1::ContainerStatus status = runningUpdate->status().container_status();
ASSERT_TRUE(status.has_container_id());
EXPECT_TRUE(status.container_id().has_parent());
@@ -1263,8 +1349,13 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
containerInfo->set_type(mesos::v1::ContainerInfo::MESOS);
containerInfo->mutable_linux_info()->set_share_pid_namespace(true);
+ Future<v1::scheduler::Event::Update> update0;
Future<v1::scheduler::Event::Update> update1;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&update0),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(FutureArg<1>(&update1));
Future<v1::scheduler::Event::Offers> offers2;
@@ -1301,8 +1392,13 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
containerInfo->mutable_linux_info()->set_share_pid_namespace(true);
Future<v1::scheduler::Event::Update> update2;
+ Future<v1::scheduler::Event::Update> update3;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update2));
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&update2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(FutureArg<1>(&update3));
// Launch the second task group.
launchGroup = v1::LAUNCH_GROUP(
@@ -1313,9 +1409,14 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
AWAIT_READY(update2);
- ASSERT_EQ(TASK_RUNNING, update2->status().state());
+ ASSERT_EQ(TASK_STARTING, update2->status().state());
EXPECT_EQ(taskInfo2.task_id(), update2->status().task_id());
- EXPECT_TRUE(update2->status().has_timestamp());
+
+ AWAIT_READY(update3);
+
+ ASSERT_EQ(TASK_RUNNING, update3->status().state());
+ EXPECT_EQ(taskInfo2.task_id(), update3->status().task_id());
+ EXPECT_TRUE(update3->status().has_timestamp());
string executorSandbox = slave::paths::getExecutorLatestRunPath(
flags.work_dir,
@@ -1422,11 +1523,16 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> starting;
Future<v1::scheduler::Event::Update> running;
Future<v1::scheduler::Event::Update> failed;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&starting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&running),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(
@@ -1448,11 +1554,12 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
- Future<Nothing> ack =
- FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+ AWAIT_READY(starting);
+
+ EXPECT_EQ(TASK_STARTING, starting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), starting->status().task_id());
AWAIT_READY(running);
- AWAIT_READY(ack);
EXPECT_EQ(TASK_RUNNING, running->status().state());
EXPECT_EQ(taskInfo.task_id(), running->status().task_id());
@@ -1559,11 +1666,16 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, TaskWithFileURI)
taskInfo.mutable_command()->add_uris()->set_value("file://" + testFilePath);
+ Future<v1::scheduler::Event::Update> startingUpdate;
Future<v1::scheduler::Event::Update> runningUpdate;
Future<v1::scheduler::Event::Update> finishedUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&runningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(
@@ -1578,6 +1690,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, TaskWithFileURI)
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+ AWAIT_READY(startingUpdate);
+ ASSERT_EQ(TASK_STARTING, startingUpdate->status().state());
+ ASSERT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+
AWAIT_READY(runningUpdate);
ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
@@ -1674,11 +1790,16 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
container->set_type(mesos::v1::ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<v1::scheduler::Event::Update> startingUpdate;
Future<v1::scheduler::Event::Update> runningUpdate;
Future<v1::scheduler::Event::Update> finishedUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&runningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(
@@ -1693,6 +1814,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
+ AWAIT_READY(startingUpdate);
+ ASSERT_EQ(TASK_STARTING, startingUpdate->status().state());
+ ASSERT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+
AWAIT_READY(runningUpdate);
ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
@@ -1845,9 +1970,14 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
executorInfo,
v1::createTaskGroupInfo({taskInfo}));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateFinished;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -1859,6 +1989,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
offer,
{reserve, create, launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(TASK_STARTING, updateStarting->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(TASK_RUNNING, updateRunning->status().state());
ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
@@ -1963,9 +2097,14 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
executorInfo,
v1::createTaskGroupInfo({taskInfo}));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateFinished;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -1977,6 +2116,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
offer,
{reserve, create, launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(TASK_STARTING, updateStarting->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(TASK_RUNNING, updateRunning->status().state());
ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
@@ -2125,7 +2268,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
"consumer");
consumerInfo.mutable_container()->CopyFrom(containerInfo);
- vector<Future<v1::scheduler::Event::Update>> updates(4);
+ vector<Future<v1::scheduler::Event::Update>> updates(6);
{
// This variable doesn't have to be used explicitly. We need it so that the
@@ -2157,6 +2300,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
enum class Stage
{
INITIAL,
+ STARTING,
RUNNING,
FINISHED
};
@@ -2175,6 +2319,13 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
switch (taskStage.get()) {
case Stage::INITIAL: {
+ ASSERT_EQ(TASK_STARTING, taskStatus.state());
+
+ taskStages[taskStatus.task_id()] = Stage::STARTING;
+
+ break;
+ }
+ case Stage::STARTING: {
ASSERT_EQ(TASK_RUNNING, taskStatus.state());
taskStages[taskStatus.task_id()] = Stage::RUNNING;
@@ -2334,7 +2485,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
"consumer");
consumerInfo.mutable_container()->CopyFrom(containerInfo);
- vector<Future<v1::scheduler::Event::Update>> updates(4);
+ vector<Future<v1::scheduler::Event::Update>> updates(6);
{
// This variable doesn't have to be used explicitly. We need it so that the
@@ -2369,6 +2520,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
enum class Stage
{
INITIAL,
+ STARTING,
RUNNING,
FINISHED
};
@@ -2387,6 +2539,13 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
switch (taskStage.get()) {
case Stage::INITIAL: {
+ ASSERT_EQ(TASK_STARTING, taskStatus.state());
+
+ taskStages[taskStatus.task_id()] = Stage::STARTING;
+
+ break;
+ }
+ case Stage::STARTING: {
ASSERT_EQ(TASK_RUNNING, taskStatus.state());
taskStages[taskStatus.task_id()] = Stage::RUNNING;
@@ -2525,11 +2684,16 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
executorInfo,
v1::createTaskGroupInfo({taskInfo}));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateHealthy;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(
DoAll(
+ FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id())))
+ .WillOnce(
+ DoAll(
FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id())))
.WillOnce(
@@ -2542,6 +2706,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
offer,
{reserve, create, launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(TASK_STARTING, updateStarting->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(TASK_RUNNING, updateRunning->status().state());
ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/disk_quota_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/disk_quota_tests.cpp b/src/tests/disk_quota_tests.cpp
index 742b6e1..fc29799 100644
--- a/src/tests/disk_quota_tests.cpp
+++ b/src/tests/disk_quota_tests.cpp
@@ -219,14 +219,20 @@ TEST_F(DiskQuotaTest, DiskUsageExceedsQuota)
Resources::parse("cpus:1;mem:128;disk:1").get(),
"dd if=/dev/zero of=file bs=1048576 count=2 && sleep 1000");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -313,9 +319,11 @@ TEST_F(DiskQuotaTest, VolumeUsageExceedsQuota)
taskResources,
"dd if=/dev/zero of=volume_path/file bs=1048576 count=2 && sleep 1000");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -325,6 +333,10 @@ TEST_F(DiskQuotaTest, VolumeUsageExceedsQuota)
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -395,16 +407,22 @@ TEST_F(DiskQuotaTest, NoQuotaEnforcement)
Resources::parse("cpus:1;mem:128;disk:1").get(),
"dd if=/dev/zero of=file bs=1048576 count=2 && sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status))
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offer.id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(task.task_id(), statusRunning->task_id());
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
Future<hashset<ContainerID>> containers = containerizer->containers();
@@ -523,9 +541,11 @@ TEST_F(DiskQuotaTest, ResourceStatistics)
"dd if=/dev/zero of=path1/file bs=1048576 count=2 && "
"sleep 1000");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -535,6 +555,10 @@ TEST_F(DiskQuotaTest, ResourceStatistics)
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -654,16 +678,22 @@ TEST_F(DiskQuotaTest, SlaveRecovery)
Resources::parse("cpus:1;mem:128;disk:3").get(),
"dd if=/dev/zero of=file bs=1048576 count=2 && sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status))
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offer.id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(task.task_id(), statusRunning->task_id());
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
Future<hashset<ContainerID>> containers = containerizer->containers();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index c34850a..33a2220 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -2209,8 +2209,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -2218,6 +2220,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver1.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/gc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index 37d3eac..6f055b5 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -954,9 +954,11 @@ TEST_F(GarbageCollectorIntegrationTest, ROOT_BusyMountPoint)
"test-task123",
"test-task123");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -965,6 +967,10 @@ TEST_F(GarbageCollectorIntegrationTest, ROOT_BusyMountPoint)
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index f4b50b1..c0dcba2 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -330,15 +330,20 @@ TEST_F(HealthCheckTest, HealthyTask)
vector<TaskInfo> tasks =
populateTasks(SLEEP_COMMAND(120), "exit 0", offers.get()[0]);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -400,7 +405,7 @@ TEST_F(HealthCheckTest, HealthyTask)
ASSERT_SOME(parse);
Result<JSON::Value> find = parse->find<JSON::Value>(
- "frameworks[0].tasks[0].statuses[0].healthy");
+ "frameworks[0].tasks[0].statuses[1].healthy");
EXPECT_SOME_TRUE(find);
}
@@ -418,7 +423,7 @@ TEST_F(HealthCheckTest, HealthyTask)
ASSERT_SOME(parse);
Result<JSON::Value> find = parse->find<JSON::Value>(
- "frameworks[0].executors[0].tasks[0].statuses[0].healthy");
+ "frameworks[0].executors[0].tasks[0].statuses[1].healthy");
EXPECT_SOME_TRUE(find);
}
@@ -488,15 +493,20 @@ TEST_F(HealthCheckTest, ROOT_HealthyTaskWithContainerImage)
health->set_type(HealthCheck::COMMAND);
health->mutable_command()->set_value("exit 0");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -522,7 +532,7 @@ TEST_F(HealthCheckTest, ROOT_HealthyTaskWithContainerImage)
ASSERT_SOME(parse);
Result<JSON::Value> find = parse->find<JSON::Value>(
- "frameworks[0].tasks[0].statuses[0].healthy");
+ "frameworks[0].tasks[0].statuses[1].healthy");
EXPECT_SOME_TRUE(find);
}
@@ -540,7 +550,7 @@ TEST_F(HealthCheckTest, ROOT_HealthyTaskWithContainerImage)
ASSERT_SOME(parse);
Result<JSON::Value> find = parse->find<JSON::Value>(
- "frameworks[0].executors[0].tasks[0].statuses[0].healthy");
+ "frameworks[0].executors[0].tasks[0].statuses[1].healthy");
EXPECT_SOME_TRUE(find);
}
@@ -628,10 +638,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HealthCheckTest, ROOT_DOCKER_DockerHealthyTask)
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
@@ -639,6 +651,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HealthCheckTest, ROOT_DOCKER_DockerHealthyTask)
AWAIT_READY(containerId);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -715,15 +730,20 @@ TEST_F(HealthCheckTest, HealthyTaskNonShell)
vector<TaskInfo> tasks =
populateTasks(SLEEP_COMMAND(120), command, offers.get()[0]);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -778,12 +798,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HealthCheckTest, HealthStatusChange)
0,
3);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
Future<TaskStatus> statusUnhealthy;
Future<TaskStatus> statusHealthyAgain;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillOnce(FutureArg<1>(&statusUnhealthy))
@@ -792,6 +814,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HealthCheckTest, HealthStatusChange)
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -920,12 +945,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusUnhealthy;
Future<TaskStatus> statusHealthy;
Future<TaskStatus> statusUnhealthyAgain;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusUnhealthy))
.WillOnce(FutureArg<1>(&statusHealthy))
@@ -934,6 +961,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1018,6 +1048,7 @@ TEST_F(HealthCheckTest, ConsecutiveFailures)
SLEEP_COMMAND(120), "exit 1", offers.get()[0], 0, 4);
// Expecting four unhealthy updates and one final kill update.
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
@@ -1026,6 +1057,7 @@ TEST_F(HealthCheckTest, ConsecutiveFailures)
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
@@ -1035,6 +1067,9 @@ TEST_F(HealthCheckTest, ConsecutiveFailures)
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1109,15 +1144,20 @@ TEST_F(HealthCheckTest, EnvironmentSetup)
vector<TaskInfo> tasks = populateTasks(
SLEEP_COMMAND(120), "exit $STATUS", offers.get()[0], 0, None(), env);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1170,10 +1210,12 @@ TEST_F(HealthCheckTest, GracePeriod)
vector<TaskInfo> tasks = populateTasks(
SLEEP_COMMAND(2), falseCommand, offers.get()[0], 9999);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(Return());
@@ -1232,17 +1274,22 @@ TEST_F(HealthCheckTest, CheckCommandTimeout)
1);
// Expecting one unhealthy update and one final kill update.
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusUnhealthy;
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusUnhealthy))
.WillOnce(FutureArg<1>(&statusKilled));
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1303,11 +1350,13 @@ TEST_F(HealthCheckTest, HealthyToUnhealthyTransitionWithinGracePeriod)
9999,
0);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
Future<TaskStatus> statusUnhealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillOnce(FutureArg<1>(&statusUnhealthy))
@@ -1315,6 +1364,9 @@ TEST_F(HealthCheckTest, HealthyToUnhealthyTransitionWithinGracePeriod)
driver.launchTasks(offers.get()[0].id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1395,16 +1447,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HealthCheckTest, HealthyTaskViaHTTP)
task.mutable_health_check()->CopyFrom(healthCheck);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1483,16 +1540,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HealthCheckTest, HealthyTaskViaHTTPWithoutType)
task.mutable_health_check()->CopyFrom(healthCheck);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1563,16 +1625,21 @@ TEST_F(HealthCheckTest, HealthyTaskViaTCP)
task.mutable_health_check()->CopyFrom(healthCheck);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1651,16 +1718,21 @@ TEST_F(HealthCheckTest, ROOT_INTERNET_CURL_HealthyTaskViaHTTPWithContainerImage)
task.mutable_health_check()->CopyFrom(healthCheck);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1741,16 +1813,21 @@ TEST_F(HealthCheckTest,
task.mutable_health_check()->CopyFrom(healthCheck);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
// Increase time here to wait for pulling image finish.
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1834,16 +1911,21 @@ TEST_F(HealthCheckTest, ROOT_INTERNET_CURL_HealthyTaskViaTCPWithContainerImage)
task.mutable_health_check()->CopyFrom(healthCheck);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1943,10 +2025,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -1955,6 +2039,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(containerId);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -2076,10 +2163,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -2088,6 +2177,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(containerId);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
// Increase time here to wait for pulling image finish.
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -2210,10 +2302,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -2222,6 +2316,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(containerId);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -2317,10 +2414,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(offers);
ASSERT_FALSE(offers->empty());
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
@@ -2360,6 +2459,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
{offers->front().id()}, {LAUNCH_GROUP(executor, taskGroup)});
AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_STARTING, statusStarting.get().state());
+
+ AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY(statusHealthy);
@@ -2446,10 +2548,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(offers);
ASSERT_FALSE(offers->empty());
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy));
@@ -2495,6 +2599,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver.acceptOffers(
{offers->front().id()}, {LAUNCH_GROUP(executor, taskGroup)});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting.get().state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
[3/6] mesos git commit: Send TASK_STARTING from the built-in
executors.
Posted by al...@apache.org.
Send TASK_STARTING from the built-in executors.
This gives schedulers more information about a tasks status,
in particular it gives a better estimate of a tasks start time
and helps differentiating between tasks stuck in TASK_STAGING
and tasks stuck in TASK_STARTING.
Review: https://reviews.apache.org/r/62212/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/de11f0ff
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/de11f0ff
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/de11f0ff
Branch: refs/heads/master
Commit: de11f0ffed36fbabc8fb4167859fd23b75f43f10
Parents: 4b6d848
Author: Benno Evers <be...@mesosphere.com>
Authored: Wed Oct 18 12:15:13 2017 -0700
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Oct 18 12:15:13 2017 -0700
----------------------------------------------------------------------
docs/high-availability-framework-guide.md | 9 +++--
src/docker/executor.cpp | 8 ++++
src/launcher/default_executor.cpp | 52 +++++++++++++++++---------
src/launcher/executor.cpp | 5 ++-
4 files changed, 51 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/de11f0ff/docs/high-availability-framework-guide.md
----------------------------------------------------------------------
diff --git a/docs/high-availability-framework-guide.md b/docs/high-availability-framework-guide.md
index 73743ab..f246773 100644
--- a/docs/high-availability-framework-guide.md
+++ b/docs/high-availability-framework-guide.md
@@ -189,10 +189,11 @@ initial state and several possible terminal states:
has not yet started to run. In this state, the task's dependencies are
fetched---for example, using the [Mesos fetcher cache](fetcher.md).
-* The `TASK_STARTING` state is optional and intended primarily for use by
- custom executors. It can be used to describe the fact that a custom executor
- has learned about the task (and maybe started fetching its dependencies) but has
- not yet started to run it.
+* The `TASK_STARTING` state is optional. It can be used to describe the fact
+ that an executor has learned about the task (and maybe started fetching its
+ dependencies) but has not yet started to run it. Custom executors are
+ encouraged to send it, to provide a more detailed description of the current
+ task state to outside observers.
* A task transitions to the `TASK_RUNNING` state after it has begun running
successfully (if the task fails to start, it transitions to one of the
http://git-wip-us.apache.org/repos/asf/mesos/blob/de11f0ff/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 3b0767f..5f6a0d0 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -159,6 +159,14 @@ public:
LOG(INFO) << "Starting task " << taskId.get();
+ // Send initial TASK_STARTING update.
+ // TODO(alexr): Use `protobuf::createTaskStatus()`
+ // instead of manually setting fields.
+ TaskStatus starting;
+ starting.mutable_task_id()->CopyFrom(task.task_id());
+ starting.set_state(TASK_STARTING);
+ driver->sendStatusUpdate(starting);
+
CHECK(task.has_container());
CHECK(task.has_command());
http://git-wip-us.apache.org/repos/asf/mesos/blob/de11f0ff/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index e58766f..cdb3c3e 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -108,6 +108,12 @@ private:
// `WAIT_NESTED_CONTAINER` call has not been established yet.
Option<Connection> waiting;
+ // TODO(bennoe): Create a real state machine instead of adding
+ // more and more ad-hoc boolean values.
+
+ // Indicates whether a container has been launched.
+ bool launched;
+
// Indicates whether a status update acknowledgement
// has been received for any status update.
bool acknowledged;
@@ -318,12 +324,14 @@ protected:
subscribe->add_unacknowledged_updates()->MergeFrom(update);
}
- // Send all unacknowledged tasks. We don't send unacknowledged terminated
- // (and hence already removed from `containers`) tasks, because for such
- // tasks `WAIT_NESTED_CONTAINER` call has already succeeded, meaning the
- // agent knows about the tasks and corresponding containers.
+ // Send all unacknowledged tasks. We don't send tasks whose container
+ // didn't launch yet, because the agent will learn about once it launches.
+ // We also don't send unacknowledged terminated (and hence already removed
+ // from `containers`) tasks, because for such tasks `WAIT_NESTED_CONTAINER`
+ // call has already succeeded, meaning the agent knows about the tasks and
+ // corresponding containers.
foreachvalue (const Owned<Container>& container, containers) {
- if (!container->acknowledged) {
+ if (container->launched && !container->acknowledged) {
subscribe->add_unacknowledged_tasks()->MergeFrom(container->taskInfo);
}
}
@@ -403,6 +411,23 @@ protected:
containerIds.push_back(containerId);
+ containers[task.task_id()] = Owned<Container>(new Container{
+ containerId,
+ task,
+ taskGroup,
+ None(),
+ None(),
+ None(),
+ None(),
+ false,
+ false,
+ false,
+ false});
+
+ // Send out the initial TASK_STARTING update.
+ const TaskStatus status = createTaskStatus(task.task_id(), TASK_STARTING);
+ forward(status);
+
agent::Call call;
call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER);
@@ -526,17 +551,8 @@ protected:
const TaskInfo& task = taskGroup.tasks().Get(index++);
const TaskID& taskId = task.task_id();
- containers[taskId] = Owned<Container>(new Container{
- containerId,
- task,
- taskGroup,
- None(),
- None(),
- None(),
- None(),
- false,
- false,
- false});
+ CHECK(containers.contains(taskId));
+ containers.at(taskId)->launched = true;
if (task.has_check()) {
Try<Owned<checks::Checker>> checker =
@@ -1410,7 +1426,7 @@ private:
CHECK_EQ(SUBSCRIBED, state);
CHECK_SOME(connectionId);
- CHECK(containers.contains(taskId));
+ CHECK(containers.contains(taskId) && containers.at(taskId)->launched);
const Owned<Container>& container = containers.at(taskId);
@@ -1469,7 +1485,7 @@ private:
LinkedHashMap<UUID, Call::Update> unacknowledgedUpdates;
- // Active child containers.
+ // Child containers.
LinkedHashMap<TaskID, Owned<Container>> containers;
// There can be multiple simulataneous ongoing (re-)connection attempts
http://git-wip-us.apache.org/repos/asf/mesos/blob/de11f0ff/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 0131577..34f6f7a 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -525,6 +525,10 @@ protected:
taskData = TaskData(task);
taskId = task.task_id();
+ // Send initial TASK_STARTING update.
+ TaskStatus starting = createTaskStatus(taskId.get(), TASK_STARTING);
+ forward(starting);
+
// Capture the kill policy.
if (task.has_kill_policy()) {
killPolicy = task.kill_policy();
@@ -1016,7 +1020,6 @@ private:
// If a check for the task has been defined, `check_status` field in each
// task status must be set to a valid `CheckStatusInfo` message even if
// there is no check status available yet.
- CHECK(taskData.isSome());
if (taskData->taskInfo.has_check()) {
CheckStatusInfo checkStatusInfo;
checkStatusInfo.set_type(taskData->taskInfo.check().type());
[6/6] mesos git commit: Fix unit tests that were broken by the
additional TASK_STARTING update.
Posted by al...@apache.org.
Fix unit tests that were broken by the additional TASK_STARTING update.
Review: https://reviews.apache.org/r/62213/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37053061
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37053061
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37053061
Branch: refs/heads/master
Commit: 37053061e4123009e4e062529fab930f476d55af
Parents: de11f0f
Author: Benno Evers <be...@mesosphere.com>
Authored: Wed Oct 18 12:15:27 2017 -0700
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Oct 18 12:15:27 2017 -0700
----------------------------------------------------------------------
src/tests/api_tests.cpp | 100 +++---
src/tests/check_tests.cpp | 155 ++++++++-
src/tests/command_executor_tests.cpp | 36 ++-
src/tests/container_logger_tests.cpp | 25 ++
.../containerizer/cgroups_isolator_tests.cpp | 95 +++++-
src/tests/containerizer/cni_isolator_tests.cpp | 99 +++++-
src/tests/containerizer/cpu_isolator_tests.cpp | 10 +
.../docker_containerizer_tests.cpp | 97 +++++-
.../docker_volume_isolator_tests.cpp | 40 ++-
.../environment_secret_isolator_tests.cpp | 10 +-
.../linux_filesystem_isolator_tests.cpp | 40 ++-
.../containerizer/memory_isolator_tests.cpp | 5 +
.../containerizer/memory_pressure_tests.cpp | 23 +-
.../nested_mesos_containerizer_tests.cpp | 5 +
src/tests/containerizer/port_mapping_tests.cpp | 10 +-
.../posix_rlimits_isolator_tests.cpp | 24 +-
.../containerizer/provisioner_appc_tests.cpp | 6 +
.../containerizer/provisioner_docker_tests.cpp | 46 ++-
.../containerizer/runtime_isolator_tests.cpp | 45 +++
.../volume_host_path_isolator_tests.cpp | 5 +
src/tests/default_executor_tests.cpp | 324 ++++++++++++++-----
src/tests/disk_quota_tests.cpp | 50 ++-
src/tests/fault_tolerance_tests.cpp | 6 +
src/tests/gc_tests.cpp | 6 +
src/tests/health_check_tests.cpp | 115 ++++++-
src/tests/hook_tests.cpp | 13 +
src/tests/master_tests.cpp | 92 +++++-
src/tests/master_validation_tests.cpp | 6 +
src/tests/oversubscription_tests.cpp | 12 +-
src/tests/partition_tests.cpp | 66 +++-
src/tests/persistent_volume_endpoints_tests.cpp | 13 +-
src/tests/persistent_volume_tests.cpp | 108 +++++--
src/tests/reconciliation_tests.cpp | 15 +-
src/tests/reservation_endpoints_tests.cpp | 44 ++-
src/tests/role_tests.cpp | 10 +-
src/tests/scheduler_tests.cpp | 52 ++-
src/tests/slave_authorization_tests.cpp | 19 +-
src/tests/slave_recovery_tests.cpp | 268 ++++++++++-----
src/tests/slave_tests.cpp | 103 +++---
src/tests/teardown_tests.cpp | 15 +-
40 files changed, 1826 insertions(+), 387 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 0e99e7b..ce3aafd 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -2275,7 +2275,8 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
Future<v1::scheduler::Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
- .WillOnce(FutureArg<1>(&offers1));
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
{
v1::scheduler::Call call;
@@ -2299,6 +2300,8 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
AWAIT_READY(offers1);
ASSERT_FALSE(offers1->offers().empty());
+ v1::AgentID agentId(offers1->offers()[0].agent_id());
+
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SUBSCRIBE);
@@ -2351,9 +2354,11 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
Future<Result<v1::master::Event>> event = decoder.read();
EXPECT_TRUE(event.isPending());
- Future<mesos::v1::scheduler::Event::Update> update;
+ Future<mesos::v1::scheduler::Event::Update> updateRunning1;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
+ .WillOnce(DoAll(
+ FutureArg<1>(&updateRunning1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(executor1, registered(_, _, _, _));
EXPECT_CALL(executor2, registered(_, _, _, _));
@@ -2399,21 +2404,8 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
ASSERT_EQ(task1.task_id(), event->get().task_added().task().task_id());
- AWAIT_READY(update);
-
- {
- v1::scheduler::Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(v1::scheduler::Call::ACKNOWLEDGE);
-
- v1::scheduler::Call::Acknowledge* acknowledge =
- call.mutable_acknowledge();
- acknowledge->mutable_task_id()->CopyFrom(task1.task_id());
- acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
- acknowledge->set_uuid(update->status().uuid());
-
- mesos.send(call);
- }
+ AWAIT_READY(updateRunning1);
+ EXPECT_EQ(updateRunning1->status().state(), TASK_RUNNING);
event = decoder.read();
@@ -2446,8 +2438,9 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
const v1::Offer& offer2 = offers2->offers(0);
+ Future<mesos::v1::scheduler::Event::Update> updateRunning2;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update))
+ .WillOnce(FutureArg<1>(&updateRunning2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
Future<TaskInfo> execTask2;
@@ -2484,7 +2477,7 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
mesos.send(call);
}
- AWAIT_READY(update);
+ AWAIT_READY(updateRunning2);
event = decoder.read();
EXPECT_TRUE(event.isPending());
@@ -4179,9 +4172,9 @@ TEST_P(AgentAPITest, GetContainers)
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
- Future<TaskStatus> status;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusRunning));
// No tasks launched, we should expect zero containers in Response.
{
@@ -4201,8 +4194,8 @@ TEST_P(AgentAPITest, GetContainers)
driver.launchTasks(offer.id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ResourceStatistics statistics;
statistics.set_mem_limit_bytes(2048);
@@ -4431,8 +4424,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetFrameworks)
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
@@ -4454,6 +4449,9 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetFrameworks)
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -4534,8 +4532,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetExecutors)
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
@@ -4557,6 +4557,9 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetExecutors)
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -4641,8 +4644,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetTasks)
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
@@ -4667,6 +4672,9 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetTasks)
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -4804,8 +4812,10 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetState)
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
@@ -4831,6 +4841,9 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetState)
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -5540,14 +5553,15 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, LaunchNestedContainerSession)
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return());
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
// Launch a nested container session that runs a command
// that writes something to stdout and stderr and exits.
@@ -5661,14 +5675,15 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return());
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
// Attempt to launch a nested container which does nothing.
@@ -5757,14 +5772,15 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return());
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
// Launch a nested container session that runs a command
// that writes something to stdout and stderr and exits.
@@ -5874,14 +5890,15 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return());
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
// Launch a nested container session that runs `cat` so that it never exits.
@@ -6178,14 +6195,15 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return());
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
// Launch a nested container session which runs a shell.
@@ -6784,16 +6802,22 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
const Offer& offer = offers.get()[0];
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(Return());
TaskInfo taskInfo = createTask(offer, "sleep 1000");
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning->state());
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index fd15a47..9a56c00 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -288,12 +288,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateExplicitReconciliation;
Future<Event::Update> updateImplicitReconciliation;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateExplicitReconciliation))
@@ -322,6 +324,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -437,12 +442,14 @@ TEST_F(CommandExecutorCheckTest, CommandCheckStatusChange)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateCheckResultChanged;
Future<Event::Update> updateCheckResultBack;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateCheckResultChanged))
@@ -464,6 +471,9 @@ TEST_F(CommandExecutorCheckTest, CommandCheckStatusChange)
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
@@ -557,10 +567,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -590,6 +602,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -660,10 +680,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -687,6 +709,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -781,11 +811,13 @@ TEST_F(CommandExecutorCheckTest, CommandCheckTimeout)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateCheckResultTimeout;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateCheckResultTimeout))
@@ -807,6 +839,12 @@ TEST_F(CommandExecutorCheckTest, CommandCheckTimeout)
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ ASSERT_EQ(TASK_STARTING, updateTaskStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskStarting->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
@@ -886,12 +924,14 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateHealthResult;
Future<Event::Update> updateImplicitReconciliation;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateHealthResult))
@@ -928,6 +968,9 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
@@ -1038,9 +1081,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> updateTaskStarting;
Future<v1::scheduler::Event::Update> updateTaskRunning;
Future<v1::scheduler::Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -1069,6 +1114,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered)
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -1171,9 +1219,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, TCPCheckDelivered)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> updateTaskStarting;
Future<v1::scheduler::Event::Update> updateTaskRunning;
Future<v1::scheduler::Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -1201,6 +1251,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, TCPCheckDelivered)
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -1381,12 +1439,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateExplicitReconciliation;
Future<Event::Update> updateImplicitReconciliation;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateExplicitReconciliation))
@@ -1415,6 +1475,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -1575,12 +1643,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateCheckResultChanged;
Future<Event::Update> updateCheckResultBack;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateCheckResultChanged))
@@ -1602,6 +1672,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ ASSERT_EQ(TASK_STARTING, updateTaskStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskStarting->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
@@ -1736,10 +1812,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -1769,6 +1847,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -1881,10 +1967,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -1936,6 +2024,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -2049,11 +2145,13 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateCheckResultTimeout;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateCheckResultTimeout))
@@ -2075,6 +2173,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout)
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ ASSERT_EQ(TASK_STARTING, updateTaskStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskStarting->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
@@ -2197,12 +2301,14 @@ TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<Event::Update> updateTaskStarting;
Future<Event::Update> updateTaskRunning;
Future<Event::Update> updateCheckResult;
Future<Event::Update> updateHealthResult;
Future<Event::Update> updateImplicitReconciliation;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillOnce(FutureArg<1>(&updateHealthResult))
@@ -2236,6 +2342,9 @@ TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
launchTask(&mesos, offer, taskInfo);
+ AWAIT_READY(updateTaskStarting);
+ acknowledge(&mesos, frameworkId, updateTaskStarting->status());
+
AWAIT_READY(updateTaskRunning);
ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
@@ -2370,11 +2479,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
- Future<v1::scheduler::Event::Update> updates[4];
+ constexpr int EXPECTED_UPDATE_COUNT = 5;
+ Future<v1::scheduler::Event::Update> updates[EXPECTED_UPDATE_COUNT];
{
testing::InSequence dummy;
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < EXPECTED_UPDATE_COUNT; i++) {
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&updates[i]));
}
@@ -2401,12 +2511,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
- enum class Stage { INITIAL, RUNNING, CHECKED };
+ enum class Stage { STARTING, INITIAL, RUNNING, CHECKED };
hashmap<v1::TaskID, Stage> taskStages;
- taskStages.put(taskInfo1.task_id(), Stage::INITIAL);
- taskStages.put(taskInfo2.task_id(), Stage::INITIAL);
+ taskStages.put(taskInfo1.task_id(), Stage::STARTING);
+ taskStages.put(taskInfo2.task_id(), Stage::STARTING);
- for (int i = 0; i < 4; i++ ) {
+ for (int i = 0; i < EXPECTED_UPDATE_COUNT; i++ ) {
AWAIT_READY(updates[i]);
const v1::TaskStatus& taskStatus = updates[i]->status();
@@ -2415,8 +2525,17 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
ASSERT_SOME(taskStage);
switch (taskStage.get()) {
+ case Stage::STARTING: {
+ v1::TaskState state = taskStatus.state();
+ ASSERT_TRUE(state == v1::TASK_STARTING);
+
+ taskStages.put(taskStatus.task_id(), Stage::INITIAL);
+
+ break;
+ }
case Stage::INITIAL: {
- ASSERT_EQ(TASK_RUNNING, taskStatus.state());
+ v1::TaskState state = taskStatus.state();
+ ASSERT_TRUE(state == v1::TASK_RUNNING);
ASSERT_TRUE(taskStatus.check_status().has_tcp());
ASSERT_FALSE(taskStatus.check_status().tcp().has_succeeded());
@@ -2513,9 +2632,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, HTTPCheckDelivered)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> updateTaskStarting;
Future<v1::scheduler::Event::Update> updateTaskRunning;
Future<v1::scheduler::Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -2544,6 +2665,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, HTTPCheckDelivered)
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+
+ // Acknowledge (to be able to get the next update).
+ acknowledge(&mesos, frameworkId, taskStarting);
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
@@ -2662,9 +2791,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, TCPCheckDelivered)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> updateTaskStarting;
Future<v1::scheduler::Event::Update> updateTaskRunning;
Future<v1::scheduler::Event::Update> updateCheckResult;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskStarting))
.WillOnce(FutureArg<1>(&updateTaskRunning))
.WillOnce(FutureArg<1>(&updateCheckResult))
.WillRepeatedly(Return()); // Ignore subsequent updates.
@@ -2692,6 +2823,16 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, TCPCheckDelivered)
launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+ AWAIT_READY(updateTaskStarting);
+ const v1::TaskStatus& taskStarting = updateTaskStarting->status();
+
+ ASSERT_EQ(TASK_STARTING, taskStarting.state());
+ EXPECT_EQ(taskInfo.task_id(), taskStarting.task_id());
+
+ // Acknowledge (to be able to get the next update).
+ acknowledge(&mesos, frameworkId, taskStarting);
+
+
AWAIT_READY(updateTaskRunning);
const v1::TaskStatus& taskRunning = updateTaskRunning->status();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/command_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/command_executor_tests.cpp b/src/tests/command_executor_tests.cpp
index 2c1d467..6d6f916 100644
--- a/src/tests/command_executor_tests.cpp
+++ b/src/tests/command_executor_tests.cpp
@@ -125,12 +125,17 @@ TEST_P(CommandExecutorTest, NoTaskKillingCapability)
offers->front().resources(),
SLEEP_COMMAND(1000));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers->front().id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -193,12 +198,17 @@ TEST_P(CommandExecutorTest, TaskKillingCapability)
offers->front().resources(),
SLEEP_COMMAND(1000));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers->front().id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -296,12 +306,14 @@ TEST_P(CommandExecutorTest, NoTransitionFromKillingToRunning)
vector<TaskInfo> tasks;
tasks.push_back(task);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
Future<TaskStatus> statusKilling;
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillOnce(FutureArg<1>(&statusKilling))
@@ -309,6 +321,9 @@ TEST_P(CommandExecutorTest, NoTransitionFromKillingToRunning)
driver.launchTasks(offers->front().id(), tasks);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -385,10 +400,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HTTPCommandExecutorTest, TerminateWithACK)
offers->front().resources(),
"sleep 1");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
@@ -398,7 +415,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HTTPCommandExecutorTest, TerminateWithACK)
driver.launchTasks(offers->front().id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by TASK_FINISHED.
+ // Scheduler should first receive TASK_STARTING, followed by TASK_RUNNING
+ // and TASK_FINISHED.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -461,9 +482,10 @@ TEST_F(HTTPCommandExecutorTest, ExplicitAcknowledgements)
offers->front().resources(),
SLEEP_COMMAND(1000));
- Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusStarting;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&statusRunning));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
// Ensure no status update acknowledgements are sent from the driver
// to the master until the explicit acknowledgement is sent.
@@ -475,9 +497,9 @@ TEST_F(HTTPCommandExecutorTest, ExplicitAcknowledgements)
driver.launchTasks(offers->front().id(), {task});
- AWAIT_READY(statusRunning);
- EXPECT_TRUE(statusRunning->has_slave_id());
- EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_TRUE(statusStarting->has_slave_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
// Now send the acknowledgement.
Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL(
@@ -486,7 +508,7 @@ TEST_F(HTTPCommandExecutorTest, ExplicitAcknowledgements)
_,
master.get()->pid);
- driver.acknowledgeStatusUpdate(statusRunning.get());
+ driver.acknowledgeStatusUpdate(statusStarting.get());
AWAIT_READY(acknowledgement);
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/container_logger_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/container_logger_tests.cpp b/src/tests/container_logger_tests.cpp
index fb8e441..b65cf6a 100644
--- a/src/tests/container_logger_tests.cpp
+++ b/src/tests/container_logger_tests.cpp
@@ -188,15 +188,20 @@ TEST_F(ContainerLoggerTest, DefaultToSandbox)
// We'll start a task that outputs to stdout.
TaskInfo task = createTask(offers.get()[0], "echo 'Hello World!'");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -296,15 +301,20 @@ TEST_F(ContainerLoggerTest, LOGROTATE_RotateInSandbox)
"i=0; while [ $i -lt 11264 ]; "
"do printf '%-1024d\\n' $i; i=$((i+1)); done");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -447,15 +457,20 @@ TEST_F(ContainerLoggerTest, LOGROTATE_CustomRotateOptions)
variable->set_name("CONTAINER_LOGGER_LOGROTATE_STDOUT_OPTIONS");
variable->set_value(customConfig);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -545,15 +560,20 @@ TEST_F(ContainerLoggerTest, LOGROTATE_ModuleFDOwnership)
// Start a task that will keep running until the end of the test.
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusKilled))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -680,15 +700,20 @@ TEST_P(UserContainerLoggerTest, ROOT_LOGROTATE_RotateWithSwitchUserTrueOrFalse)
// Start the task as a non-root user.
task.mutable_command()->set_user("nobody");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/cgroups_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/cgroups_isolator_tests.cpp b/src/tests/containerizer/cgroups_isolator_tests.cpp
index 3fc9341..ee1e5e6 100644
--- a/src/tests/containerizer/cgroups_isolator_tests.cpp
+++ b/src/tests/containerizer/cgroups_isolator_tests.cpp
@@ -179,12 +179,18 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_NET_CLS_UserCgroup)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&statusRunning));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(Return());
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -320,12 +326,17 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_RevocableCpu)
cpus,
"sleep 120");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers2.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -416,12 +427,17 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_EnableCfs)
Resources::parse("cpus:0.5").get(),
command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -530,8 +546,13 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_LimitSwap)
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
v1::createCommandInfo("ls", {"ls", "-al", "/"}));
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -543,6 +564,10 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_LimitSwap)
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
@@ -617,12 +642,17 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PidsAndTids)
offers.get()[0].resources(),
command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -801,15 +831,20 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_NET_CLS_Isolate)
// explicitly killing this task to perform the cleanup test.
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
// Capture the update to verify that the task has been launched.
- AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning->state());
// Task is ready. Make sure there is exactly 1 container in the hashset.
Future<hashset<ContainerID>> containers = containerizer->containers();
@@ -857,7 +892,7 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_NET_CLS_Isolate)
Future<Nothing> gcSchedule = FUTURE_DISPATCH(
_, &slave::GarbageCollectorProcess::schedule);
- driver.killTask(status->task_id());
+ driver.killTask(statusRunning->task_id());
AWAIT_READY(gcSchedule);
@@ -927,14 +962,19 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_NET_CLS_ContainerStatus)
// Create a task to be launched in the mesos-container.
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
- AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning->state());
// Task is ready. Verify `ContainerStatus` is present in slave state.
Future<Response> response = process::http::get(
@@ -1017,12 +1057,17 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_Sample)
TaskInfo task = createTask(offers.get()[0], "sleep 120");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1131,8 +1176,10 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_PerfForward)
AWAIT_READY(offers1);
ASSERT_FALSE(offers1->empty());
+ Future<TaskStatus> statusStarting1;
Future<TaskStatus> statusRunning1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting1))
.WillOnce(FutureArg<1>(&statusRunning1))
.WillRepeatedly(Return());
@@ -1147,6 +1194,9 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_PerfForward)
driver.launchTasks(offers1.get()[0].id(), {task1}, filters);
+ AWAIT_READY(statusStarting1);
+ EXPECT_EQ(TASK_STARTING, statusStarting1->state());
+
AWAIT_READY(statusRunning1);
EXPECT_EQ(TASK_RUNNING, statusRunning1->state());
@@ -1203,13 +1253,18 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_PerfForward)
// Start a new container which will start reporting perf statistics.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
+ Future<TaskStatus> statusStarting2;
Future<TaskStatus> statusRunning2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting2))
.WillOnce(FutureArg<1>(&statusRunning2))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.launchTasks(offers2.get()[0].id(), {task2});
+ AWAIT_READY(statusStarting2);
+ EXPECT_EQ(TASK_STARTING, statusStarting2->state());
+
AWAIT_READY(statusRunning2);
EXPECT_EQ(TASK_RUNNING, statusRunning2->state());
@@ -1292,8 +1347,10 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryForward)
AWAIT_READY(offers1);
ASSERT_FALSE(offers1->empty());
+ Future<TaskStatus> statusStarting1;
Future<TaskStatus> statusRunning1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting1))
.WillOnce(FutureArg<1>(&statusRunning1))
.WillRepeatedly(Return());
@@ -1308,6 +1365,9 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryForward)
driver.launchTasks(offers1.get()[0].id(), {task1}, filters);
+ AWAIT_READY(statusStarting1);
+ EXPECT_EQ(TASK_STARTING, statusStarting1->state());
+
AWAIT_READY(statusRunning1);
EXPECT_EQ(TASK_RUNNING, statusRunning1->state());
@@ -1361,13 +1421,18 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryForward)
// Start a new container which will start reporting memory statistics.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
+ Future<TaskStatus> statusStarting2;
Future<TaskStatus> statusRunning2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting2))
.WillOnce(FutureArg<1>(&statusRunning2))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.launchTasks(offers2.get()[0].id(), {task2});
+ AWAIT_READY(statusStarting2);
+ EXPECT_EQ(TASK_STARTING, statusStarting2->state());
+
AWAIT_READY(statusRunning2);
EXPECT_EQ(TASK_RUNNING, statusRunning2->state());
@@ -1448,8 +1513,10 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryBackward)
AWAIT_READY(offers1);
ASSERT_FALSE(offers1->empty());
+ Future<TaskStatus> statusStarting1;
Future<TaskStatus> statusRunning1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting1))
.WillOnce(FutureArg<1>(&statusRunning1))
.WillRepeatedly(Return());
@@ -1464,6 +1531,9 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryBackward)
driver.launchTasks(offers1.get()[0].id(), {task1}, filters);
+ AWAIT_READY(statusStarting1);
+ EXPECT_EQ(TASK_STARTING, statusStarting1->state());
+
AWAIT_READY(statusRunning1);
EXPECT_EQ(TASK_RUNNING, statusRunning1->state());
@@ -1517,13 +1587,18 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryBackward)
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
+ Future<TaskStatus> statusStarting2;
Future<TaskStatus> statusRunning2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting2))
.WillOnce(FutureArg<1>(&statusRunning2))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.launchTasks(offers2.get()[0].id(), {task2});
+ AWAIT_READY(statusStarting2);
+ EXPECT_EQ(TASK_STARTING, statusStarting2->state());
+
AWAIT_READY(statusRunning2);
EXPECT_EQ(TASK_RUNNING, statusRunning2->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/cni_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/cni_isolator_tests.cpp b/src/tests/containerizer/cni_isolator_tests.cpp
index e673d91..5a2af59 100644
--- a/src/tests/containerizer/cni_isolator_tests.cpp
+++ b/src/tests/containerizer/cni_isolator_tests.cpp
@@ -264,14 +264,20 @@ TEST_F(CniIsolatorTest, ROOT_INTERNET_CURL_LaunchCommandTask)
// Make sure the container join the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -346,12 +352,18 @@ TEST_F(CniIsolatorTest, ROOT_VerifyCheckpointedInfo)
// Make sure the container join the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -544,26 +556,37 @@ TEST_F(CniIsolatorTest, ROOT_SlaveRecovery)
// Make sure the container join the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusKilled));
EXPECT_CALL(sched, offerRescinded(&driver, _))
.Times(AtMost(1));
- Future<Nothing> ack =
+ Future<Nothing> ackRunning =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ackStarting =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(ackStarting);
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ackRunning);
// Stop the slave after TASK_RUNNING is received.
slave.get()->terminate();
@@ -644,14 +667,20 @@ TEST_F(CniIsolatorTest, ROOT_EnvironmentLibprocessIP)
// Make sure the container joins the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -722,14 +751,20 @@ TEST_F(CniIsolatorTest, ROOT_INTERNET_CURL_LaunchContainerInHostNetwork)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -853,15 +888,26 @@ TEST_F(CniIsolatorTest, ROOT_DynamicAddDelofCniConfig)
// Make sure the container is able to join mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
- Future<Nothing> ack =
+ Future<Nothing> ackRunning =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ackStarting =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer2.id(), {task}, filters);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(ackStarting);
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -869,7 +915,7 @@ TEST_F(CniIsolatorTest, ROOT_DynamicAddDelofCniConfig)
// To avoid having the agent resending the `TASK_RUNNING` update, which can
// happen due to clock manipulation below, wait for the status update
// acknowledgement to reach the agent.
- AWAIT_READY(ack);
+ AWAIT_READY(ackRunning);
// Testing dynamic deletion of CNI networks.
rm = os::rm(path::join(cniConfigDir, "mockConfig"));
@@ -977,14 +1023,20 @@ TEST_F(CniIsolatorTest, ROOT_OverrideHostname)
// Make sure the container joins the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1095,14 +1147,20 @@ TEST_F(CniIsolatorTest, ROOT_VerifyResolverConfig)
// Make sure the container joins the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1212,14 +1270,20 @@ TEST_F(CniIsolatorTest, ROOT_INTERNET_VerifyResolverConfig)
// Make sure the container joins the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1299,14 +1363,20 @@ TEST_F(CniIsolatorTest, ROOT_INTERNET_CURL_ReadOnlyBindMounts)
container->set_type(ContainerInfo::MESOS);
container->mutable_mesos()->mutable_image()->CopyFrom(image);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1486,9 +1556,14 @@ TEST_P(DefaultExecutorCniTest, ROOT_VerifyContainerIP)
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
command);
+ Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateFinished;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(DoAll(FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(
+ frameworkId,
+ offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
@@ -1501,6 +1576,10 @@ TEST_P(DefaultExecutorCniTest, ROOT_VerifyContainerIP)
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+ AWAIT_READY(updateStarting);
+ ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
+
AWAIT_READY(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
@@ -1665,12 +1744,18 @@ TEST_F(CniIsolatorPortMapperTest, ROOT_INTERNET_CURL_PortMapper)
// Set the container for the task.
task.mutable_container()->CopyFrom(container);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(300));
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1898,14 +1983,20 @@ TEST_P(DefaultContainerDNSCniTest, ROOT_VerifyDefaultDNS)
// Make sure the container joins the mock CNI network.
container->add_network_infos()->set_name("__MESOS_TEST__");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(task.task_id(), statusRunning->task_id());
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/cpu_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/cpu_isolator_tests.cpp b/src/tests/containerizer/cpu_isolator_tests.cpp
index 153990d..846b2e2 100644
--- a/src/tests/containerizer/cpu_isolator_tests.cpp
+++ b/src/tests/containerizer/cpu_isolator_tests.cpp
@@ -114,12 +114,17 @@ TEST_P(CpuIsolatorTest, ROOT_UserCpuUsage)
offers.get()[0],
"while true ; do true ; done & sleep 60");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -206,12 +211,17 @@ TEST_P(CpuIsolatorTest, ROOT_SystemCpuUsage)
offers.get()[0],
"cat /dev/urandom > /dev/null & sleep 60");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
index 45f0d1d..419be5d 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -509,14 +509,18 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
@@ -659,13 +663,17 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -787,13 +795,17 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_TaskKillingCapability)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -913,14 +925,18 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
EXPECT_CALL(dockerContainerizer, update(_, _))
.WillRepeatedly(Return(Nothing()));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1054,8 +1070,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
@@ -1063,6 +1081,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
AWAIT_READY(containerId);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1656,9 +1677,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -1670,6 +1693,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1810,8 +1837,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes)
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
@@ -1822,6 +1851,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1974,8 +2005,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes)
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
@@ -1986,6 +2019,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -2138,9 +2173,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -2149,6 +2186,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -2272,9 +2311,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -2283,6 +2324,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -2407,9 +2450,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -2418,6 +2463,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -2546,9 +2593,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -2557,6 +2606,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -2721,11 +2772,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
- ASSERT_EQ(TASK_RUNNING, update.status().state());
+ ASSERT_EQ(TASK_STARTING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
ASSERT_TRUE(exists(docker, containerId.get()));
@@ -2906,11 +2957,11 @@ TEST_F(DockerContainerizerTest,
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
- ASSERT_EQ(TASK_RUNNING, update.status().state());
+ ASSERT_EQ(TASK_STARTING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ ASSERT_EQ(TASK_STARTING, status->state());
ASSERT_TRUE(exists(docker, containerId.get()));
@@ -3022,9 +3073,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -3033,6 +3086,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping)
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -3159,14 +3214,18 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -4031,12 +4090,14 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_NoTransitionFromKillingToRunning)
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
Future<TaskStatus> statusKilling;
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillOnce(FutureArg<1>(&statusKilling))
@@ -4045,6 +4106,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_NoTransitionFromKillingToRunning)
driver.launchTasks(offers->front().id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -4160,14 +4223,18 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_CGROUPS_CFS_CgroupsEnableCFS)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
@@ -4299,14 +4366,18 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Non_Root_Sandbox)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -4455,14 +4526,18 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_DefaultDNS)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
@@ -4593,7 +4668,7 @@ TEST_F(DockerContainerizerIPv6Test, ROOT_DOCKER_LaunchIPv6HostNetwork)
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
@@ -4625,14 +4700,18 @@ TEST_F(DockerContainerizerIPv6Test, ROOT_DOCKER_LaunchIPv6HostNetwork)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
@@ -4870,14 +4949,18 @@ TEST_F(
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
@@ -4921,14 +5004,14 @@ TEST_F(
// status as received in the status update message.
for (int i = 0; i < 2; i++) {
Result<JSON::String> protocol = parse->find<JSON::String>(
- "frameworks[0].executors[0].tasks[0].statuses[0]"
+ "frameworks[0].executors[0].tasks[0].statuses[1]"
".container_status.network_infos[0].ip_addresses[" +
stringify(i) + "].protocol");
ASSERT_SOME(protocol);
Result<JSON::String> ip = parse->find<JSON::String>(
- "frameworks[0].executors[0].tasks[0].statuses[0]"
+ "frameworks[0].executors[0].tasks[0].statuses[1]"
".container_status.network_infos[0].ip_addresses[" +
stringify(i) + "].ip_address");
[4/6] mesos git commit: Fix unit tests that were broken by the
additional TASK_STARTING update.
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/hook_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hook_tests.cpp b/src/tests/hook_tests.cpp
index c4fadbb..5428782 100644
--- a/src/tests/hook_tests.cpp
+++ b/src/tests/hook_tests.cpp
@@ -707,9 +707,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -717,6 +719,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -924,9 +928,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -934,6 +940,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -1039,14 +1047,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HookTest, ROOT_DOCKER_VerifySlavePostFetchHook)
ContainerInfo::DockerInfo* dockerInfo = containerInfo->mutable_docker();
dockerInfo->set_image("alpine");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5d96457..c6906a7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2786,22 +2786,34 @@ TEST_F(MasterTest, UnreachableTaskAfterFailover)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&runningStatus));
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillRepeatedly(Return());
+
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ const SlaveID slaveId = startingStatus->slave_id();
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- const SlaveID slaveId = runningStatus->slave_id();
-
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Step 4: Simulate master failover. We leave the slave without a
// master so it does not attempt to re-register.
@@ -6828,20 +6840,31 @@ TEST_F(MasterTest, FailoverAgentReregisterFirst)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate master failover. We leave the scheduler without a master
// so it does not attempt to re-register yet.
@@ -7048,22 +7071,33 @@ TEST_F(MasterTest, AgentRestartNoReregister)
TaskInfo task = createTask(offer, "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
- slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
Clock::pause();
@@ -7416,12 +7450,18 @@ TEST_F(MasterTest, TaskWithTinyResources)
Resources::parse("cpus:0.00001;mem:1").get(),
SLEEP_COMMAND(1000));
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -7506,12 +7546,18 @@ TEST_F(MasterTest, MultiRoleSchedulerUnsubscribeFromRole)
TaskInfo task = createTask(offer.slave_id(), resources, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver1.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -7808,12 +7854,18 @@ TEST_F(MasterTest, AgentDomainDifferentRegion)
// Check that we can launch a task in a remote region.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -8409,9 +8461,11 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
v1::TaskGroupInfo taskGroup;
taskGroup.add_tasks()->CopyFrom(taskInfo);
- Future<v1::scheduler::Event::Update> update;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
+ .WillOnce(FutureArg<1>(&startingUpdate))
+ .WillOnce(FutureArg<1>(&runningUpdate));
{
Call call;
@@ -8433,11 +8487,17 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
mesos.send(call);
}
- AWAIT_READY(update);
+ AWAIT_READY(startingUpdate);
+
+ EXPECT_EQ(TASK_STARTING, startingUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+ EXPECT_TRUE(startingUpdate->status().has_timestamp());
+
+ AWAIT_READY(runningUpdate);
- EXPECT_EQ(TASK_RUNNING, update->status().state());
- EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
- EXPECT_TRUE(update->status().has_timestamp());
+ EXPECT_EQ(TASK_STARTING, runningUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
+ EXPECT_TRUE(runningUpdate->status().has_timestamp());
// Ensure that the task sandbox symbolic link is created.
EXPECT_TRUE(os::exists(path::join(
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index f00dd9b..7da1be5 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1889,8 +1889,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID)
Offer offer1 = offers1.get()[0];
TaskInfo task1 = createTask(offer1, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -1898,6 +1900,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID)
driver.launchTasks(offer1.id(), {task1});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task1.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task1.task_id(), runningStatus->task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index cd98b8f..d262bbe 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -757,7 +757,7 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
// Advance the clock for the slave to trigger the calculation of the
// total oversubscribed resources. As we described above, we don't
@@ -1023,15 +1023,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
TaskInfo task = createTask(offers.get()[0], "sleep 10");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(status0);
+ ASSERT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1->state());
@@ -1132,15 +1137,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
TaskInfo task = createTask(offers.get()[0], "sleep 10");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(status0);
+ ASSERT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 0597bd2..7b11264 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -215,22 +215,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- const SlaveID& slaveId = runningStatus->slave_id();
+ AWAIT_READY(statusUpdateAck2);
- AWAIT_READY(statusUpdateAck);
+ const SlaveID& slaveId = startingStatus->slave_id();
// Now, induce a partition of the slave by having the master
// timeout the slave.
@@ -572,8 +583,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -874,8 +887,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
// Launch `task1` using `sched1`.
TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60");
+ Future<TaskStatus> startingStatus1;
Future<TaskStatus> runningStatus1;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus1))
.WillOnce(FutureArg<1>(&runningStatus1));
Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
@@ -917,8 +932,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
// Launch the second task.
TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60");
+ Future<TaskStatus> startingStatus2;
Future<TaskStatus> runningStatus2;
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&startingStatus2))
.WillOnce(FutureArg<1>(&runningStatus2));
Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
@@ -1136,22 +1153,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, PartitionedSlaveOrphanedTask)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Now, induce a partition of the slave by having the master
// timeout the slave.
@@ -1404,22 +1432,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, DisconnectedFramework)
// Launch `task` using `sched1`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
driver1.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck1);
+ AWAIT_READY(statusUpdateAck2);
// Shutdown the master.
master->reset();
@@ -1573,22 +1612,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, SpuriousSlaveReregistration)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate a master loss event at the slave and then cause the
// slave to reregister with the master. From the master's
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 444737a..883192d 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1988,16 +1988,15 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
TaskInfo taskInfo = createTask(offer.slave_id(), taskResources, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(&driver, _));
-
- Future<Nothing> _statusUpdateAcknowledgement =
- FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+ Future<TaskStatus> starting;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&starting))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(starting);
+ EXPECT_EQ(TASK_STARTING, starting->state());
// Summon an offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 11fe432..acfeac1 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -806,9 +806,11 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
taskResources,
"echo abc > path1/file");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -823,6 +825,10 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -982,21 +988,25 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
taskResources2.get() + volume,
"echo task2 > path1/file2");
- // We should receive a TASK_RUNNING followed by a TASK_FINISHED for
- // each of the 2 tasks. We do not check for the actual task state
- // since it's not the primary objective of the test. We instead
- // verify that the paths are created by the tasks after we receive
- // enough status updates.
+ // We should receive a TASK_STARTING, followed by a TASK_RUNNING
+ // and a TASK_FINISHED for each of the 2 tasks.
+ // We do not check for the actual task state since it's not the
+ // primary objective of the test. We instead verify that the paths
+ // are created by the tasks after we receive enough status updates.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
Future<TaskStatus> status3;
Future<TaskStatus> status4;
+ Future<TaskStatus> status5;
+ Future<TaskStatus> status6;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillOnce(FutureArg<1>(&status3))
- .WillOnce(FutureArg<1>(&status4));
+ .WillOnce(FutureArg<1>(&status4))
+ .WillOnce(FutureArg<1>(&status5))
+ .WillOnce(FutureArg<1>(&status6));
driver.acceptOffers(
{offer.id()},
@@ -1009,6 +1019,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
AWAIT_READY(status2);
AWAIT_READY(status3);
AWAIT_READY(status4);
+ AWAIT_READY(status5);
+ AWAIT_READY(status6);
const string& volumePath = slave::paths::getPersistentVolumePath(
slaveFlags.work_dir,
@@ -1258,10 +1270,12 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
Resources::parse("cpus:1;mem:128").get() + volume,
"echo abc > path1/file1 && sleep 1000");
- // We should receive a TASK_RUNNING for the launched task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1));
// We use a filter of 0 seconds so the resources will be available
@@ -1275,6 +1289,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
LAUNCH({task1})},
filters);
+ AWAIT_READY(status0);
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -1331,11 +1348,13 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
Resources::parse("cpus:1;mem:256").get() + volume,
"echo abc > path1/file2 && sleep 1000");
- // We should receive a TASK_RUNNING for the launched task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3));
driver2.acceptOffers(
{offer2.id()},
@@ -1343,7 +1362,10 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
filters);
AWAIT_READY(status2);
- EXPECT_EQ(TASK_RUNNING, status2->state());
+ EXPECT_EQ(TASK_STARTING, status2->state());
+
+ AWAIT_READY(status3);
+ EXPECT_EQ(TASK_RUNNING, status3->state());
// Collect metrics based on both frameworks. Note that the `cpus_used` and
// `mem_used` is updated, but `disk_used` does not change since both tasks
@@ -1434,13 +1456,17 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
taskResources.get() + volume,
"sleep 1000");
- // We should receive a TASK_RUNNING for each of the tasks.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for each of the tasks.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4));
Future<CheckpointResourcesMessage> checkpointResources =
FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid);
@@ -1451,11 +1477,14 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
LAUNCH({task1, task2})});
AWAIT_READY(checkpointResources);
+
+ // We only check the first and the last status, because the two in between
+ // could arrive in any order.
AWAIT_READY(status1);
- EXPECT_EQ(TASK_RUNNING, status1->state());
+ EXPECT_EQ(TASK_STARTING, status1->state());
- AWAIT_READY(status2);
- EXPECT_EQ(TASK_RUNNING, status2->state());
+ AWAIT_READY(status4);
+ EXPECT_EQ(TASK_RUNNING, status4->state());
// This is to make sure CheckpointResourcesMessage is processed.
Clock::pause();
@@ -1598,16 +1627,20 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
- // We should receive a TASK_RUNNING each of the 2 tasks. We track task
- // termination by a TASK_FINISHED for the short-lived task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING each of the 2 tasks.
+ // We track task termination by a TASK_FINISHED for the short-lived task.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
+ Future<TaskStatus> status5;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
- .WillOnce(FutureArg<1>(&status3));
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4))
+ .WillOnce(FutureArg<1>(&status5));
driver.acceptOffers(
{offer.id()},
@@ -1616,18 +1649,23 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
LAUNCH({task1, task2})},
filters);
- // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for
- // the short-lived task.
+ // Wait for TASK_STARTING and TASK_RUNNING for both the tasks,
+ // and TASK_FINISHED for the short-lived task.
AWAIT_READY(status1);
AWAIT_READY(status2);
AWAIT_READY(status3);
+ AWAIT_READY(status4);
+ AWAIT_READY(status5);
hashset<TaskID> tasksRunning;
hashset<TaskID> tasksFinished;
- vector<Future<TaskStatus>> statuses{status1, status2, status3};
+ vector<Future<TaskStatus>> statuses {
+ status1, status2, status3, status4, status5};
foreach (const Future<TaskStatus>& status, statuses) {
- if (status->state() == TASK_RUNNING) {
+ if (status->state() == TASK_STARTING) {
+ // ignore
+ } else if (status->state() == TASK_RUNNING) {
tasksRunning.insert(status->task_id());
} else {
tasksFinished.insert(status->task_id());
@@ -1686,15 +1724,15 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
// We kill the long-lived task and wait for TASK_KILLED, so we can
// DESTROY the persistent volume once the task terminates.
- Future<TaskStatus> status4;
+ Future<TaskStatus> status6;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status4));
+ .WillOnce(FutureArg<1>(&status6));
driver.killTask(task1.task_id());
- AWAIT_READY(status4);
- EXPECT_EQ(task1.task_id(), status4->task_id());
- EXPECT_EQ(TASK_KILLED, status4->state());
+ AWAIT_READY(status6);
+ EXPECT_EQ(task1.task_id(), status6->task_id());
+ EXPECT_EQ(TASK_KILLED, status6->state());
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
@@ -1923,25 +1961,37 @@ TEST_P(PersistentVolumeTest, SlaveRecovery)
taskResources,
"while true; do test -d path1; done");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
+ // Wait for the ACK to be checkpointed.
+ AWAIT_READY(ack1);
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack2);
// Restart the slave.
slave.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 64a1d3d..8ae2860 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -1178,22 +1178,33 @@ TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Now, induce a partition of the slave by having the master
// timeout the slave.
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index e70dd0d..3645cd8 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -378,6 +378,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+ // Expect one TASK_STARTING and one TASK_RUNNING update
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
@@ -407,16 +408,21 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
// recovers 'offered' resources portion.
TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _)).
+ WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for update acks.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
// Summon an offer to receive the 'offered' resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -550,16 +556,21 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
// recovers 'offered' resources portion.
TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for update acks from TASK_STARTING and TASK_RUNNING.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
// Summon an offer to receive the 'offered' resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1575,9 +1586,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
Offer offer = offers.get()[0];
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
Resources taskResources = Resources::parse(
"cpus(role):2;mem(role):512;cpus:2;mem:1024").get();
@@ -1586,8 +1599,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
driver.acceptOffers({offer.id()}, {LAUNCH({task})});
- AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning->state());
Future<Response> response = process::http::get(
agent.get()->pid,
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/role_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp
index 568ea90..084555a 100644
--- a/src/tests/role_tests.cpp
+++ b/src/tests/role_tests.cpp
@@ -979,9 +979,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
taskResources,
"! (ls -Av path | grep -q .)");
- // We expect two status updates for the task.
- Future<TaskStatus> status1, status2;
+ // We expect three status updates for the task.
+ Future<TaskStatus> status0, status1, status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -990,6 +991,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
{offer.id()},
{RESERVE(reservedDisk), CREATE(volume), LAUNCH({task})});
+ AWAIT_READY(status0);
+
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4eda96e..6df4d32 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -623,11 +623,15 @@ TEST_P(SchedulerTest, TaskGroupRunning)
taskGroup.add_tasks()->CopyFrom(task1);
taskGroup.add_tasks()->CopyFrom(task2);
+ Future<Event::Update> startingUpdate1;
+ Future<Event::Update> startingUpdate2;
Future<Event::Update> runningUpdate1;
Future<Event::Update> runningUpdate2;
Future<Event::Update> finishedUpdate1;
Future<Event::Update> finishedUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&startingUpdate1))
+ .WillOnce(FutureArg<1>(&startingUpdate2))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2))
.WillOnce(FutureArg<1>(&finishedUpdate1))
@@ -669,14 +673,58 @@ TEST_P(SchedulerTest, TaskGroupRunning)
EXPECT_EQ(devolve(task2.task_id()),
runTaskGroupMessage->task_group().tasks(1).task_id());
+ AWAIT_READY(startingUpdate1);
+ ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state());
+
+ AWAIT_READY(startingUpdate2);
+ ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state());
+
+ const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
+
+ // TASK_STARTING updates for the tasks in a
+ // task group can be received in any order.
+ const hashset<v1::TaskID> tasksStarting{
+ startingUpdate1->status().task_id(),
+ startingUpdate2->status().task_id()};
+
+ ASSERT_EQ(tasks, tasksStarting);
+
+ // Acknowledge the TASK_STARTING updates so
+ // that subsequent updates can be received.
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(
+ startingUpdate1->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+ acknowledge->set_uuid(startingUpdate1->status().uuid());
+
+ mesos.send(call);
+ }
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(
+ startingUpdate2->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+ acknowledge->set_uuid(startingUpdate2->status().uuid());
+
+ mesos.send(call);
+ }
+
AWAIT_READY(runningUpdate1);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
AWAIT_READY(runningUpdate2);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
- const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
-
// TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 868e39e..2dcfd6c 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -627,10 +627,12 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
// The first task should fail since the task user `foo` is not an
// authorized user that can launch a task. However, the second task
// should succeed.
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -638,16 +640,19 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
{offer.id()},
{LAUNCH({task1, task2})});
- // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+ // Wait for TASK_FAILED for 1st task, and TASK_STARTING followed by
+ // TASK_RUNNING for 2nd task.
+ AWAIT_READY(status0);
AWAIT_READY(status1);
AWAIT_READY(status2);
// Validate both the statuses. Note that the order of receiving the
- // status updates for the 2 tasks is not deterministic.
- hashmap<TaskID, TaskStatus> statuses {
- {status1->task_id(), status1.get()},
- {status2->task_id(), status2.get()}
- };
+ // status updates for the 2 tasks is not deterministic, but we know
+ // that task2's TASK_RUNNING ARRIVES after TASK_STARTING.
+ hashmap<TaskID, TaskStatus> statuses;
+ statuses[status0->task_id()] = status0.get();
+ statuses[status1->task_id()] = status1.get();
+ statuses[status2->task_id()] = status2.get();
ASSERT_TRUE(statuses.contains(task1.task_id()));
EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
@@ -741,7 +746,7 @@ TEST_F(ExecutorAuthorizationTest, RunTaskGroup)
AWAIT_READY(status);
ASSERT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 30d8c23..c2d9cc8 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -252,7 +252,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
// Capture the update.
AWAIT_READY(update);
- EXPECT_EQ(TASK_RUNNING, update->update().status().state());
+ EXPECT_EQ(TASK_STARTING, update->update().status().state());
// Wait for the ACK to be checkpointed.
AWAIT_READY(_statusUpdateAcknowledgement);
@@ -314,7 +314,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
.info);
// Check status update and ack.
- ASSERT_EQ(
+ // (the number might be bigger than 1 because we might have
+ // received any number of additional TASK_RUNNING updates)
+ ASSERT_LE(
1U,
state
.frameworks[frameworkId]
@@ -423,7 +425,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
ASSERT_SOME(slave);
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -514,7 +516,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor)
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -762,13 +764,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
// Drop the first update from the executor.
- Future<StatusUpdateMessage> statusUpdate =
+ Future<StatusUpdateMessage> startingUpdate =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
// Stop the slave before the status update is received.
- AWAIT_READY(statusUpdate);
+ AWAIT_READY(startingUpdate);
slave.get()->terminate();
@@ -791,15 +793,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
// Ensure the executor re-registers.
AWAIT_READY(reregister);
- // Executor should inform about the unacknowledged update.
- ASSERT_EQ(1, reregister->updates_size());
+ // Executor should inform about the unacknowledged updates.
+ ASSERT_LE(1, reregister->updates_size());
const StatusUpdate& update = reregister->updates(0);
EXPECT_EQ(task.task_id(), update.status().task_id());
- EXPECT_EQ(TASK_RUNNING, update.status().state());
+ EXPECT_EQ(TASK_STARTING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -846,7 +848,8 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
Future<TaskStatus> statusUpdate;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&statusUpdate));
+ .WillOnce(FutureArg<1>(&statusUpdate))
+ .WillRepeatedly(Return()); // Ignore subsequent TASK_RUNNING updates.
driver.start();
@@ -863,7 +866,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(statusUpdate);
- EXPECT_EQ(TASK_RUNNING, statusUpdate->state());
+ EXPECT_EQ(TASK_STARTING, statusUpdate->state());
// Ensure the acknowledgement is checkpointed.
Clock::settle();
@@ -974,12 +977,19 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ Future<TaskStatus> statusUpdate0;
Future<TaskStatus> statusUpdate1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusUpdate0))
.WillOnce(FutureArg<1>(&statusUpdate1));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusUpdate0);
+ ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+ driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
AWAIT_READY(statusUpdate1);
ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
@@ -1442,7 +1452,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor)
Future<vector<Offer>> offers1;
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(FutureArg<1>(&offers1));
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
@@ -1451,7 +1462,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillRepeatedly(Return()); // Allow any number of subsequent status updates.
Future<Nothing> ack =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
@@ -1583,7 +1595,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Future<vector<Offer>> offers1;
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(FutureArg<1>(&offers1));
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
@@ -1595,25 +1608,42 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Future<Message> registerExecutor =
FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
- EXPECT_CALL(sched, statusUpdate(_, _));
- Future<Nothing> ack =
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(Return()); // Ignore subsequent status updates.
+
+ Future<Nothing> ackRunning =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> ackStarting =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
+ // Wait for the TASK_STARTING update from the executor
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ // Wait for the TASK_RUNNING update from the executor
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
// Capture the executor pid.
AWAIT_READY(registerExecutor);
UPID executorPid = registerExecutor->from;
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ackStarting);
+ AWAIT_READY(ackRunning);
slave.get()->terminate();
- Future<TaskStatus> status;
+ Future<TaskStatus> statusLost;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status))
+ .WillOnce(FutureArg<1>(&statusLost))
.WillRepeatedly(Return()); // Ignore subsequent status updates.
// Now shut down the executor, when the slave is down.
@@ -1644,18 +1674,18 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Clock::advance(flags.executor_reregistration_timeout);
// Now advance time until the reaper reaps the executor.
- while (status.isPending()) {
+ while (statusLost.isPending()) {
Clock::advance(process::MAX_REAP_INTERVAL());
Clock::settle();
}
// Scheduler should receive the TASK_LOST update.
- AWAIT_READY(status);
+ AWAIT_READY(statusLost);
- EXPECT_EQ(TASK_LOST, status->state());
- EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+ EXPECT_EQ(TASK_LOST, statusLost->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statusLost->source());
EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT,
- status->reason());
+ statusLost->reason());
while (offers2.isPending()) {
Clock::advance(Seconds(1));
@@ -1816,7 +1846,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
TaskInfo task = createTask(offers1.get()[0], "exit 0");
EXPECT_CALL(sched, statusUpdate(_, _))
- .Times(2); // TASK_RUNNING and TASK_FINISHED updates.
+ .Times(3); // TASK_STARTING, TASK_RUNNING and TASK_FINISHED updates.
EXPECT_CALL(sched, offerRescinded(_, _))
.Times(AtMost(1));
@@ -2010,15 +2040,22 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect TASK_STARTING and TASK_RUNNING updates
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2)
+ .WillRepeatedly(Return()); // Ignore subsequent updates
- Future<Nothing> ack =
+ Future<Nothing> ackRunning =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ackStarting =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ackStarting);
+ AWAIT_READY(ackRunning);
slave.get()->terminate();
@@ -2129,17 +2166,23 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
Resources(offer1.resources()) +
Resources(offer2.resources())));
+ Future<Nothing> update0;
Future<Nothing> update1;
Future<Nothing> update2;
+ Future<Nothing> update3;
EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureSatisfy(&update0))
.WillOnce(FutureSatisfy(&update1))
- .WillOnce(FutureSatisfy(&update2));
+ .WillOnce(FutureSatisfy(&update2))
+ .WillOnce(FutureSatisfy(&update3));
driver.launchTasks(offers.get()[0].id(), tasks);
- // Wait for TASK_RUNNING updates from the tasks.
+ // Wait for TASK_STARTING and TASK_RUNNING updates from the tasks.
+ AWAIT_READY(update0);
AWAIT_READY(update1);
AWAIT_READY(update2);
+ AWAIT_READY(update3);
// The master should generate TASK_LOST updates once the slave is stopped.
Future<TaskStatus> status1;
@@ -2439,15 +2482,21 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING update
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack1);
+ AWAIT_READY(ack2);
slave.get()->terminate();
@@ -3068,9 +3117,10 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- Future<Nothing> statusUpdate1;
+ Future<Nothing> statusUpdate1, statusUpdate2;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureSatisfy(&statusUpdate1))
+ .WillOnce(FutureSatisfy(&statusUpdate1)) // TASK_STARTING
+ .WillOnce(FutureSatisfy(&statusUpdate2)) // TASK_RUNNING
.WillOnce(Return()); // Ignore TASK_FAILED update.
Future<Message> registerExecutor =
@@ -3082,7 +3132,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
AWAIT_READY(registerExecutor);
UPID executorPid = registerExecutor->from;
- AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+ AWAIT_READY(statusUpdate2); // Wait for TASK_RUNNING update.
EXPECT_CALL(sched, offerRescinded(_, _))
.Times(AtMost(1));
@@ -3186,18 +3236,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting, statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
- Future<TaskStatus> status2;
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ Future<TaskStatus> statusLost;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&statusLost));
Future<Nothing> slaveLost;
EXPECT_CALL(sched, slaveLost(_, _))
@@ -3223,11 +3277,11 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
AWAIT_READY(executorTerminated);
// The master should send a TASK_LOST and slaveLost.
- AWAIT_READY(status2);
+ AWAIT_READY(statusLost);
- EXPECT_EQ(TASK_LOST, status2->state());
- EXPECT_EQ(TaskStatus::SOURCE_MASTER, status2->source());
- EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status2->reason());
+ EXPECT_EQ(TASK_LOST, statusLost->state());
+ EXPECT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source());
+ EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, statusLost->reason());
AWAIT_READY(slaveLost);
@@ -3415,16 +3469,21 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
SlaveID slaveId = offers1.get()[0].slave_id();
FrameworkID frameworkId = offers1.get()[0].framework_id();
- // Expecting TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expecting TASK_STARTING and TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for TASK_RUNNING update to be acknowledged.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3513,18 +3572,24 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
// Capture the framework id.
FrameworkID frameworkId = offers.get()[0].framework_id();
- // Expecting TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expecting a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2)
+ .WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
- // Wait for TASK_RUNNING update to be acknowledged.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for the updates to be acknowledged.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3661,15 +3726,30 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
// re-registers by wiping the relevant meta directory.
TaskInfo task = createTask(offers1.get()[0], "sleep 10");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ Future<TaskStatus> starting;
+ Future<TaskStatus> running;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&starting))
+ .WillOnce(FutureArg<1>(&running))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> startingAck =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> runningAck =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(starting);
+ AWAIT_READY(startingAck);
+
+ AWAIT_READY(running);
+ AWAIT_READY(runningAck);
+
+ EXPECT_EQ(TASK_STARTING, starting->state());
+ EXPECT_EQ(TASK_RUNNING, running->state());
EXPECT_CALL(allocator, deactivateSlave(_));
@@ -3824,15 +3904,21 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
// Create a long running task.
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched1, statusUpdate(_, _));
+ // Expecting TASK_STARTING and TASK_RUNNING updates
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver1.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3975,15 +4061,20 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2); // TASK_STARTING and TASK_RUNNING
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
- // Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for both ACKs to be checkpointed.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -4117,15 +4208,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
// Framework 1 launches a task.
TaskInfo task1 = createTask(offer1, "sleep 1000");
- EXPECT_CALL(sched1, statusUpdate(_, _));
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement1 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement1 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver1.launchTasks(offer1.id(), {task1});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement1);
// Framework 2. Enable checkpointing.
FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
@@ -4150,14 +4246,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
// Framework 2 launches a task.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
- EXPECT_CALL(sched2, statusUpdate(_, _));
+ EXPECT_CALL(sched2, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement2 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
driver2.launchTasks(offers2.get()[0].id(), {task2});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement2);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -4281,15 +4383,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
// Launch a long running task in the first slave.
TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement1 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement1 =
FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task1});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement1);
Future<vector<Offer>> offers2;
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -4316,15 +4423,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
// Launch a long running task in each slave.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement2 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement2 =
FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers2.get()[0].id(), {task2});
// Wait for the ACKs to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement2);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement2);
slave1.get()->terminate();
slave2.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 76a157f..def64b5 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -680,16 +680,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, CommandTaskWithArguments)
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING, followed by
+ // TASK_RUNNING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -746,12 +752,17 @@ TEST_F(SlaveTest, CommandTaskWithKillPolicy)
task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds(
gracePeriod.ns());
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1051,16 +1062,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING followed by
+ // TASK_RUNNING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -2381,15 +2398,21 @@ TEST_F(SlaveTest, StatisticsEndpointRunningExecutor)
Resources::parse("cpus:1;mem:32").get(),
SLEEP_COMMAND(1000));
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(task.task_id(), statusRunning->task_id());
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Hit the statistics endpoint and expect the response contains the
// resource statistics for the running container.
@@ -5207,16 +5230,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorEnvironmentVariables)
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING, followed by
+ // TASK_STARTING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -5562,7 +5590,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, HTTPSchedulerSlaveRestart)
UPID executorPid = registerExecutorMessage->from;
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
// Restart the slave.
slave.get()->terminate();
@@ -6829,14 +6857,23 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
- Future<v1::scheduler::Event::Update> update;
-
- EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
-
const v1::Offer offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> updateStarting;
+ Future<v1::scheduler::Event::Update> updateRunning;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateRunning),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
@@ -6878,28 +6915,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
mesos.send(call);
}
- AWAIT_READY(update);
-
- ASSERT_EQ(TASK_RUNNING, update->status().state());
- ASSERT_EQ(taskInfo.task_id(), update->status().task_id());
-
- Future<Nothing> _statusUpdateAcknowledgement =
- FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
+ AWAIT_READY(updateStarting);
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
- acknowledge->mutable_task_id()->CopyFrom(update->status().task_id());
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(update->status().uuid());
+ ASSERT_EQ(TASK_STARTING, updateStarting->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
- mesos.send(call);
- }
+ AWAIT_READY(updateRunning);
- AWAIT_READY(_statusUpdateAcknowledgement);
+ ASSERT_EQ(TASK_RUNNING, updateRunning->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
// Restart the agent.
slave.get()->terminate();
@@ -8177,12 +8201,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag)
TaskInfo task = createTask(offers->front(), "sleep 1000");
+ Future<TaskStatus> statusUpdate0;
Future<TaskStatus> statusUpdate1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusUpdate0))
.WillOnce(FutureArg<1>(&statusUpdate1));
driver.launchTasks(offers->front().id(), {task});
+ AWAIT_READY(statusUpdate0);
+ ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+ driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
AWAIT_READY(statusUpdate1);
ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/teardown_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/teardown_tests.cpp b/src/tests/teardown_tests.cpp
index 5eada4f..392cacf 100644
--- a/src/tests/teardown_tests.cpp
+++ b/src/tests/teardown_tests.cpp
@@ -353,20 +353,31 @@ TEST_F(TeardownTest, RecoveredFrameworkAfterMasterFailover)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate master failover. We leave the scheduler without a master
// so it does not attempt to re-register.