You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:42:23 UTC

svn commit: r1132107 - in /incubator/mesos/trunk/src: exec/exec.cpp slave/slave.cpp tests/test_master.cpp

Author: benh
Date: Sun Jun  5 08:42:23 2011
New Revision: 1132107

URL: http://svn.apache.org/viewvc?rev=1132107&view=rev
Log:
Added a test case for framework messages.

Modified:
    incubator/mesos/trunk/src/exec/exec.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/tests/test_master.cpp

Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132107&r1=1132106&r2=1132107&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun  5 08:42:23 2011
@@ -122,11 +122,11 @@ protected:
           // TODO: Pass an argument to shutdown to tell it this is abnormal?
           invoke(bind(&Executor::shutdown, executor, driver));
 
-	  // This is a pretty bad state ... no slave is left. Rather
-	  // than exit lets kill our process group (which includes
-	  // ourself) hoping to clean up any processes this executor
-	  // launched itself.
-	  // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
+          // This is a pretty bad state ... no slave is left. Rather
+          // than exit lets kill our process group (which includes
+          // ourself) hoping to clean up any processes this executor
+          // launched itself.
+          // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
           if (!local)
             killpg(0, SIGKILL);
           else

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132107&r1=1132106&r2=1132107&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 08:42:23 2011
@@ -289,7 +289,11 @@ void Slave::operator () ()
         FrameworkMessage message;
         tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
         if (Executor *ex = getExecutor(fid)) {
+          VLOG(1) << "Relaying framework message for framework " << fid;
           send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+        } else {
+          VLOG(1) << "Dropping framework message for framework " << fid
+                  << " because its executor is not running";
         }
         // TODO(*): If executor is not started, queue framework message?
         // (It's probably okay to just drop it since frameworks can have
@@ -380,6 +384,8 @@ void Slave::operator () ()
 
           // Set slave ID in case framework omitted it.
           message.slaveId = this->id;
+          VLOG(1) << "Sending framework message to framework " << fid
+                  << " with PID " << framework->pid;
           send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
         }
         break;

Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1132107&r1=1132106&r2=1132107&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun  5 08:42:23 2011
@@ -784,68 +784,109 @@ TEST(MasterTest, SchedulerFailoverStatus
   ProcessClock::resume();
 }
 
-class FailoverTaskRunningScheduler : public TaskRunningScheduler
+
+// An executor used in the framework message test that just sends a reply
+// to each message received and logs the last message.
+class FrameworkMessageExecutor : public Executor
 {
 public:
-  TaskRunningScheduler *failover;
-  const PID master;
-  MesosSchedulerDriver *driver;
-
-  FailoverTaskRunningScheduler(TaskRunningScheduler *_failover,
-			       const PID &_master)
-    : failover(_failover), master(_master), driver(NULL) {}
-
-  virtual ~FailoverTaskRunningScheduler() {
-    if (driver != NULL) {
-      driver->join();
-      delete driver;
-      driver = NULL;
-    }
+  bool messageReceived;
+  string messageData;
+  SlaveID mySlaveId;
+
+  FrameworkMessageExecutor(): messageReceived(false) {}
+
+  virtual ~FrameworkMessageExecutor() {}
+
+  virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
+    mySlaveId = args.slaveId;
   }
 
-  virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
-    EXPECT_EQ(TASK_RUNNING, status.state);
-    statusUpdateCalled = true;
-    driver = new MesosSchedulerDriver(failover, master, fid);
-    driver->start();
+  virtual void frameworkMessage(ExecutorDriver* d, const FrameworkMessage& m) {
+    LOG(INFO) << "FrameworkMessageExecutor got a message";
+    messageReceived = true;
+    messageData = m.data;
+    // Send a message back to the scheduler, which will cause it to exit
+    FrameworkMessage reply(mySlaveId, 0, "reply");
+    d->sendFrameworkMessage(reply);
+    LOG(INFO) << "Sent the reply back";
   }
 };
 
 
-class SchedulerFailoverFrameworkMessageScheduler : public TaskRunningScheduler
+// A scheduler used in the framework message test that launches a task, waits
+// for it to start, sends it a framework message, and waits for a reply.
+class FrameworkMessageScheduler : public Scheduler
 {
 public:
-  bool frameworkMessageCalled;
+  FrameworkID fid;
+  string errorMessage;
+  bool messageReceived;
+  string messageData;
+  SlaveID slaveIdOfTask;
 
-  SchedulerFailoverFrameworkMessageScheduler()
-    : frameworkMessageCalled(false) {}
+  FrameworkMessageScheduler() {}
 
-  virtual void frameworkMessage(SchedulerDriver* d,
-				const FrameworkMessage& message) {
-    frameworkMessageCalled = true;
-    d->stop();
+  virtual ~FrameworkMessageScheduler() {}
+
+  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+    return ExecutorInfo("noexecutor", "");
   }
-};
 
-class SchedulerFailoverFrameworkMessageExecutor : public Executor
-{
-public:
-  ExecutorDriver *driver;
+  virtual void registered(SchedulerDriver*, FrameworkID fid) {
+    LOG(INFO) << "FrameworkMessageScheduler registered";
+    this->fid = fid;
+  }
 
-  virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
-    driver = d;
+  virtual void resourceOffer(SchedulerDriver* d,
+                             OfferID id,
+                             const vector<SlaveOffer>& offers) {
+    LOG(INFO) << "FrameworkMessageScheduler got a slot offer";
+    vector<TaskDescription> tasks;
+    ASSERT_TRUE(offers.size() == 1);
+    const SlaveOffer &offer = offers[0];
+    TaskDescription desc(0, offer.slaveId, "", offer.params, "");
+    tasks.push_back(desc);
+    slaveIdOfTask = offer.slaveId;
+    d->replyToOffer(id, tasks, map<string, string>());
+  }
+
+
+  virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
+    EXPECT_EQ(TASK_RUNNING, status.state);
+    LOG(INFO) << "Task is running; sending it a framework message";
+    FrameworkMessage message(slaveIdOfTask, 0, "hello");
+    d->sendFrameworkMessage(message);
+  }
+
+
+  virtual void frameworkMessage(SchedulerDriver* d, const FrameworkMessage& m) {
+    LOG(INFO) << "FrameworkMessageScheduler got a message";
+    messageReceived = true;
+    messageData = m.data;
+    // Stop our driver because the test is complete
+    d->stop();
+  }
+
+  virtual void error(SchedulerDriver* d,
+                     int code,
+                     const std::string& message) {
+    errorMessage = message;
+    d->stop();
   }
 };
 
 
-TEST(MasterTest, SchedulerFailoverFrameworkMessage)
+// Tests that framework messages are sent correctly both in both the
+// scheduler->executor direction and the executor->scheduler direction.
+TEST(MasterTest, FrameworkMessages)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
   Master m;
   PID master = Process::spawn(&m);
 
-  SchedulerFailoverFrameworkMessageExecutor exec;
+  FrameworkMessageExecutor exec;
   LocalIsolationModule isolationModule(&exec);
 
   Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
@@ -853,20 +894,16 @@ TEST(MasterTest, SchedulerFailoverFramew
 
   BasicMasterDetector detector(master, slave, true);
 
-  SchedulerFailoverFrameworkMessageScheduler failoverSched;
-  FailoverTaskRunningScheduler failingSched(&failoverSched, master);
-
-  MesosSchedulerDriver driver(&failingSched, master);
+  FrameworkMessageScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
 
   driver.run();
 
-  EXPECT_EQ("Framework failover", failingSched.errorMessage);
-
-  exec.driver->sendFrameworkMessage(FrameworkMessage());
-
-  failingSched.driver->join();
-
-  EXPECT_TRUE(failoverSched.frameworkMessageCalled);
+  EXPECT_EQ("", sched.errorMessage);
+  EXPECT_TRUE(exec.messageReceived);
+  EXPECT_EQ("hello", exec.messageData);
+  EXPECT_TRUE(sched.messageReceived);
+  EXPECT_EQ("reply", sched.messageData);
 
   MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
   Process::wait(slave);