You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:08:03 UTC
svn commit: r1132253 [2/5] - in /incubator/mesos/trunk: src/ src/detector/
src/exec/ src/master/ src/messaging/ src/sched/ src/slave/ src/tests/
third_party/libprocess/
third_party/libprocess/third_party/ry-http-parser-1c3624a/
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 09:08:02 2011
@@ -1,34 +1,40 @@
-#ifndef MESSAGES_HPP
-#define MESSAGES_HPP
+#ifndef __MESSAGES_HPP__
+#define __MESSAGES_HPP__
#include <float.h>
+#include <glog/logging.h>
+
#include <string>
+#include <tr1/functional>
+
#include <mesos.hpp>
#include <process.hpp>
+#include <boost/unordered_map.hpp>
+
#include "messaging/messages.pb.h"
namespace mesos { namespace internal {
+enum MSGID {
+ // Artifacts from libprocess.
+ PROCESS_TIMEOUT,
+ PROCESS_EXIT,
-// TODO(benh): Eliminate versioning once message ids become strings.
-const std::string MESOS_MESSAGING_VERSION = "2";
-
-
-enum MessageType {
- /* From framework to master. */
- F2M_REGISTER_FRAMEWORK = PROCESS_MSGID,
+ // From framework to master.
+ F2M_REGISTER_FRAMEWORK,
F2M_REREGISTER_FRAMEWORK,
F2M_UNREGISTER_FRAMEWORK,
F2M_RESOURCE_OFFER_REPLY,
F2M_REVIVE_OFFERS,
F2M_KILL_TASK,
F2M_FRAMEWORK_MESSAGE,
+ F2M_STATUS_UPDATE_ACK,
- /* From master to framework. */
+ // From master to framework.
M2F_REGISTER_REPLY,
M2F_RESOURCE_OFFER,
M2F_RESCIND_OFFER,
@@ -37,7 +43,7 @@ enum MessageType {
M2F_FRAMEWORK_MESSAGE,
M2F_ERROR,
- /* From slave to master. */
+ // From slave to master.
S2M_REGISTER_SLAVE,
S2M_REREGISTER_SLAVE,
S2M_UNREGISTER_SLAVE,
@@ -45,15 +51,10 @@ enum MessageType {
S2M_FRAMEWORK_MESSAGE,
S2M_EXITED_EXECUTOR,
- /* From slave heart to master. */
+ // From slave heart to master.
SH2M_HEARTBEAT,
-
- /* From master detector to processes */
- GOT_MASTER_TOKEN,
- NEW_MASTER_DETECTED,
- NO_MASTER_DETECTED,
- /* From master to slave. */
+ // From master to slave.
M2S_REGISTER_REPLY,
M2S_REREGISTER_REPLY,
M2S_RUN_TASK,
@@ -61,14 +62,15 @@ enum MessageType {
M2S_KILL_FRAMEWORK,
M2S_FRAMEWORK_MESSAGE,
M2S_UPDATE_FRAMEWORK,
+ M2S_STATUS_UPDATE_ACK,
M2S_SHUTDOWN, // Used in unit tests to shut down cluster
- /* From executor to slave. */
+ // From executor to slave.
E2S_REGISTER_EXECUTOR,
E2S_STATUS_UPDATE,
E2S_FRAMEWORK_MESSAGE,
- /* From slave to executor. */
+ // From slave to executor.
S2E_REGISTER_REPLY,
S2E_RUN_TASK,
S2E_KILL_TASK,
@@ -76,62 +78,56 @@ enum MessageType {
S2E_KILL_EXECUTOR,
#ifdef __sun__
- /* From projd to slave. */
+ // From projd to slave.
PD2S_REGISTER_PROJD,
PD2S_PROJECT_READY,
- /* From slave to projd. */
+ // From slave to projd.
S2PD_UPDATE_RESOURCES,
S2PD_KILL_ALL,
-#endif /* __sun__ */
-
- /* Internal to framework. */
- F2F_RESOURCE_OFFER_REPLY,
- F2F_FRAMEWORK_MESSAGE,
+#endif // __sun__
- /* Internal to master. */
+ // Internal to master.
M2M_GET_STATE, // Used by web UI
M2M_GET_STATE_REPLY,
M2M_TIMER_TICK, // Timer for expiring filters etc
M2M_FRAMEWORK_EXPIRED, // Timer for expiring frameworks
M2M_SHUTDOWN, // Used in tests to shut down master
- /* Internal to slave. */
+ // Internal to slave.
S2S_GET_STATE, // Used by web UI
S2S_GET_STATE_REPLY,
S2S_SHUTDOWN, // Used in tests to shut down slave
- /* Generic. */
- TERMINATE,
+ // From master detector to processes.
+ GOT_MASTER_TOKEN,
+ NEW_MASTER_DETECTED,
+ NO_MASTER_DETECTED,
+ MASTER_DETECTION_FAILURE,
- // TODO(benh): Put these all in their right place.
- MASTER_DETECTION_FAILURE,
- F2M_STATUS_UPDATE_ACK,
- M2S_STATUS_UPDATE_ACK,
+ // HTTP messages.
+ vars,
- MESOS_MSGID,
+ MESOS_MSGID
};
-/**
- * To couple a MSGID with a protocol buffer we use a templated class
- * that extends the necessary protocol buffer type (this also allows
- * the code to be better isolated from protocol buffer naming). While
- * protocol buffers are allegedly not meant to be inherited, we
- * decided this was an acceptable option since we don't add any new
- * functionality (or do any thing with the existing functionality).
- *
- * To add another message that uses a protocol buffer you need to
- * provide a specialization of the Message class (i.e., using the
- * MESSAGE macro defined below).
- */
+// To couple a MSGID with a protocol buffer we use a templated class
+// that extends the necessary protocol buffer type (this also allows
+// the code to be better isolated from protocol buffer naming). While
+// protocol buffers are allegedly not meant to be inherited, we
+// decided this was an acceptable option since we don't add any new
+// functionality (or do any thing with the existing functionality).
+//
+// To add another message that uses a protocol buffer you need to
+// provide a specialization of the Message class (i.e., using the
+// MESSAGE macro defined below).
template <MSGID ID>
-class Message;
-
+class MSG;
#define MESSAGE(ID, T) \
template <> \
- class Message<ID> : public T {}
+ class MSG<ID> : public T {}
class AnyMessage
@@ -141,9 +137,9 @@ public:
: data(data_) {}
template <MSGID ID>
- operator Message<ID> () const
+ operator MSG<ID> () const
{
- Message<ID> msg;
+ MSG<ID> msg;
msg.ParseFromString(data);
return msg;
}
@@ -156,76 +152,326 @@ private:
class MesosProcess : public Process
{
public:
+ MesosProcess(const std::string& id = "") : Process(id) {}
+
+ virtual ~MesosProcess() {}
+
static void post(const PID &to, MSGID id)
{
- const std::string &data = MESOS_MESSAGING_VERSION + "|";
- Process::post(to, id, data.data(), data.size());
+ CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
+ Process::post(to, names[id]);
}
template <MSGID ID>
- static void post(const PID &to, const Message<ID> &msg)
+ static void post(const PID &to, const MSG<ID> &msg)
{
+ CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
std::string data;
msg.SerializeToString(&data);
- data = MESOS_MESSAGING_VERSION + "|" + data;
- Process::post(to, ID, data.data(), data.size());
+ Process::post(to, names[ID], data.data(), data.size());
}
+ static boost::unordered_map<std::string, MSGID> ids;
+ static boost::unordered_map<MSGID, std::string> names;
+
protected:
AnyMessage message() const
{
return AnyMessage(body());
}
+ MSGID msgid() const
+ {
+ CHECK(ids.count(name()) > 0) << "Missing MSGID for '" << name() << "'";
+ return ids[name()];
+ }
+
std::string body() const
{
size_t size;
- const char *s = Process::body(&size);
- const std::string data(s, size);
- size_t index = data.find('|');
- CHECK(index != std::string::npos);
- return data.substr(index + 1);
+ const char *data = Process::body(&size);
+ return std::string(data, size);
}
- virtual void send(const PID &to, MSGID id)
+ void send(const PID &to, MSGID id)
{
- const std::string &data = MESOS_MESSAGING_VERSION + "|";
- Process::send(to, id, data.data(), data.size());
+ CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
+ Process::send(to, names[id]);
}
template <MSGID ID>
- void send(const PID &to, const Message<ID> &msg)
+ void send(const PID &to, const MSG<ID> &msg)
{
+ CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
std::string data;
msg.SerializeToString(&data);
- data = MESOS_MESSAGING_VERSION + "|" + data;
- Process::send(to, ID, data.data(), data.size());
+ Process::send(to, names[ID], data.data(), data.size());
+ }
+
+ MSGID receive(double secs = 0)
+ {
+ while (true) {
+ Process::receive(secs);
+ if (ids.count(name()) > 0) {
+ // Check if this has been bound and invoke the handler.
+ if (handlers.count(name()) > 0) {
+ size_t length;
+ const char* data = Process::body(&length);
+ handlers[name()](data, length);
+ } else {
+ return ids[name()];
+ }
+ } else {
+ LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+ << " from: " << from() << " to: " << self();
+ }
+ }
}
- virtual MSGID receive(double secs = 0)
+ MSGID serve(double secs = 0)
{
- bool indefinite = secs == 0;
- double now = elapsed();
- MSGID id = Process::receive(secs);
- if (PROCESS_MSGID < id && id < MESOS_MSGID) {
- size_t size;
- const char *s = Process::body(&size);
- const std::string data(s, size);
- size_t index = data.find('|');
- if (index == std::string::npos ||
- MESOS_MESSAGING_VERSION != data.substr(0, index)) {
- LOG(ERROR) << "Dropping message from " << from()
- << " with incorrect messaging version!";
- if (!indefinite) {
- double remaining = secs - (elapsed() - now);
- return receive(remaining <= 0 ? DBL_EPSILON : remaining);
+ while (true) {
+ Process::serve(secs);
+ if (ids.count(name()) > 0) {
+ // Check if this has been bound and invoke the handler.
+ if (handlers.count(name()) > 0) {
+ size_t length;
+ const char* data = Process::body(&length);
+ handlers[name()](data, length);
} else {
- return receive(0);
+ return ids[name()];
}
+ } else {
+ LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+ << " from: " << from() << " to: " << self();
}
}
- return id;
}
+
+ template <typename T>
+ void handle(MSGID id, void (T::*method)())
+ {
+ T* t = static_cast<T*>(this);
+ CHECK(names.count(id) > 0);
+ handlers[names[id]] =
+ std::tr1::bind(&MesosProcess::handler0<T>, t,
+ method, std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1>
+ void handle(MSGID id, void (T::*method)(P1),
+ P1 (PB::*param1)() const)
+ {
+ T* t = static_cast<T*>(this);
+ CHECK(names.count(id) > 0);
+ handlers[names[id]] =
+ std::tr1::bind(&MesosProcess::handler1<T, PB, P1>, t,
+ method, param1,
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2>
+ void handle(MSGID id, void (T::*method)(P1, P2),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const)
+ {
+ T* t = static_cast<T*>(this);
+ CHECK(names.count(id) > 0);
+ handlers[names[id]] =
+ std::tr1::bind(&MesosProcess::handler2<T, PB, P1, P2>, t,
+ method, p1, p2,
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2,
+ typename P3>
+ void handle(MSGID id,
+ void (T::*method)(P1, P2, P3),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ P3 (PB::*p3)() const)
+ {
+ T* t = static_cast<T*>(this);
+ CHECK(names.count(id) > 0);
+ handlers[names[id]] =
+ std::tr1::bind(&MesosProcess::handler3<T, PB, P1, P2, P3>, t,
+ method, p1, p2, p3,
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2,
+ typename P3,
+ typename P4>
+ void handle(MSGID id,
+ void (T::*method)(P1, P2, P3, P4),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ P3 (PB::*p3)() const,
+ P4 (PB::*p4)() const)
+ {
+ T* t = static_cast<T*>(this);
+ CHECK(names.count(id) > 0);
+ handlers[names[id]] =
+ std::tr1::bind(&MesosProcess::handler4<T, PB, P1, P2, P3, P4>, t,
+ method, p1, p2, p3, p4,
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2,
+ typename P3,
+ typename P4,
+ typename P5>
+ void handle(MSGID id,
+ void (T::*method)(P1, P2, P3, P4, P5),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ P3 (PB::*p3)() const,
+ P4 (PB::*p4)() const,
+ P5 (PB::*p5)() const)
+ {
+ T* t = static_cast<T*>(this);
+ CHECK(names.count(id) > 0);
+ handlers[names[id]] =
+ std::tr1::bind(&MesosProcess::handler5<T, PB, P1, P2, P3, P4, P5>, t,
+ method, p1, p2, p3, p4, p5,
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ }
+
+private:
+ template <typename T>
+ static void handler0(T* t, void (T::*method)(),
+ const char* data, size_t length)
+ {
+ (t->*method)();
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1>
+ static void handler1(T* t, void (T::*method)(P1),
+ P1 (PB::*p1)() const,
+ const char* data, size_t length)
+ {
+ PB pb;
+ pb.ParseFromArray(data, length);
+ if (pb.IsInitialized()) {
+ (t->*method)((&pb->*p1)());
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << pb.InitializationErrorString();
+ }
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2>
+ static void handler2(T* t, void (T::*method)(P1, P2),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ const char* data, size_t length)
+ {
+ PB pb;
+ pb.ParseFromArray(data, length);
+ if (pb.IsInitialized()) {
+ (t->*method)((&pb->*p1)(), (&pb->*p2)());
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << pb.InitializationErrorString();
+ }
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2,
+ typename P3>
+ static void handler3(T* t, void (T::*method)(P1, P2, P3),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ P3 (PB::*p3)() const,
+ const char* data, size_t length)
+ {
+ PB pb;
+ pb.ParseFromArray(data, length);
+ if (pb.IsInitialized()) {
+ (t->*method)((&pb->*p1)(), (&pb->*p2)(), (&pb->*p3)());
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << pb.InitializationErrorString();
+ }
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2,
+ typename P3,
+ typename P4>
+ static void handler4(T* t, void (T::*method)(P1, P2, P3, P4),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ P3 (PB::*p3)() const,
+ P4 (PB::*p4)() const,
+ const char* data, size_t length)
+ {
+ PB pb;
+ pb.ParseFromArray(data, length);
+ if (pb.IsInitialized()) {
+ (t->*method)((&pb->*p1)(), (&pb->*p2)(), (&pb->*p3)(), (&pb->*p4)());
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << pb.InitializationErrorString();
+ }
+ }
+
+ template <typename T,
+ typename PB,
+ typename P1,
+ typename P2,
+ typename P3,
+ typename P4,
+ typename P5>
+ static void handler5(T* t, void (T::*method)(P1, P2, P3, P4, P5),
+ P1 (PB::*p1)() const,
+ P2 (PB::*p2)() const,
+ P3 (PB::*p3)() const,
+ P4 (PB::*p4)() const,
+ P5 (PB::*p5)() const,
+ const char* data, size_t length)
+ {
+ PB pb;
+ pb.ParseFromArray(data, length);
+ if (pb.IsInitialized()) {
+ (t->*method)((&pb->*p1)(), (&pb->*p2)(), (&pb->*p3)(), (&pb->*p4)(),
+ (&pb->*p5)());
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << pb.InitializationErrorString();
+ }
+ }
+
+ boost::unordered_map<std::string, std::tr1::function<void (const char*, size_t)> > handlers;
};
@@ -277,7 +523,7 @@ MESSAGE(S2E_FRAMEWORK_MESSAGE, Framework
MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
-#endif /* __sun__ */
+#endif // __sun__
MESSAGE(M2M_GET_STATE_REPLY, StateMessage);
MESSAGE(M2M_FRAMEWORK_EXPIRED, FrameworkExpiredMessage);
@@ -287,7 +533,7 @@ MESSAGE(S2S_GET_STATE_REPLY, StateMessag
MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
-}} /* namespace mesos { namespace internal { */
+}} // namespace mesos { namespace internal {
-#endif /* MESSAGES_HPP */
+#endif // __MESSAGES_HPP__
Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun 5 09:08:02 2011
@@ -56,8 +56,8 @@ message UnregisterFrameworkMessage {
message ResourceOfferMessage {
required OfferID offer_id = 1;
- repeated SlaveOffer offer = 2;
- repeated string pid = 3;
+ repeated SlaveOffer offers = 2;
+ repeated string pids = 3;
}
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 09:08:02 2011
@@ -13,11 +13,12 @@
#include <string>
#include <sstream>
+#include <tr1/functional>
+
#include <mesos.hpp>
#include <mesos_sched.hpp>
-#include <reliable.hpp>
+#include <process.hpp>
-#include <boost/bind.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
@@ -37,14 +38,17 @@
using namespace mesos;
using namespace mesos::internal;
+using boost::cref;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using google::protobuf::RepeatedPtrField;
+
using std::map;
using std::string;
using std::vector;
-using boost::bind;
-using boost::cref;
-using boost::unordered_map;
-using boost::unordered_set;
+using std::tr1::bind;
namespace mesos { namespace internal {
@@ -78,7 +82,7 @@ protected:
VLOG(1) << "No status updates received for task ID: "
<< taskId << " after "
<< STATUS_UPDATE_TIMEOUT << ", assuming task was lost";
- Message<M2F_STATUS_UPDATE> out;
+ MSG<M2F_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
TaskStatus* status = out.mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
@@ -117,9 +121,42 @@ public:
const FrameworkID& _frameworkId,
const FrameworkInfo& _framework)
: driver(_driver), sched(_sched), frameworkId(_frameworkId),
- framework(_framework), generation(0), master(PID()), terminate(false) {}
+ framework(_framework), generation(0), master(PID()), terminate(false)
+ {
+ handle(NEW_MASTER_DETECTED, &SchedulerProcess::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
+
+ handle(NO_MASTER_DETECTED, &SchedulerProcess::noMasterDetected);
- ~SchedulerProcess()
+ handle(MASTER_DETECTION_FAILURE, &SchedulerProcess::masterDetectionFailure);
+
+ handle(M2F_REGISTER_REPLY, &SchedulerProcess::registerReply,
+ &FrameworkRegisteredMessage::framework_id);
+
+ handle(M2F_RESOURCE_OFFER, &SchedulerProcess::resourceOffer
+ &ResourceOfferMessage::offer_id,
+ &ResourceOfferMessage::offers,
+ &ResourceOfferMessage::pids);
+
+ handle(M2F_RESCIND_OFFER, &SchedulerProcess::rescindOffer,
+ &RescindResourceOfferMessage::offer_id);
+
+ handle(M2F_STATUS_UPDATE, &SchedulerProcess::statusUpdate,
+ &StatusUpdateMessage::framework_id,
+ &StatusUpdateMessage::status);
+
+ handle(M2F_LOST_SLAVE, &SchedulerProcess::lostSlave,
+ &LostSlaveMessage::slave_id);
+
+ handle(M2F_FRAMEWORK_MESSAGE, &SchedulerProcess::frameworkMessage,
+ &FrameworkMessageMessage::message);
+
+ handle(M2F_ERROR, &SchedulerProcess::error,
+ &FrameworkErrorMessage::code,
+ &FrameworkErrorMessage::message);
+ }
+
+ virtual ~SchedulerProcess()
{
// Cleanup any remaining timers.
foreachpair (const TaskID& taskId, StatusUpdateTimer* timer, timers) {
@@ -154,165 +191,148 @@ protected:
// above for why sending a message will still require us to use
// the terminate flag).
switch (serve(2)) {
-
- case NEW_MASTER_DETECTED: {
- const Message<NEW_MASTER_DETECTED>& msg = message();
-
- VLOG(1) << "New master at " << msg.pid();
-
- master = msg.pid();
- link(master);
-
- if (frameworkId == "") {
- // Touched for the very first time.
- Message<F2M_REGISTER_FRAMEWORK> out;
- out.mutable_framework()->MergeFrom(framework);
- send(master, out);
- } else {
- // Not the first time, or failing over.
- Message<F2M_REREGISTER_FRAMEWORK> out;
- out.mutable_framework()->MergeFrom(framework);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.set_generation(generation++);
- send(master, out);
+ case PROCESS_EXIT: {
+ // TODO(benh): Don't wait for a new master forever.
+ if (from() == master)
+ VLOG(1) << "Connection to master lost .. waiting for new master";
+ break;
}
- active = true;
-
- break;
- }
+ case PROCESS_TIMEOUT: {
+ break;
+ }
- case NO_MASTER_DETECTED: {
- // In this case, we don't actually invoke Scheduler::error
- // since we might get reconnected to a master imminently.
- active = false;
- VLOG(1) << "No master detected, waiting for another master";
- break;
+ default: {
+ VLOG(1) << "Received unknown message " << msgid()
+ << " from " << from();
+ break;
+ }
}
+ }
+ }
- case MASTER_DETECTION_FAILURE: {
- active = false;
- // TODO(benh): Better error codes/messages!
- int32_t code = 1;
- const string& message = "Failed to detect master(s)";
- invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
- break;
- }
+ void newMasterDetected(const string& pid)
+ {
+ VLOG(1) << "New master at " << pid;
- case M2F_REGISTER_REPLY: {
- const Message<M2F_REGISTER_REPLY>& msg = message();
- frameworkId = msg.framework_id();
- invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
- break;
- }
+ master = pid;
+ link(master);
- case M2F_RESOURCE_OFFER: {
- const Message<M2F_RESOURCE_OFFER>& msg = message();
+ if (frameworkId == "") {
+ // Touched for the very first time.
+ MSG<F2M_REGISTER_FRAMEWORK> out;
+ out.mutable_framework()->MergeFrom(framework);
+ send(master, out);
+ } else {
+ // Not the first time, or failing over.
+ MSG<F2M_REREGISTER_FRAMEWORK> out;
+ out.mutable_framework()->MergeFrom(framework);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.set_generation(generation++);
+ send(master, out);
+ }
- // Construct a vector for the offers. Also save the pid
- // associated with each slave (one per SlaveOffer) so later we
- // can send framework messages directly.
- vector<SlaveOffer> offers;
-
- for (int i = 0; i < msg.offer_size(); i++) {
- const SlaveOffer& offer = msg.offer(i);
- PID pid(msg.pid(i));
- CHECK(pid != PID());
- savedOffers[msg.offer_id()][offer.slave_id()] = pid;
- offers.push_back(offer);
- }
+ active = true;
+ }
- invoke(bind(&Scheduler::resourceOffer, sched, driver,
- cref(msg.offer_id()), cref(offers)));
- break;
- }
+ void noMasterDetected()
+ {
+ // In this case, we don't actually invoke Scheduler::error
+ // since we might get reconnected to a master imminently.
+ active = false;
+ VLOG(1) << "No master detected, waiting for another master";
+ }
- case M2F_RESCIND_OFFER: {
- const Message<M2F_RESCIND_OFFER>& msg = message();
- savedOffers.erase(msg.offer_id());
- invoke(bind(&Scheduler::offerRescinded, sched, driver,
- cref(msg.offer_id())));
- break;
- }
+ void masterDetectionFailure()
+ {
+ active = false;
+ // TODO(benh): Better error codes/messages!
+ int32_t code = 1;
+ const string& message = "Failed to detect master(s)";
+ invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
+ }
- case M2F_STATUS_UPDATE: {
- const Message<M2F_STATUS_UPDATE>& msg = message();
+ void registerReply(const FrameworkID& frameworkId)
+ {
+ this->frameworkId = frameworkId;
+ invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
+ }
- const TaskStatus &status = msg.status();
+ void resourceOffer(const OfferID& offerId,
+ const RepeatedPtrField<SlaveOffer>& offers,
+ const RepeatedPtrField<string>& pids)
+ {
+ // Construct a vector for the offers. Also save the pid
+ // associated with each slave (one per SlaveOffer) so later we
+ // can send framework messages directly.
+ vector<SlaveOffer> temp;
+
+ for (int i = 0; i < offers.size(); i++) {
+ PID pid(pids.Get(i));
+ CHECK(pid != PID());
+ savedOffers[offerId][offers.Get(i).slave_id()] = pid;
+ temp.push_back(offers.Get(i));
+ }
- // TODO(benh): Note that this maybe a duplicate status update!
- // Once we get support to try and have a more consistent view
- // of what's running in the cluster, we'll just let this one
- // slide. The alternative is possibly dealing with a scheduler
- // failover and not correctly giving the scheduler it's status
- // update, which seems worse than giving a status update
- // multiple times (of course, if a scheduler re-uses a TaskID,
- // that could be bad.
-
- // Stop any status update timers we might have had running.
- if (timers.count(status.task_id()) > 0) {
- StatusUpdateTimer* timer = timers[status.task_id()];
- timers.erase(status.task_id());
- send(timer->self(), MESOS_MSGID);
- wait(timer->self());
- delete timer;
- }
+ invoke(bind(&Scheduler::resourceOffer, sched, driver, cref(offerId),
+ cref(temp)));
+ }
- invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
+ void rescindOffer(const OfferID& offerId)
+ {
+ savedOffers.erase(offerId);
+ invoke(bind(&Scheduler::offerRescinded, sched, driver, cref(offerId)));
+ }
- // Acknowledge the message (we do this last, after we invoked
- // the scheduler, if we did at all, in case it causes a crash,
- // since this way the message might get resent/routed after
- // the scheduler comes back online).
- Message<F2M_STATUS_UPDATE_ACK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_slave_id()->MergeFrom(status.slave_id());
- out.mutable_task_id()->MergeFrom(status.task_id());
- send(master, out);
- break;
- }
+ void statusUpdate(const FrameworkID& frameworkId, const TaskStatus& status)
+ {
+ CHECK(this->frameworkId == frameworkId);
- case M2F_FRAMEWORK_MESSAGE: {
- const Message<M2F_FRAMEWORK_MESSAGE>& msg = message();
- invoke(bind(&Scheduler::frameworkMessage, sched, driver,
- cref(msg.message())));
- break;
- }
+ // TODO(benh): Note that this maybe a duplicate status update!
+ // Once we get support to try and have a more consistent view
+ // of what's running in the cluster, we'll just let this one
+ // slide. The alternative is possibly dealing with a scheduler
+ // failover and not correctly giving the scheduler it's status
+ // update, which seems worse than giving a status update
+ // multiple times (of course, if a scheduler re-uses a TaskID,
+ // that could be bad.
+
+ // Stop any status update timers we might have had running.
+ if (timers.count(status.task_id()) > 0) {
+ StatusUpdateTimer* timer = timers[status.task_id()];
+ timers.erase(status.task_id());
+ send(timer->self(), MESOS_MSGID);
+ wait(timer->self());
+ delete timer;
+ }
- case M2F_LOST_SLAVE: {
- const Message<M2F_LOST_SLAVE>& msg = message();
- savedSlavePids.erase(msg.slave_id());
- invoke(bind(&Scheduler::slaveLost, sched, driver,
- cref(msg.slave_id())));
- break;
- }
+ invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
- case M2F_ERROR: {
- const Message<M2F_ERROR>& msg = message();
- int32_t code = msg.code();
- const string& message = msg.message();
- invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
- break;
- }
+ // Acknowledge the message (we do this last, after we invoked
+ // the scheduler, if we did at all, in case it causes a crash,
+ // since this way the message might get resent/routed after
+ // the scheduler comes back online).
+ MSG<F2M_STATUS_UPDATE_ACK> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_slave_id()->MergeFrom(status.slave_id());
+ out.mutable_task_id()->MergeFrom(status.task_id());
+ send(master, out);
+ }
- case PROCESS_EXIT: {
- // TODO(benh): Don't wait for a new master forever.
- if (from() == master)
- VLOG(1) << "Connection to master lost .. waiting for new master";
- break;
- }
+ void lostSlave(const SlaveID& slaveId)
+ {
+ savedSlavePids.erase(slaveId);
+ invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
+ }
- case PROCESS_TIMEOUT: {
- break;
- }
+ void frameworkMessage(const FrameworkMessage& message)
+ {
+ invoke(bind(&Scheduler::frameworkMessage, sched, driver, cref(message)));
+ }
- default: {
- VLOG(1) << "Received unknown message " << msgid()
- << " from " << from();
- break;
- }
- }
- }
+ void error(int32_t code, const string& message)
+ {
+ invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
}
void stop()
@@ -320,7 +340,7 @@ protected:
if (!active)
return;
- Message<F2M_UNREGISTER_FRAMEWORK> out;
+ MSG<F2M_UNREGISTER_FRAMEWORK> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
send(master, out);
}
@@ -330,7 +350,7 @@ protected:
if (!active)
return;
- Message<F2M_KILL_TASK> out;
+ MSG<F2M_KILL_TASK> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
out.mutable_task_id()->MergeFrom(taskId);
send(master, out);
@@ -343,7 +363,7 @@ protected:
if (!active)
return;
- Message<F2M_RESOURCE_OFFER_REPLY> out;
+ MSG<F2M_RESOURCE_OFFER_REPLY> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
out.mutable_offer_id()->MergeFrom(offerId);
@@ -377,7 +397,7 @@ protected:
if (!active)
return;
- Message<F2M_REVIVE_OFFERS> out;
+ MSG<F2M_REVIVE_OFFERS> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
send(master, out);
}
@@ -401,7 +421,7 @@ protected:
CHECK(slave != PID());
// TODO(benh): This is kind of wierd, M2S?
- Message<M2S_FRAMEWORK_MESSAGE> out;
+ MSG<M2S_FRAMEWORK_MESSAGE> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
out.mutable_message()->MergeFrom(message);
send(slave, out);
@@ -409,7 +429,7 @@ protected:
VLOG(1) << "Cannot send directly to slave " << message.slave_id()
<< "; sending through master";
- Message<F2M_FRAMEWORK_MESSAGE> out;
+ MSG<F2M_FRAMEWORK_MESSAGE> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
out.mutable_message()->MergeFrom(message);
send(master, out);
@@ -432,8 +452,6 @@ private:
unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
unordered_map<SlaveID, PID> savedSlavePids;
- unordered_set<TaskID> tasks;
-
// Timers to ensure we get a status update for each task we launch.
unordered_map<TaskID, StatusUpdateTimer *> timers;
};
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp Sun Jun 5 09:08:02 2011
@@ -244,8 +244,8 @@ void LxcIsolationModule::Reaper::operato
{
link(module->slave->self());
while (true) {
- switch (receive(1)) {
- case PROCESS_TIMEOUT: {
+ receive(1);
+ if (name() == TIMEOUT) {
// Check whether any child process has exited
pid_t pid;
int status;
@@ -264,10 +264,7 @@ void LxcIsolationModule::Reaper::operato
}
}
}
- break;
- }
- case SHUTDOWN_REAPER:
- case PROCESS_EXIT:
+ } else if (name() == TERMINATE || name() == EXIT) {
return;
}
}
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Sun Jun 5 09:08:02 2011
@@ -23,15 +23,12 @@ public:
LxcIsolationModule* module;
protected:
- void operator () ();
+ virtual void operator () ();
public:
Reaper(LxcIsolationModule* module);
};
- // Extra shutdown message for reaper
- enum { SHUTDOWN_REAPER = PROCESS_MSGID };
-
// Per-framework information object maintained in info hashmap
struct FrameworkInfo {
string container; // Name of Linux container used for this framework
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun 5 09:08:02 2011
@@ -39,7 +39,7 @@ ProcessBasedIsolationModule::~ProcessBas
// could thus lead to a seg fault!
if (initialized) {
CHECK(reaper != NULL);
- Process::post(reaper->self(), SHUTDOWN_REAPER);
+ Process::post(reaper->self(), TERMINATE);
Process::wait(reaper->self());
delete reaper;
}
@@ -148,8 +148,8 @@ void ProcessBasedIsolationModule::Reaper
{
link(module->slave->self());
while (true) {
- switch (receive(1)) {
- case PROCESS_TIMEOUT: {
+ receive(1);
+ if (name() == TIMEOUT) {
// Check whether any child process has exited.
pid_t pid;
int status;
@@ -171,10 +171,7 @@ void ProcessBasedIsolationModule::Reaper
}
}
}
- break;
- }
- case SHUTDOWN_REAPER:
- case PROCESS_EXIT:
+ } else if (name() == TERMINATE || name() == EXIT) {
return;
}
}
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Sun Jun 5 09:08:02 2011
@@ -34,15 +34,12 @@ public:
ProcessBasedIsolationModule* module;
protected:
- void operator () ();
+ virtual void operator () ();
public:
Reaper(ProcessBasedIsolationModule* module);
};
- // Extra shutdown message for reaper
- enum { SHUTDOWN_REAPER = PROCESS_MSGID };
-
protected:
// Main method executed after a fork() to create a Launcher for launching
// an executor's process. The Launcher will create the child's working
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:08:02 2011
@@ -6,6 +6,8 @@
#include <algorithm>
#include <fstream>
+#include <google/protobuf/descriptor.h>
+
#include "slave.hpp"
#include "webui.hpp"
@@ -14,101 +16,61 @@
#define gethostbyname2(name, _) gethostbyname(name)
#endif
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
+using boost::unordered_map;
+using boost::unordered_set;
+
using std::list;
using std::make_pair;
using std::ostringstream;
-using std::istringstream;
using std::pair;
using std::queue;
using std::string;
using std::vector;
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-namespace {
-
-// Periodically sends heartbeats to the master
-class Heart : public MesosProcess
-{
-private:
- PID master;
- PID slave;
- SlaveID sid;
- double interval;
-
-protected:
- void operator () ()
- {
- link(slave);
- link(master);
- do {
- switch (receive(interval)) {
- case PROCESS_TIMEOUT:
- send(master, pack<SH2M_HEARTBEAT>(sid));
- break;
- case PROCESS_EXIT:
- return;
- }
- } while (true);
- }
-
-public:
- Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
- : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
-};
-
-
-// Default values for CPU cores and memory to include in configuration
-const int32_t DEFAULT_CPUS = 1;
-const int32_t DEFAULT_MEM = 1 * Gigabyte;
-
-
-} /* namespace */
-
-
-Slave::Slave(Resources _resources, bool _local,
+Slave::Slave(const Resources& _resources, bool _local,
IsolationModule *_isolationModule)
- : id(""), resources(_resources), local(_local),
- isolationModule(_isolationModule) {}
+ : resources(_resources), local(_local),
+ isolationModule(_isolationModule), heart(NULL) {}
-Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
- : id(""), conf(_conf), local(_local), isolationModule(_module)
-{
- resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
- conf.get<int32_t>("mem", DEFAULT_MEM));
+Slave::Slave(const Configuration& _conf, bool _local,
+ IsolationModule* _isolationModule)
+ : conf(_conf), local(_local),
+ isolationModule(_isolationModule), heart(NULL)
+{
+ resources =
+ Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
}
-void Slave::registerOptions(Configurator* conf)
-{
- conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
- DEFAULT_CPUS);
- conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
- DEFAULT_MEM);
- conf->addOption<string>("work_dir",
- "Where to place framework work directories\n"
- "(default: MESOS_HOME/work)");
- conf->addOption<string>("hadoop_home",
- "Where to find Hadoop installed (for fetching\n"
- "framework executors from HDFS)\n"
- "(default: look for HADOOP_HOME environment\n"
- "variable or find hadoop on PATH)");
- conf->addOption<bool>("switch_user",
- "Whether to run tasks as the user who\n"
- "submitted them rather than the user running\n"
- "the slave (requires setuid permission)",
- true);
- conf->addOption<string>("frameworks_home",
- "Directory prepended to relative executor\n"
- "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* configurator)
+{
+ // TODO(benh): Is there a way to specify units for the resources?
+ configurator->addOption<string>("resources",
+ "Total consumable resources per slave\n");
+// configurator->addOption<string>("attributes",
+// "Attributes of machine\n");
+ configurator->addOption<string>("work_dir",
+ "Where to place framework work directories\n"
+ "(default: MESOS_HOME/work)");
+ configurator->addOption<string>("hadoop_home",
+ "Where to find Hadoop installed (for\n"
+ "fetching framework executors from HDFS)\n"
+ "(default: look for HADOOP_HOME in\n"
+ "environment or find hadoop on PATH)");
+ configurator->addOption<bool>("switch_user",
+ "Whether to run tasks as the user who\n"
+ "submitted them rather than the user running\n"
+ "the slave (requires setuid permission)",
+ true);
+ configurator->addOption<string>("frameworks_home",
+ "Directory prepended to relative executor\n"
+ "paths (default: MESOS_HOME/frameworks)");
}
@@ -120,25 +82,54 @@ Slave::~Slave()
state::SlaveState *Slave::getState()
{
- std::ostringstream my_pid;
- my_pid << self();
- std::ostringstream master_pid;
- master_pid << master;
+ Resources resources(resources);
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
state::SlaveState *state =
- new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus,
- resources.mem, my_pid.str(), master_pid.str());
+ new state::SlaveState(BUILD_DATE, BUILD_USER, slaveId.value(),
+ cpus.value(), mem.value(), self(), master);
- foreachpair(_, Framework *f, frameworks) {
- state::Framework *framework = new state::Framework(f->id, f->name,
- f->executorInfo.uri, f->executorStatus, f->resources.cpus,
- f->resources.mem);
- state->frameworks.push_back(framework);
- foreachpair(_, Task *t, f->tasks) {
- state::Task *task = new state::Task(t->id, t->name, t->state,
- t->resources.cpus, t->resources.mem);
- framework->tasks.push_back(task);
- }
- }
+// foreachpair (_, Framework *f, frameworks) {
+
+// foreachpair (_, Executor* e, f->executors) {
+
+// Resources resources(e->resources);
+// Resource::Scalar cpus;
+// Resource::Scalar mem;
+// cpus.set_value(-1);
+// mem.set_value(-1);
+// cpus = resources.getScalar("cpus", cpus);
+// mem = resources.getScalar("mem", mem);
+
+// state::Framework *framework =
+// new state::Framework(f->frameworkId.value(), f->info.name(),
+// f->info.executor().uri(), f->executorStatus,
+// cpus.value(), mem.value());
+
+// state->frameworks.push_back(framework);
+
+// foreachpair(_, Task *t, f->tasks) {
+// Resources resources(t->resources());
+// Resource::Scalar cpus;
+// Resource::Scalar mem;
+// cpus.set_value(-1);
+// mem.set_value(-1);
+// cpus = resources.getScalar("cpus", cpus);
+// mem = resources.getScalar("mem", mem);
+
+// state::Task *task =
+// new state::Task(t->task_id().value(), t->name(),
+// TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+// cpus.value(), mem.value());
+
+// framework->tasks.push_back(task);
+// }
+// }
return state;
}
@@ -147,61 +138,60 @@ state::SlaveState *Slave::getState()
void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
+ LOG(INFO) << "Slave resources: " << resources;
// Get our hostname
- char buf[512];
+ char buf[256];
gethostname(buf, sizeof(buf));
hostent *he = gethostbyname2(buf, AF_INET);
string hostname = he->h_name;
- // Get our public Web UI URL. Normally this is our hostname, but on EC2
+ // Get our public DNS name. Normally this is our hostname, but on EC2
// we look for the MESOS_PUBLIC_DNS environment variable. This allows
// the master to display our public name in its web UI.
- LOG(INFO) << "setting up webUIUrl on port " << conf["webui_port"];
- string webUIUrl;
+ string public_hostname = hostname;
if (getenv("MESOS_PUBLIC_DNS") != NULL) {
- webUIUrl = getenv("MESOS_PUBLIC_DNS");
- } else {
- webUIUrl = hostname;
+ public_hostname = getenv("MESOS_PUBLIC_DNS");
}
-#ifdef MESOS_WEBUI
- webUIUrl += ":" + conf["webui_port"];
-#endif
+
+ SlaveInfo slave;
+ slave.set_hostname(hostname);
+ slave.set_public_hostname(public_hostname);
+ slave.mutable_resources()->MergeFrom(resources);
// Initialize isolation module.
isolationModule->initialize(this);
while (true) {
- switch (receive()) {
+ switch (receive(1)) {
case NEW_MASTER_DETECTED: {
- string masterSeq;
- PID masterPid;
- tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
+ const MSG<NEW_MASTER_DETECTED>& msg = message();
- LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+ LOG(INFO) << "New master at " << msg.pid();
- redirect(master, masterPid);
- master = masterPid;
+ master = msg.pid();
link(master);
- if (id.empty()) {
+ if (slaveId == "") {
// Slave started before master.
- send(master, pack<S2M_REGISTER_SLAVE>(hostname, webUIUrl, resources));
+ MSG<S2M_REGISTER_SLAVE> out;
+ out.mutable_slave()->MergeFrom(slave);
+ send(master, out);
} else {
- // Reconnecting, so reconstruct resourcesInUse for the master.
- Resources resourcesInUse;
- vector<Task> taskVec;
-
- foreachpair(_, Framework *framework, frameworks) {
- foreachpair(_, Task *task, framework->tasks) {
- resourcesInUse += task->resources;
- Task ti = *task;
- ti.slaveId = id;
- taskVec.push_back(ti);
- }
- }
+ // Re-registering, so send tasks running.
+ MSG<S2M_REREGISTER_SLAVE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_slave()->MergeFrom(slave);
+
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (_, Executor* executor, framework->executors) {
+ foreachpair (_, Task* task, executor->tasks) {
+ out.add_task()->MergeFrom(*task);
+ }
+ }
+ }
- send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, webUIUrl, resources, taskVec));
+ send(master, out);
}
break;
}
@@ -211,231 +201,351 @@ void Slave::operator () ()
break;
}
+ case MASTER_DETECTION_FAILURE: {
+ LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
+ break;
+ }
+
case M2S_REGISTER_REPLY: {
- double interval = 0;
- tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
- LOG(INFO) << "Registered with master; given slave ID " << this->id;
- link(spawn(new Heart(master, self(), this->id, interval)));
+ const MSG<M2S_REGISTER_REPLY>& msg = message();
+ slaveId = msg.slave_id();
+
+ LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+
+ heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
+ link(spawn(heart));
break;
}
case M2S_REREGISTER_REPLY: {
- SlaveID sid;
- double interval = 0;
- tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
- LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
- if (this->id == "")
- this->id = sid;
- CHECK(this->id == sid);
- link(spawn(new Heart(master, self(), this->id, interval)));
+ const MSG<M2S_REREGISTER_REPLY>& msg = message();
+
+ LOG(INFO) << "Re-registered with master";
+
+ if (!(slaveId == msg.slave_id())) {
+ LOG(FATAL) << "Slave re-registered but got wrong ID";
+ }
+
+ if (heart != NULL) {
+ send(heart->self(), MESOS_MSGID);
+ wait(heart->self());
+ delete heart;
+ }
+
+ heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
+ link(spawn(heart));
break;
}
case M2S_RUN_TASK: {
- FrameworkID fid;
- TaskID tid;
- string fwName, user, taskName, taskArg;
- ExecutorInfo execInfo;
- Params params;
- PID pid;
- tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
- unpack<M2S_RUN_TASK>(body());
- LOG(INFO) << "Got assigned task " << fid << ":" << tid;
- Resources res;
- res.cpus = params.getInt32("cpus", -1);
- res.mem = params.getInt64("mem", -1);
- Framework *framework = getFramework(fid);
+ const MSG<M2S_RUN_TASK>& msg = message();
+
+ const TaskDescription& task = msg.task();
+
+ LOG(INFO) << "Got assigned task " << task.task_id()
+ << " for framework " << msg.framework_id();
+
+ Framework *framework = getFramework(msg.framework_id());
if (framework == NULL) {
- // Framework not yet created on this node - create it.
- framework = new Framework(fid, fwName, user, execInfo, pid);
- frameworks[fid] = framework;
- isolationModule->startExecutor(framework);
- }
- Task *task = framework->addTask(tid, taskName, res);
- Executor *executor = getExecutor(fid);
- if (executor) {
- send(executor->pid,
- pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
- isolationModule->resourcesChanged(framework);
+ framework =
+ new Framework(msg.framework_id(), msg.framework(), msg.pid());
+ frameworks[msg.framework_id()] = framework;
+ }
+
+ // Either send the task to an executor or start a new executor
+ // and queue the task until the executor has started.
+ Executor* executor = task.has_executor()
+ ? framework->getExecutor(task.executor().executor_id())
+ : framework->getExecutor(framework->info.executor().executor_id());
+
+ if (executor != NULL) {
+ if (!executor->pid) {
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+ } else {
+ // Add the task to the executor.
+ executor->addTask(task);
+
+ MSG<S2E_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(executor->pid, out);
+ isolationModule->resourcesChanged(framework, executor);
+ }
} else {
- // Executor not yet registered; queue task for when it starts up
- TaskDescription *td = new TaskDescription(
- tid, taskName, taskArg, params.str());
- framework->queuedTasks.push_back(td);
+ // Launch an executor for this task.
+ if (task.has_executor()) {
+ executor = framework->createExecutor(task.executor());
+ } else {
+ executor = framework->createExecutor(framework->info.executor());
+ }
+
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+
+ // Tell the isolation module to launch the executor.
+ isolationModule->launchExecutor(framework, executor);
}
break;
}
case M2S_KILL_TASK: {
- FrameworkID fid;
- TaskID tid;
- tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
- LOG(INFO) << "Killing task " << fid << ":" << tid;
- if (Executor *ex = getExecutor(fid)) {
- send(ex->pid, pack<S2E_KILL_TASK>(tid));
- }
- if (Framework *fw = getFramework(fid)) {
- fw->removeTask(tid);
- isolationModule->resourcesChanged(fw);
+ const MSG<M2S_KILL_TASK>& msg = message();
+
+ LOG(INFO) << "Asked to kill task " << msg.task_id()
+ << " of framework " << msg.framework_id();
+
+ Framework* framework = getFramework(msg.framework_id());
+ if (framework != NULL) {
+ // Tell the executor to kill the task if it is up and
+ // running, otherwise, consider the task lost.
+ Executor* executor = framework->getExecutor(msg.task_id());
+ if (executor == NULL || !executor->pid) {
+ // Update the resources locally, if an executor comes up
+ // after this then it just won't receive this task.
+ executor->removeTask(msg.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(msg.task_id());
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+ send(master, out);
+
+ double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status->task_id()] = *status;
+ } else {
+ // Otherwise, send a message to the executor and wait for
+ // it to send us a status update.
+ MSG<S2E_KILL_TASK> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_task_id()->MergeFrom(msg.task_id());
+ send(executor->pid, out);
+ }
+ } else {
+ LOG(WARNING) << "Cannot kill task " << msg.task_id()
+ << " of framework " << msg.framework_id()
+ << " because no such framework is running";
+
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(msg.task_id());
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+ send(master, out);
+
+ double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status->task_id()] = *status;
}
break;
}
case M2S_KILL_FRAMEWORK: {
- FrameworkID fid;
- tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
- LOG(INFO) << "Asked to kill framework " << fid;
- Framework *fw = getFramework(fid);
- if (fw != NULL)
- killFramework(fw);
+ const MSG<M2S_KILL_FRAMEWORK>&msg = message();
+
+ LOG(INFO) << "Asked to kill framework " << msg.framework_id();
+
+ Framework *framework = getFramework(msg.framework_id());
+ if (framework != NULL)
+ killFramework(framework);
break;
}
case M2S_FRAMEWORK_MESSAGE: {
- FrameworkID fid;
- FrameworkMessage message;
- tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
- if (Executor *ex = getExecutor(fid)) {
- VLOG(1) << "Relaying framework message for framework " << fid;
- send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+ const MSG<M2S_FRAMEWORK_MESSAGE>&msg = message();
+
+ Framework* framework = getFramework(msg.framework_id());
+ if (framework != NULL) {
+ const FrameworkMessage& message = msg.message();
+
+ Executor* executor = framework->getExecutor(message.executor_id());
+ if (executor == NULL) {
+ LOG(WARNING) << "Dropping message for executor '"
+ << message.executor_id() << "' of framework "
+ << msg.framework_id()
+ << " because executor does not exist";
+ } else if (!executor->pid) {
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ LOG(WARNING) << "Dropping message for executor '"
+ << message.executor_id() << "' of framework "
+ << msg.framework_id()
+ << " because executor is not running";
+ } else {
+ MSG<S2E_FRAMEWORK_MESSAGE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_message()->MergeFrom(message);
+ send(executor->pid, out);
+ }
} else {
- VLOG(1) << "Dropping framework message for framework " << fid
- << " because its executor is not running";
+ LOG(WARNING) << "Dropping message for framework "
+ << msg.framework_id()
+ << " because it does not exist";
+ }
+ break;
+ }
+
+ case M2S_UPDATE_FRAMEWORK: {
+ const MSG<M2S_UPDATE_FRAMEWORK>&msg = message();
+
+ Framework *framework = getFramework(msg.framework_id());
+ if (framework != NULL) {
+ LOG(INFO) << "Updating framework " << msg.framework_id()
+ << " pid to " << msg.pid();
+ framework->pid = msg.pid();
}
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
break;
}
- case M2S_UPDATE_FRAMEWORK_PID: {
- FrameworkID fid;
- PID pid;
- tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
- Framework *framework = getFramework(fid);
+ case M2S_STATUS_UPDATE_ACK: {
+ const MSG<M2S_STATUS_UPDATE_ACK>& msg = message();
+
+ Framework* framework = getFramework(msg.framework_id());
if (framework != NULL) {
- LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
- framework->pid = pid;
+ foreachpair (double deadline, _, framework->statuses) {
+ if (framework->statuses[deadline].count(msg.task_id()) > 0) {
+ LOG(INFO) << "Got acknowledgement of status update"
+ << " for task " << msg.task_id()
+ << " of framework " << framework->frameworkId;
+ framework->statuses[deadline].erase(msg.task_id());
+ break;
+ }
+ }
}
break;
}
case E2S_REGISTER_EXECUTOR: {
- FrameworkID fid;
- tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
- LOG(INFO) << "Got executor registration for framework " << fid;
- if (Framework *fw = getFramework(fid)) {
- if (getExecutor(fid) != 0) {
- LOG(ERROR) << "Executor for framework " << fid
- << "already exists";
- send(from(), pack<S2E_KILL_EXECUTOR>());
- break;
+ const MSG<E2S_REGISTER_EXECUTOR>& msg = message();
+
+ LOG(INFO) << "Got registration for executor '"
+ << msg.executor_id() << "' of framework "
+ << msg.framework_id();
+
+ Framework* framework = getFramework(msg.framework_id());
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(msg.executor_id());
+
+ // Check the status of the executor.
+ if (executor == NULL) {
+ LOG(WARNING) << "Not expecting executor '" << msg.executor_id()
+ << "' of framework " << msg.framework_id();
+ send(from(), S2E_KILL_EXECUTOR);
+ } else if (executor->pid != PID()) {
+ LOG(WARNING) << "Not good, executor '" << msg.executor_id()
+ << "' of framework " << msg.framework_id()
+ << " is already running";
+ send(from(), S2E_KILL_EXECUTOR);
+ } else {
+ // Save the pid for the executor.
+ executor->pid = from();
+
+ // Now that the executor is up, set its resource limits.
+ isolationModule->resourcesChanged(framework, executor);
+
+ // Tell executor it's registered and give it any queued tasks.
+ MSG<S2E_REGISTER_REPLY> out;
+ ExecutorArgs* args = out.mutable_args();
+ args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+ args->set_name(framework->info.name());
+ args->mutable_slave_id()->MergeFrom(slaveId);
+ args->set_hostname(hostname);
+ args->set_data(framework->info.executor().data());
+ send(executor->pid, out);
+ sendQueuedTasks(framework, executor);
}
- Executor *executor = new Executor(fid, from());
- executors[fid] = executor;
- link(from());
- // Now that the executor is up, set its resource limits
- isolationModule->resourcesChanged(fw);
- // Tell executor that it's registered and give it its queued tasks
- send(from(), pack<S2E_REGISTER_REPLY>(this->id,
- hostname,
- fw->name,
- fw->executorInfo.initArg));
- sendQueuedTasks(fw);
} else {
- // Framework is gone; tell the executor to exit
- send(from(), pack<S2E_KILL_EXECUTOR>());
+ // Framework is gone; tell the executor to exit.
+ LOG(WARNING) << "Framework " << msg.framework_id()
+ << " does not exist (it may have been killed),"
+ << " telling executor to exit";
+
+ // TODO(benh): Don't we also want to tell the isolation
+ // module to shut this guy down!
+ send(from(), S2E_KILL_EXECUTOR);
}
break;
}
case E2S_STATUS_UPDATE: {
- FrameworkID fid;
- TaskID tid;
- TaskState taskState;
- string data;
- tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
+ const MSG<E2S_STATUS_UPDATE>& msg = message();
- Framework *framework = getFramework(fid);
- if (framework != NULL) {
- LOG(INFO) << "Got status update for task " << fid << ":" << tid;
- if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
- taskState == TASK_KILLED || taskState == TASK_LOST) {
- LOG(INFO) << "Task " << fid << ":" << tid << " done";
+ const TaskStatus& status = msg.status();
- framework->removeTask(tid);
- isolationModule->resourcesChanged(framework);
- }
+ LOG(INFO) << "Status update: task " << status.task_id()
+ << " of framework " << msg.framework_id()
+ << " is now in state "
+ << TaskState_descriptor()->FindValueByNumber(status.state())->name();
- // Reliably send message and save sequence number for
- // canceling later.
- int seq = rsend(master, framework->pid,
- pack<S2M_STATUS_UPDATE>(id, fid, tid,
- taskState, data));
- seqs[fid].insert(seq);
+ Framework *framework = getFramework(msg.framework_id());
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(status.task_id());
+ if (executor != NULL) {
+ if (status.state() == TASK_FINISHED ||
+ status.state() == TASK_FAILED ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_LOST) {
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+ }
+
+ // Send message and record the status for possible resending.
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_status()->MergeFrom(status);
+ send(master, out);
+
+ double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+ framework->statuses[deadline][status.task_id()] = status;
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "executor for framework " << msg.framework_id();
+ }
} else {
- LOG(WARNING) << "Got status update for UNKNOWN task "
- << fid << ":" << tid;
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "framework " << msg.framework_id();
}
break;
}
case E2S_FRAMEWORK_MESSAGE: {
- FrameworkID fid;
- FrameworkMessage message;
- tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
+ const MSG<E2S_FRAMEWORK_MESSAGE>& msg = message();
+
+ const FrameworkMessage& message = msg.message();
- Framework *framework = getFramework(fid);
+ Framework *framework = getFramework(msg.framework_id());
if (framework != NULL) {
- LOG(INFO) << "Sending message for framework " << fid
+ LOG(INFO) << "Sending message for framework "
+ << framework->frameworkId
<< " to " << framework->pid;
- // Set slave ID in case framework omitted it.
- message.slaveId = this->id;
- VLOG(1) << "Sending framework message to framework " << fid
- << " with PID " << framework->pid;
- send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+ // TODO(benh): This is weird, sending an M2F message.
+ MSG<M2F_FRAMEWORK_MESSAGE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_message()->MergeFrom(message);
+ out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
+ send(framework->pid, out);
}
break;
}
case S2S_GET_STATE: {
- send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
- break;
- }
-
- case PROCESS_EXIT: {
- LOG(INFO) << "Process exited: " << from();
-
- if (from() == master) {
- LOG(WARNING) << "Master disconnected! "
- << "Waiting for a new master to be elected.";
- // TODO(benh): After so long waiting for a master, commit suicide.
- } else {
- // Check if an executor has exited (this is technically
- // redundant because the isolation module should be doing
- // this for us).
- foreachpair (_, Executor *ex, executors) {
- if (from() == ex->pid) {
- LOG(INFO) << "Executor for framework " << ex->frameworkId
- << " disconnected";
- Framework *framework = getFramework(ex->frameworkId);
- if (framework != NULL) {
- send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
- killFramework(framework);
- }
- break;
- }
- }
- }
-
+ state::SlaveState *state = getState();
+ MSG<S2S_GET_STATE_REPLY> out;
+ out.set_pointer((char *) &state, sizeof(state));
+ send(from(), out);
break;
}
case M2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by master: " << from();
- unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
- foreachpair (_, Framework *framework, frameworksCopy) {
+ foreachpaircopy (_, Framework *framework, frameworks) {
killFramework(framework);
}
return;
@@ -443,16 +553,46 @@ void Slave::operator () ()
case S2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by " << from();
- unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
- foreachpair (_, Framework *framework, frameworksCopy) {
+ foreachpaircopy (_, Framework *framework, frameworks) {
killFramework(framework);
}
return;
}
+ case PROCESS_EXIT: {
+ LOG(INFO) << "Process exited: " << from();
+
+ if (from() == master) {
+ LOG(WARNING) << "Master disconnected! "
+ << "Waiting for a new master to be elected.";
+ // TODO(benh): After so long waiting for a master, commit suicide.
+ }
+ break;
+ }
+
+ case PROCESS_TIMEOUT: {
+ // Check and see if we should re-send any status updates.
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (double deadline, _, framework->statuses) {
+ if (deadline <= elapsed()) {
+ foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
+ LOG(WARNING) << "Resending status update"
+ << " for task " << status.task_id()
+ << " of framework " << framework->frameworkId;
+ MSG<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.mutable_status()->MergeFrom(status);
+ send(master, out);
+ }
+ }
+ }
+ }
+ break;
+ }
+
default: {
- LOG(ERROR) << "Received unknown message ID " << msgid()
- << " from " << from();
+ LOG(ERROR) << "Received unknown message (" << msgid()
+ << ") from " << from();
break;
}
}
@@ -460,70 +600,66 @@ void Slave::operator () ()
}
-Framework * Slave::getFramework(FrameworkID frameworkId)
+Framework* Slave::getFramework(const FrameworkID& frameworkId)
{
- FrameworkMap::iterator it = frameworks.find(frameworkId);
- if (it == frameworks.end()) return NULL;
- return it->second;
-}
-
+ if (frameworks.count(frameworkId) > 0) {
+ return frameworks[frameworkId];
+ }
-Executor * Slave::getExecutor(FrameworkID frameworkId)
-{
- ExecutorMap::iterator it = executors.find(frameworkId);
- if (it == executors.end()) return NULL;
- return it->second;
+ return NULL;
}
// Send any tasks queued up for the given framework to its executor
// (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework *framework)
+void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
{
- LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
- Executor *executor = getExecutor(framework->id);
- if (!executor) return;
- foreach(TaskDescription *td, framework->queuedTasks) {
- send(executor->pid,
- pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
- delete td;
+ LOG(INFO) << "Flushing queued tasks for framework "
+ << framework->frameworkId;
+
+ CHECK(executor->pid != PID());
+
+ foreach (const TaskDescription& task, executor->queuedTasks) {
+ // Add the task to the executor.
+ executor->addTask(task);
+
+ MSG<S2E_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(executor->pid, out);
}
- framework->queuedTasks.clear();
+
+ executor->queuedTasks.clear();
}
// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutor)
+void Slave::killFramework(Framework *framework, bool killExecutors)
{
- LOG(INFO) << "Cleaning up framework " << framework->id;
+ LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
- // Cancel sending any reliable messages for this framework.
- foreach (int seq, seqs[framework->id])
- cancel(seq);
+ // Shutdown all executors of this framework.
+ foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
+ if (killExecutors) {
+ LOG(INFO) << "Killing executor '" << executorId
+ << "' of framework " << framework->frameworkId;
- seqs.erase(framework->id);
+ send(executor->pid, S2E_KILL_EXECUTOR);
- // Remove its allocated resources.
- framework->resources = Resources();
-
- // If an executor is running, tell it to exit and kill it.
- if (Executor *ex = getExecutor(framework->id)) {
- if (killExecutor) {
- LOG(INFO) << "Killing executor for framework " << framework->id;
// TODO(benh): There really isn't ANY time between when an
// executor gets a S2E_KILL_EXECUTOR message and the isolation
// module goes and kills it. We should really think about making
// the semantics of this better.
- send(ex->pid, pack<S2E_KILL_EXECUTOR>());
- isolationModule->killExecutor(framework);
+
+ isolationModule->killExecutor(framework, executor);
}
- LOG(INFO) << "Cleaning up executor for framework " << framework->id;
- delete ex;
- executors.erase(framework->id);
+ framework->destroyExecutor(executorId);
}
- frameworks.erase(framework->id);
+ frameworks.erase(framework->frameworkId);
delete framework;
}
@@ -531,18 +667,44 @@ void Slave::killFramework(Framework *fra
// Called by isolation module when an executor process exits
// TODO(benh): Make this callback be a message so that we can avoid
// race conditions.
-void Slave::executorExited(FrameworkID fid, int status)
+void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status)
{
- if (Framework *f = getFramework(fid)) {
- LOG(INFO) << "Executor for framework " << fid << " exited "
- << "with status " << status;
- send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
- killFramework(f, false);
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor != NULL) {
+ LOG(INFO) << "Exited executor '" << executorId
+ << "' of framework " << frameworkId
+ << " with status " << status;
+
+ MSG<S2M_EXITED_EXECUTOR> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_status(status);
+ send(master, out);
+
+ framework->destroyExecutor(executorId);
+
+ // TODO(benh): When should we kill the presence of an entire
+ // framework on a slave?
+ if (framework->executors.size() == 0) {
+ killFramework(framework);
+ }
+ } else {
+ LOG(WARNING) << "UNKNOWN executor '" << executorId
+ << "' of framework " << frameworkId
+ << " has exited with status " << status;
+ }
+ } else {
+ LOG(WARNING) << "UNKNOWN executor '" << executorId
+ << "' of UNKNOWN framework " << frameworkId
+ << " has exited with status " << status;
}
};
-string Slave::getUniqueWorkDirectory(FrameworkID fid)
+string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId)
{
string workDir;
if (conf.contains("work_dir")) {
@@ -554,7 +716,7 @@ string Slave::getUniqueWorkDirectory(Fra
}
ostringstream os(std::ios_base::app | std::ios_base::out);
- os << workDir << "/slave-" << id << "/fw-" << fid;
+ os << workDir << "/slave-" << slaveId << "/fw-" << frameworkId;
// Find a unique directory based on the path given by the slave
// (this is because we might launch multiple executors from the same
@@ -575,7 +737,7 @@ string Slave::getUniqueWorkDirectory(Fra
}
-const Params& Slave::getConf()
+const Configuration& Slave::getConfiguration()
{
return conf;
}
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun 5 09:08:02 2011
@@ -27,7 +27,7 @@
#include <sys/types.h>
#include <sys/wait.h>
-#include <reliable.hpp>
+#include <process.hpp>
#include "isolation_module.hpp"
#include "state.hpp"
@@ -195,7 +195,7 @@ protected:
do {
switch (receive(interval)) {
case PROCESS_TIMEOUT: {
- Message<SH2M_HEARTBEAT> msg;
+ MSG<SH2M_HEARTBEAT> msg;
msg.mutable_slave_id()->MergeFrom(slaveId);
send(master, msg);
break;
Modified: incubator/mesos/trunk/src/slave/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.cpp (original)
+++ incubator/mesos/trunk/src/slave/webui.cpp Sun Jun 5 09:08:02 2011
@@ -96,7 +96,7 @@ public:
receive();
CHECK(msgid() == S2S_GET_STATE_REPLY);
- const Message<S2S_GET_STATE_REPLY>& msg = message();
+ const MSG<S2S_GET_STATE_REPLY>& msg = message();
slaveState =
*(state::SlaveState **) msg.pointer().data();
Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun 5 09:08:02 2011
@@ -490,7 +490,7 @@ TEST(MasterTest, SlavePartitioned)
EXPECT_CALL(sched, slaveLost(&driver, _))
.WillOnce(Trigger(&slaveLostCall));
- EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
+ EXPECT_MSG(filter, Eq(MesosProcess::names[SH2M_HEARTBEAT]), _, _)
.WillRepeatedly(Return(true));
driver.start();
@@ -774,7 +774,7 @@ TEST(MasterTest, SchedulerFailoverStatus
EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
.Times(1);
- EXPECT_MSG(filter, Eq(M2F_STATUS_UPDATE), _, Ne(master))
+ EXPECT_MSG(filter, Eq(MesosProcess::names[M2F_STATUS_UPDATE]), _, Ne(master))
.WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
.RetiresOnSaturation();
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Sun Jun 5 09:08:02 2011
@@ -10,6 +10,8 @@
#include <gmock/gmock.h>
+#include <messaging/messages.hpp>
+
namespace mesos { namespace internal { namespace test {
@@ -99,8 +101,8 @@ public:
*/
class MockFilter : public Filter
{
- public:
- MOCK_METHOD1(filter, bool(struct msg *));
+public:
+ MOCK_METHOD1(filter, bool(Message *));
};
@@ -108,9 +110,9 @@ class MockFilter : public Filter
* A message can be matched against in conjunction with the MockFilter
* (see above) to perform specific actions based for messages.
*/
-MATCHER_P3(MsgMatcher, id, from, to, "")
+MATCHER_P3(MsgMatcher, name, from, to, "")
{
- return (testing::Matcher<MSGID>(id).Matches(arg->id) &&
+ return (testing::Matcher<std::string>(name).Matches(arg->name) &&
testing::Matcher<PID>(from).Matches(arg->from) &&
testing::Matcher<PID>(to).Matches(arg->to));
}
@@ -121,8 +123,8 @@ MATCHER_P3(MsgMatcher, id, from, to, "")
* using the message matcher (see above) as well as the MockFilter
* (see above).
*/
-#define EXPECT_MSG(filter, id, from, to) \
- EXPECT_CALL(filter, filter(MsgMatcher(id, from, to)))
+#define EXPECT_MSG(filter, name, from, to) \
+ EXPECT_CALL(filter, filter(MsgMatcher(name, from, to)))
/**
Modified: incubator/mesos/trunk/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/Makefile.in?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/third_party/libprocess/Makefile.in Sun Jun 5 09:08:02 2011
@@ -15,13 +15,16 @@ CXXFLAGS += -I@abs_srcdir@/third_party/l
# Add Boost to CXXFLAGS.
CXXFLAGS += -I@abs_srcdir@/third_party/boost-1.37.0
+# Add http parser to CXXFLAGS.
+CXXFLAGS += -I@abs_srcdir@/third_party/ry-http-parser-1c3624a
+
# Add -fPIC to CXXFLAGS.
-CXXFLAGS += -fPIC
+CXXFLAGS += -fPIC -D_XOPEN_SOURCE
# Add dependency tracking to CXXFLAGS.
CXXFLAGS += -MMD -MP
-LIB_OBJ = process.o pid.o reliable.o fatal.o
+LIB_OBJ = process.o pid.o fatal.o tokenize.o
LIB = libprocess.a
OBJS = $(LIB_OBJ)
@@ -36,10 +39,12 @@ $(LIB_OBJ): %.o: @abs_srcdir@/%.cpp
$(CXX) -c $(CXXFLAGS) -o $@ $<
$(LIB): $(LIB_OBJ)
- $(AR) rcs $@ $^
+ $(AR) rcs $@ $^ third_party/ry-http-parser-1c3624a/http_parser_g.o
third_party:
$(MAKE) -C @abs_builddir@/third_party/libev-3.8
+ cp -r @abs_srcdir@/third_party/ry-http-parser-1c3624a @abs_builddir@/third_party/
+ $(MAKE) -C @abs_builddir@/third_party/ry-http-parser-1c3624a http_parser_g.o
all: third_party $(LIBS)