You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/04/12 19:38:20 UTC
svn commit: r1467371 -
/incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
Author: benh
Date: Fri Apr 12 17:38:20 2013
New Revision: 1467371
URL: http://svn.apache.org/r1467371
Log:
Updated resource offers tests to use new testing abstractions.
Review: https://reviews.apache.org/r/10306
Modified:
incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
Modified: incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/resource_offers_tests.cpp?rev=1467371&r1=1467370&r2=1467371&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/resource_offers_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/resource_offers_tests.cpp Fri Apr 12 17:38:20 2013
@@ -18,18 +18,13 @@
#include <gmock/gmock.h>
-#include <map>
-#include <string>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
-#include "detector/detector.hpp"
-
#include "local/local.hpp"
-#include "master/allocator.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
@@ -41,24 +36,19 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
-using mesos::internal::master::Allocator;
using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
+using process::Future;
using process::PID;
-using std::map;
-using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
-using testing::DoAll;
-using testing::ElementsAre;
using testing::Return;
-using testing::SaveArg;
TEST(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
@@ -70,29 +60,21 @@ TEST(ResourceOffersTest, ResourceOfferWi
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
- vector<Offer> offers;
-
- trigger resourceOffersCall;
-
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffersCall)))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(sched, offerRescinded(&driver, _))
- .Times(AtMost(1));
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // All 10 slaves might not be in first offer.
driver.start();
- WAIT_UNTIL(resourceOffersCall);
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
+ EXPECT_GE(10u, offers.get().size());
- EXPECT_NE(0u, offers.size());
- EXPECT_GE(10u, offers.size());
-
- Resources resources(offers[0].resources());
+ Resources resources(offers.get()[0].resources());
EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value());
EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value());
@@ -112,49 +94,39 @@ TEST(ResourceOffersTest, TaskUsesNoResou
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
- vector<Offer> offers;
-
- trigger resourceOffersCall;
-
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffersCall)))
- .WillRepeatedly(Return());
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
- WAIT_UNTIL(resourceOffersCall);
-
- EXPECT_NE(0u, offers.size());
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
- TaskStatus status;
-
- trigger statusUpdateCall;
-
+ Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&statusUpdateCall)));
+ .WillOnce(FutureArg<1>(&status));
- driver.launchTasks(offers[0].id(), tasks);
+ driver.launchTasks(offers.get()[0].id(), tasks);
- WAIT_UNTIL(statusUpdateCall);
-
- EXPECT_EQ(task.task_id(), status.task_id());
- EXPECT_EQ(TASK_LOST, status.state());
- EXPECT_TRUE(status.has_message());
- EXPECT_EQ("Task uses no resources", status.message());
+ AWAIT_UNTIL(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(status.get().has_message());
+ EXPECT_EQ("Task uses no resources", status.get().message());
driver.stop();
driver.join();
@@ -172,28 +144,23 @@ TEST(ResourceOffersTest, TaskUsesInvalid
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
- vector<Offer> offers;
-
- trigger resourceOffersCall;
-
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffersCall)))
- .WillRepeatedly(Return());
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
- WAIT_UNTIL(resourceOffersCall);
-
- EXPECT_NE(0u, offers.size());
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Resource* cpus = task.add_resources();
@@ -204,22 +171,17 @@ TEST(ResourceOffersTest, TaskUsesInvalid
vector<TaskInfo> tasks;
tasks.push_back(task);
- TaskStatus status;
-
- trigger statusUpdateCall;
-
+ Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&statusUpdateCall)));
-
- driver.launchTasks(offers[0].id(), tasks);
+ .WillOnce(FutureArg<1>(&status));
- WAIT_UNTIL(statusUpdateCall);
+ driver.launchTasks(offers.get()[0].id(), tasks);
- EXPECT_EQ(task.task_id(), status.task_id());
- EXPECT_EQ(TASK_LOST, status.state());
- EXPECT_TRUE(status.has_message());
- EXPECT_EQ("Task uses invalid resources", status.message());
+ AWAIT_UNTIL(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(status.get().has_message());
+ EXPECT_EQ("Task uses invalid resources", status.get().message());
driver.stop();
driver.join();
@@ -237,28 +199,23 @@ TEST(ResourceOffersTest, TaskUsesMoreRes
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
- vector<Offer> offers;
-
- trigger resourceOffersCall;
-
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffersCall)))
- .WillRepeatedly(Return());
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
- WAIT_UNTIL(resourceOffersCall);
-
- EXPECT_NE(0u, offers.size());
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Resource* cpus = task.add_resources();
@@ -269,22 +226,18 @@ TEST(ResourceOffersTest, TaskUsesMoreRes
vector<TaskInfo> tasks;
tasks.push_back(task);
- TaskStatus status;
-
- trigger statusUpdateCall;
-
+ Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&statusUpdateCall)));
+ .WillOnce(FutureArg<1>(&status));
- driver.launchTasks(offers[0].id(), tasks);
+ driver.launchTasks(offers.get()[0].id(), tasks);
- WAIT_UNTIL(statusUpdateCall);
+ AWAIT_UNTIL(status);
- EXPECT_EQ(task.task_id(), status.task_id());
- EXPECT_EQ(TASK_LOST, status.state());
- EXPECT_TRUE(status.has_message());
- EXPECT_EQ("Task uses more resources than offered", status.message());
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(status.get().has_message());
+ EXPECT_EQ("Task uses more resources than offered", status.get().message());
driver.stop();
driver.join();
@@ -293,7 +246,7 @@ TEST(ResourceOffersTest, TaskUsesMoreRes
}
-TEST(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
+TEST(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
@@ -302,27 +255,17 @@ TEST(ResourceOffersTest, ResourcesGetReo
MockScheduler sched1;
MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
- vector<Offer> offers;
-
- trigger sched1ResourceOfferCall;
-
EXPECT_CALL(sched1, registered(&driver1, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&sched1ResourceOfferCall)))
- .WillRepeatedly(Return());
+ .WillOnce(FutureArg<1>(&offers));
driver1.start();
- WAIT_UNTIL(sched1ResourceOfferCall);
-
- EXPECT_NE(0u, offers.size());
-
- vector<TaskInfo> tasks; // Use nothing!
-
- driver1.launchTasks(offers[0].id(), tasks);
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
driver1.stop();
driver1.join();
@@ -330,21 +273,15 @@ TEST(ResourceOffersTest, ResourcesGetReo
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
- trigger sched2ResourceOfferCall;
-
EXPECT_CALL(sched2, registered(&driver2, _, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
- .WillOnce(Trigger(&sched2ResourceOfferCall))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(sched2, offerRescinded(&driver2, _))
- .Times(AtMost(1));
+ .WillOnce(FutureArg<1>(&offers));
driver2.start();
- WAIT_UNTIL(sched2ResourceOfferCall);
+ AWAIT_UNTIL(offers);
driver2.stop();
driver2.join();
@@ -353,7 +290,7 @@ TEST(ResourceOffersTest, ResourcesGetReo
}
-TEST(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
+TEST(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
@@ -362,28 +299,71 @@ TEST(ResourceOffersTest, ResourcesGetReo
MockScheduler sched1;
MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
- vector<Offer> offers;
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+ .WillOnce(FutureArg<1>(&offers));
- trigger sched1ResourceOffersCall;
+ driver1.start();
+
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ vector<TaskInfo> tasks; // Use nothing!
+ driver1.launchTasks(offers.get()[0].id(), tasks);
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .Times(1);
+
+ EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver2.start();
+
+ AWAIT_UNTIL(offers);
+
+ // Stop first framework before second so no offers are sent.
+ driver1.stop();
+ driver1.join();
+
+ driver2.stop();
+ driver2.join();
+
+ local::shutdown();
+}
+
+
+TEST(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ PID<Master> master = local::launch(1, 2, 1 * Gigabyte, 1 * Gigabyte, false);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
EXPECT_CALL(sched1, registered(&driver1, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&sched1ResourceOffersCall)))
- .WillRepeatedly(Return());
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver1.start();
- WAIT_UNTIL(sched1ResourceOffersCall);
-
- EXPECT_NE(0u, offers.size());
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Resource* cpus = task.add_resources();
@@ -399,44 +379,34 @@ TEST(ResourceOffersTest, ResourcesGetReo
vector<TaskInfo> tasks;
tasks.push_back(task);
- TaskStatus status;
-
- trigger sched1StatusUpdateCall;
-
+ Future<TaskStatus> status;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
- .WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&sched1StatusUpdateCall)));
-
- driver1.launchTasks(offers[0].id(), tasks);
+ .WillOnce(FutureArg<1>(&status));
- WAIT_UNTIL(sched1StatusUpdateCall);
+ driver1.launchTasks(offers.get()[0].id(), tasks);
- EXPECT_EQ(task.task_id(), status.task_id());
- EXPECT_EQ(TASK_LOST, status.state());
- EXPECT_TRUE(status.has_message());
- EXPECT_EQ("Task uses invalid resources", status.message());
-
- driver1.stop();
- driver1.join();
+ AWAIT_UNTIL(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(status.get().has_message());
+ EXPECT_EQ("Task uses invalid resources", status.get().message());
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
- trigger sched2ResourceOffersCall;
-
EXPECT_CALL(sched2, registered(&driver2, _, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
- .WillOnce(Trigger(&sched2ResourceOffersCall))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(sched2, offerRescinded(&driver2, _))
- .Times(AtMost(1));
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver2.start();
- WAIT_UNTIL(sched2ResourceOffersCall);
+ AWAIT_UNTIL(offers);
+
+ driver1.stop();
+ driver1.join();
driver2.stop();
driver2.join();
@@ -456,120 +426,89 @@ TEST(ResourceOffersTest, Request)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- MockScheduler sched;
+ Cluster cluster;
MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
- EXPECT_CALL(allocator, initialize(_, _));
-
- EXPECT_CALL(allocator, frameworkAdded(_, _, _));
-
- EXPECT_CALL(allocator, frameworkDeactivated(_));
-
- EXPECT_CALL(allocator, frameworkRemoved(_));
-
- EXPECT_CALL(allocator, slaveAdded(_, _, _))
- .WillRepeatedly(Return()); // Test may finish before slave registers.
-
- EXPECT_CALL(allocator, slaveRemoved(_))
- .WillRepeatedly(Return()); // Test may finish before slave registers.
-
- Allocator a(&allocator);
+ EXPECT_CALL(allocator, initialize(_, _))
+ .Times(1);
- PID<Master> master = local::launch(
- 1, 2, 1 * Gigabyte, 1 * Gigabyte, false, &a);
+ Try<PID<Master> > master = cluster.masters.start(&allocator);
+ ASSERT_SOME(master);
- MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
- trigger registeredCall;
+ EXPECT_CALL(allocator, frameworkAdded(_, _, _))
+ .Times(1);
+ Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(Trigger(®isteredCall));
+ .WillOnce(FutureSatisfy(®istered));
driver.start();
- WAIT_UNTIL(registeredCall);
+ AWAIT_UNTIL(registered);
- vector<Request> requestsSent;
- vector<Request> requestsReceived;
+ vector<Request> sent;
Request request;
request.mutable_slave_id()->set_value("test");
- requestsSent.push_back(request);
+ sent.push_back(request);
- trigger resourcesRequestedCall;
+ Future<vector<Request> > received;
EXPECT_CALL(allocator, resourcesRequested(_, _))
- .WillOnce(DoAll(SaveArg<1>(&requestsReceived),
- Trigger(&resourcesRequestedCall)));
+ .WillOnce(FutureArg<1>(&received));
- driver.requestResources(requestsSent);
+ driver.requestResources(sent);
- WAIT_UNTIL(resourcesRequestedCall);
+ AWAIT_UNTIL(received);
+ EXPECT_EQ(sent.size(), received.get().size());
+ EXPECT_NE(0u, received.get().size());
+ EXPECT_EQ(request.slave_id(), received.get()[0].slave_id());
- EXPECT_EQ(requestsSent.size(), requestsReceived.size());
- EXPECT_NE(0u, requestsReceived.size());
- EXPECT_EQ(request.slave_id(), requestsReceived[0].slave_id());
+ EXPECT_CALL(allocator, frameworkDeactivated(_))
+ .Times(AtMost(1)); // Races with shutting down the cluster.
+
+ EXPECT_CALL(allocator, frameworkRemoved(_))
+ .Times(AtMost(1)); // Races with shutting down the cluster.
driver.stop();
driver.join();
- local::shutdown();
+ cluster.shutdown();
}
-class MultipleExecutorsTest : public MesosTest
-{};
+class MultipleExecutorsTest : public MesosClusterTest {};
TEST_F(MultipleExecutorsTest, TasksExecutorInfoDiffers)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- HierarchicalDRFAllocatorProcess allocator;
- Allocator a(&allocator);
- Files files;
- Master m(&a, &files);
- PID<Master> master = process::spawn(&m);
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
MockExecutor exec;
- trigger shutdownCall;
-
- EXPECT_CALL(exec, registered(_, _, _, _))
- .Times(AtMost(1));
-
- EXPECT_CALL(exec, launchTask(_, _))
- .WillRepeatedly(Return()); // Test expects we won't send any status updates!
-
- EXPECT_CALL(exec, shutdown(_))
- .WillOnce(Trigger(&shutdownCall));
-
- TestingIsolator isolator(DEFAULT_EXECUTOR_ID, &exec);
-
- Slave s(this->slaveFlags, true, &isolator, &files);
- PID<Slave> slave = process::spawn(&s);
-
- BasicMasterDetector detector(master, slave, true);
+ Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+ ASSERT_SOME(master);
MockScheduler sched;
- MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
-
- vector<Offer> offers;
-
- trigger resourceOffersCall;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
+ Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffersCall)))
- .WillRepeatedly(Return());
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
- WAIT_UNTIL(resourceOffersCall);
-
- EXPECT_NE(0u, offers.size());
+ AWAIT_UNTIL(offers);
+ EXPECT_NE(0u, offers.get().size());
ExecutorInfo executor;
executor.mutable_executor_id()->set_value("default");
@@ -578,7 +517,7 @@ TEST_F(MultipleExecutorsTest, TasksExecu
TaskInfo task1;
task1.set_name("");
task1.mutable_task_id()->set_value("1");
- task1.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
task1.mutable_executor()->MergeFrom(executor);
@@ -587,7 +526,7 @@ TEST_F(MultipleExecutorsTest, TasksExecu
TaskInfo task2;
task2.set_name("");
task2.mutable_task_id()->set_value("2");
- task2.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
task2.mutable_executor()->MergeFrom(executor);
@@ -595,32 +534,39 @@ TEST_F(MultipleExecutorsTest, TasksExecu
tasks.push_back(task1);
tasks.push_back(task2);
- TaskStatus status;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
- trigger statusUpdateCall;
+ // Grab the "good" task but don't send a status update.
+ Future<TaskInfo> task;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(FutureArg<1>(&task));
+ Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&statusUpdateCall)));
+ .WillOnce(FutureArg<1>(&status));
- driver.launchTasks(offers[0].id(), tasks);
+ driver.launchTasks(offers.get()[0].id(), tasks);
- WAIT_UNTIL(statusUpdateCall);
+ AWAIT_UNTIL(task);
+ EXPECT_EQ(task1.task_id(), task.get().task_id());
- EXPECT_EQ(task2.task_id(), status.task_id());
- EXPECT_EQ(TASK_LOST, status.state());
- EXPECT_TRUE(status.has_message());
+ AWAIT_UNTIL(status);
+ EXPECT_EQ(task2.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(status.get().has_message());
EXPECT_EQ("Task has invalid ExecutorInfo (existing ExecutorInfo"
- " with same ExecutorID is not compatible)", status.message());
+ " with same ExecutorID is not compatible)",
+ status.get().message());
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
driver.stop();
driver.join();
- WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
-
- process::terminate(slave);
- process::wait(slave);
+ AWAIT_UNTIL(shutdown); // To ensure can deallocate MockExecutor.
- process::terminate(master);
- process::wait(master);
+ cluster.shutdown();
}