You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:52:42 UTC
svn commit: r1132162 - /incubator/mesos/trunk/src/tests/test_master.cpp
Author: benh
Date: Sun Jun 5 08:52:41 2011
New Revision: 1132162
URL: http://svn.apache.org/viewvc?rev=1132162&view=rev
Log:
More updates to testing.
Modified:
incubator/mesos/trunk/src/tests/test_master.cpp
Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1132162&r1=1132161&r2=1132162&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun 5 08:52:41 2011
@@ -1,13 +1,9 @@
-#include <gtest/gtest.h>
-
-#include <boost/lexical_cast.hpp>
+#include <gmock/gmock.h>
#include <mesos_exec.hpp>
#include <mesos_sched.hpp>
-#include "common/date_utils.hpp"
-
-#include "event_history/event_logger.hpp"
+#include <boost/lexical_cast.hpp>
#include "local/local.hpp"
@@ -17,275 +13,182 @@
#include "slave/process_based_isolation_module.hpp"
#include "slave/slave.hpp"
-using std::string;
-using std::vector;
-
-using boost::lexical_cast;
+#include "testing_gmock.hpp"
using namespace mesos;
using namespace mesos::internal;
+using namespace mesos::internal::test;
+
+using boost::lexical_cast;
-using mesos::internal::eventhistory::EventLogger;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::internal::slave::Framework;
using mesos::internal::slave::IsolationModule;
using mesos::internal::slave::ProcessBasedIsolationModule;
-class NoopScheduler : public Scheduler
+using std::string;
+using std::map;
+using std::vector;
+
+using testing::_;
+using testing::A;
+using testing::An;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Eq;
+using testing::ElementsAre;
+using testing::Ne;
+using testing::Return;
+using testing::SaveArg;
+using testing::Sequence;
+using testing::StrEq;
+
+
+class LocalIsolationModule : public IsolationModule
{
public:
- bool registeredCalled;
- int offersGotten;
- int slavesExpected;
+ Executor *executor;
+ MesosExecutorDriver *driver;
+ string pid;
-public:
- NoopScheduler(int _slavesExpected)
- : slavesExpected(_slavesExpected),
- registeredCalled(false),
- offersGotten(0)
- {}
+ LocalIsolationModule(Executor *_executor)
+ : executor(_executor), driver(NULL) {}
- virtual ~NoopScheduler() {}
+ virtual ~LocalIsolationModule() {}
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
+ virtual void initialize(Slave *slave) {
+ pid = slave->self();
}
- virtual void registered(SchedulerDriver*, FrameworkID fid) {
- LOG(INFO) << "NoopScheduler registered with id " << fid;
- registeredCalled = true;
- }
+ virtual void startExecutor(Framework *framework) {
+ // TODO(benh): Cleanup the way we launch local drivers!
+ setenv("MESOS_LOCAL", "1", 1);
+ setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
+ setenv("MESOS_FRAMEWORK_ID", framework->id.c_str(), 1);
- virtual void resourceOffer(SchedulerDriver *d,
- OfferID id,
- const vector<SlaveOffer>& offers) {
- LOG(INFO) << "NoopScheduler got a slot offer";
- offersGotten++;
- EXPECT_EQ(slavesExpected, offers.size());
- foreach (const SlaveOffer& offer, offers) {
- string cpus = offer.params.find("cpus")->second;
- string mem = offer.params.find("mem")->second;
- EXPECT_EQ("2", cpus);
- EXPECT_EQ(lexical_cast<string>(1 * Gigabyte), mem);
- }
- vector<TaskDescription> tasks;
- d->replyToOffer(id, tasks, map<string, string>());
- d->stop();
+ driver = new MesosExecutorDriver(executor);
+ driver->start();
}
-};
+ virtual void killExecutor(Framework* framework) {
+ driver->stop();
+ driver->join();
+ delete driver;
-TEST(MasterTest, NoopFrameworkWithOneSlave)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
- NoopScheduler sched(1);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_TRUE(sched.registeredCalled);
- EXPECT_EQ(1, sched.offersGotten);
- local::shutdown();
-}
+ // TODO(benh): Cleanup the way we launch local drivers!
+ unsetenv("MESOS_LOCAL");
+ unsetenv("MESOS_SLAVE_PID");
+ unsetenv("MESOS_FRAMEWORK_ID");
+ }
+};
-TEST(MasterTest, NoopFrameworkWithMultipleSlaves)
+TEST(MasterTest, ResourceOfferWithMultipleSlaves)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
- NoopScheduler sched(10);
+
+ MockScheduler sched;
MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_TRUE(sched.registeredCalled);
- EXPECT_EQ(1, sched.offersGotten);
- local::shutdown();
-}
+ vector<SlaveOffer> offers;
-class FixedResponseScheduler : public Scheduler
-{
-public:
- vector<TaskDescription> response;
- string errorMessage;
-
- FixedResponseScheduler(vector<TaskDescription> _response)
- : response(_response) {}
+ trigger resourceOfferCall;
- virtual ~FixedResponseScheduler() {}
+ EXPECT_CALL(sched, getFrameworkName(&driver))
+ .WillOnce(Return(""));
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ EXPECT_CALL(sched, getExecutorInfo(&driver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- virtual void resourceOffer(SchedulerDriver* d,
- OfferID id,
- const vector<SlaveOffer>& offers) {
- LOG(INFO) << "FixedResponseScheduler got a slot offer";
- d->replyToOffer(id, response, map<string, string>());
- }
-
- virtual void error(SchedulerDriver* d,
- int code,
- const std::string& message) {
- errorMessage = message;
- d->stop();
- }
-};
+ EXPECT_CALL(sched, registered(&driver, _))
+ .Times(1);
+ EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+ .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)));
-TEST(MasterTest, DuplicateTaskIdsInResponse)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "1";
- params["mem"] = lexical_cast<string>(1 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- tasks.push_back(TaskDescription(2, "200102030405-0-0", "", params, ""));
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Duplicate task ID: 1", sched.errorMessage);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .Times(AtMost(1));
+ driver.start();
-TEST(MasterTest, TooMuchMemoryInTask)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "1";
- params["mem"] = lexical_cast<string>(4 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ WAIT_UNTIL(resourceOfferCall);
+ EXPECT_NE(0, offers.size());
+ EXPECT_GE(10, offers.size());
-TEST(MasterTest, TooMuchCpuInTask)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "4";
- params["mem"] = lexical_cast<string>(1 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ EXPECT_EQ("2", offers[0].params["cpus"]);
+ EXPECT_EQ("1024", offers[0].params["mem"]);
+ driver.stop();
+ driver.join();
-TEST(MasterTest, TooLittleCpuInTask)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "0";
- params["mem"] = lexical_cast<string>(1 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Invalid task size: <0 CPUs, 1024 MEM>", sched.errorMessage);
local::shutdown();
- DateUtils::clearMockDate();
}
-TEST(MasterTest, TooLittleMemoryInTask)
+TEST(MasterTest, ResourcesReofferedAfterReject)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "1";
- params["mem"] = "1";
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Invalid task size: <1 CPUs, 1 MEM>", sched.errorMessage);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
-TEST(MasterTest, TooMuchMemoryAcrossTasks)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "1";
- params["mem"] = lexical_cast<string>(2 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- tasks.push_back(TaskDescription(2, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, master);
+ OfferID offerId;
-TEST(MasterTest, TooMuchCpuAcrossTasks)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
- map<string, string> params;
- params["cpus"] = "2";
- params["mem"] = lexical_cast<string>(1 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- tasks.push_back(TaskDescription(2, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched(tasks);
- MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ trigger sched1ResourceOfferCall;
+ EXPECT_CALL(sched1, getFrameworkName(&driver1))
+ .WillOnce(Return(""));
-TEST(MasterTest, ResourcesReofferedAfterReject)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
+ EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- NoopScheduler sched1(10);
- MesosSchedulerDriver driver1(&sched1, master);
- driver1.run();
- EXPECT_TRUE(sched1.registeredCalled);
- EXPECT_EQ(1, sched1.offersGotten);
+ EXPECT_CALL(sched1, registered(&driver1, _))
+ .Times(1);
+
+ EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
+ .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)));
+
+ driver1.start();
+
+ WAIT_UNTIL(sched1ResourceOfferCall);
+
+ driver1.replyToOffer(offerId, vector<TaskDescription>(), map<string, string>());
- NoopScheduler sched2(10);
+ driver1.stop();
+ driver1.join();
+
+ MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, master);
- driver2.run();
- EXPECT_TRUE(sched2.registeredCalled);
- EXPECT_EQ(1, sched2.offersGotten);
+
+ trigger sched2ResourceOfferCall;
+
+ EXPECT_CALL(sched2, getFrameworkName(&driver2))
+ .WillOnce(Return(""));
+
+ EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
+
+ EXPECT_CALL(sched2, registered(&driver2, _))
+ .Times(1);
+
+ EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+ .WillOnce(Trigger(&sched2ResourceOfferCall));
+
+ EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+ .Times(AtMost(1));
+
+ driver2.start();
+
+ WAIT_UNTIL(sched2ResourceOfferCall);
+
+ driver2.stop();
+ driver2.join();
local::shutdown();
}
@@ -294,65 +197,94 @@ TEST(MasterTest, ResourcesReofferedAfter
TEST(MasterTest, ResourcesReofferedAfterBadResponse)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- DateUtils::setMockDate("200102030405");
+
PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
- vector<TaskDescription> tasks;
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, master);
+
+ OfferID offerId;
+ vector<SlaveOffer> offers;
+
+ trigger sched1ResourceOfferCall;
+
+ EXPECT_CALL(sched1, getFrameworkName(&driver1))
+ .WillOnce(Return(""));
+
+ EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
+
+ EXPECT_CALL(sched1, registered(&driver1, _))
+ .Times(1);
+
+ EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
+ .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+ Trigger(&sched1ResourceOfferCall)));
+
+ driver1.start();
+
+ WAIT_UNTIL(sched1ResourceOfferCall);
+
+ EXPECT_EQ(1, offers.size());
+
map<string, string> params;
params["cpus"] = "0";
params["mem"] = lexical_cast<string>(1 * Gigabyte);
- tasks.push_back(TaskDescription(1, "200102030405-0-0", "", params, ""));
- FixedResponseScheduler sched1(tasks);
- MesosSchedulerDriver driver1(&sched1, master);
- driver1.run();
- EXPECT_EQ("Invalid task size: <0 CPUs, 1024 MEM>", sched1.errorMessage);
- NoopScheduler sched2(1);
+ vector<TaskDescription> tasks;
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", params, bytes()));
+
+ trigger errorCall;
+
+ EXPECT_CALL(sched1, error(&driver1, _, "Invalid task size: <0 CPUs, 1024 MEM>"))
+ .WillOnce(Trigger(&errorCall));
+
+ EXPECT_CALL(sched1, offerRescinded(&driver1, offerId))
+ .Times(AtMost(1));
+
+ driver1.replyToOffer(offerId, tasks, map<string, string>());
+
+ WAIT_UNTIL(errorCall);
+
+ driver1.stop();
+ driver1.join();
+
+ MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, master);
- driver2.run();
- EXPECT_TRUE(sched2.registeredCalled);
- EXPECT_EQ(1, sched2.offersGotten);
- local::shutdown();
- DateUtils::clearMockDate();
-}
+ trigger sched2ResourceOfferCall;
+ EXPECT_CALL(sched2, getFrameworkName(&driver2))
+ .WillOnce(Return(""));
-class SlaveLostScheduler : public Scheduler
-{
-public:
- PID slave;
- bool slaveLostCalled;
-
- SlaveLostScheduler(const PID &_slave)
- : slave(_slave), slaveLostCalled(false) {}
+ EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- virtual ~SlaveLostScheduler() {}
+ EXPECT_CALL(sched2, registered(&driver2, _))
+ .Times(1);
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+ .WillOnce(Trigger(&sched2ResourceOfferCall));
- virtual void resourceOffer(SchedulerDriver* d,
- OfferID id,
- const vector<SlaveOffer>& offers) {
- LOG(INFO) << "SlaveLostScheduler got a slot offer";
- MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
- }
-
- virtual void slaveLost(SchedulerDriver* d, SlaveID slaveId) {
- slaveLostCalled = true;
- d->stop();
- }
-};
+ EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+ .Times(AtMost(1));
+
+ driver2.start();
+
+ WAIT_UNTIL(sched2ResourceOfferCall);
+
+ driver2.stop();
+ driver2.join();
+
+ local::shutdown();
+}
TEST(MasterTest, SlaveLost)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- EventLogger el;
- Master m(&el);
+ Master m;
PID master = Process::spawn(&m);
ProcessBasedIsolationModule isolationModule;
@@ -361,74 +293,54 @@ TEST(MasterTest, SlaveLost)
BasicMasterDetector detector(master, slave, true);
- SlaveLostScheduler sched(slave);
-
+ MockScheduler sched;
MesosSchedulerDriver driver(&sched, master);
- driver.run();
- EXPECT_TRUE(sched.slaveLostCalled);
+ OfferID offerId;
+ vector<SlaveOffer> offers;
- Process::wait(slave);
+ trigger resourceOfferCall;
- MesosProcess::post(master, pack<M2M_SHUTDOWN>());
- Process::wait(master);
-}
+ EXPECT_CALL(sched, getFrameworkName(&driver))
+ .WillOnce(Return(""));
+ EXPECT_CALL(sched, getExecutorInfo(&driver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
+ EXPECT_CALL(sched, registered(&driver, _))
+ .Times(1);
-class FailoverScheduler : public Scheduler
-{
-public:
- bool registeredCalled;
-
- FailoverScheduler() : registeredCalled(false) {}
+ EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+ .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+ Trigger(&resourceOfferCall)));
- virtual ~FailoverScheduler() {}
+ driver.start();
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ WAIT_UNTIL(resourceOfferCall);
- virtual void registered(SchedulerDriver *d, FrameworkID fid) {
- LOG(INFO) << "FailoverScheduler registered";
- registeredCalled = true;
- d->stop();
- }
-};
+ EXPECT_EQ(1, offers.size());
+ trigger offerRescindedCall, slaveLostCall;
-class FailingScheduler : public Scheduler
-{
-public:
- Scheduler *failover;
- PID master;
- MesosSchedulerDriver *driver;
- string errorMessage;
+ EXPECT_CALL(sched, offerRescinded(&driver, offerId))
+ .WillOnce(Trigger(&offerRescindedCall));
- FailingScheduler(Scheduler *_failover, const PID &_master)
- : failover(_failover), master(_master) {}
+ EXPECT_CALL(sched, slaveLost(&driver, offers[0].slaveId))
+ .WillOnce(Trigger(&slaveLostCall));
- virtual ~FailingScheduler() {
- delete driver;
- }
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ WAIT_UNTIL(offerRescindedCall);
+ WAIT_UNTIL(slaveLostCall);
- virtual void registered(SchedulerDriver*, FrameworkID fid) {
- LOG(INFO) << "FailingScheduler registered";
- driver = new MesosSchedulerDriver(failover, master, fid);
- driver->start();
- }
+ driver.stop();
+ driver.join();
- virtual void error(SchedulerDriver* d,
- int code,
- const std::string& message) {
- errorMessage = message;
- d->stop();
- }
-};
+ Process::wait(slave);
+
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
+}
TEST(MasterTest, SchedulerFailover)
@@ -437,273 +349,201 @@ TEST(MasterTest, SchedulerFailover)
PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
- FailoverScheduler failoverSched;
- FailingScheduler failingSched(&failoverSched, master);
+ // Launch the first (i.e., failing) scheduler and wait until
+ // registered gets called to launch the second (i.e., failover)
+ // scheduler.
- MesosSchedulerDriver driver(&failingSched, master);
- driver.run();
+ MockScheduler failingSched;
+ MesosSchedulerDriver failingDriver(&failingSched, master);
- EXPECT_EQ("Framework failover", failingSched.errorMessage);
+ FrameworkID frameworkId;
- failingSched.driver->join();
+ trigger failingRegisteredCall;
- EXPECT_TRUE(failoverSched.registeredCalled);
-
- local::shutdown();
-}
+ EXPECT_CALL(failingSched, getFrameworkName(&failingDriver))
+ .WillOnce(Return(""));
+ EXPECT_CALL(failingSched, getExecutorInfo(&failingDriver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
-class OfferRescindedScheduler : public Scheduler
-{
-public:
- const PID slave;
- bool offerRescindedCalled;
-
- OfferRescindedScheduler(const PID &_slave)
- : slave(_slave), offerRescindedCalled(false) {}
+ EXPECT_CALL(failingSched, registered(&failingDriver, _))
+ .WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&failingRegisteredCall)));
- virtual ~OfferRescindedScheduler() {}
+ EXPECT_CALL(failingSched, resourceOffer(&failingDriver, _, _))
+ .Times(AtMost(1));
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ EXPECT_CALL(failingSched, offerRescinded(&failingDriver, _))
+ .Times(AtMost(1));
- virtual void resourceOffer(SchedulerDriver* d,
- OfferID id,
- const vector<SlaveOffer>& offers) {
- LOG(INFO) << "OfferRescindedScheduler got a slot offer";
- vector<TaskDescription> tasks;
- ASSERT_TRUE(offers.size() == 1);
- const SlaveOffer &offer = offers[0];
- TaskDescription desc(0, offer.slaveId, "", offer.params, "");
- tasks.push_back(desc);
- d->replyToOffer(id, tasks, map<string, string>());
- MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
- }
+ EXPECT_CALL(failingSched, error(&failingDriver, _, "Framework failover"))
+ .Times(1);
- virtual void offerRescinded(SchedulerDriver* d, OfferID)
- {
- offerRescindedCalled = true;
- d->stop();
- }
-};
+ failingDriver.start();
+ WAIT_UNTIL(failingRegisteredCall);
-class OfferReplyMessageFilter : public MessageFilter
-{
-public:
- virtual bool filter(struct msg *msg) {
- return msg->id == F2M_SLOT_OFFER_REPLY;
- }
-};
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler and wait until it
+ // gets a registered callback..
+ MockScheduler failoverSched;
+ MesosSchedulerDriver failoverDriver(&failoverSched, master, frameworkId);
-TEST(MasterTest, OfferRescinded)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
+ trigger failoverRegisteredCall;
- OfferReplyMessageFilter filter;
- Process::filter(&filter);
+ EXPECT_CALL(failoverSched, getFrameworkName(&failoverDriver))
+ .WillOnce(Return(""));
- EventLogger el;
- Master m(&el);
- PID master = Process::spawn(&m);
+ EXPECT_CALL(failoverSched, getExecutorInfo(&failoverDriver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- ProcessBasedIsolationModule isolationModule;
- Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
- PID slave = Process::spawn(&s);
+ EXPECT_CALL(failoverSched, registered(&failoverDriver, frameworkId))
+ .WillOnce(Trigger(&failoverRegisteredCall));
- BasicMasterDetector detector(master, slave, true);
+ EXPECT_CALL(failoverSched, resourceOffer(&failoverDriver, _, _))
+ .Times(AtMost(1));
- OfferRescindedScheduler sched(slave);
- MesosSchedulerDriver driver(&sched, master);
+ EXPECT_CALL(failoverSched, offerRescinded(&failoverDriver, _))
+ .Times(AtMost(1));
- driver.run();
+ failoverDriver.start();
- EXPECT_TRUE(sched.offerRescindedCalled);
+ WAIT_UNTIL(failoverRegisteredCall);
- Process::wait(slave);
+ failingDriver.stop();
+ failoverDriver.stop();
- MesosProcess::post(master, pack<M2M_SHUTDOWN>());
- Process::wait(master);
+ failingDriver.join();
+ failoverDriver.join();
- Process::filter(NULL);
+ local::shutdown();
}
-class SlavePartitionedScheduler : public Scheduler
+TEST(MasterTest, SlavePartitioned)
{
-public:
- bool slaveLostCalled;
-
- SlavePartitionedScheduler()
- : slaveLostCalled(false) {}
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
- virtual ~SlavePartitionedScheduler() {}
+ ProcessClock::pause();
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ MockFilter filter;
+ Process::filter(&filter);
- virtual void slaveLost(SchedulerDriver* d, SlaveID slaveId) {
- slaveLostCalled = true;
- d->stop();
- }
-};
+ EXPECT_MSG(filter, _, _, _)
+ .WillRepeatedly(Return(false));
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
-class HeartbeatMessageFilter : public MessageFilter
-{
-public:
- virtual bool filter(struct msg *msg) {
- return msg->id == SH2M_HEARTBEAT;
- }
-};
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, master);
+ trigger slaveLostCall;
-TEST(MasterTest, SlavePartitioned)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
+ EXPECT_CALL(sched, getFrameworkName(&driver))
+ .WillOnce(Return(""));
- HeartbeatMessageFilter filter;
- Process::filter(&filter);
+ EXPECT_CALL(sched, getExecutorInfo(&driver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- ProcessClock::pause();
+ EXPECT_CALL(sched, registered(&driver, _))
+ .Times(1);
- PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+ .Times(AtMost(1));
- SlavePartitionedScheduler sched;
- MesosSchedulerDriver driver(&sched, master);
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(Trigger(&slaveLostCall));
+
+ EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
+ .WillRepeatedly(Return(true));
driver.start();
ProcessClock::advance(master::HEARTBEAT_TIMEOUT);
- driver.join();
+ WAIT_UNTIL(slaveLostCall);
- EXPECT_TRUE(sched.slaveLostCalled);
+ driver.stop();
+ driver.join();
local::shutdown();
- ProcessClock::resume();
-
Process::filter(NULL);
+
+ ProcessClock::resume();
}
-class TaskRunningScheduler : public Scheduler
+TEST(MasterTest, TaskRunning)
{
-public:
- FrameworkID fid;
- bool statusUpdateCalled;
- string errorMessage;
-
- TaskRunningScheduler()
- : statusUpdateCalled(false) {}
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
- virtual ~TaskRunningScheduler() {}
+ Master m;
+ PID master = Process::spawn(&m);
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ MockExecutor exec;
- virtual void registered(SchedulerDriver*, FrameworkID fid) {
- LOG(INFO) << "TaskRunningScheduler registered";
- this->fid = fid;
- }
+ EXPECT_CALL(exec, init(_, _))
+ .Times(1);
- virtual void resourceOffer(SchedulerDriver* d,
- OfferID id,
- const vector<SlaveOffer>& offers) {
- LOG(INFO) << "TaskRunningScheduler got a slot offer";
- vector<TaskDescription> tasks;
- ASSERT_TRUE(offers.size() == 1);
- const SlaveOffer &offer = offers[0];
- TaskDescription desc(0, offer.slaveId, "", offer.params, "");
- tasks.push_back(desc);
- d->replyToOffer(id, tasks, map<string, string>());
- }
+ EXPECT_CALL(exec, launchTask(_, _))
+ .Times(1);
- virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
- EXPECT_EQ(TASK_RUNNING, status.state);
- statusUpdateCalled = true;
- d->stop();
- }
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(1);
- virtual void error(SchedulerDriver* d,
- int code,
- const std::string& message) {
- errorMessage = message;
- d->stop();
- }
-};
+ LocalIsolationModule isolationModule(&exec);
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
-class TaskRunningExecutor : public Executor {};
+ BasicMasterDetector detector(master, slave, true);
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, master);
-class LocalIsolationModule : public IsolationModule
-{
-public:
- Executor *executor;
- MesosExecutorDriver *driver;
- string pid;
+ OfferID offerId;
+ vector<SlaveOffer> offers;
+ TaskStatus status;
- LocalIsolationModule(Executor *_executor)
- : executor(_executor), driver(NULL) {}
+ trigger resourceOfferCall, statusUpdateCall;
- virtual ~LocalIsolationModule() {}
+ EXPECT_CALL(sched, getFrameworkName(&driver))
+ .WillOnce(Return(""));
- virtual void initialize(Slave *slave) {
- pid = slave->self();
- }
+ EXPECT_CALL(sched, getExecutorInfo(&driver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- virtual void startExecutor(Framework *framework) {
- // TODO(benh): Cleanup the way we launch local drivers!
- setenv("MESOS_LOCAL", "1", 1);
- setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
- setenv("MESOS_FRAMEWORK_ID", framework->id.c_str(), 1);
+ EXPECT_CALL(sched, registered(&driver, _))
+ .Times(1);
- driver = new MesosExecutorDriver(executor);
- driver->start();
- }
+ EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+ .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+ Trigger(&resourceOfferCall)));
- virtual void killExecutor(Framework* framework) {
- driver->stop();
- driver->join();
- delete driver;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
- // TODO(benh): Cleanup the way we launch local drivers!
- unsetenv("MESOS_LOCAL");
- unsetenv("MESOS_SLAVE_PID");
- unsetenv("MESOS_FRAMEWORK_ID");
- }
-};
+ driver.start();
+ WAIT_UNTIL(resourceOfferCall);
-TEST(MasterTest, TaskRunning)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
+ EXPECT_EQ(1, offers.size());
- EventLogger el;
- Master m(&el);
- PID master = Process::spawn(&m);
+ vector<TaskDescription> tasks;
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- TaskRunningExecutor exec;
- LocalIsolationModule isolationModule(&exec);
+ driver.replyToOffer(offerId, tasks, map<string, string>());
- Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
- PID slave = Process::spawn(&s);
+ WAIT_UNTIL(statusUpdateCall);
- BasicMasterDetector detector(master, slave, true);
+ EXPECT_EQ(TASK_RUNNING, status.state);
- TaskRunningScheduler sched;
- MesosSchedulerDriver driver(&sched, master);
-
- driver.run();
-
- EXPECT_TRUE(sched.statusUpdateCalled);
- EXPECT_EQ("", sched.errorMessage);
+ driver.stop();
+ driver.join();
MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
Process::wait(slave);
@@ -713,317 +553,238 @@ TEST(MasterTest, TaskRunning)
}
-class SchedulerFailoverStatusUpdateScheduler : public TaskRunningScheduler
+TEST(MasterTest, SchedulerFailoverStatusUpdate)
{
- public:
- virtual void registered(SchedulerDriver*, FrameworkID fid) {
- Process::filter(NULL);
- ProcessClock::advance(RELIABLE_TIMEOUT);
- }
-};
-
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
-class StatusUpdateFilter : public MessageFilter
-{
-public:
- TaskRunningScheduler *failover;
- TaskRunningScheduler *failing;
- const PID master;
- MesosSchedulerDriver *driver;
-
- StatusUpdateFilter(TaskRunningScheduler *_failover,
- TaskRunningScheduler *_failing,
- const PID &_master)
- : failover(_failover), failing(_failing), master(_master),
- driver(NULL) {}
-
- ~StatusUpdateFilter() {
- if (driver != NULL) {
- driver->join();
- delete driver;
- driver = NULL;
- }
- }
+ ProcessClock::pause();
- virtual bool filter(struct msg *msg) {
- // TODO(benh): Fix the brokenness of this test due to blocking
- // S2M_FT_STATUS_UPDATE!
- if (driver == NULL &&
- msg->id == S2M_FT_STATUS_UPDATE &&
- !(msg->to == master)) {
- driver = new MesosSchedulerDriver(failover, master, failing->fid);
- driver->start();
- return true;
- }
+ MockFilter filter;
+ Process::filter(&filter);
- return false;
- }
-};
+ EXPECT_MSG(filter, _, _, _)
+ .WillRepeatedly(Return(false));
+ MockExecutor exec;
-TEST(MasterTest, SchedulerFailoverStatusUpdate)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
+ EXPECT_CALL(exec, init(_, _))
+ .Times(1);
- ProcessClock::pause();
+ EXPECT_CALL(exec, launchTask(_, _))
+ .Times(1);
- EventLogger el;
- Master m(&el);
- PID master = Process::spawn(&m);
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(1);
- TaskRunningExecutor exec;
LocalIsolationModule isolationModule(&exec);
+ Master m;
+ PID master = Process::spawn(&m);
+
Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
- SchedulerFailoverStatusUpdateScheduler failoverSched;
- TaskRunningScheduler failingSched;
+ // Launch the first (i.e., failing) scheduler and wait until the
+ // first status update message is sent to it (drop the message).
- StatusUpdateFilter filter(&failoverSched, &failingSched, master);
- Process::filter(&filter);
+ MockScheduler failingSched;
+ MesosSchedulerDriver failingDriver(&failingSched, master);
- MesosSchedulerDriver driver(&failingSched, master);
+ FrameworkID frameworkId;
+ OfferID offerId;
+ vector<SlaveOffer> offers;
- driver.run();
+ trigger resourceOfferCall;
- EXPECT_FALSE(failingSched.statusUpdateCalled);
- EXPECT_EQ("Framework failover", failingSched.errorMessage);
+ EXPECT_CALL(failingSched, getFrameworkName(&failingDriver))
+ .WillOnce(Return(""));
- filter.driver->join();
+ EXPECT_CALL(failingSched, getExecutorInfo(&failingDriver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- EXPECT_TRUE(failoverSched.statusUpdateCalled);
- EXPECT_EQ("", failoverSched.errorMessage);
+ EXPECT_CALL(failingSched, registered(&failingDriver, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
- Process::filter(NULL);
+ EXPECT_CALL(failingSched, resourceOffer(&failingDriver, _, _))
+ .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+ Trigger(&resourceOfferCall)));
- MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
- Process::wait(slave);
+ EXPECT_CALL(failingSched, error(&failingDriver, _, "Framework failover"))
+ .Times(1);
- MesosProcess::post(master, pack<M2M_SHUTDOWN>());
- Process::wait(master);
+ EXPECT_CALL(failingSched, statusUpdate(&failingDriver, _))
+ .Times(0);
- ProcessClock::resume();
-}
+ trigger statusUpdateMsg;
+ EXPECT_MSG(filter, Eq(S2M_FT_STATUS_UPDATE), _, Ne(master))
+ .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
+ .RetiresOnSaturation();
-// An executor used in the framework message test that just sends a reply
-// to each message received and logs the last message.
-class FrameworkMessageExecutor : public Executor
-{
-public:
- bool messageReceived;
- string messageData;
- SlaveID mySlaveId;
+ failingDriver.start();
- FrameworkMessageExecutor(): messageReceived(false) {}
+ WAIT_UNTIL(resourceOfferCall);
- virtual ~FrameworkMessageExecutor() {}
+ EXPECT_EQ(1, offers.size());
- virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
- mySlaveId = args.slaveId;
- }
+ vector<TaskDescription> tasks;
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- virtual void frameworkMessage(ExecutorDriver* d, const FrameworkMessage& m) {
- LOG(INFO) << "FrameworkMessageExecutor got a message";
- messageReceived = true;
- messageData = m.data;
- // Send a message back to the scheduler, which will cause it to exit
- FrameworkMessage reply(mySlaveId, 0, "reply");
- d->sendFrameworkMessage(reply);
- LOG(INFO) << "Sent the reply back";
- }
-};
+ failingDriver.replyToOffer(offerId, tasks, map<string, string>());
+ WAIT_UNTIL(statusUpdateMsg);
-// A scheduler used in the framework message test that launches a task, waits
-// for it to start, sends it a framework message, and waits for a reply.
-class FrameworkMessageScheduler : public Scheduler
-{
-public:
- FrameworkID fid;
- string errorMessage;
- bool messageReceived;
- string messageData;
- SlaveID slaveIdOfTask;
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler and wait until it
+ // registers, at which point advance time enough for the reliable
+ // timeout to kick in and another status update message is sent.
- FrameworkMessageScheduler() {}
+ MockScheduler failoverSched;
+ MesosSchedulerDriver failoverDriver(&failoverSched, master, frameworkId);
- virtual ~FrameworkMessageScheduler() {}
+ trigger registeredCall, statusUpdateCall;
- virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- return ExecutorInfo("noexecutor", "");
- }
+ EXPECT_CALL(failoverSched, getFrameworkName(&failoverDriver))
+ .WillOnce(Return(""));
- virtual void registered(SchedulerDriver*, FrameworkID fid) {
- LOG(INFO) << "FrameworkMessageScheduler registered";
- this->fid = fid;
- }
+ EXPECT_CALL(failoverSched, getExecutorInfo(&failoverDriver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
- virtual void resourceOffer(SchedulerDriver* d,
- OfferID id,
- const vector<SlaveOffer>& offers) {
- LOG(INFO) << "FrameworkMessageScheduler got a slot offer";
- vector<TaskDescription> tasks;
- ASSERT_TRUE(offers.size() == 1);
- const SlaveOffer &offer = offers[0];
- TaskDescription desc(0, offer.slaveId, "", offer.params, "");
- tasks.push_back(desc);
- slaveIdOfTask = offer.slaveId;
- d->replyToOffer(id, tasks, map<string, string>());
- }
+ EXPECT_CALL(failoverSched, registered(&failoverDriver, frameworkId))
+ .WillOnce(Trigger(®isteredCall));
+ EXPECT_CALL(failoverSched, statusUpdate(&failoverDriver, _))
+ .WillOnce(Trigger(&statusUpdateCall));
- virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
- EXPECT_EQ(TASK_RUNNING, status.state);
- LOG(INFO) << "Task is running; sending it a framework message";
- FrameworkMessage message(slaveIdOfTask, 0, "hello");
- d->sendFrameworkMessage(message);
- }
+ failoverDriver.start();
+ WAIT_UNTIL(registeredCall);
+
+ ProcessClock::advance(RELIABLE_TIMEOUT);
+
+ WAIT_UNTIL(statusUpdateCall);
+
+ failingDriver.stop();
+ failoverDriver.stop();
+
+ failingDriver.join();
+ failoverDriver.join();
+
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+ Process::wait(slave);
+
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
+
+ Process::filter(NULL);
+
+ ProcessClock::resume();
+}
- virtual void frameworkMessage(SchedulerDriver* d, const FrameworkMessage& m) {
- LOG(INFO) << "FrameworkMessageScheduler got a message";
- messageReceived = true;
- messageData = m.data;
- // Stop our driver because the test is complete
- d->stop();
- }
- virtual void error(SchedulerDriver* d,
- int code,
- const std::string& message) {
- errorMessage = message;
- d->stop();
- }
-};
-// Tests that framework messages are sent correctly both in both the
-// scheduler->executor direction and the executor->scheduler direction.
TEST(MasterTest, FrameworkMessages)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- EventLogger el;
- Master m(&el);
- PID master = Process::spawn(&m);
+ MockExecutor exec;
- FrameworkMessageExecutor exec;
+ ExecutorDriver *execDriver;
+ ExecutorArgs args;
+ FrameworkMessage execMessage;
- LocalIsolationModule isolationModule(&exec);
+ trigger execFrameworkMessageCall;
- Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
- PID slave = Process::spawn(&s);
+ EXPECT_CALL(exec, init(_, _))
+ .WillOnce(DoAll(SaveArg<0>(&execDriver), SaveArg<1>(&args)));
- BasicMasterDetector detector(master, slave, true);
+ EXPECT_CALL(exec, launchTask(_, _))
+ .Times(1);
- FrameworkMessageScheduler sched;
- MesosSchedulerDriver driver(&sched, master);
+ EXPECT_CALL(exec, frameworkMessage(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&execMessage),
+ Trigger(&execFrameworkMessageCall)));
- driver.run();
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(1);
- EXPECT_EQ("", sched.errorMessage);
- EXPECT_TRUE(exec.messageReceived);
- EXPECT_EQ("hello", exec.messageData);
- EXPECT_TRUE(sched.messageReceived);
- EXPECT_EQ("reply", sched.messageData);
+ LocalIsolationModule isolationModule(&exec);
- MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
- Process::wait(slave);
+ Master m;
+ PID master = Process::spawn(&m);
- MesosProcess::post(master, pack<M2M_SHUTDOWN>());
- Process::wait(master);
-}
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
+ BasicMasterDetector detector(master, slave, true);
-class FailoverTaskRunningScheduler : public TaskRunningScheduler
-{
-public:
- TaskRunningScheduler *failover;
- const PID master;
- MesosSchedulerDriver *driver;
-
- FailoverTaskRunningScheduler(TaskRunningScheduler *_failover,
- const PID &_master)
- : failover(_failover), master(_master), driver(NULL) {}
-
- virtual ~FailoverTaskRunningScheduler() {
- if (driver != NULL) {
- driver->join();
- delete driver;
- driver = NULL;
- }
- }
+ // Launch the first (i.e., failing) scheduler and wait until the
+ // first status update message is sent to it (drop the message).
- virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
- EXPECT_EQ(TASK_RUNNING, status.state);
- statusUpdateCalled = true;
- driver = new MesosSchedulerDriver(failover, master, fid);
- driver->start();
- }
-};
+ MockScheduler sched;
+ MesosSchedulerDriver schedDriver(&sched, master);
+ OfferID offerId;
+ vector<SlaveOffer> offers;
+ TaskStatus status;
+ FrameworkMessage schedMessage;
-class SchedulerFailoverFrameworkMessageScheduler : public TaskRunningScheduler
-{
-public:
- bool frameworkMessageCalled;
+ trigger resourceOfferCall, statusUpdateCall, schedFrameworkMessageCall;
- SchedulerFailoverFrameworkMessageScheduler()
- : frameworkMessageCalled(false) {}
+ EXPECT_CALL(sched, getFrameworkName(&schedDriver))
+ .WillOnce(Return(""));
- virtual void frameworkMessage(SchedulerDriver* d,
- const FrameworkMessage& message) {
- frameworkMessageCalled = true;
- d->stop();
- }
-};
+ EXPECT_CALL(sched, getExecutorInfo(&schedDriver))
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
+ EXPECT_CALL(sched, registered(&schedDriver, _))
+ .Times(1);
-class SchedulerFailoverFrameworkMessageExecutor : public Executor
-{
-public:
- ExecutorDriver *driver;
+ EXPECT_CALL(sched, resourceOffer(&schedDriver, _, _))
+ .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+ Trigger(&resourceOfferCall)));
- virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
- driver = d;
- }
-};
+ EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
+ EXPECT_CALL(sched, frameworkMessage(&schedDriver, _))
+ .WillOnce(DoAll(SaveArg<1>(&schedMessage),
+ Trigger(&schedFrameworkMessageCall)));
-TEST(MasterTest, SchedulerFailoverFrameworkMessage)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
+ schedDriver.start();
- EventLogger el;
- Master m(&el);
- PID master = Process::spawn(&m);
+ WAIT_UNTIL(resourceOfferCall);
- SchedulerFailoverFrameworkMessageExecutor exec;
- LocalIsolationModule isolationModule(&exec);
+ EXPECT_EQ(1, offers.size());
- Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
- PID slave = Process::spawn(&s);
+ vector<TaskDescription> tasks;
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- BasicMasterDetector detector(master, slave, true);
+ schedDriver.replyToOffer(offerId, tasks, map<string, string>());
+
+ WAIT_UNTIL(statusUpdateCall);
- SchedulerFailoverFrameworkMessageScheduler failoverSched;
- FailoverTaskRunningScheduler failingSched(&failoverSched, master);
+ EXPECT_EQ(TASK_RUNNING, status.state);
- MesosSchedulerDriver driver(&failingSched, master);
+ FrameworkMessage hello(offers[0].slaveId, 1, "hello");
+ schedDriver.sendFrameworkMessage(hello);
- driver.run();
+ WAIT_UNTIL(execFrameworkMessageCall);
- EXPECT_EQ("Framework failover", failingSched.errorMessage);
+ EXPECT_EQ("hello", execMessage.data);
- exec.driver->sendFrameworkMessage(FrameworkMessage());
+ FrameworkMessage reply(args.slaveId, 1, "reply");
+ execDriver->sendFrameworkMessage(reply);
- failingSched.driver->join();
+ WAIT_UNTIL(schedFrameworkMessageCall);
- EXPECT_TRUE(failoverSched.frameworkMessageCalled);
+ EXPECT_EQ("reply", schedMessage.data);
+
+ schedDriver.stop();
+ schedDriver.join();
MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
Process::wait(slave);
@@ -1031,3 +792,5 @@ TEST(MasterTest, SchedulerFailoverFramew
MesosProcess::post(master, pack<M2M_SHUTDOWN>());
Process::wait(master);
}
+
+