You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:41:51 UTC

svn commit: r1131861 - /incubator/mesos/trunk/src/tests/test_master.cpp

Author: benh
Date: Sun Jun  5 05:41:51 2011
New Revision: 1131861

URL: http://svn.apache.org/viewvc?rev=1131861&view=rev
Log:
A new test that ensures that when a scheduler fails over the status updates are reliably sent to the new (reregistered) scheduler.

Modified:
    incubator/mesos/trunk/src/tests/test_master.cpp

Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131861&r1=1131860&r2=1131861&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun  5 05:41:51 2011
@@ -376,13 +376,13 @@ public:
 class FailingScheduler : public Scheduler
 {
 public:
-  FailoverScheduler *failoverSched;
+  Scheduler *failover;
   PID master;
   NexusSchedulerDriver *driver;
   string errorMessage;
 
-  FailingScheduler(FailoverScheduler *_failoverSched, const PID &_master)
-    : failoverSched(_failoverSched), master(_master) {}
+  FailingScheduler(Scheduler *_failover, const PID &_master)
+    : failover(_failover), master(_master) {}
 
   virtual ~FailingScheduler() {
     delete driver;
@@ -394,7 +394,7 @@ public:
 
   virtual void registered(SchedulerDriver*, FrameworkID fid) {
     LOG(INFO) << "FailingScheduler registered";
-    driver = new NexusSchedulerDriver(failoverSched, master, fid);
+    driver = new NexusSchedulerDriver(failover, master, fid);
     driver->start();
   }
 
@@ -569,7 +569,9 @@ TEST(MasterTest, SlavePartitioned)
 class TaskRunningScheduler : public Scheduler
 {
 public:
+  FrameworkID fid;
   bool statusUpdateCalled;
+  string errorMessage;
   
   TaskRunningScheduler()
     : statusUpdateCalled(false) {}
@@ -580,6 +582,11 @@ public:
     return ExecutorInfo("noexecutor", "");
   }
 
+  virtual void registered(SchedulerDriver*, FrameworkID fid) {
+    LOG(INFO) << "TaskRunningScheduler registered";
+    this->fid = fid;
+  }
+
   virtual void resourceOffer(SchedulerDriver* d,
                              OfferID id,
                              const vector<SlaveOffer>& offers) {
@@ -593,26 +600,34 @@ public:
   }
 
   virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
-    EXPECT_EQ(status.state, TASK_RUNNING);
+    EXPECT_EQ(TASK_RUNNING, status.state);
     statusUpdateCalled = true;
     d->stop();
   }
+
+  virtual void error(SchedulerDriver* d,
+                     int code,
+                     const std::string& message) {
+    errorMessage = message;
+    d->stop();
+  }
 };
 
 
 class TaskRunningExecutor : public Executor {};
 
 
-class TaskRunningIsolationModule : public IsolationModule
+class LocalIsolationModule : public IsolationModule
 {
 public:
-  TaskRunningExecutor *executor;
+  Executor *executor;
   NexusExecutorDriver *driver;
   string pid;
 
-  TaskRunningIsolationModule() : executor(NULL), driver(NULL) {}
+  LocalIsolationModule(Executor *_executor)
+    : executor(_executor), driver(NULL) {}
 
-  virtual ~TaskRunningIsolationModule() {}
+  virtual ~LocalIsolationModule() {}
 
   virtual void initialize(Slave *slave) {
     pid = slave->getPID();
@@ -624,7 +639,6 @@ public:
     setenv("NEXUS_SLAVE_PID", pid.c_str(), 1);
     setenv("NEXUS_FRAMEWORK_ID", "0-0", 1);
 
-    executor = new TaskRunningExecutor();
     driver = new NexusExecutorDriver(executor);
     driver->start();
   }
@@ -633,7 +647,6 @@ public:
     driver->stop();
     driver->join();
     delete driver;
-    delete executor;
 
     // TODO(benh): Cleanup the way we launch local drivers!
     unsetenv("NEXUS_LOCAL");
@@ -650,7 +663,9 @@ TEST(MasterTest, TaskRunning)
   Master m;
   PID master = Process::spawn(&m);
 
-  TaskRunningIsolationModule isolationModule;
+  TaskRunningExecutor exec;
+  LocalIsolationModule isolationModule(&exec);
+
   Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
   PID slave = Process::spawn(&s);
 
@@ -662,6 +677,7 @@ TEST(MasterTest, TaskRunning)
   driver.run();
 
   EXPECT_TRUE(sched.statusUpdateCalled);
+  EXPECT_EQ("", sched.errorMessage);
 
   Process::post(slave, S2S_SHUTDOWN);
   Process::wait(slave);
@@ -669,3 +685,98 @@ TEST(MasterTest, TaskRunning)
   Process::post(master, M2M_SHUTDOWN);
   Process::wait(master);
 }
+
+
+class SchedulerFailoverStatusUpdateScheduler : public TaskRunningScheduler
+{
+ public:
+  virtual void registered(SchedulerDriver*, FrameworkID fid) {
+    Process::filter(NULL);
+    ProcessClock::advance(RELIABLE_TIMEOUT);
+  }
+};
+
+
+class StatusUpdateFilter : public MessageFilter
+{
+public:
+  TaskRunningScheduler *failover;
+  TaskRunningScheduler *failing;
+  const PID master;
+  NexusSchedulerDriver *driver;
+
+  StatusUpdateFilter(TaskRunningScheduler *_failover,
+                     TaskRunningScheduler *_failing,
+                     const PID &_master)
+    : failover(_failover), failing(_failing), master(_master),
+      driver(NULL) {}
+
+  ~StatusUpdateFilter() {
+    if (driver != NULL) {
+      driver->join();
+      delete driver;
+      driver = NULL;
+    }
+  }
+
+  virtual bool filter(struct msg *msg) {
+    // TODO(benh): Fix the brokenness of this test due to blocking
+    // S2M_FT_STATUS_UPDATE!
+    if (driver == NULL &&
+        msg->id == S2M_FT_STATUS_UPDATE &&
+        !(msg->to == master)) {
+      driver = new NexusSchedulerDriver(failover, master, failing->fid);
+      driver->start();
+      return true;
+    }
+
+    return false;
+  }
+};
+
+
+TEST(MasterTest, SchedulerFailoverStatusUpdate)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  ProcessClock::pause();
+
+  Master m;
+  PID master = Process::spawn(&m);
+
+  TaskRunningExecutor exec;
+  LocalIsolationModule isolationModule(&exec);
+
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  SchedulerFailoverStatusUpdateScheduler failoverSched;
+  TaskRunningScheduler failingSched;
+
+  StatusUpdateFilter filter(&failoverSched, &failingSched, master);
+  Process::filter(&filter);
+
+  NexusSchedulerDriver driver(&failingSched, master);
+
+  driver.run();
+
+  EXPECT_FALSE(failingSched.statusUpdateCalled);
+  EXPECT_EQ("Framework failover", failingSched.errorMessage);
+
+  filter.driver->join();
+
+  EXPECT_TRUE(failoverSched.statusUpdateCalled);
+  EXPECT_EQ("", failoverSched.errorMessage);
+
+  Process::filter(NULL);
+
+  Process::post(slave, S2S_SHUTDOWN);
+  Process::wait(slave);
+
+  Process::post(master, M2M_SHUTDOWN);
+  Process::wait(master);
+
+  ProcessClock::resume();
+}