You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:41:51 UTC
svn commit: r1131861 - /incubator/mesos/trunk/src/tests/test_master.cpp
Author: benh
Date: Sun Jun 5 05:41:51 2011
New Revision: 1131861
URL: http://svn.apache.org/viewvc?rev=1131861&view=rev
Log:
A new test that ensures that when a scheduler fails over the status updates are reliably sent to the new (reregistered) scheduler.
Modified:
incubator/mesos/trunk/src/tests/test_master.cpp
Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131861&r1=1131860&r2=1131861&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun 5 05:41:51 2011
@@ -376,13 +376,13 @@ public:
class FailingScheduler : public Scheduler
{
public:
- FailoverScheduler *failoverSched;
+ Scheduler *failover;
PID master;
NexusSchedulerDriver *driver;
string errorMessage;
- FailingScheduler(FailoverScheduler *_failoverSched, const PID &_master)
- : failoverSched(_failoverSched), master(_master) {}
+ FailingScheduler(Scheduler *_failover, const PID &_master)
+ : failover(_failover), master(_master) {}
virtual ~FailingScheduler() {
delete driver;
@@ -394,7 +394,7 @@ public:
virtual void registered(SchedulerDriver*, FrameworkID fid) {
LOG(INFO) << "FailingScheduler registered";
- driver = new NexusSchedulerDriver(failoverSched, master, fid);
+ driver = new NexusSchedulerDriver(failover, master, fid);
driver->start();
}
@@ -569,7 +569,9 @@ TEST(MasterTest, SlavePartitioned)
class TaskRunningScheduler : public Scheduler
{
public:
+ FrameworkID fid;
bool statusUpdateCalled;
+ string errorMessage;
TaskRunningScheduler()
: statusUpdateCalled(false) {}
@@ -580,6 +582,11 @@ public:
return ExecutorInfo("noexecutor", "");
}
+ virtual void registered(SchedulerDriver*, FrameworkID fid) {
+ LOG(INFO) << "TaskRunningScheduler registered";
+ this->fid = fid;
+ }
+
virtual void resourceOffer(SchedulerDriver* d,
OfferID id,
const vector<SlaveOffer>& offers) {
@@ -593,26 +600,34 @@ public:
}
virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
- EXPECT_EQ(status.state, TASK_RUNNING);
+ EXPECT_EQ(TASK_RUNNING, status.state);
statusUpdateCalled = true;
d->stop();
}
+
+ virtual void error(SchedulerDriver* d,
+ int code,
+ const std::string& message) {
+ errorMessage = message;
+ d->stop();
+ }
};
class TaskRunningExecutor : public Executor {};
-class TaskRunningIsolationModule : public IsolationModule
+class LocalIsolationModule : public IsolationModule
{
public:
- TaskRunningExecutor *executor;
+ Executor *executor;
NexusExecutorDriver *driver;
string pid;
- TaskRunningIsolationModule() : executor(NULL), driver(NULL) {}
+ LocalIsolationModule(Executor *_executor)
+ : executor(_executor), driver(NULL) {}
- virtual ~TaskRunningIsolationModule() {}
+ virtual ~LocalIsolationModule() {}
virtual void initialize(Slave *slave) {
pid = slave->getPID();
@@ -624,7 +639,6 @@ public:
setenv("NEXUS_SLAVE_PID", pid.c_str(), 1);
setenv("NEXUS_FRAMEWORK_ID", "0-0", 1);
- executor = new TaskRunningExecutor();
driver = new NexusExecutorDriver(executor);
driver->start();
}
@@ -633,7 +647,6 @@ public:
driver->stop();
driver->join();
delete driver;
- delete executor;
// TODO(benh): Cleanup the way we launch local drivers!
unsetenv("NEXUS_LOCAL");
@@ -650,7 +663,9 @@ TEST(MasterTest, TaskRunning)
Master m;
PID master = Process::spawn(&m);
- TaskRunningIsolationModule isolationModule;
+ TaskRunningExecutor exec;
+ LocalIsolationModule isolationModule(&exec);
+
Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
PID slave = Process::spawn(&s);
@@ -662,6 +677,7 @@ TEST(MasterTest, TaskRunning)
driver.run();
EXPECT_TRUE(sched.statusUpdateCalled);
+ EXPECT_EQ("", sched.errorMessage);
Process::post(slave, S2S_SHUTDOWN);
Process::wait(slave);
@@ -669,3 +685,98 @@ TEST(MasterTest, TaskRunning)
Process::post(master, M2M_SHUTDOWN);
Process::wait(master);
}
+
+
+class SchedulerFailoverStatusUpdateScheduler : public TaskRunningScheduler
+{
+ public:
+ virtual void registered(SchedulerDriver*, FrameworkID fid) {
+ Process::filter(NULL);
+ ProcessClock::advance(RELIABLE_TIMEOUT);
+ }
+};
+
+
+class StatusUpdateFilter : public MessageFilter
+{
+public:
+ TaskRunningScheduler *failover;
+ TaskRunningScheduler *failing;
+ const PID master;
+ NexusSchedulerDriver *driver;
+
+ StatusUpdateFilter(TaskRunningScheduler *_failover,
+ TaskRunningScheduler *_failing,
+ const PID &_master)
+ : failover(_failover), failing(_failing), master(_master),
+ driver(NULL) {}
+
+ ~StatusUpdateFilter() {
+ if (driver != NULL) {
+ driver->join();
+ delete driver;
+ driver = NULL;
+ }
+ }
+
+ virtual bool filter(struct msg *msg) {
+ // TODO(benh): Fix the brokenness of this test due to blocking
+ // S2M_FT_STATUS_UPDATE!
+ if (driver == NULL &&
+ msg->id == S2M_FT_STATUS_UPDATE &&
+ !(msg->to == master)) {
+ driver = new NexusSchedulerDriver(failover, master, failing->fid);
+ driver->start();
+ return true;
+ }
+
+ return false;
+ }
+};
+
+
+TEST(MasterTest, SchedulerFailoverStatusUpdate)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ ProcessClock::pause();
+
+ Master m;
+ PID master = Process::spawn(&m);
+
+ TaskRunningExecutor exec;
+ LocalIsolationModule isolationModule(&exec);
+
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
+
+ BasicMasterDetector detector(master, slave, true);
+
+ SchedulerFailoverStatusUpdateScheduler failoverSched;
+ TaskRunningScheduler failingSched;
+
+ StatusUpdateFilter filter(&failoverSched, &failingSched, master);
+ Process::filter(&filter);
+
+ NexusSchedulerDriver driver(&failingSched, master);
+
+ driver.run();
+
+ EXPECT_FALSE(failingSched.statusUpdateCalled);
+ EXPECT_EQ("Framework failover", failingSched.errorMessage);
+
+ filter.driver->join();
+
+ EXPECT_TRUE(failoverSched.statusUpdateCalled);
+ EXPECT_EQ("", failoverSched.errorMessage);
+
+ Process::filter(NULL);
+
+ Process::post(slave, S2S_SHUTDOWN);
+ Process::wait(slave);
+
+ Process::post(master, M2M_SHUTDOWN);
+ Process::wait(master);
+
+ ProcessClock::resume();
+}