You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:42:23 UTC
svn commit: r1132107 - in /incubator/mesos/trunk/src: exec/exec.cpp
slave/slave.cpp tests/test_master.cpp
Author: benh
Date: Sun Jun 5 08:42:23 2011
New Revision: 1132107
URL: http://svn.apache.org/viewvc?rev=1132107&view=rev
Log:
Added a test case for framework messages.
Modified:
incubator/mesos/trunk/src/exec/exec.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/tests/test_master.cpp
Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132107&r1=1132106&r2=1132107&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun 5 08:42:23 2011
@@ -122,11 +122,11 @@ protected:
// TODO: Pass an argument to shutdown to tell it this is abnormal?
invoke(bind(&Executor::shutdown, executor, driver));
- // This is a pretty bad state ... no slave is left. Rather
- // than exit lets kill our process group (which includes
- // ourself) hoping to clean up any processes this executor
- // launched itself.
- // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
+ // This is a pretty bad state ... no slave is left. Rather
+ // than exit lets kill our process group (which includes
+ // ourself) hoping to clean up any processes this executor
+ // launched itself.
+ // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
if (!local)
killpg(0, SIGKILL);
else
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132107&r1=1132106&r2=1132107&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 08:42:23 2011
@@ -289,7 +289,11 @@ void Slave::operator () ()
FrameworkMessage message;
tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
if (Executor *ex = getExecutor(fid)) {
+ VLOG(1) << "Relaying framework message for framework " << fid;
send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+ } else {
+ VLOG(1) << "Dropping framework message for framework " << fid
+ << " because its executor is not running";
}
// TODO(*): If executor is not started, queue framework message?
// (It's probably okay to just drop it since frameworks can have
@@ -380,6 +384,8 @@ void Slave::operator () ()
// Set slave ID in case framework omitted it.
message.slaveId = this->id;
+ VLOG(1) << "Sending framework message to framework " << fid
+ << " with PID " << framework->pid;
send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
}
break;
Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1132107&r1=1132106&r2=1132107&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun 5 08:42:23 2011
@@ -784,68 +784,109 @@ TEST(MasterTest, SchedulerFailoverStatus
ProcessClock::resume();
}
-class FailoverTaskRunningScheduler : public TaskRunningScheduler
+
+// An executor used in the framework message test that just sends a reply
+// to each message received and logs the last message.
+class FrameworkMessageExecutor : public Executor
{
public:
- TaskRunningScheduler *failover;
- const PID master;
- MesosSchedulerDriver *driver;
-
- FailoverTaskRunningScheduler(TaskRunningScheduler *_failover,
- const PID &_master)
- : failover(_failover), master(_master), driver(NULL) {}
-
- virtual ~FailoverTaskRunningScheduler() {
- if (driver != NULL) {
- driver->join();
- delete driver;
- driver = NULL;
- }
+ bool messageReceived;
+ string messageData;
+ SlaveID mySlaveId;
+
+ FrameworkMessageExecutor(): messageReceived(false) {}
+
+ virtual ~FrameworkMessageExecutor() {}
+
+ virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
+ mySlaveId = args.slaveId;
}
- virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
- EXPECT_EQ(TASK_RUNNING, status.state);
- statusUpdateCalled = true;
- driver = new MesosSchedulerDriver(failover, master, fid);
- driver->start();
+ virtual void frameworkMessage(ExecutorDriver* d, const FrameworkMessage& m) {
+ LOG(INFO) << "FrameworkMessageExecutor got a message";
+ messageReceived = true;
+ messageData = m.data;
+ // Send a message back to the scheduler, which will cause it to exit
+ FrameworkMessage reply(mySlaveId, 0, "reply");
+ d->sendFrameworkMessage(reply);
+ LOG(INFO) << "Sent the reply back";
}
};
-class SchedulerFailoverFrameworkMessageScheduler : public TaskRunningScheduler
+// A scheduler used in the framework message test that launches a task, waits
+// for it to start, sends it a framework message, and waits for a reply.
+class FrameworkMessageScheduler : public Scheduler
{
public:
- bool frameworkMessageCalled;
+ FrameworkID fid;
+ string errorMessage;
+ bool messageReceived;
+ string messageData;
+ SlaveID slaveIdOfTask;
- SchedulerFailoverFrameworkMessageScheduler()
- : frameworkMessageCalled(false) {}
+ FrameworkMessageScheduler() {}
- virtual void frameworkMessage(SchedulerDriver* d,
- const FrameworkMessage& message) {
- frameworkMessageCalled = true;
- d->stop();
+ virtual ~FrameworkMessageScheduler() {}
+
+ virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+ return ExecutorInfo("noexecutor", "");
}
-};
-class SchedulerFailoverFrameworkMessageExecutor : public Executor
-{
-public:
- ExecutorDriver *driver;
+ virtual void registered(SchedulerDriver*, FrameworkID fid) {
+ LOG(INFO) << "FrameworkMessageScheduler registered";
+ this->fid = fid;
+ }
- virtual void init(ExecutorDriver* d, const ExecutorArgs& args) {
- driver = d;
+ virtual void resourceOffer(SchedulerDriver* d,
+ OfferID id,
+ const vector<SlaveOffer>& offers) {
+ LOG(INFO) << "FrameworkMessageScheduler got a slot offer";
+ vector<TaskDescription> tasks;
+ ASSERT_TRUE(offers.size() == 1);
+ const SlaveOffer &offer = offers[0];
+ TaskDescription desc(0, offer.slaveId, "", offer.params, "");
+ tasks.push_back(desc);
+ slaveIdOfTask = offer.slaveId;
+ d->replyToOffer(id, tasks, map<string, string>());
+ }
+
+
+ virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
+ EXPECT_EQ(TASK_RUNNING, status.state);
+ LOG(INFO) << "Task is running; sending it a framework message";
+ FrameworkMessage message(slaveIdOfTask, 0, "hello");
+ d->sendFrameworkMessage(message);
+ }
+
+
+ virtual void frameworkMessage(SchedulerDriver* d, const FrameworkMessage& m) {
+ LOG(INFO) << "FrameworkMessageScheduler got a message";
+ messageReceived = true;
+ messageData = m.data;
+ // Stop our driver because the test is complete
+ d->stop();
+ }
+
+ virtual void error(SchedulerDriver* d,
+ int code,
+ const std::string& message) {
+ errorMessage = message;
+ d->stop();
}
};
-TEST(MasterTest, SchedulerFailoverFrameworkMessage)
+// Tests that framework messages are sent correctly both in both the
+// scheduler->executor direction and the executor->scheduler direction.
+TEST(MasterTest, FrameworkMessages)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
PID master = Process::spawn(&m);
- SchedulerFailoverFrameworkMessageExecutor exec;
+ FrameworkMessageExecutor exec;
LocalIsolationModule isolationModule(&exec);
Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
@@ -853,20 +894,16 @@ TEST(MasterTest, SchedulerFailoverFramew
BasicMasterDetector detector(master, slave, true);
- SchedulerFailoverFrameworkMessageScheduler failoverSched;
- FailoverTaskRunningScheduler failingSched(&failoverSched, master);
-
- MesosSchedulerDriver driver(&failingSched, master);
+ FrameworkMessageScheduler sched;
+ MesosSchedulerDriver driver(&sched, master);
driver.run();
- EXPECT_EQ("Framework failover", failingSched.errorMessage);
-
- exec.driver->sendFrameworkMessage(FrameworkMessage());
-
- failingSched.driver->join();
-
- EXPECT_TRUE(failoverSched.frameworkMessageCalled);
+ EXPECT_EQ("", sched.errorMessage);
+ EXPECT_TRUE(exec.messageReceived);
+ EXPECT_EQ("hello", exec.messageData);
+ EXPECT_TRUE(sched.messageReceived);
+ EXPECT_EQ("reply", sched.messageData);
MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
Process::wait(slave);