You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/04/12 19:38:20 UTC

svn commit: r1467371 - /incubator/mesos/trunk/src/tests/resource_offers_tests.cpp

Author: benh
Date: Fri Apr 12 17:38:20 2013
New Revision: 1467371

URL: http://svn.apache.org/r1467371
Log:
Updated resource offers tests to use new testing abstractions.

Review: https://reviews.apache.org/r/10306

Modified:
    incubator/mesos/trunk/src/tests/resource_offers_tests.cpp

Modified: incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/resource_offers_tests.cpp?rev=1467371&r1=1467370&r2=1467371&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/resource_offers_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/resource_offers_tests.cpp Fri Apr 12 17:38:20 2013
@@ -18,18 +18,13 @@
 
 #include <gmock/gmock.h>
 
-#include <map>
-#include <string>
 #include <vector>
 
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
-#include "detector/detector.hpp"
-
 #include "local/local.hpp"
 
-#include "master/allocator.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
@@ -41,24 +36,19 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
-using mesos::internal::master::Allocator;
 using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
 
+using process::Future;
 using process::PID;
 
-using std::map;
-using std::string;
 using std::vector;
 
 using testing::_;
 using testing::AtMost;
-using testing::DoAll;
-using testing::ElementsAre;
 using testing::Return;
-using testing::SaveArg;
 
 
 TEST(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
@@ -70,29 +60,21 @@ TEST(ResourceOffersTest, ResourceOfferWi
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
-  vector<Offer> offers;
-
-  trigger resourceOffersCall;
-
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&resourceOffersCall)))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(sched, offerRescinded(&driver, _))
-    .Times(AtMost(1));
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // All 10 slaves might not be in first offer.
 
   driver.start();
 
-  WAIT_UNTIL(resourceOffersCall);
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
+  EXPECT_GE(10u, offers.get().size());
 
-  EXPECT_NE(0u, offers.size());
-  EXPECT_GE(10u, offers.size());
-
-  Resources resources(offers[0].resources());
+  Resources resources(offers.get()[0].resources());
   EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value());
   EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value());
 
@@ -112,49 +94,39 @@ TEST(ResourceOffersTest, TaskUsesNoResou
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
-  vector<Offer> offers;
-
-  trigger resourceOffersCall;
-
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&resourceOffersCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
-  WAIT_UNTIL(resourceOffersCall);
-
-  EXPECT_NE(0u, offers.size());
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
 
   TaskInfo task;
   task.set_name("");
   task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
 
   vector<TaskInfo> tasks;
   tasks.push_back(task);
 
-  TaskStatus status;
-
-  trigger statusUpdateCall;
-
+  Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status),
-                    Trigger(&statusUpdateCall)));
+    .WillOnce(FutureArg<1>(&status));
 
-  driver.launchTasks(offers[0].id(), tasks);
+  driver.launchTasks(offers.get()[0].id(), tasks);
 
-  WAIT_UNTIL(statusUpdateCall);
-
-  EXPECT_EQ(task.task_id(), status.task_id());
-  EXPECT_EQ(TASK_LOST, status.state());
-  EXPECT_TRUE(status.has_message());
-  EXPECT_EQ("Task uses no resources", status.message());
+  AWAIT_UNTIL(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_EQ("Task uses no resources", status.get().message());
 
   driver.stop();
   driver.join();
@@ -172,28 +144,23 @@ TEST(ResourceOffersTest, TaskUsesInvalid
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
-  vector<Offer> offers;
-
-  trigger resourceOffersCall;
-
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&resourceOffersCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
-  WAIT_UNTIL(resourceOffersCall);
-
-  EXPECT_NE(0u, offers.size());
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
 
   TaskInfo task;
   task.set_name("");
   task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
 
   Resource* cpus = task.add_resources();
@@ -204,22 +171,17 @@ TEST(ResourceOffersTest, TaskUsesInvalid
   vector<TaskInfo> tasks;
   tasks.push_back(task);
 
-  TaskStatus status;
-
-  trigger statusUpdateCall;
-
+  Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status),
-                    Trigger(&statusUpdateCall)));
-
-  driver.launchTasks(offers[0].id(), tasks);
+    .WillOnce(FutureArg<1>(&status));
 
-  WAIT_UNTIL(statusUpdateCall);
+  driver.launchTasks(offers.get()[0].id(), tasks);
 
-  EXPECT_EQ(task.task_id(), status.task_id());
-  EXPECT_EQ(TASK_LOST, status.state());
-  EXPECT_TRUE(status.has_message());
-  EXPECT_EQ("Task uses invalid resources", status.message());
+  AWAIT_UNTIL(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_EQ("Task uses invalid resources", status.get().message());
 
   driver.stop();
   driver.join();
@@ -237,28 +199,23 @@ TEST(ResourceOffersTest, TaskUsesMoreRes
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
-  vector<Offer> offers;
-
-  trigger resourceOffersCall;
-
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&resourceOffersCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
-  WAIT_UNTIL(resourceOffersCall);
-
-  EXPECT_NE(0u, offers.size());
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
 
   TaskInfo task;
   task.set_name("");
   task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
 
   Resource* cpus = task.add_resources();
@@ -269,22 +226,18 @@ TEST(ResourceOffersTest, TaskUsesMoreRes
   vector<TaskInfo> tasks;
   tasks.push_back(task);
 
-  TaskStatus status;
-
-  trigger statusUpdateCall;
-
+  Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status),
-                    Trigger(&statusUpdateCall)));
+    .WillOnce(FutureArg<1>(&status));
 
-  driver.launchTasks(offers[0].id(), tasks);
+  driver.launchTasks(offers.get()[0].id(), tasks);
 
-  WAIT_UNTIL(statusUpdateCall);
+  AWAIT_UNTIL(status);
 
-  EXPECT_EQ(task.task_id(), status.task_id());
-  EXPECT_EQ(TASK_LOST, status.state());
-  EXPECT_TRUE(status.has_message());
-  EXPECT_EQ("Task uses more resources than offered", status.message());
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_EQ("Task uses more resources than offered", status.get().message());
 
   driver.stop();
   driver.join();
@@ -293,7 +246,7 @@ TEST(ResourceOffersTest, TaskUsesMoreRes
 }
 
 
-TEST(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
+TEST(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
@@ -302,27 +255,17 @@ TEST(ResourceOffersTest, ResourcesGetReo
   MockScheduler sched1;
   MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
 
-  vector<Offer> offers;
-
-  trigger sched1ResourceOfferCall;
-
   EXPECT_CALL(sched1, registered(&driver1, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&sched1ResourceOfferCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(FutureArg<1>(&offers));
 
   driver1.start();
 
-  WAIT_UNTIL(sched1ResourceOfferCall);
-
-  EXPECT_NE(0u, offers.size());
-
-  vector<TaskInfo> tasks; // Use nothing!
-
-  driver1.launchTasks(offers[0].id(), tasks);
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
 
   driver1.stop();
   driver1.join();
@@ -330,21 +273,15 @@ TEST(ResourceOffersTest, ResourcesGetReo
   MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
 
-  trigger sched2ResourceOfferCall;
-
   EXPECT_CALL(sched2, registered(&driver2, _, _))
     .Times(1);
 
   EXPECT_CALL(sched2, resourceOffers(&driver2, _))
-    .WillOnce(Trigger(&sched2ResourceOfferCall))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
-    .Times(AtMost(1));
+    .WillOnce(FutureArg<1>(&offers));
 
   driver2.start();
 
-  WAIT_UNTIL(sched2ResourceOfferCall);
+  AWAIT_UNTIL(offers);
 
   driver2.stop();
   driver2.join();
@@ -353,7 +290,7 @@ TEST(ResourceOffersTest, ResourcesGetReo
 }
 
 
-TEST(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
+TEST(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
@@ -362,28 +299,71 @@ TEST(ResourceOffersTest, ResourcesGetReo
   MockScheduler sched1;
   MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
 
-  vector<Offer> offers;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers));
 
-  trigger sched1ResourceOffersCall;
+  driver1.start();
+
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  vector<TaskInfo> tasks; // Use nothing!
+  driver1.launchTasks(offers.get()[0].id(), tasks);
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched2, registered(&driver2, _, _))
+    .Times(1);
+
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver2.start();
+
+  AWAIT_UNTIL(offers);
+
+  // Stop first framework before second so no offers are sent.
+  driver1.stop();
+  driver1.join();
+
+  driver2.stop();
+  driver2.join();
+
+  local::shutdown();
+}
+
+
+TEST(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, 1 * Gigabyte, false);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
 
   EXPECT_CALL(sched1, registered(&driver1, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&sched1ResourceOffersCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver1.start();
 
-  WAIT_UNTIL(sched1ResourceOffersCall);
-
-  EXPECT_NE(0u, offers.size());
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
 
   TaskInfo task;
   task.set_name("");
   task.mutable_task_id()->set_value("1");
-  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
 
   Resource* cpus = task.add_resources();
@@ -399,44 +379,34 @@ TEST(ResourceOffersTest, ResourcesGetReo
   vector<TaskInfo> tasks;
   tasks.push_back(task);
 
-  TaskStatus status;
-
-  trigger sched1StatusUpdateCall;
-
+  Future<TaskStatus> status;
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
-    .WillOnce(DoAll(SaveArg<1>(&status),
-                    Trigger(&sched1StatusUpdateCall)));
-
-  driver1.launchTasks(offers[0].id(), tasks);
+    .WillOnce(FutureArg<1>(&status));
 
-  WAIT_UNTIL(sched1StatusUpdateCall);
+  driver1.launchTasks(offers.get()[0].id(), tasks);
 
-  EXPECT_EQ(task.task_id(), status.task_id());
-  EXPECT_EQ(TASK_LOST, status.state());
-  EXPECT_TRUE(status.has_message());
-  EXPECT_EQ("Task uses invalid resources", status.message());
-
-  driver1.stop();
-  driver1.join();
+  AWAIT_UNTIL(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(status.get().has_message());
+  EXPECT_EQ("Task uses invalid resources", status.get().message());
 
   MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
 
-  trigger sched2ResourceOffersCall;
-
   EXPECT_CALL(sched2, registered(&driver2, _, _))
     .Times(1);
 
   EXPECT_CALL(sched2, resourceOffers(&driver2, _))
-    .WillOnce(Trigger(&sched2ResourceOffersCall))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
-    .Times(AtMost(1));
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver2.start();
 
-  WAIT_UNTIL(sched2ResourceOffersCall);
+  AWAIT_UNTIL(offers);
+
+  driver1.stop();
+  driver1.join();
 
   driver2.stop();
   driver2.join();
@@ -456,120 +426,89 @@ TEST(ResourceOffersTest, Request)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  MockScheduler sched;
+  Cluster cluster;
 
   MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _));
-
-  EXPECT_CALL(allocator, frameworkAdded(_, _, _));
-
-  EXPECT_CALL(allocator, frameworkDeactivated(_));
-
-  EXPECT_CALL(allocator, frameworkRemoved(_));
-
-  EXPECT_CALL(allocator, slaveAdded(_, _, _))
-    .WillRepeatedly(Return()); // Test may finish before slave registers.
-
-  EXPECT_CALL(allocator, slaveRemoved(_))
-    .WillRepeatedly(Return()); // Test may finish before slave registers.
-
-  Allocator a(&allocator);
+  EXPECT_CALL(allocator, initialize(_, _))
+    .Times(1);
 
-  PID<Master> master = local::launch(
-      1, 2, 1 * Gigabyte, 1 * Gigabyte, false, &a);
+  Try<PID<Master> > master = cluster.masters.start(&allocator);
+  ASSERT_SOME(master);
 
-  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
 
-  trigger registeredCall;
+  EXPECT_CALL(allocator, frameworkAdded(_, _, _))
+    .Times(1);
 
+  Future<Nothing> registered;
   EXPECT_CALL(sched, registered(&driver, _, _))
-    .WillOnce(Trigger(&registeredCall));
+    .WillOnce(FutureSatisfy(&registered));
 
   driver.start();
 
-  WAIT_UNTIL(registeredCall);
+  AWAIT_UNTIL(registered);
 
-  vector<Request> requestsSent;
-  vector<Request> requestsReceived;
+  vector<Request> sent;
   Request request;
   request.mutable_slave_id()->set_value("test");
-  requestsSent.push_back(request);
+  sent.push_back(request);
 
-  trigger resourcesRequestedCall;
+  Future<vector<Request> > received;
   EXPECT_CALL(allocator, resourcesRequested(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&requestsReceived),
-                    Trigger(&resourcesRequestedCall)));
+    .WillOnce(FutureArg<1>(&received));
 
-  driver.requestResources(requestsSent);
+  driver.requestResources(sent);
 
-  WAIT_UNTIL(resourcesRequestedCall);
+  AWAIT_UNTIL(received);
+  EXPECT_EQ(sent.size(), received.get().size());
+  EXPECT_NE(0u, received.get().size());
+  EXPECT_EQ(request.slave_id(), received.get()[0].slave_id());
 
-  EXPECT_EQ(requestsSent.size(), requestsReceived.size());
-  EXPECT_NE(0u, requestsReceived.size());
-  EXPECT_EQ(request.slave_id(), requestsReceived[0].slave_id());
+  EXPECT_CALL(allocator, frameworkDeactivated(_))
+    .Times(AtMost(1)); // Races with shutting down the cluster.
+
+  EXPECT_CALL(allocator, frameworkRemoved(_))
+    .Times(AtMost(1)); // Races with shutting down the cluster.
 
   driver.stop();
   driver.join();
 
-  local::shutdown();
+  cluster.shutdown();
 }
 
 
-class MultipleExecutorsTest : public MesosTest
-{};
+class MultipleExecutorsTest : public MesosClusterTest {};
 
 
 TEST_F(MultipleExecutorsTest, TasksExecutorInfoDiffers)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  HierarchicalDRFAllocatorProcess allocator;
-  Allocator a(&allocator);
-  Files files;
-  Master m(&a, &files);
-  PID<Master> master = process::spawn(&m);
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
 
   MockExecutor exec;
 
-  trigger shutdownCall;
-
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .Times(AtMost(1));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillRepeatedly(Return()); // Test expects we won't send any status updates!
-
-  EXPECT_CALL(exec, shutdown(_))
-    .WillOnce(Trigger(&shutdownCall));
-
-  TestingIsolator isolator(DEFAULT_EXECUTOR_ID, &exec);
-
-  Slave s(this->slaveFlags, true, &isolator, &files);
-  PID<Slave> slave = process::spawn(&s);
-
-  BasicMasterDetector detector(master, slave, true);
+  Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+  ASSERT_SOME(master);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
-
-  vector<Offer> offers;
-
-  trigger resourceOffersCall;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
 
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
 
+  Future<vector<Offer> > offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-                    Trigger(&resourceOffersCall)))
-    .WillRepeatedly(Return());
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
-  WAIT_UNTIL(resourceOffersCall);
-
-  EXPECT_NE(0u, offers.size());
+  AWAIT_UNTIL(offers);
+  EXPECT_NE(0u, offers.get().size());
 
   ExecutorInfo executor;
   executor.mutable_executor_id()->set_value("default");
@@ -578,7 +517,7 @@ TEST_F(MultipleExecutorsTest, TasksExecu
   TaskInfo task1;
   task1.set_name("");
   task1.mutable_task_id()->set_value("1");
-  task1.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
   task1.mutable_executor()->MergeFrom(executor);
 
@@ -587,7 +526,7 @@ TEST_F(MultipleExecutorsTest, TasksExecu
   TaskInfo task2;
   task2.set_name("");
   task2.mutable_task_id()->set_value("2");
-  task2.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
   task2.mutable_executor()->MergeFrom(executor);
 
@@ -595,32 +534,39 @@ TEST_F(MultipleExecutorsTest, TasksExecu
   tasks.push_back(task1);
   tasks.push_back(task2);
 
-  TaskStatus status;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
 
-  trigger statusUpdateCall;
+  // Grab the "good" task but don't send a status update.
+  Future<TaskInfo> task;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureArg<1>(&task));
 
+  Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status),
-                    Trigger(&statusUpdateCall)));
+    .WillOnce(FutureArg<1>(&status));
 
-  driver.launchTasks(offers[0].id(), tasks);
+  driver.launchTasks(offers.get()[0].id(), tasks);
 
-  WAIT_UNTIL(statusUpdateCall);
+  AWAIT_UNTIL(task);
+  EXPECT_EQ(task1.task_id(), task.get().task_id());
 
-  EXPECT_EQ(task2.task_id(), status.task_id());
-  EXPECT_EQ(TASK_LOST, status.state());
-  EXPECT_TRUE(status.has_message());
+  AWAIT_UNTIL(status);
+  EXPECT_EQ(task2.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(status.get().has_message());
   EXPECT_EQ("Task has invalid ExecutorInfo (existing ExecutorInfo"
-            " with same ExecutorID is not compatible)", status.message());
+            " with same ExecutorID is not compatible)",
+            status.get().message());
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
 
   driver.stop();
   driver.join();
 
-  WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
-
-  process::terminate(slave);
-  process::wait(slave);
+  AWAIT_UNTIL(shutdown); // To ensure can deallocate MockExecutor.
 
-  process::terminate(master);
-  process::wait(master);
+  cluster.shutdown();
 }