You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:18:22 UTC
svn commit: r1132277 - in /incubator/mesos/trunk/src: detector/ exec/
master/ messaging/ sched/ slave/ tests/
Author: benh
Date: Sun Jun 5 09:18:22 2011
New Revision: 1132277
URL: http://svn.apache.org/viewvc?rev=1132277&view=rev
Log:
Finished changes to accomodate latest libprocess updates, as well as eliminated switch style receive loops in Slave and ExecutorProcess.
Modified:
incubator/mesos/trunk/src/detector/detector.cpp
incubator/mesos/trunk/src/exec/exec.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/messaging/messages.cpp
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/master_test.cpp
Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun 5 09:18:22 2011
@@ -46,7 +46,7 @@ protected:
if (name() == process::TIMEOUT) {
LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
<< "(automagically) reconnect";
- MesosProcess<class T>::post(pid, MASTER_DETECTION_FAILURE);
+ process::post(pid, MASTER_DETECTION_FAILURE);
}
}
@@ -475,7 +475,7 @@ void ZooKeeperMasterDetector::detectMast
// No master present (lost or possibly hasn't come up yet).
if (masterSeq.empty()) {
- MesosProcess<class T>::post(pid, NO_MASTER_DETECTED);
+ process::post(pid, NO_MASTER_DETECTED);
} else if (masterSeq != currentMasterSeq) {
currentMasterSeq = masterSeq;
currentMasterPID = lookupMasterPID(masterSeq);
@@ -483,7 +483,7 @@ void ZooKeeperMasterDetector::detectMast
// While trying to get the master PID, master might have crashed,
// so PID might be empty.
if (currentMasterPID == UPID()) {
- MesosProcess<class T>::post(pid, NO_MASTER_DETECTED);
+ process::post(pid, NO_MASTER_DETECTED);
} else {
MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(currentMasterPID);
Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun 5 09:18:22 2011
@@ -41,9 +41,26 @@ public:
const ExecutorID& _executorId, bool _local)
: slave(_slave), driver(_driver), executor(_executor),
frameworkId(_frameworkId), executorId(_executorId),
- local(_local), terminate(false) {}
+ local(_local), terminate(false)
+ {
+ install(S2E_REGISTER_REPLY, &ExecutorProcess::registerReply,
+ &ExecutorRegisteredMessage::args);
+
+ install(S2E_RUN_TASK, &ExecutorProcess::runTask,
+ &RunTaskMessage::task);
+
+ install(S2E_KILL_TASK, &ExecutorProcess::killTask,
+ &KillTaskMessage::task_id);
+
+ install(S2E_FRAMEWORK_MESSAGE, &ExecutorProcess::frameworkMessage,
+ &FrameworkMessageMessage::message);
+
+ install(S2E_KILL_EXECUTOR, &ExecutorProcess::killExecutor);
- ~ExecutorProcess() {}
+ install(process::EXITED, &ExecutorProcess::exited);
+ }
+
+ virtual ~ExecutorProcess() {}
protected:
virtual void operator () ()
@@ -59,108 +76,76 @@ protected:
send(slave, out);
while(true) {
- // TODO(benh): Is there a better way to architect this code? In
- // particular, if the executor blocks in a callback, we can't
- // process any other messages. This is especially tricky if a
- // slave dies since we won't handle the PROCESS_EXIT message in
- // a timely manner (if at all).
-
// Check for terminate in the same way as SchedulerProcess. See
// comments there for an explanation of why this is necessary.
- if (terminate)
- return;
+ if (terminate) return;
- switch (receive(2)) {
- case S2E_REGISTER_REPLY: {
- const MSG<S2E_REGISTER_REPLY>& msg = message();
-
- slaveId = msg.args().slave_id();
-
- VLOG(1) << "Executor registered on slave " << slaveId;
-
- process::invoke(bind(&Executor::init, executor, driver,
- cref(msg.args())));
- break;
- }
-
- case S2E_RUN_TASK: {
- const MSG<S2E_RUN_TASK>& msg = message();
-
- const TaskDescription& task = msg.task();
-
- VLOG(1) << "Executor asked to run a task " << task.task_id();
-
- MSG<E2S_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus* status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(task.task_id());
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_RUNNING);
- send(slave, out);
-
- process::invoke(bind(&Executor::launchTask, executor, driver,
- cref(task)));
- break;
- }
-
- case S2E_KILL_TASK: {
- const MSG<S2E_KILL_TASK>& msg = message();
-
- VLOG(1) << "Executor asked to kill task " << msg.task_id();
-
- process::invoke(bind(&Executor::killTask, executor, driver,
- cref(msg.task_id())));
- break;
- }
-
- case S2E_FRAMEWORK_MESSAGE: {
- const MSG<S2E_FRAMEWORK_MESSAGE>& msg = message();
-
- VLOG(1) << "Executor passed message";
-
- const FrameworkMessage& message = msg.message();
- process::invoke(bind(&Executor::frameworkMessage, executor, driver,
- cref(message)));
- break;
- }
-
- case S2E_KILL_EXECUTOR: {
- VLOG(1) << "Executor asked to shutdown";
- process::invoke(bind(&Executor::shutdown, executor, driver));
- if (!local)
- exit(0);
- else
- return;
- }
-
- case PROCESS_EXIT: {
- VLOG(1) << "Slave exited, trying to shutdown";
-
- // TODO: Pass an argument to shutdown to tell it this is abnormal?
- process::invoke(bind(&Executor::shutdown, executor, driver));
-
- // This is a pretty bad state ... no slave is left. Rather
- // than exit lets kill our process group (which includes
- // ourself) hoping to clean up any processes this executor
- // launched itself.
- // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
- if (!local)
- killpg(0, SIGKILL);
- else
- return;
- }
-
- case PROCESS_TIMEOUT: {
- break;
- }
-
- default: {
- VLOG(1) << "Received unknown message ID " << msgid()
- << " from " << from();
- // TODO: Is this serious enough to exit?
- break;
- }
- }
+ serve(0, true);
+ }
+ }
+
+ void registerReply(const ExecutorArgs& args)
+ {
+ VLOG(1) << "Executor registered on slave " << args.slave_id();
+ slaveId = args.slave_id();
+ process::invoke(bind(&Executor::init, executor, driver, cref(args)));
+ }
+
+ void runTask(const TaskDescription& task)
+ {
+ VLOG(1) << "Executor asked to run a task " << task.task_id();
+
+ MSG<E2S_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(task.task_id());
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_RUNNING);
+ send(slave, out);
+
+ process::invoke(bind(&Executor::launchTask, executor, driver, cref(task)));
+ }
+
+ void killTask(const TaskID& taskId)
+ {
+ VLOG(1) << "Executor asked to kill task " << taskId;
+ process::invoke(bind(&Executor::killTask, executor, driver, cref(taskId)));
+ }
+
+ void frameworkMessage(const FrameworkMessage& message)
+ {
+ VLOG(1) << "Executor received message";
+ process::invoke(bind(&Executor::frameworkMessage, executor, driver,
+ cref(message)));
+ }
+
+ void killExecutor()
+ {
+ VLOG(1) << "Executor asked to shutdown";
+ process::invoke(bind(&Executor::shutdown, executor, driver));
+ if (!local) {
+ exit(0);
+ } else {
+ return;
+ }
+ }
+
+ void exited()
+ {
+ VLOG(1) << "Slave exited, trying to shutdown";
+
+ // TODO: Pass an argument to shutdown to tell it this is abnormal?
+ process::invoke(bind(&Executor::shutdown, executor, driver));
+
+ // This is a pretty bad state ... no slave is left. Rather
+ // than exit lets kill our process group (which includes
+ // ourself) hoping to clean up any processes this executor
+ // launched itself.
+ // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
+ if (!local) {
+ killpg(0, SIGKILL);
+ } else {
+ terminate = true;
}
}
@@ -206,6 +191,8 @@ MesosExecutorDriver::MesosExecutorDriver
MesosExecutorDriver::~MesosExecutorDriver()
{
+ // Just as in SchedulerProcess, we might wait here indefinitely if
+ // MesosExecutorDriver::stop has not been invoked.
process::wait(process->self());
delete process;
@@ -295,7 +282,10 @@ int MesosExecutorDriver::stop()
return -1;
}
+ CHECK(process != NULL);
+
process->terminate = true;
+ process::post(process->self(), process::TERMINATE);
running = false;
@@ -308,8 +298,10 @@ int MesosExecutorDriver::stop()
int MesosExecutorDriver::join()
{
Lock lock(&mutex);
- while (running)
+
+ while (running) {
pthread_cond_wait(&cond, &mutex);
+ }
return 0;
}
@@ -331,6 +323,8 @@ int MesosExecutorDriver::sendStatusUpdat
return -1;
}
+ CHECK(process != NULL);
+
// Validate that they set the correct slave ID.
if (!(process->slaveId == status.slave_id())) {
return -1;
@@ -355,6 +349,8 @@ int MesosExecutorDriver::sendFrameworkMe
return -1;
}
+ CHECK(process != NULL);
+
// Validate that they set the correct slave ID and executor ID.
if (!(process->slaveId == message.slave_id())) {
return -1;
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 09:18:22 2011
@@ -55,7 +55,7 @@ protected:
receive(1);
if (name() == process::TIMEOUT) {
dispatch(master, &Master::timerTick);
- } else if (name() == process::EXIT) {
+ } else if (name() == process::EXITED) {
return;
}
}
@@ -323,7 +323,8 @@ void Master::operator () ()
<< "we haven't received an identifier yet!";
}
- const MSG<GOT_MASTER_TOKEN>& msg = message();
+ MSG<GOT_MASTER_TOKEN> msg;
+ msg.ParseFromString(body());
// The master ID is comprised of the current date and some ephemeral
// token (e.g., determined by ZooKeeper).
@@ -342,15 +343,15 @@ void Master::operator () ()
while (true) {
serve();
- if (msgid() == PROCESS_TERMINATE) {
+ if (name() == process::TERMINATE) {
LOG(INFO) << "Asked to terminate by " << from();
foreachpair (_, Slave* slave, slaves) {
send(slave->pid, process::TERMINATE);
}
break;
} else {
- LOG(INFO) << "Unhandled message " << name()
- << " from " << from();
+ LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+ << " from: " << from();
}
}
}
@@ -427,7 +428,7 @@ void Master::initialize()
install(SH2M_HEARTBEAT, &Master::slaveHeartbeat,
&HeartbeatMessage::slave_id);
- install(PROCESS_EXIT, &Master::processExited);
+ install(process::EXITED, &Master::exited);
// Install HTTP request handlers.
Process<Master>::install("vars", &Master::vars);
@@ -983,7 +984,7 @@ void Master::frameworkExpired(const Fram
}
-void Master::processExited()
+void Master::exited()
{
// TODO(benh): Could we get PROCESS_EXIT from a network partition?
LOG(INFO) << "Process exited: " << from();
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun 5 09:18:22 2011
@@ -162,7 +162,7 @@ public:
void slaveHeartbeat(const SlaveID& slaveId);
void timerTick();
void frameworkExpired(const FrameworkID& frameworkId);
- void processExited();
+ void exited();
process::Promise<process::HttpResponse> vars(const process::HttpRequest& request);
@@ -263,7 +263,7 @@ protected:
if (name() == process::TIMEOUT) {
process::dispatch(master, &Master::frameworkExpired, frameworkId);
return;
- } else if (name() == process::EXIT || name() == process::TERMINATE) {
+ } else if (name() == process::EXITED || name() == process::TERMINATE) {
return;
}
}
Modified: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun 5 09:18:22 2011
@@ -1,99 +1,76 @@
#include "messaging/messages.hpp"
+#define DEFINE_MESSAGE(name) \
+ char name[] = #name
namespace mesos { namespace internal {
-boost::unordered_map<std::string, MSGID> ids;
-boost::unordered_map<MSGID, std::string> names;
-
-
-static struct Initialization
-{
- Initialization()
- {
- ids[process::EXIT] = PROCESS_EXIT;
- names[PROCESS_EXIT] = process::EXIT;
-
- ids[process::TIMEOUT] = PROCESS_TIMEOUT;
- names[PROCESS_TIMEOUT] = process::TIMEOUT;
-
- ids[process::TERMINATE] = PROCESS_TERMINATE;
- names[PROCESS_TERMINATE] = process::TERMINATE;
- }
-} __initialization__;
-
-
-struct InitializeMessage
-{
- InitializeMessage(const std::string& name, MSGID id)
- {
- ids[name] = id;
- names[id] = name;
- }
-};
-
-
-#define INITIALIZE_MESSAGE(ID) \
- static InitializeMessage __ ## ID(#ID, ID)
-
-
-INITIALIZE_MESSAGE(F2M_REGISTER_FRAMEWORK);
-INITIALIZE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
-INITIALIZE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
-INITIALIZE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
-INITIALIZE_MESSAGE(F2M_REVIVE_OFFERS);
-INITIALIZE_MESSAGE(F2M_KILL_TASK);
-INITIALIZE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(F2M_STATUS_UPDATE_ACK);
-
-INITIALIZE_MESSAGE(M2F_REGISTER_REPLY);
-INITIALIZE_MESSAGE(M2F_RESOURCE_OFFER);
-INITIALIZE_MESSAGE(M2F_RESCIND_OFFER);
-INITIALIZE_MESSAGE(M2F_STATUS_UPDATE);
-INITIALIZE_MESSAGE(M2F_LOST_SLAVE);
-INITIALIZE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(M2F_ERROR);
-
-INITIALIZE_MESSAGE(S2M_REGISTER_SLAVE);
-INITIALIZE_MESSAGE(S2M_REREGISTER_SLAVE);
-INITIALIZE_MESSAGE(S2M_UNREGISTER_SLAVE);
-INITIALIZE_MESSAGE(S2M_STATUS_UPDATE);
-INITIALIZE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(S2M_EXITED_EXECUTOR);
-
-INITIALIZE_MESSAGE(SH2M_HEARTBEAT);
-
-INITIALIZE_MESSAGE(M2S_REGISTER_REPLY);
-INITIALIZE_MESSAGE(M2S_REREGISTER_REPLY);
-INITIALIZE_MESSAGE(M2S_RUN_TASK);
-INITIALIZE_MESSAGE(M2S_KILL_TASK);
-INITIALIZE_MESSAGE(M2S_KILL_FRAMEWORK);
-INITIALIZE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(M2S_UPDATE_FRAMEWORK);
-INITIALIZE_MESSAGE(M2S_STATUS_UPDATE_ACK);
-
-INITIALIZE_MESSAGE(E2S_REGISTER_EXECUTOR);
-INITIALIZE_MESSAGE(E2S_STATUS_UPDATE);
-INITIALIZE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
-
-INITIALIZE_MESSAGE(S2E_REGISTER_REPLY);
-INITIALIZE_MESSAGE(S2E_RUN_TASK);
-INITIALIZE_MESSAGE(S2E_KILL_TASK);
-INITIALIZE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(S2E_KILL_EXECUTOR);
+// From framework to master.
+DEFINE_MESSAGE(F2M_REGISTER_FRAMEWORK);
+DEFINE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
+DEFINE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
+DEFINE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
+DEFINE_MESSAGE(F2M_REVIVE_OFFERS);
+DEFINE_MESSAGE(F2M_KILL_TASK);
+DEFINE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(F2M_STATUS_UPDATE_ACK);
+
+// From master to framework.
+DEFINE_MESSAGE(M2F_REGISTER_REPLY);
+DEFINE_MESSAGE(M2F_RESOURCE_OFFER);
+DEFINE_MESSAGE(M2F_RESCIND_OFFER);
+DEFINE_MESSAGE(M2F_STATUS_UPDATE);
+DEFINE_MESSAGE(M2F_LOST_SLAVE);
+DEFINE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(M2F_ERROR);
+
+// From slave to master.
+DEFINE_MESSAGE(S2M_REGISTER_SLAVE);
+DEFINE_MESSAGE(S2M_REREGISTER_SLAVE);
+DEFINE_MESSAGE(S2M_UNREGISTER_SLAVE);
+DEFINE_MESSAGE(S2M_STATUS_UPDATE);
+DEFINE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(S2M_EXITED_EXECUTOR);
+
+// From slave heart to master.
+DEFINE_MESSAGE(SH2M_HEARTBEAT);
+
+// From master to slave.
+DEFINE_MESSAGE(M2S_REGISTER_REPLY);
+DEFINE_MESSAGE(M2S_REREGISTER_REPLY);
+DEFINE_MESSAGE(M2S_RUN_TASK);
+DEFINE_MESSAGE(M2S_KILL_TASK);
+DEFINE_MESSAGE(M2S_KILL_FRAMEWORK);
+DEFINE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(M2S_UPDATE_FRAMEWORK);
+DEFINE_MESSAGE(M2S_STATUS_UPDATE_ACK);
+
+// From executor to slave.
+DEFINE_MESSAGE(E2S_REGISTER_EXECUTOR);
+DEFINE_MESSAGE(E2S_STATUS_UPDATE);
+DEFINE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
+
+// From slave to executor.
+DEFINE_MESSAGE(S2E_REGISTER_REPLY);
+DEFINE_MESSAGE(S2E_RUN_TASK);
+DEFINE_MESSAGE(S2E_KILL_TASK);
+DEFINE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(S2E_KILL_EXECUTOR);
#ifdef __sun__
-INITIALIZE_MESSAGE(PD2S_REGISTER_PROJD);
-INITIALIZE_MESSAGE(PD2S_PROJD_READY);
-INITIALIZE_MESSAGE(S2PD_UPDATE_RESOURCES);
-INITIALIZE_MESSAGE(S2PD_KILL_ALL);
+// From projd to slave.
+DEFINE_MESSAGE(PD2S_REGISTER_PROJD);
+DEFINE_MESSAGE(PD2S_PROJD_READY);
+
+// From slave to projd.
+DEFINE_MESSAGE(S2PD_UPDATE_RESOURCES);
+DEFINE_MESSAGE(S2PD_KILL_ALL);
#endif // __sun__
-INITIALIZE_MESSAGE(GOT_MASTER_TOKEN);
-INITIALIZE_MESSAGE(NEW_MASTER_DETECTED);
-INITIALIZE_MESSAGE(NO_MASTER_DETECTED);
-INITIALIZE_MESSAGE(MASTER_DETECTION_FAILURE);
-
-INITIALIZE_MESSAGE(MESOS_MSGID);
+// From master detector to processes.
+DEFINE_MESSAGE(GOT_MASTER_TOKEN);
+DEFINE_MESSAGE(NEW_MASTER_DETECTED);
+DEFINE_MESSAGE(NO_MASTER_DETECTED);
+DEFINE_MESSAGE(MASTER_DETECTION_FAILURE);
}} // namespace mesos { namespace internal {
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 09:18:22 2011
@@ -15,124 +15,106 @@
#include <boost/unordered_map.hpp>
+#include "common/utils.hpp"
+
#include "messaging/messages.pb.h"
namespace mesos { namespace internal {
-enum MSGID {
- // Artifacts from libprocess.
- PROCESS_TIMEOUT,
- PROCESS_EXIT,
- PROCESS_TERMINATE,
-
- // From framework to master.
- F2M_REGISTER_FRAMEWORK,
- F2M_REREGISTER_FRAMEWORK,
- F2M_UNREGISTER_FRAMEWORK,
- F2M_RESOURCE_OFFER_REPLY,
- F2M_REVIVE_OFFERS,
- F2M_KILL_TASK,
- F2M_FRAMEWORK_MESSAGE,
- F2M_STATUS_UPDATE_ACK,
-
- // From master to framework.
- M2F_REGISTER_REPLY,
- M2F_RESOURCE_OFFER,
- M2F_RESCIND_OFFER,
- M2F_STATUS_UPDATE,
- M2F_LOST_SLAVE,
- M2F_FRAMEWORK_MESSAGE,
- M2F_ERROR,
-
- // From slave to master.
- S2M_REGISTER_SLAVE,
- S2M_REREGISTER_SLAVE,
- S2M_UNREGISTER_SLAVE,
- S2M_STATUS_UPDATE,
- S2M_FRAMEWORK_MESSAGE,
- S2M_EXITED_EXECUTOR,
-
- // From slave heart to master.
- SH2M_HEARTBEAT,
-
- // From master to slave.
- M2S_REGISTER_REPLY,
- M2S_REREGISTER_REPLY,
- M2S_RUN_TASK,
- M2S_KILL_TASK,
- M2S_KILL_FRAMEWORK,
- M2S_FRAMEWORK_MESSAGE,
- M2S_UPDATE_FRAMEWORK,
- M2S_STATUS_UPDATE_ACK,
-
- // From executor to slave.
- E2S_REGISTER_EXECUTOR,
- E2S_STATUS_UPDATE,
- E2S_FRAMEWORK_MESSAGE,
-
- // From slave to executor.
- S2E_REGISTER_REPLY,
- S2E_RUN_TASK,
- S2E_KILL_TASK,
- S2E_FRAMEWORK_MESSAGE,
- S2E_KILL_EXECUTOR,
-
-#ifdef __sun__
- // From projd to slave.
- PD2S_REGISTER_PROJD,
- PD2S_PROJECT_READY,
-
- // From slave to projd.
- S2PD_UPDATE_RESOURCES,
- S2PD_KILL_ALL,
-#endif // __sun__
-
- // From master detector to processes.
- GOT_MASTER_TOKEN,
- NEW_MASTER_DETECTED,
- NO_MASTER_DETECTED,
- MASTER_DETECTION_FAILURE,
-
- MESOS_MSGID
-};
-
-
-// To couple a MSGID with a protocol buffer we use a templated class
-// that extends the necessary protocol buffer type (this also allows
-// the code to be better isolated from protocol buffer naming). While
-// protocol buffers are allegedly not meant to be inherited, we
-// decided this was an acceptable option since we don't add any new
-// functionality (or do any thing with the existing functionality).
+// To couple a message name with a protocol buffer we use a templated
+// class that extends the necessary protocol buffer type (this also
+// allows the code to be better isolated from protocol buffer
+// naming). While protocol buffers are allegedly not meant to be
+// inherited, we decided this was an acceptable option since we don't
+// add any new functionality (or do any thing with the existing
+// functionality).
//
// To add another message that uses a protocol buffer you need to
// provide a specialization of the Message class (i.e., using the
// MESSAGE macro defined below).
-template <MSGID ID>
+template <const char* name>
class MSG;
-#define MESSAGE(ID, T) \
- template <> \
- class MSG<ID> : public T {}
+#define MESSAGE1(name) \
+ extern char name[]
+#define MESSAGE2(name, T) \
+ extern char name[]; \
+ template <> \
+ class MSG<name> : public T {}
+
+#define MESSAGE(...) \
+ CONCAT(MESSAGE, VA_NUM_ARGS(__VA_ARGS__))(__VA_ARGS__)
-class AnyMessage
-{
-public:
- AnyMessage(const std::string& data_)
- : data(data_) {}
- template <MSGID ID>
- operator MSG<ID> () const
- {
- MSG<ID> msg;
- msg.ParseFromString(data);
- return msg;
- }
+// From framework to master.
+MESSAGE(F2M_REGISTER_FRAMEWORK, RegisterFrameworkMessage);
+MESSAGE(F2M_REREGISTER_FRAMEWORK, ReregisterFrameworkMessage);
+MESSAGE(F2M_UNREGISTER_FRAMEWORK, UnregisterFrameworkMessage);
+MESSAGE(F2M_RESOURCE_OFFER_REPLY, ResourceOfferReplyMessage);
+MESSAGE(F2M_REVIVE_OFFERS, ReviveOffersMessage);
+MESSAGE(F2M_KILL_TASK, KillTaskMessage);
+MESSAGE(F2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(F2M_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-private:
- std::string data;
-};
+// From master to framework.
+MESSAGE(M2F_REGISTER_REPLY, FrameworkRegisteredMessage);
+MESSAGE(M2F_RESOURCE_OFFER, ResourceOfferMessage);
+MESSAGE(M2F_RESCIND_OFFER, RescindResourceOfferMessage);
+MESSAGE(M2F_STATUS_UPDATE, StatusUpdateMessage);
+MESSAGE(M2F_LOST_SLAVE, LostSlaveMessage);
+MESSAGE(M2F_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(M2F_ERROR, FrameworkErrorMessage);
+
+// From slave to master.
+MESSAGE(S2M_REGISTER_SLAVE, RegisterSlaveMessage);
+MESSAGE(S2M_REREGISTER_SLAVE, ReregisterSlaveMessage);
+MESSAGE(S2M_UNREGISTER_SLAVE, UnregisterSlaveMessage);
+MESSAGE(S2M_STATUS_UPDATE, StatusUpdateMessage);
+MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
+
+// From slave heart to master.
+MESSAGE(SH2M_HEARTBEAT, HeartbeatMessage);
+
+// From master to slave.
+MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
+MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
+MESSAGE(M2S_RUN_TASK, RunTaskMessage);
+MESSAGE(M2S_KILL_TASK, KillTaskMessage);
+MESSAGE(M2S_KILL_FRAMEWORK, KillFrameworkMessage);
+MESSAGE(M2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(M2S_UPDATE_FRAMEWORK, UpdateFrameworkMessage);
+MESSAGE(M2S_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
+
+// From executor to slave.
+MESSAGE(E2S_REGISTER_EXECUTOR, RegisterExecutorMessage);
+MESSAGE(E2S_STATUS_UPDATE, StatusUpdateMessage);
+MESSAGE(E2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+
+// From slave to executor.
+MESSAGE(S2E_REGISTER_REPLY, ExecutorRegisteredMessage);
+MESSAGE(S2E_RUN_TASK, RunTaskMessage);
+MESSAGE(S2E_KILL_TASK, KillTaskMessage);
+MESSAGE(S2E_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(S2E_KILL_EXECUTOR);
+
+#ifdef __sun__
+// From projd to slave.
+MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
+MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
+
+// From slave to projd.
+MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
+MESSAGE(S2PD_KILL_ALL);
+#endif // __sun__
+
+// From master detector to processes.
+MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
+MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
+MESSAGE(NO_MASTER_DETECTED);
+MESSAGE(MASTER_DETECTION_FAILURE);
// Type conversions helpful for changing between protocol buffer types
@@ -156,11 +138,6 @@ std::vector<T> convert(const google::pro
}
-// Mapping between message names to message ids.
-extern boost::unordered_map<std::string, MSGID> ids;
-extern boost::unordered_map<MSGID, std::string> names;
-
-
template <typename T>
class MesosProcess : public process::Process<T>
{
@@ -170,99 +147,44 @@ public:
virtual ~MesosProcess() {}
- static void post(const process::UPID& to, const std::string& name)
- {
- process::post(to, name);
- }
-
- static void post(const process::UPID& to, MSGID id)
+ template <const char *name>
+ static void post(const process::UPID& to, const MSG<name>& msg)
{
- CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
- process::post(to, names[id]);
- }
-
- template <MSGID ID>
- static void post(const process::UPID& to, const MSG<ID>& msg)
- {
- CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
std::string data;
msg.SerializeToString(&data);
- process::post(to, names[ID], data.data(), data.size());
+ process::post(to, name, data.data(), data.size());
}
protected:
- AnyMessage message() const
- {
- return AnyMessage(process::Process<T>::body());
- }
-
- MSGID msgid() const
- {
- CHECK(ids.count(process::Process<T>::name()) > 0)
- << "Missing MSGID for '" << process::Process<T>::name() << "'";
- return ids[process::Process<T>::name()];
- }
-
void send(const process::UPID& to, const std::string& name)
{
process::Process<T>::send(to, name);
}
- void send(const process::UPID& to, MSGID id)
+ template <const char* name>
+ void send(const process::UPID& to, const MSG<name>& msg)
{
- CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
- process::Process<T>::send(to, names[id]);
- }
-
- template <MSGID ID>
- void send(const process::UPID& to, const MSG<ID>& msg)
- {
- CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
std::string data;
msg.SerializeToString(&data);
- process::Process<T>::send(to, names[ID], data.data(), data.size());
- }
-
- MSGID receive(double secs = 0)
- {
- while (true) {
- process::Process<T>::receive(secs);
- if (ids.count(process::Process<T>::name()) > 0) {
- return ids[process::Process<T>::name()];
- } else {
- LOG(WARNING) << "Dropping unknown message '"
- << process::Process<T>::name() << "'"
- << " from: " << process::Process<T>::from()
- << " to: " << process::Process<T>::self();
- }
- }
+ process::Process<T>::send(to, name, data.data(), data.size());
}
- MSGID serve(double secs = 0)
+ const std::string& serve(double secs = 0, bool once = false)
{
- while (true) {
- process::Process<T>::serve(secs);
- if (ids.count(process::Process<T>::name()) > 0) {
- // Check if this has been bound and invoke the handler.
- if (handlers.count(process::Process<T>::name()) > 0) {
- handlers[process::Process<T>::name()](process::Process<T>::body());
- } else {
- return ids[process::Process<T>::name()];
- }
+ do {
+ process::Process<T>::serve(secs, once);
+ if (handlers.count(process::Process<T>::name()) > 0) {
+ handlers[process::Process<T>::name()](process::Process<T>::body());
} else {
- LOG(WARNING) << "Dropping unknown message '"
- << process::Process<T>::name() << "'"
- << " from: " << process::Process<T>::from()
- << " to: " << process::Process<T>::self();
+ return process::Process<T>::name();
}
- }
+ } while (!once);
}
- void install(MSGID id, void (T::*method)())
+ void install(const std::string& name, void (T::*method)())
{
T* t = static_cast<T*>(this);
- CHECK(names.count(id) > 0);
- handlers[names[id]] =
+ handlers[name] =
std::tr1::bind(&MesosProcess<T>::handler0, t,
method,
std::tr1::placeholders::_1);
@@ -270,12 +192,11 @@ protected:
template <typename PB,
typename P1, typename P1C>
- void install(MSGID id, void (T::*method)(P1C),
+ void install(const std::string& name, void (T::*method)(P1C),
P1 (PB::*param1)() const)
{
T* t = static_cast<T*>(this);
- CHECK(names.count(id) > 0);
- handlers[names[id]] =
+ handlers[name] =
std::tr1::bind(&handler1<PB, P1, P1C>, t,
method, param1,
std::tr1::placeholders::_1);
@@ -284,13 +205,12 @@ protected:
template <typename PB,
typename P1, typename P1C,
typename P2, typename P2C>
- void install(MSGID id, void (T::*method)(P1C, P2C),
+ void install(const std::string& name, void (T::*method)(P1C, P2C),
P1 (PB::*p1)() const,
P2 (PB::*p2)() const)
{
T* t = static_cast<T*>(this);
- CHECK(names.count(id) > 0);
- handlers[names[id]] =
+ handlers[name] =
std::tr1::bind(&handler2<PB, P1, P1C, P2, P2C>, t,
method, p1, p2,
std::tr1::placeholders::_1);
@@ -300,15 +220,14 @@ protected:
typename P1, typename P1C,
typename P2, typename P2C,
typename P3, typename P3C>
- void install(MSGID id,
+ void install(const std::string& name,
void (T::*method)(P1C, P2C, P3C),
P1 (PB::*p1)() const,
P2 (PB::*p2)() const,
P3 (PB::*p3)() const)
{
T* t = static_cast<T*>(this);
- CHECK(names.count(id) > 0);
- handlers[names[id]] =
+ handlers[name] =
std::tr1::bind(&handler3<PB, P1, P1C, P2, P2C, P3, P3C>, t,
method, p1, p2, p3,
std::tr1::placeholders::_1);
@@ -319,7 +238,7 @@ protected:
typename P2, typename P2C,
typename P3, typename P3C,
typename P4, typename P4C>
- void install(MSGID id,
+ void install(const std::string& name,
void (T::*method)(P1C, P2C, P3C, P4C),
P1 (PB::*p1)() const,
P2 (PB::*p2)() const,
@@ -327,8 +246,7 @@ protected:
P4 (PB::*p4)() const)
{
T* t = static_cast<T*>(this);
- CHECK(names.count(id) > 0);
- handlers[names[id]] =
+ handlers[name] =
std::tr1::bind(&handler4<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C>, t,
method, p1, p2, p3, p4,
std::tr1::placeholders::_1);
@@ -340,7 +258,7 @@ protected:
typename P3, typename P3C,
typename P4, typename P4C,
typename P5, typename P5C>
- void install(MSGID id,
+ void install(const std::string& name,
void (T::*method)(P1C, P2C, P3C, P4C, P5C),
P1 (PB::*p1)() const,
P2 (PB::*p2)() const,
@@ -349,8 +267,7 @@ protected:
P5 (PB::*p5)() const)
{
T* t = static_cast<T*>(this);
- CHECK(names.count(id) > 0);
- handlers[names[id]] =
+ handlers[name] =
std::tr1::bind(&handler5<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C, P5, P5C>, t,
method, p1, p2, p3, p4, p5,
std::tr1::placeholders::_1);
@@ -370,7 +287,7 @@ private:
const std::string& data)
{
PB pb;
- pb.ParseFromArray(data.data(), data.size());
+ pb.ParseFromString(data);
if (pb.IsInitialized()) {
(t->*method)(convert((&pb->*p1)()));
} else {
@@ -388,7 +305,7 @@ private:
const std::string& data)
{
PB pb;
- pb.ParseFromArray(data.data(), data.size());
+ pb.ParseFromString(data);
if (pb.IsInitialized()) {
(t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()));
} else {
@@ -408,7 +325,7 @@ private:
const std::string& data)
{
PB pb;
- pb.ParseFromArray(data.data(), data.size());
+ pb.ParseFromString(data);
if (pb.IsInitialized()) {
(t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
convert((&pb->*p3)()));
@@ -431,7 +348,7 @@ private:
const std::string& data)
{
PB pb;
- pb.ParseFromArray(data.data(), data.size());
+ pb.ParseFromString(data);
if (pb.IsInitialized()) {
(t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
convert((&pb->*p3)()), convert((&pb->*p4)()));
@@ -456,7 +373,7 @@ private:
const std::string& data)
{
PB pb;
- pb.ParseFromArray(data.data(), data.size());
+ pb.ParseFromString(data);
if (pb.IsInitialized()) {
(t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
convert((&pb->*p3)()), convert((&pb->*p4)()),
@@ -470,60 +387,6 @@ private:
boost::unordered_map<std::string, std::tr1::function<void (const std::string&)> > handlers;
};
-
-MESSAGE(F2M_REGISTER_FRAMEWORK, RegisterFrameworkMessage);
-MESSAGE(F2M_REREGISTER_FRAMEWORK, ReregisterFrameworkMessage);
-MESSAGE(F2M_UNREGISTER_FRAMEWORK, UnregisterFrameworkMessage);
-MESSAGE(F2M_RESOURCE_OFFER_REPLY, ResourceOfferReplyMessage);
-MESSAGE(F2M_REVIVE_OFFERS, ReviveOffersMessage);
-MESSAGE(F2M_KILL_TASK, KillTaskMessage);
-MESSAGE(F2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(F2M_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-MESSAGE(M2F_REGISTER_REPLY, FrameworkRegisteredMessage);
-MESSAGE(M2F_RESOURCE_OFFER, ResourceOfferMessage);
-MESSAGE(M2F_RESCIND_OFFER, RescindResourceOfferMessage);
-MESSAGE(M2F_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(M2F_LOST_SLAVE, LostSlaveMessage);
-MESSAGE(M2F_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2F_ERROR, FrameworkErrorMessage);
-
-MESSAGE(S2M_REGISTER_SLAVE, RegisterSlaveMessage);
-MESSAGE(S2M_REREGISTER_SLAVE, ReregisterSlaveMessage);
-MESSAGE(S2M_UNREGISTER_SLAVE, UnregisterSlaveMessage);
-MESSAGE(S2M_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
-
-MESSAGE(SH2M_HEARTBEAT, HeartbeatMessage);
-
-MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_RUN_TASK, RunTaskMessage);
-MESSAGE(M2S_KILL_TASK, KillTaskMessage);
-MESSAGE(M2S_KILL_FRAMEWORK, KillFrameworkMessage);
-MESSAGE(M2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2S_UPDATE_FRAMEWORK, UpdateFrameworkMessage);
-MESSAGE(M2S_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-MESSAGE(E2S_REGISTER_EXECUTOR, RegisterExecutorMessage);
-MESSAGE(E2S_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(E2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-
-MESSAGE(S2E_REGISTER_REPLY, ExecutorRegisteredMessage);
-MESSAGE(S2E_RUN_TASK, RunTaskMessage);
-MESSAGE(S2E_KILL_TASK, KillTaskMessage);
-MESSAGE(S2E_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-
-#ifdef __sun__
-MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
-MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
-MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
-#endif // __sun__
-
-MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
-MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
-
}} // namespace mesos { namespace internal {
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 09:18:22 2011
@@ -104,6 +104,8 @@ public:
install(M2F_ERROR, &SchedulerProcess::error,
&FrameworkErrorMessage::code,
&FrameworkErrorMessage::message);
+
+ install(process::EXITED, &SchedulerProcess::exited);
}
virtual ~SchedulerProcess() {}
@@ -112,44 +114,26 @@ protected:
virtual void operator () ()
{
while (true) {
- // Rather than send a message to this process when it is time to
- // terminate, we set a flag that gets re-read. Sending a message
- // requires some sort of matching or priority reads that
- // libprocess currently doesn't support. Note that this field is
- // only read by this process, so we don't need to protect it in
- // any way. In fact, using a lock to protect it (or for
- // providing atomicity for cleanup, for example), might lead to
- // deadlock with the client code because we already use a lock
- // in SchedulerDriver. That being said, for now we make
- // terminate 'volatile' to guarantee that each read is getting a
- // fresh copy.
- // TODO(benh): Do a coherent read so as to avoid using 'volatile'.
- if (terminate)
- return;
-
- // TODO(benh): We need to break the receive every so often to
- // check if 'terminate' has been set. It would be better to just
- // send a message rather than have a timeout (see the comment
- // above for why sending a message will still require us to use
- // the terminate flag).
- switch (serve(2)) {
- case PROCESS_EXIT: {
- // TODO(benh): Don't wait for a new master forever.
- if (from() == master)
- VLOG(1) << "Connection to master lost .. waiting for new master";
- break;
- }
-
- case PROCESS_TIMEOUT: {
- break;
- }
-
- default: {
- VLOG(1) << "Received unknown message " << msgid()
- << " from " << from();
- break;
- }
- }
+ // Sending a message to terminate this process is insufficient
+ // because that message might get queued behind a bunch of other
+ // message. So, when it is time to terminate, we set a flag that
+ // gets re-read by this process after every message. In order to
+ // get this correct we must return from each invocation of
+ // 'serve', to check and see if terminate has been set. In
+ // addition, we need to send a dummy message right after we set
+ // terminate just in case there aren't any messages in the
+ // queue. Note that the terminate field is only read by this
+ // process, so we don't need to protect it in any way. In fact,
+ // using a lock to protect it (or for providing atomicity for
+ // cleanup, for example), might lead to deadlock with the client
+ // code because we already use a lock in SchedulerDriver. That
+ // being said, for now we make terminate 'volatile' to guarantee
+ // that each read is getting a fresh copy.
+ // TODO(benh): Do a coherent read so as to avoid using
+ // 'volatile'.
+ if (terminate) return;
+
+ serve(0, true);
}
}
@@ -285,6 +269,14 @@ protected:
cref(message)));
}
+ void exited()
+ {
+ // TODO(benh): Don't wait for a new master forever.
+ if (from() == master) {
+ VLOG(1) << "Connection to master lost .. waiting for new master";
+ }
+ }
+
void stop()
{
if (!active)
@@ -514,11 +506,12 @@ MesosSchedulerDriver::~MesosSchedulerDri
// ultimately invokes this destructor). This deadlock is actually a
// bug in the client code: provided that the SchedulerProcess class
// _only_ makes calls into instances of Scheduler, then such a
- // deadlock implies that the destructor got called from within a method
- // of the Scheduler instance that is being destructed! Note
+ // deadlock implies that the destructor got called from within a
+ // method of the Scheduler instance that is being destructed! Note
// that we could add a method to libprocess that told us whether or
// not this was about to be deadlock, and possibly report this back
- // to the user somehow.
+ // to the user somehow. Note that we will also wait forever if
+ // MesosSchedulerDriver::stop was never called.
if (process != NULL) {
process::wait(process->self());
delete process;
@@ -549,14 +542,21 @@ int MesosSchedulerDriver::start()
return -1;
}
+ // We might have been running before, but have since stopped. Don't
+ // allow this driver to be used again (for now)!
+ if (process != NULL) {
+ return -1;
+ }
+
// Set running here so we can recognize an exception from calls into
// Java (via getFrameworkName or getExecutorInfo).
running = true;
// Get username of current user.
passwd* passwd;
- if ((passwd = getpwuid(getuid())) == NULL)
+ if ((passwd = getpwuid(getuid())) == NULL) {
fatal("failed to get username information");
+ }
// Set up framework info.
FrameworkInfo framework;
@@ -565,8 +565,9 @@ int MesosSchedulerDriver::start()
framework.mutable_executor()->MergeFrom(sched->getExecutorInfo(this));
// Something invoked stop while we were in the scheduler, bail.
- if (!running)
+ if (!running) {
return -1;
+ }
process = new SchedulerProcess(this, sched, frameworkId, framework);
@@ -604,7 +605,7 @@ int MesosSchedulerDriver::stop()
if (process != NULL) {
process::dispatch(process->self(), &SchedulerProcess::stop);
process->terminate = true;
- process = NULL;
+ process::post(process->self(), process::TERMINATE);
}
running = false;
@@ -624,8 +625,9 @@ int MesosSchedulerDriver::join()
{
Lock lock(&mutex);
- while (running)
+ while (running) {
pthread_cond_wait(&cond, &mutex);
+ }
return 0;
}
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun 5 09:18:22 2011
@@ -172,7 +172,7 @@ void ProcessBasedIsolationModule::Reaper
}
}
}
- } else if (name() == process::TERMINATE || name() == process::EXIT) {
+ } else if (name() == process::TERMINATE || name() == process::EXITED) {
return;
}
}
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:18:22 2011
@@ -38,7 +38,10 @@ using std::vector;
Slave::Slave(const Resources& _resources, bool _local,
IsolationModule *_isolationModule)
: resources(_resources), local(_local),
- isolationModule(_isolationModule), heart(NULL) {}
+ isolationModule(_isolationModule), heart(NULL)
+{
+ initialize();
+}
Slave::Slave(const Configuration& _conf, bool _local,
@@ -48,6 +51,8 @@ Slave::Slave(const Configuration& _conf,
{
resources =
Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+
+ initialize();
}
@@ -158,15 +163,16 @@ void Slave::operator () ()
hostent* he = gethostbyname2(buf, AF_INET);
string hostname = he->h_name;
- // Get our public DNS name. Normally this is our hostname, but on EC2
- // we look for the MESOS_PUBLIC_DNS environment variable. This allows
- // the master to display our public name in its web UI.
+ // Check and see if we have a different public DNS name. Normally
+ // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
+ // environment variable. This allows the master to display our
+ // public name in its web UI.
string public_hostname = hostname;
if (getenv("MESOS_PUBLIC_DNS") != NULL) {
public_hostname = getenv("MESOS_PUBLIC_DNS");
}
- SlaveInfo slave;
+ // Initialize slave info.
slave.set_hostname(hostname);
slave.set_public_hostname(public_hostname);
slave.mutable_resources()->MergeFrom(resources);
@@ -175,428 +181,471 @@ void Slave::operator () ()
isolationModule->initialize(this);
while (true) {
- receive(1);
- if (msgid() == PROCESS_TERMINATE) {
+ serve(1);
+ if (name() == process::TERMINATE) {
LOG(INFO) << "Asked to shut down by " << from();
foreachpaircopy (_, Framework* framework, frameworks) {
killFramework(framework);
}
return;
}
+ }
+}
- // Otherwise, don't terminate and see what the message is.
- switch (msgid()) {
- case NEW_MASTER_DETECTED: {
- const MSG<NEW_MASTER_DETECTED>& msg = message();
-
- LOG(INFO) << "New master at " << msg.pid();
-
- master = msg.pid();
- link(master);
-
- if (slaveId == "") {
- // Slave started before master.
- MSG<S2M_REGISTER_SLAVE> out;
- out.mutable_slave()->MergeFrom(slave);
- send(master, out);
- } else {
- // Re-registering, so send tasks running.
- MSG<S2M_REREGISTER_SLAVE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_slave()->MergeFrom(slave);
-
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (_, Executor* executor, framework->executors) {
- foreachpair (_, Task* task, executor->tasks) {
- out.add_tasks()->MergeFrom(*task);
- }
- }
- }
-
- send(master, out);
- }
- break;
- }
-
- case NO_MASTER_DETECTED: {
- LOG(INFO) << "Lost master(s) ... waiting";
- break;
- }
- case MASTER_DETECTION_FAILURE: {
- LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
- break;
- }
+void Slave::initialize()
+{
+ install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
- case M2S_REGISTER_REPLY: {
- const MSG<M2S_REGISTER_REPLY>& msg = message();
- slaveId = msg.slave_id();
+ install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
- LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+ install(MASTER_DETECTION_FAILURE, &Slave::masterDetectionFailure);
- heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
- link(spawn(heart));
- break;
- }
-
- case M2S_REREGISTER_REPLY: {
- const MSG<M2S_REREGISTER_REPLY>& msg = message();
+ install(M2S_REGISTER_REPLY, &Slave::registerReply,
+ &SlaveRegisteredMessage::slave_id,
+ &SlaveRegisteredMessage::heartbeat_interval);
- LOG(INFO) << "Re-registered with master";
+ install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
+ &SlaveRegisteredMessage::slave_id,
+ &SlaveRegisteredMessage::heartbeat_interval);
- if (!(slaveId == msg.slave_id())) {
- LOG(FATAL) << "Slave re-registered but got wrong ID";
- }
+ install(M2S_RUN_TASK, &Slave::runTask,
+ &RunTaskMessage::framework,
+ &RunTaskMessage::framework_id,
+ &RunTaskMessage::pid,
+ &RunTaskMessage::task);
- if (heart != NULL) {
- send(heart->self(), MESOS_MSGID);
- wait(heart->self());
- delete heart;
- }
+ install(M2S_KILL_TASK, &Slave::killTask,
+ &KillTaskMessage::framework_id,
+ &KillTaskMessage::task_id);
- heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
- link(spawn(heart));
- break;
- }
-
- case M2S_RUN_TASK: {
- const MSG<M2S_RUN_TASK>& msg = message();
-
- const TaskDescription& task = msg.task();
-
- LOG(INFO) << "Got assigned task " << task.task_id()
- << " for framework " << msg.framework_id();
-
- Framework *framework = getFramework(msg.framework_id());
- if (framework == NULL) {
- framework =
- new Framework(msg.framework_id(), msg.framework(), msg.pid());
- frameworks[msg.framework_id()] = framework;
- }
+ install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
+ &KillFrameworkMessage::framework_id);
- // Either send the task to an executor or start a new executor
- // and queue the task until the executor has started.
- Executor* executor = task.has_executor()
- ? framework->getExecutor(task.executor().executor_id())
- : framework->getExecutor(framework->info.executor().executor_id());
-
- if (executor != NULL) {
- if (!executor->pid) {
- // Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
- } else {
- // Add the task to the executor.
- executor->addTask(task);
-
- MSG<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
- isolationModule->resourcesChanged(framework, executor);
- }
- } else {
- // Launch an executor for this task.
- if (task.has_executor()) {
- executor = framework->createExecutor(task.executor());
- } else {
- executor = framework->createExecutor(framework->info.executor());
- }
+ install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
+ &FrameworkMessageMessage::framework_id,
+ &FrameworkMessageMessage::message);
- // Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
+ install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
+ &UpdateFrameworkMessage::framework_id,
+ &UpdateFrameworkMessage::pid);
- // Tell the isolation module to launch the executor.
- isolationModule->launchExecutor(framework, executor);
- }
- break;
- }
+ install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
+ &StatusUpdateAckMessage::framework_id,
+ &StatusUpdateAckMessage::slave_id,
+ &StatusUpdateAckMessage::task_id);
- case M2S_KILL_TASK: {
- const MSG<M2S_KILL_TASK>& msg = message();
+ install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
+ &RegisterExecutorMessage::framework_id,
+ &RegisterExecutorMessage::executor_id);
- LOG(INFO) << "Asked to kill task " << msg.task_id()
- << " of framework " << msg.framework_id();
+ install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
+ &StatusUpdateMessage::framework_id,
+ &StatusUpdateMessage::status);
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- // Tell the executor to kill the task if it is up and
- // running, otherwise, consider the task lost.
- Executor* executor = framework->getExecutor(msg.task_id());
- if (executor == NULL || !executor->pid) {
- // Update the resources locally, if an executor comes up
- // after this then it just won't receive this task.
- executor->removeTask(msg.task_id());
- isolationModule->resourcesChanged(framework, executor);
-
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(msg.task_id());
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
- } else {
- // Otherwise, send a message to the executor and wait for
- // it to send us a status update.
- MSG<S2E_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_task_id()->MergeFrom(msg.task_id());
- send(executor->pid, out);
- }
- } else {
- LOG(WARNING) << "Cannot kill task " << msg.task_id()
- << " of framework " << msg.framework_id()
- << " because no such framework is running";
+ install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
+ &FrameworkMessageMessage::framework_id,
+ &FrameworkMessageMessage::message);
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(msg.task_id());
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
+ install(process::TIMEOUT, &Slave::timeout);
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
+ install(process::EXITED, &Slave::exited);
+}
+
+
+void Slave::newMasterDetected(const string& pid)
+{
+ LOG(INFO) << "New master at " << pid;
+
+ master = pid;
+ link(master);
+
+ if (slaveId == "") {
+ // Slave started before master.
+ MSG<S2M_REGISTER_SLAVE> out;
+ out.mutable_slave()->MergeFrom(slave);
+ send(master, out);
+ } else {
+ // Re-registering, so send tasks running.
+ MSG<S2M_REREGISTER_SLAVE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_slave()->MergeFrom(slave);
+
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (_, Executor* executor, framework->executors) {
+ foreachpair (_, Task* task, executor->tasks) {
+ out.add_tasks()->MergeFrom(*task);
}
- break;
}
+ }
- case M2S_KILL_FRAMEWORK: {
- const MSG<M2S_KILL_FRAMEWORK>&msg = message();
+ send(master, out);
+ }
+}
- LOG(INFO) << "Asked to kill framework " << msg.framework_id();
- Framework *framework = getFramework(msg.framework_id());
- if (framework != NULL)
- killFramework(framework);
- break;
- }
+void Slave::noMasterDetected()
+{
+ LOG(INFO) << "Lost master(s) ... waiting";
+}
- case M2S_FRAMEWORK_MESSAGE: {
- const MSG<M2S_FRAMEWORK_MESSAGE>&msg = message();
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- const FrameworkMessage& message = msg.message();
-
- Executor* executor = framework->getExecutor(message.executor_id());
- if (executor == NULL) {
- LOG(WARNING) << "Dropping message for executor '"
- << message.executor_id() << "' of framework "
- << msg.framework_id()
- << " because executor does not exist";
- } else if (!executor->pid) {
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
- LOG(WARNING) << "Dropping message for executor '"
- << message.executor_id() << "' of framework "
- << msg.framework_id()
- << " because executor is not running";
- } else {
- MSG<S2E_FRAMEWORK_MESSAGE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_message()->MergeFrom(message);
- send(executor->pid, out);
- }
- } else {
- LOG(WARNING) << "Dropping message for framework "
- << msg.framework_id()
- << " because it does not exist";
- }
- break;
- }
+void Slave::masterDetectionFailure()
+{
+ LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
+}
- case M2S_UPDATE_FRAMEWORK: {
- const MSG<M2S_UPDATE_FRAMEWORK>&msg = message();
- Framework *framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- LOG(INFO) << "Updating framework " << msg.framework_id()
- << " pid to " << msg.pid();
- framework->pid = msg.pid();
- }
- break;
- }
+void Slave::registerReply(const SlaveID& slaveId, double heartbeat_interval)
+{
+ LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+ this->slaveId = slaveId;
+ heart = new Heart(master, self(), slaveId, heartbeat_interval);
+ link(spawn(heart));
+}
- case M2S_STATUS_UPDATE_ACK: {
- const MSG<M2S_STATUS_UPDATE_ACK>& msg = message();
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- foreachpair (double deadline, _, framework->statuses) {
- if (framework->statuses[deadline].count(msg.task_id()) > 0) {
- LOG(INFO) << "Got acknowledgement of status update"
- << " for task " << msg.task_id()
- << " of framework " << framework->frameworkId;
- framework->statuses[deadline].erase(msg.task_id());
- break;
- }
- }
- }
- break;
- }
+void Slave::reregisterReply(const SlaveID& slaveId, double heartbeat_interval)
+{
+ LOG(INFO) << "Re-registered with master";
- case E2S_REGISTER_EXECUTOR: {
- const MSG<E2S_REGISTER_EXECUTOR>& msg = message();
+ if (!(this->slaveId == slaveId)) {
+ LOG(FATAL) << "Slave re-registered but got wrong ID";
+ }
- LOG(INFO) << "Got registration for executor '"
- << msg.executor_id() << "' of framework "
- << msg.framework_id();
-
- Framework* framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(msg.executor_id());
-
- // Check the status of the executor.
- if (executor == NULL) {
- LOG(WARNING) << "Not expecting executor '" << msg.executor_id()
- << "' of framework " << msg.framework_id();
- send(from(), S2E_KILL_EXECUTOR);
- } else if (executor->pid != UPID()) {
- LOG(WARNING) << "Not good, executor '" << msg.executor_id()
- << "' of framework " << msg.framework_id()
- << " is already running";
- send(from(), S2E_KILL_EXECUTOR);
- } else {
- // Save the pid for the executor.
- executor->pid = from();
-
- // Now that the executor is up, set its resource limits.
- isolationModule->resourcesChanged(framework, executor);
-
- // Tell executor it's registered and give it any queued tasks.
- MSG<S2E_REGISTER_REPLY> out;
- ExecutorArgs* args = out.mutable_args();
- args->mutable_framework_id()->MergeFrom(framework->frameworkId);
- args->set_name(framework->info.name());
- args->mutable_slave_id()->MergeFrom(slaveId);
- args->set_hostname(hostname);
- args->set_data(framework->info.executor().data());
- send(executor->pid, out);
- sendQueuedTasks(framework, executor);
- }
- } else {
- // Framework is gone; tell the executor to exit.
- LOG(WARNING) << "Framework " << msg.framework_id()
- << " does not exist (it may have been killed),"
- << " telling executor to exit";
-
- // TODO(benh): Don't we also want to tell the isolation
- // module to shut this guy down!
- send(from(), S2E_KILL_EXECUTOR);
- }
- break;
- }
+ if (heart != NULL) {
+ send(heart->self(), process::TERMINATE);
+ wait(heart->self());
+ delete heart;
+ }
- case E2S_STATUS_UPDATE: {
- const MSG<E2S_STATUS_UPDATE>& msg = message();
+ heart = new Heart(master, self(), slaveId, heartbeat_interval);
+ link(spawn(heart));
+}
- const TaskStatus& status = msg.status();
- LOG(INFO) << "Status update: task " << status.task_id()
- << " of framework " << msg.framework_id()
- << " is now in state "
- << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
- Framework *framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(status.task_id());
- if (executor != NULL) {
- if (status.state() == TASK_FINISHED ||
- status.state() == TASK_FAILED ||
- status.state() == TASK_KILLED ||
- status.state() == TASK_LOST) {
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- }
-
- // Send message and record the status for possible resending.
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_status()->MergeFrom(status);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status.task_id()] = status;
- } else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "executor for framework " << msg.framework_id();
- }
- } else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "framework " << msg.framework_id();
- }
- break;
- }
+void Slave::runTask(const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const string& pid,
+ const TaskDescription& task)
+{
+ LOG(INFO) << "Got assigned task " << task.task_id()
+ << " for framework " << frameworkId;
- case E2S_FRAMEWORK_MESSAGE: {
- const MSG<E2S_FRAMEWORK_MESSAGE>& msg = message();
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ framework = new Framework(frameworkId, frameworkInfo, pid);
+ frameworks[frameworkId] = framework;
+ }
- const FrameworkMessage& message = msg.message();
+ // Either send the task to an executor or start a new executor
+ // and queue the task until the executor has started.
+ Executor* executor = task.has_executor()
+ ? framework->getExecutor(task.executor().executor_id())
+ : framework->getExecutor(framework->info.executor().executor_id());
+
+ if (executor != NULL) {
+ if (!executor->pid) {
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+ } else {
+ // Add the task to the executor.
+ executor->addTask(task);
- Framework *framework = getFramework(msg.framework_id());
- if (framework != NULL) {
- LOG(INFO) << "Sending message for framework "
- << framework->frameworkId
- << " to " << framework->pid;
-
- // TODO(benh): This is weird, sending an M2F message.
- MSG<M2F_FRAMEWORK_MESSAGE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_message()->MergeFrom(message);
- out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
- send(framework->pid, out);
- }
- break;
- }
+ MSG<S2E_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(executor->pid, out);
+ isolationModule->resourcesChanged(framework, executor);
+ }
+ } else {
+ // Launch an executor for this task.
+ if (task.has_executor()) {
+ executor = framework->createExecutor(task.executor());
+ } else {
+ executor = framework->createExecutor(framework->info.executor());
+ }
+
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+
+ // Tell the isolation module to launch the executor.
+ isolationModule->launchExecutor(framework, executor);
+ }
+}
+
+
+void Slave::killTask(const FrameworkID& frameworkId,
+ const TaskID& taskId)
+{
+ LOG(INFO) << "Asked to kill task " << taskId
+ << " of framework " << frameworkId;
- case PROCESS_EXIT: {
- LOG(INFO) << "Process exited: " << from();
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ // Tell the executor to kill the task if it is up and
+ // running, otherwise, consider the task lost.
+ Executor* executor = framework->getExecutor(taskId);
+ if (executor == NULL || !executor->pid) {
+ // Update the resources locally, if an executor comes up
+ // after this then it just won't receive this task.
+ executor->removeTask(taskId);
+ isolationModule->resourcesChanged(framework, executor);
+
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+ send(master, out);
- if (from() == master) {
- LOG(WARNING) << "Master disconnected! "
- << "Waiting for a new master to be elected.";
- // TODO(benh): After so long waiting for a master, commit suicide.
- }
+ double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status->task_id()] = *status;
+ } else {
+ // Otherwise, send a message to the executor and wait for
+ // it to send us a status update.
+ MSG<S2E_KILL_TASK> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_task_id()->MergeFrom(taskId);
+ send(executor->pid, out);
+ }
+ } else {
+ LOG(WARNING) << "Cannot kill task " << taskId
+ << " of framework " << frameworkId
+ << " because no such framework is running";
+
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+ send(master, out);
+
+ double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status->task_id()] = *status;
+ }
+}
+
+
+void Slave::killFramework(const FrameworkID& frameworkId)
+{
+ LOG(INFO) << "Asked to kill framework " << frameworkId;
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ killFramework(framework);
+ }
+}
+
+
+void Slave::schedulerMessage(const FrameworkID& frameworkId,
+ const FrameworkMessage& message)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(message.executor_id());
+ if (executor == NULL) {
+ LOG(WARNING) << "Dropping message for executor '"
+ << message.executor_id() << "' of framework " << frameworkId
+ << " because executor does not exist";
+ } else if (!executor->pid) {
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ LOG(WARNING) << "Dropping message for executor '"
+ << message.executor_id() << "' of framework " << frameworkId
+ << " because executor is not running";
+ } else {
+ MSG<S2E_FRAMEWORK_MESSAGE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_message()->MergeFrom(message);
+ send(executor->pid, out);
+ }
+ } else {
+ LOG(WARNING) << "Dropping message for framework "<< frameworkId
+ << " because it does not exist";
+ }
+}
+
+
+void Slave::updateFramework(const FrameworkID& frameworkId,
+ const string& pid)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Updating framework " << frameworkId
+ << " pid to " <<pid;
+ framework->pid = pid;
+ }
+}
+
+
+void Slave::statusUpdateAck(const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const TaskID& taskId)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ foreachpair (double deadline, _, framework->statuses) {
+ if (framework->statuses[deadline].count(taskId) > 0) {
+ LOG(INFO) << "Got acknowledgement of status update"
+ << " for task " << taskId
+ << " of framework " << framework->frameworkId;
+ framework->statuses[deadline].erase(taskId);
break;
}
+ }
+ }
+}
- case PROCESS_TIMEOUT: {
- // Check and see if we should re-send any status updates.
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (double deadline, _, framework->statuses) {
- if (deadline <= elapsed()) {
- foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
- LOG(WARNING) << "Resending status update"
- << " for task " << status.task_id()
- << " of framework " << framework->frameworkId;
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(master, out);
- }
- }
- }
- }
- break;
+
+void Slave::registerExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ LOG(INFO) << "Got registration for executor '" << executorId
+ << "' of framework " << frameworkId;
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(executorId);
+
+ // Check the status of the executor.
+ if (executor == NULL) {
+ LOG(WARNING) << "Not expecting executor '" << executorId
+ << "' of framework " << frameworkId;
+ send(from(), S2E_KILL_EXECUTOR);
+ } else if (executor->pid != UPID()) {
+ LOG(WARNING) << "Not good, executor '" << executorId
+ << "' of framework " << frameworkId
+ << " is already running";
+ send(from(), S2E_KILL_EXECUTOR);
+ } else {
+ // Save the pid for the executor.
+ executor->pid = from();
+
+ // Now that the executor is up, set its resource limits.
+ isolationModule->resourcesChanged(framework, executor);
+
+ // Tell executor it's registered and give it any queued tasks.
+ MSG<S2E_REGISTER_REPLY> out;
+ ExecutorArgs* args = out.mutable_args();
+ args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+ args->set_name(framework->info.name());
+ args->mutable_slave_id()->MergeFrom(slaveId);
+ args->set_hostname(slave.hostname());
+ args->set_data(framework->info.executor().data());
+ send(executor->pid, out);
+ sendQueuedTasks(framework, executor);
+ }
+ } else {
+ // Framework is gone; tell the executor to exit.
+ LOG(WARNING) << "Framework " << frameworkId
+ << " does not exist (it may have been killed),"
+ << " telling executor to exit";
+
+ // TODO(benh): Don't we also want to tell the isolation
+ // module to shut this guy down!
+ send(from(), S2E_KILL_EXECUTOR);
+ }
+}
+
+
+void Slave::statusUpdate(const FrameworkID& frameworkId,
+ const TaskStatus& status)
+{
+ LOG(INFO) << "Status update: task " << status.task_id()
+ << " of framework " << frameworkId
+ << " is now in state "
+ << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(status.task_id());
+ if (executor != NULL) {
+ if (status.state() == TASK_FINISHED ||
+ status.state() == TASK_FAILED ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_LOST) {
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
}
- default: {
- LOG(ERROR) << "Received unknown message (" << msgid()
- << ") from " << from();
- break;
+ // Send message and record the status for possible resending.
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_status()->MergeFrom(status);
+ send(master, out);
+
+ double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status.task_id()] = status;
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "executor for framework " << frameworkId;
+ }
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "framework " << frameworkId;
+ }
+}
+
+
+void Slave::executorMessage(const FrameworkID& frameworkId,
+ const FrameworkMessage& message)
+{
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Sending message for framework "
+ << framework->frameworkId
+ << " to " << framework->pid;
+
+ // TODO(benh): This is weird, sending an M2F message.
+ MSG<M2F_FRAMEWORK_MESSAGE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_message()->MergeFrom(message);
+ out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
+ send(framework->pid, out);
+ }
+}
+
+
+void Slave::timeout()
+{
+ // Check and see if we should re-send any status updates.
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (double deadline, _, framework->statuses) {
+ if (deadline <= elapsed()) {
+ foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
+ LOG(WARNING) << "Resending status update"
+ << " for task " << status.task_id()
+ << " of framework " << framework->frameworkId;
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.mutable_status()->MergeFrom(status);
+ send(master, out);
+ }
}
}
}
}
+void Slave::exited()
+{
+ LOG(INFO) << "Process exited: " << from();
+
+ if (from() == master) {
+ LOG(WARNING) << "Master disconnected! "
+ << "Waiting for a new master to be elected.";
+ // TODO(benh): After so long waiting for a master, commit suicide.
+ }
+}
+
+
+
Framework* Slave::getFramework(const FrameworkID& frameworkId)
{
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun 5 09:18:22 2011
@@ -192,16 +192,13 @@ protected:
link(slave);
link(master);
do {
- switch (receive(interval)) {
- case PROCESS_TIMEOUT: {
- MSG<SH2M_HEARTBEAT> msg;
- msg.mutable_slave_id()->MergeFrom(slaveId);
- send(master, msg);
- break;
- }
- case PROCESS_EXIT:
- default:
- return;
+ serve(interval);
+ if (name() == process::TIMEOUT) {
+ MSG<SH2M_HEARTBEAT> msg;
+ msg.mutable_slave_id()->MergeFrom(slaveId);
+ send(master, msg);
+ } else {
+ return;
}
} while (true);
}
@@ -239,6 +236,34 @@ public:
const Configuration& getConfiguration();
+ void newMasterDetected(const std::string& pid);
+ void noMasterDetected();
+ void masterDetectionFailure();
+ void registerReply(const SlaveID& slaveId, double heartbeat_interval);
+ void reregisterReply(const SlaveID& slaveId, double heartbeat_interval);
+ void runTask(const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskDescription& task);
+ void killTask(const FrameworkID& frameworkId,
+ const TaskID& taskId);
+ void killFramework(const FrameworkID& frameworkId);
+ void schedulerMessage(const FrameworkID& frameworkId,
+ const FrameworkMessage& message);
+ void updateFramework(const FrameworkID& frameworkId,
+ const std::string& pid);
+ void statusUpdateAck(const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const TaskID& taskId);
+ void registerExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+ void statusUpdate(const FrameworkID& frameworkId,
+ const TaskStatus& status);
+ void executorMessage(const FrameworkID& frameworkId,
+ const FrameworkMessage& message);
+ void timeout();
+ void exited();
+
// TODO(...): Don't make these instance variables public! Hack for
// now because they are needed in the isolation modules.
bool local;
@@ -247,6 +272,8 @@ public:
protected:
virtual void operator () ();
+ void initialize();
+
Framework* getFramework(const FrameworkID& frameworkId);
// Send any tasks queued up for the given framework to its executor
@@ -256,6 +283,8 @@ protected:
private:
Configuration conf;
+ SlaveInfo slave;
+
process::UPID master;
Resources resources;
Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun 5 09:18:22 2011
@@ -494,7 +494,7 @@ TEST(MasterTest, SlavePartitioned)
EXPECT_CALL(sched, slaveLost(&driver, _))
.WillOnce(Trigger(&slaveLostCall));
- EXPECT_MSG(filter, Eq(names[SH2M_HEARTBEAT]), _, _)
+ EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
.WillRepeatedly(Return(true));
driver.start();
@@ -778,7 +778,7 @@ TEST(MasterTest, SchedulerFailoverStatus
EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
.Times(1);
- EXPECT_MSG(filter, Eq(names[M2F_STATUS_UPDATE]), _, Ne(master))
+ EXPECT_MSG(filter, Eq(M2F_STATUS_UPDATE), _, Ne(master))
.WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
.RetiresOnSaturation();