You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:52:42 UTC

svn commit: r1132162 - /incubator/mesos/trunk/src/tests/test_master.cpp

Author: benh
Date: Sun Jun  5 08:52:41 2011
New Revision: 1132162

URL: http://svn.apache.org/viewvc?rev=1132162&view=rev
Log:
More updates to testing.

Modified:
    incubator/mesos/trunk/src/tests/test_master.cpp

Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1132162&r1=1132161&r2=1132162&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun  5 08:52:41 2011
@@ -1,13 +1,9 @@
-#include <gtest/gtest.h>
-
-#include <boost/lexical_cast.hpp>
+#include <gmock/gmock.h>
 
 #include <mesos_exec.hpp>
 #include <mesos_sched.hpp>
 
-#include "common/date_utils.hpp"
-
-#include "event_history/event_logger.hpp"
+#include <boost/lexical_cast.hpp>
 
 #include "local/local.hpp"
 
@@ -17,275 +13,182 @@
 #include "slave/process_based_isolation_module.hpp"
 #include "slave/slave.hpp"
 
-using std::string;
-using std::vector;
-
-using boost::lexical_cast;
+#include "testing_gmock.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
+using namespace mesos::internal::test;
+
+using boost::lexical_cast;
 
-using mesos::internal::eventhistory::EventLogger;
 using mesos::internal::master::Master;
 using mesos::internal::slave::Slave;
 using mesos::internal::slave::Framework;
 using mesos::internal::slave::IsolationModule;
 using mesos::internal::slave::ProcessBasedIsolationModule;
 
-class NoopScheduler : public Scheduler
+using std::string;
+using std::map;
+using std::vector;
+
+using testing::_;
+using testing::A;
+using testing::An;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Eq;
+using testing::ElementsAre;
+using testing::Ne;
+using testing::Return;
+using testing::SaveArg;
+using testing::Sequence;
+using testing::StrEq;
+
+
+class LocalIsolationModule : public IsolationModule
 {
 public:
-  bool registeredCalled;
-  int offersGotten;
-  int slavesExpected;
+  Executor *executor;
+  MesosExecutorDriver *driver;
+  string pid;
 
-public:
-  NoopScheduler(int _slavesExpected)
-    : slavesExpected(_slavesExpected),
-      registeredCalled(false),
-      offersGotten(0)
-  {}
+  LocalIsolationModule(Executor *_executor)
+    : executor(_executor), driver(NULL) {}
 
-  virtual ~NoopScheduler() {}
+  virtual ~LocalIsolationModule() {}
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
+  virtual void initialize(Slave *slave) {
+    pid = slave->self();
   }
 
-  virtual void registered(SchedulerDriver*, FrameworkID fid) {
-    LOG(INFO) << "NoopScheduler registered with id " << fid;
-    registeredCalled = true;
-  }
+  virtual void startExecutor(Framework *framework) {
+    // TODO(benh): Cleanup the way we launch local drivers!
+    setenv("MESOS_LOCAL", "1", 1);
+    setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
+    setenv("MESOS_FRAMEWORK_ID", framework->id.c_str(), 1);
 
-  virtual void resourceOffer(SchedulerDriver *d,
-                             OfferID id,
-                             const vector<SlaveOffer>& offers) {
-    LOG(INFO) << "NoopScheduler got a slot offer";
-    offersGotten++;
-    EXPECT_EQ(slavesExpected, offers.size());
-    foreach (const SlaveOffer& offer, offers) {
-      string cpus = offer.params.find("cpus")->second;
-      string mem = offer.params.find("mem")->second;
-      EXPECT_EQ("2", cpus);
-      EXPECT_EQ(lexical_cast<string>(1 * Gigabyte), mem);
-    }
-    vector<TaskDescription> tasks;
-    d->replyToOffer(id, tasks, map<string, string>());
-    d->stop();
+    driver = new MesosExecutorDriver(executor);
+    driver->start();
   }
-}; 
 
+  virtual void killExecutor(Framework* framework) {
+    driver->stop();
+    driver->join();
+    delete driver;
 
-TEST(MasterTest, NoopFrameworkWithOneSlave)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
-  NoopScheduler sched(1);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_TRUE(sched.registeredCalled);
-  EXPECT_EQ(1, sched.offersGotten);
-  local::shutdown();
-}
+    // TODO(benh): Cleanup the way we launch local drivers!
+    unsetenv("MESOS_LOCAL");
+    unsetenv("MESOS_SLAVE_PID");
+    unsetenv("MESOS_FRAMEWORK_ID");
+  }
+};
 
 
-TEST(MasterTest, NoopFrameworkWithMultipleSlaves)
+TEST(MasterTest, ResourceOfferWithMultipleSlaves)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
   PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
-  NoopScheduler sched(10);
+
+  MockScheduler sched;
   MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_TRUE(sched.registeredCalled);
-  EXPECT_EQ(1, sched.offersGotten);
-  local::shutdown();
-}
 
+  vector<SlaveOffer> offers;
 
-class FixedResponseScheduler : public Scheduler
-{
-public:
-  vector<TaskDescription> response;
-  string errorMessage;
-  
-  FixedResponseScheduler(vector<TaskDescription> _response)
-    : response(_response) {}
+  trigger resourceOfferCall;
 
-  virtual ~FixedResponseScheduler() {}
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  virtual void resourceOffer(SchedulerDriver* d,
-                             OfferID id,
-                             const vector<SlaveOffer>& offers) {
-    LOG(INFO) << "FixedResponseScheduler got a slot offer";
-    d->replyToOffer(id, response, map<string, string>());
-  }
-  
-  virtual void error(SchedulerDriver* d,
-                     int code,
-                     const std::string& message) {
-    errorMessage = message;
-    d->stop();
-  }
-};
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
 
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)));
 
-TEST(MasterTest, DuplicateTaskIdsInResponse)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "1";
-  params["mem"] = lexical_cast<string>(1 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  tasks.push_back(TaskDescription(2, "200102030405-0-0", "", params, ""));
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Duplicate task ID: 1", sched.errorMessage);
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(AtMost(1));
 
+  driver.start();
 
-TEST(MasterTest, TooMuchMemoryInTask)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "1";
-  params["mem"] = lexical_cast<string>(4 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
+  WAIT_UNTIL(resourceOfferCall);
 
+  EXPECT_NE(0, offers.size());
+  EXPECT_GE(10, offers.size());
 
-TEST(MasterTest, TooMuchCpuInTask)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "4";
-  params["mem"] = lexical_cast<string>(1 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
+  EXPECT_EQ("2", offers[0].params["cpus"]);
+  EXPECT_EQ("1024", offers[0].params["mem"]);
 
+  driver.stop();
+  driver.join();
 
-TEST(MasterTest, TooLittleCpuInTask)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "0";
-  params["mem"] = lexical_cast<string>(1 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Invalid task size: <0 CPUs, 1024 MEM>", sched.errorMessage);
   local::shutdown();
-  DateUtils::clearMockDate();
 }
 
 
-TEST(MasterTest, TooLittleMemoryInTask)
+TEST(MasterTest, ResourcesReofferedAfterReject)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "1";
-  params["mem"] = "1";
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Invalid task size: <1 CPUs, 1 MEM>", sched.errorMessage);
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
 
+  PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
 
-TEST(MasterTest, TooMuchMemoryAcrossTasks)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "1";
-  params["mem"] = lexical_cast<string>(2 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  tasks.push_back(TaskDescription(2, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
 
+  OfferID offerId;
 
-TEST(MasterTest, TooMuchCpuAcrossTasks)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
-  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
-  vector<TaskDescription> tasks;
-  map<string, string> params;
-  params["cpus"] = "2";
-  params["mem"] = lexical_cast<string>(1 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  tasks.push_back(TaskDescription(2, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched(tasks);
-  MesosSchedulerDriver driver(&sched, master);
-  driver.run();
-  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
+  trigger sched1ResourceOfferCall;
 
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
 
-TEST(MasterTest, ResourcesReofferedAfterReject)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  NoopScheduler sched1(10);
-  MesosSchedulerDriver driver1(&sched1, master);
-  driver1.run();
-  EXPECT_TRUE(sched1.registeredCalled);
-  EXPECT_EQ(1, sched1.offersGotten);
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .Times(1);
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)));
+
+  driver1.start();
+
+  WAIT_UNTIL(sched1ResourceOfferCall);
+
+  driver1.replyToOffer(offerId, vector<TaskDescription>(), map<string, string>());
 
-  NoopScheduler sched2(10);
+  driver1.stop();
+  driver1.join();
+
+  MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, master);
-  driver2.run();
-  EXPECT_TRUE(sched2.registeredCalled);
-  EXPECT_EQ(1, sched2.offersGotten);
+
+  trigger sched2ResourceOfferCall;
+
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
+
+  EXPECT_CALL(sched2, registered(&driver2, _))
+    .Times(1);
+
+  EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+    .WillOnce(Trigger(&sched2ResourceOfferCall));
+
+  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+    .Times(AtMost(1));
+
+  driver2.start();
+
+  WAIT_UNTIL(sched2ResourceOfferCall);
+
+  driver2.stop();
+  driver2.join();
 
   local::shutdown();
 }
@@ -294,65 +197,94 @@ TEST(MasterTest, ResourcesReofferedAfter
 TEST(MasterTest, ResourcesReofferedAfterBadResponse)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  DateUtils::setMockDate("200102030405");
+
   PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
-  vector<TaskDescription> tasks;
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+
+  trigger sched1ResourceOfferCall;
+
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
+
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .Times(1);
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&sched1ResourceOfferCall)));
+
+  driver1.start();
+
+  WAIT_UNTIL(sched1ResourceOfferCall);
+
+  EXPECT_EQ(1, offers.size());
+
   map<string, string> params;
   params["cpus"] = "0";
   params["mem"] = lexical_cast<string>(1 * Gigabyte);
-  tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
-  FixedResponseScheduler sched1(tasks);
-  MesosSchedulerDriver driver1(&sched1, master);
-  driver1.run();
-  EXPECT_EQ("Invalid task size: <0 CPUs, 1024 MEM>", sched1.errorMessage);
 
-  NoopScheduler sched2(1);
+  vector<TaskDescription> tasks;
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", params, bytes()));
+
+  trigger errorCall;
+
+  EXPECT_CALL(sched1, error(&driver1, _, "Invalid task size: <0 CPUs, 1024 MEM>"))
+    .WillOnce(Trigger(&errorCall));
+
+  EXPECT_CALL(sched1, offerRescinded(&driver1, offerId))
+    .Times(AtMost(1));
+
+  driver1.replyToOffer(offerId, tasks, map<string, string>());
+
+  WAIT_UNTIL(errorCall);
+
+  driver1.stop();
+  driver1.join();
+
+  MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, master);
-  driver2.run();
-  EXPECT_TRUE(sched2.registeredCalled);
-  EXPECT_EQ(1, sched2.offersGotten);
 
-  local::shutdown();
-  DateUtils::clearMockDate();
-}
+  trigger sched2ResourceOfferCall;
 
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
 
-class SlaveLostScheduler : public Scheduler
-{
-public:
-  PID slave;
-  bool slaveLostCalled;
-  
-  SlaveLostScheduler(const PID &_slave)
-    : slave(_slave), slaveLostCalled(false) {}
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  virtual ~SlaveLostScheduler() {}
+  EXPECT_CALL(sched2, registered(&driver2, _))
+    .Times(1);
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+    .WillOnce(Trigger(&sched2ResourceOfferCall));
 
-  virtual void resourceOffer(SchedulerDriver* d,
-                             OfferID id,
-                             const vector<SlaveOffer>& offers) {
-    LOG(INFO) << "SlaveLostScheduler got a slot offer";
-    MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
-  }
-  
-  virtual void slaveLost(SchedulerDriver* d, SlaveID slaveId) {
-    slaveLostCalled = true;
-    d->stop();
-  }
-};
+  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+    .Times(AtMost(1));
+
+  driver2.start();
+
+  WAIT_UNTIL(sched2ResourceOfferCall);
+
+  driver2.stop();
+  driver2.join();
+
+  local::shutdown();
+}
 
 
 TEST(MasterTest, SlaveLost)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  EventLogger el;
-  Master m(&el);
+  Master m;
   PID master = Process::spawn(&m);
 
   ProcessBasedIsolationModule isolationModule;
@@ -361,74 +293,54 @@ TEST(MasterTest, SlaveLost)
 
   BasicMasterDetector detector(master, slave, true);
 
-  SlaveLostScheduler sched(slave);
-
+  MockScheduler sched;
   MesosSchedulerDriver driver(&sched, master);
-  driver.run();
 
-  EXPECT_TRUE(sched.slaveLostCalled);
+  OfferID offerId;
+  vector<SlaveOffer> offers;
 
-  Process::wait(slave);
+  trigger resourceOfferCall;
 
-  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
-  Process::wait(master);
-}
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
 
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
 
-class FailoverScheduler : public Scheduler
-{
-public:
-  bool registeredCalled;
-  
-  FailoverScheduler() : registeredCalled(false) {}
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)));
 
-  virtual ~FailoverScheduler() {}
+  driver.start();
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  WAIT_UNTIL(resourceOfferCall);
 
-  virtual void registered(SchedulerDriver *d, FrameworkID fid) {
-    LOG(INFO) << "FailoverScheduler registered";
-    registeredCalled = true;
-    d->stop();
-  }
-};
+  EXPECT_EQ(1, offers.size());
 
+  trigger offerRescindedCall, slaveLostCall;
 
-class FailingScheduler : public Scheduler
-{
-public:
-  Scheduler *failover;
-  PID master;
-  MesosSchedulerDriver *driver;
-  string errorMessage;
+  EXPECT_CALL(sched, offerRescinded(&driver, offerId))
+    .WillOnce(Trigger(&offerRescindedCall));
 
-  FailingScheduler(Scheduler *_failover, const PID &_master)
-    : failover(_failover), master(_master) {}
+  EXPECT_CALL(sched, slaveLost(&driver, offers[0].slaveId))
+    .WillOnce(Trigger(&slaveLostCall));
 
-  virtual ~FailingScheduler() {
-    delete driver;
-  }
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  WAIT_UNTIL(offerRescindedCall);
+  WAIT_UNTIL(slaveLostCall);
 
-  virtual void registered(SchedulerDriver*, FrameworkID fid) {
-    LOG(INFO) << "FailingScheduler registered";
-    driver = new MesosSchedulerDriver(failover, master, fid);
-    driver->start();
-  }
+  driver.stop();
+  driver.join();
 
-  virtual void error(SchedulerDriver* d,
-                     int code,
-                     const std::string& message) {
-    errorMessage = message;
-    d->stop();
-  }
-};
+  Process::wait(slave);
+
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
+}
 
 
 TEST(MasterTest, SchedulerFailover)
@@ -437,273 +349,201 @@ TEST(MasterTest, SchedulerFailover)
 
   PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
-  FailoverScheduler failoverSched;
-  FailingScheduler failingSched(&failoverSched, master);
+  // Launch the first (i.e., failing) scheduler and wait until
+  // registered gets called to launch the second (i.e., failover)
+  // scheduler.
 
-  MesosSchedulerDriver driver(&failingSched, master);
-  driver.run();
+  MockScheduler failingSched;
+  MesosSchedulerDriver failingDriver(&failingSched, master);
 
-  EXPECT_EQ("Framework failover", failingSched.errorMessage);
+  FrameworkID frameworkId;
 
-  failingSched.driver->join();
+  trigger failingRegisteredCall;
 
-  EXPECT_TRUE(failoverSched.registeredCalled);
-
-  local::shutdown();
-}
+  EXPECT_CALL(failingSched, getFrameworkName(&failingDriver))
+    .WillOnce(Return(""));
 
+  EXPECT_CALL(failingSched, getExecutorInfo(&failingDriver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-class OfferRescindedScheduler : public Scheduler
-{
-public:
-  const PID slave;
-  bool offerRescindedCalled;
-  
-  OfferRescindedScheduler(const PID &_slave)
-    : slave(_slave), offerRescindedCalled(false) {}
+  EXPECT_CALL(failingSched, registered(&failingDriver, _))
+    .WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&failingRegisteredCall)));
 
-  virtual ~OfferRescindedScheduler() {}
+  EXPECT_CALL(failingSched, resourceOffer(&failingDriver, _, _))
+    .Times(AtMost(1));
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  EXPECT_CALL(failingSched, offerRescinded(&failingDriver, _))
+    .Times(AtMost(1));
 
-  virtual void resourceOffer(SchedulerDriver* d,
-                             OfferID id,
-                             const vector<SlaveOffer>& offers) {
-    LOG(INFO) << "OfferRescindedScheduler got a slot offer";
-    vector<TaskDescription> tasks;
-    ASSERT_TRUE(offers.size() == 1);
-    const SlaveOffer &offer = offers[0];
-    TaskDescription desc(0, offer.slaveId, "", offer.params, "");
-    tasks.push_back(desc);
-    d->replyToOffer(id, tasks, map<string, string>());
-    MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
-  }
+  EXPECT_CALL(failingSched, error(&failingDriver, _, "Framework failover"))
+    .Times(1);
 
-  virtual void offerRescinded(SchedulerDriver* d, OfferID)
-  {
-    offerRescindedCalled = true;
-    d->stop();
-  }
-};
+  failingDriver.start();
 
+  WAIT_UNTIL(failingRegisteredCall);
 
-class OfferReplyMessageFilter : public MessageFilter
-{
-public:
-  virtual bool filter(struct msg *msg) {
-    return msg->id == F2M_SLOT_OFFER_REPLY;
-  }
-};
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler and wait until it
+  // gets a registered callback..
 
+  MockScheduler failoverSched;
+  MesosSchedulerDriver failoverDriver(&failoverSched, master, frameworkId);
 
-TEST(MasterTest, OfferRescinded)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+  trigger failoverRegisteredCall;
 
-  OfferReplyMessageFilter filter;
-  Process::filter(&filter);
+  EXPECT_CALL(failoverSched, getFrameworkName(&failoverDriver))
+    .WillOnce(Return(""));
 
-  EventLogger el;
-  Master m(&el);
-  PID master = Process::spawn(&m);
+  EXPECT_CALL(failoverSched, getExecutorInfo(&failoverDriver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  ProcessBasedIsolationModule isolationModule;
-  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
-  PID slave = Process::spawn(&s);
+  EXPECT_CALL(failoverSched, registered(&failoverDriver, frameworkId))
+    .WillOnce(Trigger(&failoverRegisteredCall));
 
-  BasicMasterDetector detector(master, slave, true);
+  EXPECT_CALL(failoverSched, resourceOffer(&failoverDriver, _, _))
+    .Times(AtMost(1));
 
-  OfferRescindedScheduler sched(slave);
-  MesosSchedulerDriver driver(&sched, master);
+  EXPECT_CALL(failoverSched, offerRescinded(&failoverDriver, _))
+    .Times(AtMost(1));
 
-  driver.run();
+  failoverDriver.start();
 
-  EXPECT_TRUE(sched.offerRescindedCalled);
+  WAIT_UNTIL(failoverRegisteredCall);
 
-  Process::wait(slave);
+  failingDriver.stop();
+  failoverDriver.stop();
 
-  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
-  Process::wait(master);
+  failingDriver.join();
+  failoverDriver.join();
 
-  Process::filter(NULL);
+  local::shutdown();
 }
 
 
-class SlavePartitionedScheduler : public Scheduler
+TEST(MasterTest, SlavePartitioned)
 {
-public:
-  bool slaveLostCalled;
-  
-  SlavePartitionedScheduler()
-    : slaveLostCalled(false) {}
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  virtual ~SlavePartitionedScheduler() {}
+  ProcessClock::pause();
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  MockFilter filter;
+  Process::filter(&filter);
 
-  virtual void slaveLost(SchedulerDriver* d, SlaveID slaveId) {
-    slaveLostCalled = true;
-    d->stop();
-  }
-};
+  EXPECT_MSG(filter, _, _, _)
+    .WillRepeatedly(Return(false));
 
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
-class HeartbeatMessageFilter : public MessageFilter
-{
-public:
-  virtual bool filter(struct msg *msg) {
-    return msg->id == SH2M_HEARTBEAT;
-  }
-};
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
 
+  trigger slaveLostCall;
 
-TEST(MasterTest, SlavePartitioned)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
 
-  HeartbeatMessageFilter filter;
-  Process::filter(&filter);
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  ProcessClock::pause();
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
 
-  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .Times(AtMost(1));
 
-  SlavePartitionedScheduler sched;
-  MesosSchedulerDriver driver(&sched, master);
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(Trigger(&slaveLostCall));
+
+  EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
+    .WillRepeatedly(Return(true));
 
   driver.start();
 
   ProcessClock::advance(master::HEARTBEAT_TIMEOUT);
 
-  driver.join();
+  WAIT_UNTIL(slaveLostCall);
 
-  EXPECT_TRUE(sched.slaveLostCalled);
+  driver.stop();
+  driver.join();
 
   local::shutdown();
 
-  ProcessClock::resume();
-
   Process::filter(NULL);
+
+  ProcessClock::resume();
 }
 
 
-class TaskRunningScheduler : public Scheduler
+TEST(MasterTest, TaskRunning)
 {
-public:
-  FrameworkID fid;
-  bool statusUpdateCalled;
-  string errorMessage;
-  
-  TaskRunningScheduler()
-    : statusUpdateCalled(false) {}
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  virtual ~TaskRunningScheduler() {}
+  Master m;
+  PID master = Process::spawn(&m);
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  MockExecutor exec;
 
-  virtual void registered(SchedulerDriver*, FrameworkID fid) {
-    LOG(INFO) << "TaskRunningScheduler registered";
-    this->fid = fid;
-  }
+  EXPECT_CALL(exec, init(_, _))
+    .Times(1);
 
-  virtual void resourceOffer(SchedulerDriver* d,
-                             OfferID id,
-                             const vector<SlaveOffer>& offers) {
-    LOG(INFO) << "TaskRunningScheduler got a slot offer";
-    vector<TaskDescription> tasks;
-    ASSERT_TRUE(offers.size() == 1);
-    const SlaveOffer &offer = offers[0];
-    TaskDescription desc(0, offer.slaveId, "", offer.params, "");
-    tasks.push_back(desc);
-    d->replyToOffer(id, tasks, map<string, string>());
-  }
+  EXPECT_CALL(exec, launchTask(_, _))
+    .Times(1);
 
-  virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
-    EXPECT_EQ(TASK_RUNNING, status.state);
-    statusUpdateCalled = true;
-    d->stop();
-  }
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(1);
 
-  virtual void error(SchedulerDriver* d,
-                     int code,
-                     const std::string& message) {
-    errorMessage = message;
-    d->stop();
-  }
-};
+  LocalIsolationModule isolationModule(&exec);
 
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
-class TaskRunningExecutor : public Executor {};
+  BasicMasterDetector detector(master, slave, true);
 
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
 
-class LocalIsolationModule : public IsolationModule
-{
-public:
-  Executor *executor;
-  MesosExecutorDriver *driver;
-  string pid;
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status;
 
-  LocalIsolationModule(Executor *_executor)
-    : executor(_executor), driver(NULL) {}
+  trigger resourceOfferCall, statusUpdateCall;
 
-  virtual ~LocalIsolationModule() {}
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
 
-  virtual void initialize(Slave *slave) {
-    pid = slave->self();
-  }
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  virtual void startExecutor(Framework *framework) {
-    // TODO(benh): Cleanup the way we launch local drivers!
-    setenv("MESOS_LOCAL", "1", 1);
-    setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
-    setenv("MESOS_FRAMEWORK_ID", framework->id.c_str(), 1);
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
 
-    driver = new MesosExecutorDriver(executor);
-    driver->start();
-  }
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)));
 
-  virtual void killExecutor(Framework* framework) {
-    driver->stop();
-    driver->join();
-    delete driver;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
 
-    // TODO(benh): Cleanup the way we launch local drivers!
-    unsetenv("MESOS_LOCAL");
-    unsetenv("MESOS_SLAVE_PID");
-    unsetenv("MESOS_FRAMEWORK_ID");
-  }
-};
+  driver.start();
 
+  WAIT_UNTIL(resourceOfferCall);
 
-TEST(MasterTest, TaskRunning)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+  EXPECT_EQ(1, offers.size());
 
-  EventLogger el;
-  Master m(&el);
-  PID master = Process::spawn(&m);
+  vector<TaskDescription> tasks;
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  TaskRunningExecutor exec;
-  LocalIsolationModule isolationModule(&exec);
+  driver.replyToOffer(offerId, tasks, map<string, string>());
 
-  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
-  PID slave = Process::spawn(&s);
+  WAIT_UNTIL(statusUpdateCall);
 
-  BasicMasterDetector detector(master, slave, true);
+  EXPECT_EQ(TASK_RUNNING, status.state);
 
-  TaskRunningScheduler sched;
-  MesosSchedulerDriver driver(&sched, master);
-
-  driver.run();
-
-  EXPECT_TRUE(sched.statusUpdateCalled);
-  EXPECT_EQ("", sched.errorMessage);
+  driver.stop();
+  driver.join();
 
   MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
   Process::wait(slave);
@@ -713,317 +553,238 @@ TEST(MasterTest, TaskRunning)
 }
 
 
-class SchedulerFailoverStatusUpdateScheduler : public TaskRunningScheduler
+TEST(MasterTest, SchedulerFailoverStatusUpdate)
 {
- public:
-  virtual void registered(SchedulerDriver*, FrameworkID fid) {
-    Process::filter(NULL);
-    ProcessClock::advance(RELIABLE_TIMEOUT);
-  }
-};
-
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-class StatusUpdateFilter : public MessageFilter
-{
-public:
-  TaskRunningScheduler *failover;
-  TaskRunningScheduler *failing;
-  const PID master;
-  MesosSchedulerDriver *driver;
-
-  StatusUpdateFilter(TaskRunningScheduler *_failover,
-                     TaskRunningScheduler *_failing,
-                     const PID &_master)
-    : failover(_failover), failing(_failing), master(_master),
-      driver(NULL) {}
-
-  ~StatusUpdateFilter() {
-    if (driver != NULL) {
-      driver->join();
-      delete driver;
-      driver = NULL;
-    }
-  }
+  ProcessClock::pause();
 
-  virtual bool filter(struct msg *msg) {
-    // TODO(benh): Fix the brokenness of this test due to blocking
-    // S2M_FT_STATUS_UPDATE!
-    if (driver == NULL &&
-        msg->id == S2M_FT_STATUS_UPDATE &&
-        !(msg->to == master)) {
-      driver = new MesosSchedulerDriver(failover, master, failing->fid);
-      driver->start();
-      return true;
-    }
+  MockFilter filter;
+  Process::filter(&filter);
 
-    return false;
-  }
-};
+  EXPECT_MSG(filter, _, _, _)
+    .WillRepeatedly(Return(false));
 
+  MockExecutor exec;
 
-TEST(MasterTest, SchedulerFailoverStatusUpdate)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+  EXPECT_CALL(exec, init(_, _))
+    .Times(1);
 
-  ProcessClock::pause();
+  EXPECT_CALL(exec, launchTask(_, _))
+    .Times(1);
 
-  EventLogger el;
-  Master m(&el);
-  PID master = Process::spawn(&m);
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(1);
 
-  TaskRunningExecutor exec;
   LocalIsolationModule isolationModule(&exec);
 
+  Master m;
+  PID master = Process::spawn(&m);
+
   Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
   PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
 
-  SchedulerFailoverStatusUpdateScheduler failoverSched;
-  TaskRunningScheduler failingSched;
+  // Launch the first (i.e., failing) scheduler and wait until the
+  // first status update message is sent to it (drop the message).
 
-  StatusUpdateFilter filter(&failoverSched, &failingSched, master);
-  Process::filter(&filter);
+  MockScheduler failingSched;
+  MesosSchedulerDriver failingDriver(&failingSched, master);
 
-  MesosSchedulerDriver driver(&failingSched, master);
+  FrameworkID frameworkId;
+  OfferID offerId;
+  vector<SlaveOffer> offers;
 
-  driver.run();
+  trigger resourceOfferCall;
 
-  EXPECT_FALSE(failingSched.statusUpdateCalled);
-  EXPECT_EQ("Framework failover", failingSched.errorMessage);
+  EXPECT_CALL(failingSched, getFrameworkName(&failingDriver))
+    .WillOnce(Return(""));
 
-  filter.driver->join();
+  EXPECT_CALL(failingSched, getExecutorInfo(&failingDriver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  EXPECT_TRUE(failoverSched.statusUpdateCalled);
-  EXPECT_EQ("", failoverSched.errorMessage);
+  EXPECT_CALL(failingSched, registered(&failingDriver, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
 
-  Process::filter(NULL);
+  EXPECT_CALL(failingSched, resourceOffer(&failingDriver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)));
 
-  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
-  Process::wait(slave);
+  EXPECT_CALL(failingSched, error(&failingDriver, _, "Framework failover"))
+    .Times(1);
 
-  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
-  Process::wait(master);
+  EXPECT_CALL(failingSched, statusUpdate(&failingDriver, _))
+    .Times(0);
 
-  ProcessClock::resume();
-}
+  trigger statusUpdateMsg;
 
+  EXPECT_MSG(filter, Eq(S2M_FT_STATUS_UPDATE), _, Ne(master))
+    .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
+    .RetiresOnSaturation();
 
-// An executor used in the framework message test that just sends a reply
-// to each message received and logs the last message.
-class FrameworkMessageExecutor : public Executor
-{
-public:
-  bool messageReceived;
-  string messageData;
-  SlaveID mySlaveId;
+  failingDriver.start();
 
-  FrameworkMessageExecutor(): messageReceived(false) {}
+  WAIT_UNTIL(resourceOfferCall);
 
-  virtual ~FrameworkMessageExecutor() {}
+  EXPECT_EQ(1, offers.size());
 
-  virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
-    mySlaveId = args.slaveId;
-  }
+  vector<TaskDescription> tasks;
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  virtual void frameworkMessage(ExecutorDriver* d, const FrameworkMessage& m) {
-    LOG(INFO) << "FrameworkMessageExecutor got a message";
-    messageReceived = true;
-    messageData = m.data;
-    // Send a message back to the scheduler, which will cause it to exit
-    FrameworkMessage reply(mySlaveId, 0, "reply");
-    d->sendFrameworkMessage(reply);
-    LOG(INFO) << "Sent the reply back";
-  }
-};
+  failingDriver.replyToOffer(offerId, tasks, map<string, string>());
 
+  WAIT_UNTIL(statusUpdateMsg);
 
-// A scheduler used in the framework message test that launches a task, waits
-// for it to start, sends it a framework message, and waits for a reply.
-class FrameworkMessageScheduler : public Scheduler
-{
-public:
-  FrameworkID fid;
-  string errorMessage;
-  bool messageReceived;
-  string messageData;
-  SlaveID slaveIdOfTask;
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler and wait until it
+  // registers, at which point advance time enough for the reliable
+  // timeout to kick in and another status update message is sent.
 
-  FrameworkMessageScheduler() {}
+  MockScheduler failoverSched;
+  MesosSchedulerDriver failoverDriver(&failoverSched, master, frameworkId);
 
-  virtual ~FrameworkMessageScheduler() {}
+  trigger registeredCall, statusUpdateCall;
 
-  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    return ExecutorInfo("noexecutor", "");
-  }
+  EXPECT_CALL(failoverSched, getFrameworkName(&failoverDriver))
+    .WillOnce(Return(""));
 
-  virtual void registered(SchedulerDriver*, FrameworkID fid) {
-    LOG(INFO) << "FrameworkMessageScheduler registered";
-    this->fid = fid;
-  }
+  EXPECT_CALL(failoverSched, getExecutorInfo(&failoverDriver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
-  virtual void resourceOffer(SchedulerDriver* d,
-                             OfferID id,
-                             const vector<SlaveOffer>& offers) {
-    LOG(INFO) << "FrameworkMessageScheduler got a slot offer";
-    vector<TaskDescription> tasks;
-    ASSERT_TRUE(offers.size() == 1);
-    const SlaveOffer &offer = offers[0];
-    TaskDescription desc(0, offer.slaveId, "", offer.params, "");
-    tasks.push_back(desc);
-    slaveIdOfTask = offer.slaveId;
-    d->replyToOffer(id, tasks, map<string, string>());
-  }
+  EXPECT_CALL(failoverSched, registered(&failoverDriver, frameworkId))
+    .WillOnce(Trigger(&registeredCall));
 
+  EXPECT_CALL(failoverSched, statusUpdate(&failoverDriver, _))
+    .WillOnce(Trigger(&statusUpdateCall));
 
-  virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
-    EXPECT_EQ(TASK_RUNNING, status.state);
-    LOG(INFO) << "Task is running; sending it a framework message";
-    FrameworkMessage message(slaveIdOfTask, 0, "hello");
-    d->sendFrameworkMessage(message);
-  }
+  failoverDriver.start();
 
+  WAIT_UNTIL(registeredCall);
+
+  ProcessClock::advance(RELIABLE_TIMEOUT);
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  failingDriver.stop();
+  failoverDriver.stop();
+
+  failingDriver.join();
+  failoverDriver.join();
+
+  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+  Process::wait(slave);
+
+  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+  Process::wait(master);
+
+  Process::filter(NULL);
+
+  ProcessClock::resume();
+}
 
-  virtual void frameworkMessage(SchedulerDriver* d, const FrameworkMessage& m) {
-    LOG(INFO) << "FrameworkMessageScheduler got a message";
-    messageReceived = true;
-    messageData = m.data;
-    // Stop our driver because the test is complete
-    d->stop();
-  }
 
-  virtual void error(SchedulerDriver* d,
-                     int code,
-                     const std::string& message) {
-    errorMessage = message;
-    d->stop();
-  }
-};
 
 
-// Tests that framework messages are sent correctly both in both the
-// scheduler->executor direction and the executor->scheduler direction.
 TEST(MasterTest, FrameworkMessages)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  EventLogger el;
-  Master m(&el);
-  PID master = Process::spawn(&m);
+  MockExecutor exec;
 
-  FrameworkMessageExecutor exec;
+  ExecutorDriver *execDriver;
+  ExecutorArgs args;
+  FrameworkMessage execMessage;
 
-  LocalIsolationModule isolationModule(&exec);
+  trigger execFrameworkMessageCall;
 
-  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
-  PID slave = Process::spawn(&s);
+  EXPECT_CALL(exec, init(_, _))
+    .WillOnce(DoAll(SaveArg<0>(&execDriver), SaveArg<1>(&args)));
 
-  BasicMasterDetector detector(master, slave, true);
+  EXPECT_CALL(exec, launchTask(_, _))
+    .Times(1);
 
-  FrameworkMessageScheduler sched;
-  MesosSchedulerDriver driver(&sched, master);
+  EXPECT_CALL(exec, frameworkMessage(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&execMessage),
+                    Trigger(&execFrameworkMessageCall)));
 
-  driver.run();
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(1);
 
-  EXPECT_EQ("", sched.errorMessage);
-  EXPECT_TRUE(exec.messageReceived);
-  EXPECT_EQ("hello", exec.messageData);
-  EXPECT_TRUE(sched.messageReceived);
-  EXPECT_EQ("reply", sched.messageData);
+  LocalIsolationModule isolationModule(&exec);
 
-  MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
-  Process::wait(slave);
+  Master m;
+  PID master = Process::spawn(&m);
 
-  MesosProcess::post(master, pack<M2M_SHUTDOWN>());
-  Process::wait(master);
-}
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
 
+  BasicMasterDetector detector(master, slave, true);
 
-class FailoverTaskRunningScheduler : public TaskRunningScheduler
-{
-public:
-  TaskRunningScheduler *failover;
-  const PID master;
-  MesosSchedulerDriver *driver;
-
-  FailoverTaskRunningScheduler(TaskRunningScheduler *_failover,
-			       const PID &_master)
-    : failover(_failover), master(_master), driver(NULL) {}
-
-  virtual ~FailoverTaskRunningScheduler() {
-    if (driver != NULL) {
-      driver->join();
-      delete driver;
-      driver = NULL;
-    }
-  }
+  // Launch the first (i.e., failing) scheduler and wait until the
+  // first status update message is sent to it (drop the message).
 
-  virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
-    EXPECT_EQ(TASK_RUNNING, status.state);
-    statusUpdateCalled = true;
-    driver = new MesosSchedulerDriver(failover, master, fid);
-    driver->start();
-  }
-};
+  MockScheduler sched;
+  MesosSchedulerDriver schedDriver(&sched, master);
 
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status;
+  FrameworkMessage schedMessage;
 
-class SchedulerFailoverFrameworkMessageScheduler : public TaskRunningScheduler
-{
-public:
-  bool frameworkMessageCalled;
+  trigger resourceOfferCall, statusUpdateCall, schedFrameworkMessageCall;
 
-  SchedulerFailoverFrameworkMessageScheduler()
-    : frameworkMessageCalled(false) {}
+  EXPECT_CALL(sched, getFrameworkName(&schedDriver))
+    .WillOnce(Return(""));
 
-  virtual void frameworkMessage(SchedulerDriver* d,
-				const FrameworkMessage& message) {
-    frameworkMessageCalled = true;
-    d->stop();
-  }
-};
+  EXPECT_CALL(sched, getExecutorInfo(&schedDriver))
+    .WillOnce(Return(ExecutorInfo("noexecutor", "")));
 
+  EXPECT_CALL(sched, registered(&schedDriver, _))
+    .Times(1);
 
-class SchedulerFailoverFrameworkMessageExecutor : public Executor
-{
-public:
-  ExecutorDriver *driver;
+  EXPECT_CALL(sched, resourceOffer(&schedDriver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)));
 
-  virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
-    driver = d;
-  }
-};
+  EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
 
+  EXPECT_CALL(sched, frameworkMessage(&schedDriver, _))
+    .WillOnce(DoAll(SaveArg<1>(&schedMessage),
+                    Trigger(&schedFrameworkMessageCall)));
 
-TEST(MasterTest, SchedulerFailoverFrameworkMessage)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+  schedDriver.start();
 
-  EventLogger el;
-  Master m(&el);
-  PID master = Process::spawn(&m);
+  WAIT_UNTIL(resourceOfferCall);
 
-  SchedulerFailoverFrameworkMessageExecutor exec;
-  LocalIsolationModule isolationModule(&exec);
+  EXPECT_EQ(1, offers.size());
 
-  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
-  PID slave = Process::spawn(&s);
+  vector<TaskDescription> tasks;
+  tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
 
-  BasicMasterDetector detector(master, slave, true);
+  schedDriver.replyToOffer(offerId, tasks, map<string, string>());
+
+  WAIT_UNTIL(statusUpdateCall);
 
-  SchedulerFailoverFrameworkMessageScheduler failoverSched;
-  FailoverTaskRunningScheduler failingSched(&failoverSched, master);
+  EXPECT_EQ(TASK_RUNNING, status.state);
 
-  MesosSchedulerDriver driver(&failingSched, master);
+  FrameworkMessage hello(offers[0].slaveId, 1, "hello");
+  schedDriver.sendFrameworkMessage(hello);
 
-  driver.run();
+  WAIT_UNTIL(execFrameworkMessageCall);
 
-  EXPECT_EQ("Framework failover", failingSched.errorMessage);
+  EXPECT_EQ("hello", execMessage.data);
 
-  exec.driver->sendFrameworkMessage(FrameworkMessage());
+  FrameworkMessage reply(args.slaveId, 1, "reply");
+  execDriver->sendFrameworkMessage(reply);
 
-  failingSched.driver->join();
+  WAIT_UNTIL(schedFrameworkMessageCall);
 
-  EXPECT_TRUE(failoverSched.frameworkMessageCalled);
+  EXPECT_EQ("reply", schedMessage.data);
+
+  schedDriver.stop();
+  schedDriver.join();
 
   MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
   Process::wait(slave);
@@ -1031,3 +792,5 @@ TEST(MasterTest, SchedulerFailoverFramew
   MesosProcess::post(master, pack<M2M_SHUTDOWN>());
   Process::wait(master);
 }
+
+