You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:25:43 UTC
svn commit: r1132329 [5/6] - in /incubator/mesos/trunk: ./ src/ src/common/
src/config/ src/event_history/ src/examples/ src/local/ src/master/
src/messaging/ src/slave/ src/tests/ third_party/sqlite-3.6.23.1/
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun 5 09:25:41 2011
@@ -1,30 +1,56 @@
#ifndef __MASTER_HPP__
#define __MASTER_HPP__
+#include <time.h>
+#include <arpa/inet.h>
+
+#include <algorithm>
+#include <fstream>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdexcept>
#include <string>
#include <vector>
-#include <process.hpp>
+#include <reliable.hpp>
#include <glog/logging.h>
+#include <boost/lexical_cast.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include "state.hpp"
+#include "common/fatal.hpp"
#include "common/foreach.hpp"
-#include "common/multimap.hpp"
+#include "common/params.hpp"
#include "common/resources.hpp"
-#include "common/type_utils.hpp"
+#include "common/task.hpp"
#include "configurator/configurator.hpp"
+#include "detector/detector.hpp"
+
#include "messaging/messages.hpp"
namespace mesos { namespace internal { namespace master {
+using namespace mesos;
+using namespace mesos::internal;
+
+using std::make_pair;
+using std::map;
+using std::pair;
+using std::set;
+using std::string;
+using std::vector;
+
+using boost::unordered_map;
+using boost::unordered_set;
+
using foreach::_;
@@ -46,461 +72,373 @@ const int32_t MAX_CPUS = 1000 * 1000;
// Maximum amount of memory / machine.
const int32_t MAX_MEM = 1024 * 1024 * Megabyte;
-// Acceptable timeout for slave PONG.
-const double SLAVE_PONG_TIMEOUT = 15.0;
+// Interval that slaves should send heartbeats.
+const double HEARTBEAT_INTERVAL = 2;
-// Maximum number of timeouts until slave is considered failed.
-const int MAX_SLAVE_TIMEOUTS = 5;
+// Acceptable time since we saw the last heartbeat (four heartbeats).
+const double HEARTBEAT_TIMEOUT = 15;
// Time to wait for a framework to failover (TODO(benh): Make configurable)).
-const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;
+const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60;
+// Some forward declarations
+struct Slave;
+class Allocator;
-// Reasons why offers might be returned to the Allocator.
-enum OfferReturnReason
+
+class FrameworkFailoverTimer : public MesosProcess
{
- ORR_FRAMEWORK_REPLIED,
- ORR_OFFER_RESCINDED,
- ORR_FRAMEWORK_LOST,
- ORR_FRAMEWORK_FAILOVER,
- ORR_SLAVE_LOST
-};
+private:
+ const PID master;
+ const FrameworkID fid;
+protected:
+ void operator () ()
+ {
+ link(master);
+ do {
+ switch (receive(FRAMEWORK_FAILOVER_TIMEOUT)) {
+ case PROCESS_TIMEOUT:
+ send(master, pack<M2M_FRAMEWORK_EXPIRED>(fid));
+ return;
+ case PROCESS_EXIT:
+ return;
+ case M2M_SHUTDOWN:
+ return;
+ }
+ } while (true);
+ }
-// Reasons why tasks might be removed, passed to the Allocator.
-enum TaskRemovalReason
-{
- TRR_TASK_ENDED,
- TRR_FRAMEWORK_LOST,
- TRR_EXECUTOR_LOST,
- TRR_SLAVE_LOST
+public:
+ FrameworkFailoverTimer(const PID &_master, FrameworkID _fid)
+ : master(_master), fid(_fid) {}
};
-// Some forward declarations.
-class Allocator;
-class SlavesManager;
-struct Framework;
-struct Slave;
-struct SlaveResources;
-class SlaveObserver;
-struct SlotOffer;
+// Resources offered on a particular slave.
+struct SlaveResources
+{
+ Slave *slave;
+ Resources resources;
+
+ SlaveResources() {}
+
+ SlaveResources(Slave *s, Resources r): slave(s), resources(r) {}
+};
-class Master : public MesosProcess<Master>
+// A resource offer.
+struct SlotOffer
{
-public:
- Master();
- Master(const Configuration& conf);
+ OfferID id;
+ FrameworkID frameworkId;
+ vector<SlaveResources> resources;
- virtual ~Master();
+ SlotOffer(OfferID i, FrameworkID f, const vector<SlaveResources>& r)
+ : id(i), frameworkId(f), resources(r) {}
+};
- static void registerOptions(Configurator* configurator);
+// An connected framework.
+struct Framework
+{
+ PID pid;
+ FrameworkID id;
+ bool active; // Turns false when framework is being removed
+ string name;
+ string user;
+ ExecutorInfo executorInfo;
+ double connectTime;
- process::Promise<state::MasterState*> getState();
-
- OfferID makeOffer(Framework* framework,
- const std::vector<SlaveResources>& resources);
+ unordered_map<TaskID, Task *> tasks;
+ unordered_set<SlotOffer *> slotOffers; // Active offers given to this framework
- // Return connected frameworks that are not in the process of being removed
- std::vector<Framework*> getActiveFrameworks();
+ Resources resources; // Total resources owned by framework (tasks + offers)
- // Return connected slaves that are not in the process of being removed
- std::vector<Slave*> getActiveSlaves();
+ // Contains a time of unfiltering for each slave we've filtered,
+ // or 0 for slaves that we want to keep filtered forever
+ unordered_map<Slave *, double> slaveFilter;
- void newMasterDetected(const std::string& pid);
- void noMasterDetected();
- void masterDetectionFailure();
- void registerFramework(const FrameworkInfo& frameworkInfo);
- void reregisterFramework(const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- int32_t generation);
- void unregisterFramework(const FrameworkID& frameworkId);
- void resourceOfferReply(const FrameworkID& frameworkId,
- const OfferID& offerId,
- const std::vector<TaskDescription>& tasks,
- const Params& params);
- void reviveOffers(const FrameworkID& frameworkId);
- void killTask(const FrameworkID& frameworkId,
- const TaskID& taskId);
- void schedulerMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const std::string& data);
- void statusUpdateAck(const FrameworkID& frameworkId,
- const TaskID& taskId,
- const SlaveID& slaveId);
- void registerSlave(const SlaveInfo& slaveInfo);
- void reregisterSlave(const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const std::vector<Task>& tasks);
- void unregisterSlave(const SlaveID& slaveId);
- void statusUpdate(const FrameworkID& frameworkId,
- const TaskStatus& status);
- void executorMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const std::string& data);
- void exitedExecutor(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- int32_t result);
- void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
- void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
- void timerTick();
- void frameworkExpired(const FrameworkID& frameworkId);
- void exited();
-
- process::Promise<process::HttpResponse> vars(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> stats(const process::HttpRequest& request);
-
- Framework* lookupFramework(const FrameworkID& frameworkId);
- Slave* lookupSlave(const SlaveID& slaveId);
- SlotOffer* lookupSlotOffer(const OfferID& offerId);
-
-protected:
- virtual void operator () ();
-
- void initialize();
+ // A failover timer if the connection to this framework is lost.
+ FrameworkFailoverTimer *failoverTimer;
- // Process a resource offer reply (for a non-cancelled offer) by launching
- // the desired tasks (if the offer contains a valid set of tasks) and
- // reporting any unused resources to the allocator
- void processOfferReply(SlotOffer* offer,
- const std::vector<TaskDescription>& tasks,
- const Params& params);
+ Framework(const PID &_pid, FrameworkID _id, double time)
+ : pid(_pid), id(_id), active(true), connectTime(time),
+ failoverTimer(NULL) {}
- // Launch a task described in a slot offer response
- void launchTask(Framework* framework, const TaskDescription& task);
+ ~Framework()
+ {
+ if (failoverTimer != NULL) {
+ MesosProcess::post(failoverTimer->self(), pack<M2M_SHUTDOWN>());
+ Process::wait(failoverTimer->self());
+ delete failoverTimer;
+ failoverTimer = NULL;
+ }
+ }
- // Terminate a framework, sending it a particular error message
- // TODO: Make the error codes and messages programmer-friendly
- void terminateFramework(Framework* framework,
- int32_t code,
- const std::string& message);
+ Task * lookupTask(TaskID tid)
+ {
+ unordered_map<TaskID, Task *>::iterator it = tasks.find(tid);
+ if (it != tasks.end())
+ return it->second;
+ else
+ return NULL;
+ }
- // Remove a slot offer (because it was replied to, or we want to rescind it,
- // or we lost a framework or a slave)
- void removeSlotOffer(SlotOffer* offer,
- OfferReturnReason reason,
- const std::vector<SlaveResources>& resourcesLeft);
-
- void removeTask(Task* task, TaskRemovalReason reason);
-
- void addFramework(Framework* framework);
-
- // Replace the scheduler for a framework with a new process ID, in the
- // event of a scheduler failover.
- void failoverFramework(Framework* framework, const process::UPID& newPid);
-
- // Kill all of a framework's tasks, delete the framework object, and
- // reschedule slot offers for slots that were assigned to this framework
- void removeFramework(Framework* framework);
-
- // Add a slave.
- void addSlave(Slave* slave);
-
- void readdSlave(Slave* slave, const std::vector<Task>& tasks);
-
- // Lose all of a slave's tasks and delete the slave object
- void removeSlave(Slave* slave);
-
- virtual Allocator* createAllocator();
-
- FrameworkID newFrameworkId();
- OfferID newOfferId();
- SlaveID newSlaveId();
-
- const Configuration& getConfiguration();
-
-private:
- const Configuration conf;
-
- SlavesManager* slavesManager;
-
- multimap<std::string, uint16_t> slaveHostnamePorts;
-
- boost::unordered_map<FrameworkID, Framework*> frameworks;
- boost::unordered_map<SlaveID, Slave*> slaves;
- boost::unordered_map<OfferID, SlotOffer*> slotOffers;
-
- boost::unordered_map<process::UPID, FrameworkID> pidToFrameworkId;
- boost::unordered_map<process::UPID, SlaveID> pidToSlaveId;
-
- int64_t nextFrameworkId; // Used to give each framework a unique ID.
- int64_t nextOfferId; // Used to give each slot offer a unique ID.
- int64_t nextSlaveId; // Used to give each slave a unique ID.
-
- std::string allocatorType;
- Allocator* allocator;
-
- bool active;
-
- // Contains the date the master was launched and
- // some ephemeral token (e.g. returned from
- // ZooKeeper). Used in framework and slave IDs
- // created by this master.
- std::string masterId;
-
- // Statistics (initialized in Master::initialize).
- struct {
- uint64_t launched_tasks;
- uint64_t finished_tasks;
- uint64_t killed_tasks;
- uint64_t failed_tasks;
- uint64_t lost_tasks;
- uint64_t valid_status_updates;
- uint64_t invalid_status_updates;
- uint64_t valid_framework_messages;
- uint64_t invalid_framework_messages;
- } statistics;
-};
-
-
-// A resource offer.
-struct SlotOffer
-{
- OfferID offerId;
- FrameworkID frameworkId;
- std::vector<SlaveResources> resources;
+ void addTask(Task *task)
+ {
+ CHECK(tasks.count(task->id) == 0);
+ tasks[task->id] = task;
+ this->resources += task->resources;
+ }
+
+ void removeTask(TaskID tid)
+ {
+ CHECK(tasks.find(tid) != tasks.end());
+ unordered_map<TaskID, Task *>::iterator it = tasks.find(tid);
+ this->resources -= it->second->resources;
+ tasks.erase(it);
+ }
+
+ void addOffer(SlotOffer *offer)
+ {
+ CHECK(slotOffers.find(offer) == slotOffers.end());
+ slotOffers.insert(offer);
+ foreach (SlaveResources &r, offer->resources)
+ this->resources += r.resources;
+ }
- SlotOffer(const OfferID& _offerId,
- const FrameworkID& _frameworkId,
- const std::vector<SlaveResources>& _resources)
- : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
+ void removeOffer(SlotOffer *offer)
+ {
+ CHECK(slotOffers.find(offer) != slotOffers.end());
+ slotOffers.erase(offer);
+ foreach (SlaveResources &r, offer->resources)
+ this->resources -= r.resources;
+ }
+
+ bool filters(Slave *slave, Resources resources)
+ {
+ // TODO: Implement other filters
+ return slaveFilter.find(slave) != slaveFilter.end();
+ }
+
+ void removeExpiredFilters(double now)
+ {
+ vector<Slave *> toRemove;
+ foreachpair (Slave *slave, double removalTime, slaveFilter)
+ if (removalTime != 0 && removalTime <= now)
+ toRemove.push_back(slave);
+ foreach (Slave *slave, toRemove)
+ slaveFilter.erase(slave);
+ }
};
// A connected slave.
struct Slave
-{
- SlaveInfo info;
- SlaveID slaveId;
- process::UPID pid;
-
+{
+ PID pid;
+ SlaveID id;
bool active; // Turns false when slave is being removed
+ string hostname;
+ string publicDns;
double connectTime;
double lastHeartbeat;
+ Resources resources; // Total resources on slave
Resources resourcesOffered; // Resources currently in offers
Resources resourcesInUse; // Resources currently used by tasks
- boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
- boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
-
- SlaveObserver* observer;
+ unordered_map<pair<FrameworkID, TaskID>, Task *> tasks;
+ unordered_set<SlotOffer *> slotOffers; // Active offers of slots on this slave
- Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
- const process::UPID& _pid, double time)
- : info(_info), slaveId(_slaveId), pid(_pid), active(true),
- connectTime(time), lastHeartbeat(time) {}
-
- ~Slave() {}
+ Slave(const PID &_pid, SlaveID _id, double time)
+ : pid(_pid), id(_id), active(true)
+ {
+ connectTime = lastHeartbeat = time;
+ }
- Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
+ Task * lookupTask(FrameworkID fid, TaskID tid)
{
- foreachpair (_, Task* task, tasks) {
- if (task->framework_id() == frameworkId && task->task_id() == taskId) {
+ foreachpair (_, Task *task, tasks)
+ if (task->frameworkId == fid && task->id == tid)
return task;
- }
- }
return NULL;
}
- void addTask(Task* task)
+ void addTask(Task *task)
{
- std::pair<FrameworkID, TaskID> key =
- std::make_pair(task->framework_id(), task->task_id());
- CHECK(tasks.count(key) == 0);
- tasks[key] = task;
- foreach (const Resource& resource, task->resources()) {
- resourcesInUse += resource;
- }
+ CHECK(tasks.find(make_pair(task->frameworkId, task->id)) == tasks.end());
+ tasks[make_pair(task->frameworkId, task->id)] = task;
+ resourcesInUse += task->resources;
}
- void removeTask(Task* task)
+ void removeTask(Task *task)
{
- std::pair<FrameworkID, TaskID> key =
- std::make_pair(task->framework_id(), task->task_id());
- CHECK(tasks.count(key) > 0);
- tasks.erase(key);
- foreach (const Resource& resource, task->resources()) {
- resourcesInUse -= resource;
- }
+ CHECK(tasks.find(make_pair(task->frameworkId, task->id)) != tasks.end());
+ tasks.erase(make_pair(task->frameworkId, task->id));
+ resourcesInUse -= task->resources;
}
Resources resourcesFree()
{
- Resources resources;
- foreach (const Resource& resource, info.resources()) {
- resources += resource;
- }
return resources - (resourcesOffered + resourcesInUse);
}
};
-// Resources offered on a particular slave.
-struct SlaveResources
+// Reasons why offers might be returned to the Allocator.
+enum OfferReturnReason
{
- SlaveResources() {}
- SlaveResources(Slave* s, Resources r): slave(s), resources(r) {}
-
- Slave* slave;
- Resources resources;
+ ORR_FRAMEWORK_REPLIED,
+ ORR_OFFER_RESCINDED,
+ ORR_FRAMEWORK_LOST,
+ ORR_FRAMEWORK_FAILOVER,
+ ORR_SLAVE_LOST
};
-class FrameworkFailoverTimer : public process::Process<FrameworkFailoverTimer>
+// Reasons why tasks might be removed, passed to the Allocator.
+enum TaskRemovalReason
{
-public:
- FrameworkFailoverTimer(const process::PID<Master>& _master,
- const FrameworkID& _frameworkId)
- : master(_master), frameworkId(_frameworkId) {}
+ TRR_TASK_ENDED,
+ TRR_FRAMEWORK_LOST,
+ TRR_EXECUTOR_LOST,
+ TRR_SLAVE_LOST
+};
+
+class Master : public MesosProcess
+{
protected:
- virtual void operator () ()
- {
- link(master);
- while (true) {
- receive(FRAMEWORK_FAILOVER_TIMEOUT);
- if (name() == process::TIMEOUT) {
- process::dispatch(master, &Master::frameworkExpired, frameworkId);
- return;
- } else if (name() == process::EXITED || name() == process::TERMINATE) {
- return;
- }
- }
- }
+ Params conf;
-private:
- const process::PID<Master> master;
- const FrameworkID frameworkId;
-};
+ unordered_map<FrameworkID, Framework *> frameworks;
+ unordered_map<SlaveID, Slave *> slaves;
+ unordered_map<OfferID, SlotOffer *> slotOffers;
+ unordered_map<PID, FrameworkID> pidToFid;
+ unordered_map<PID, SlaveID> pidToSid;
-// An connected framework.
-struct Framework
-{
- FrameworkInfo info;
- FrameworkID frameworkId;
- process::UPID pid;
+ int64_t nextFrameworkId; // Used to give each framework a unique ID.
+ int64_t nextSlaveId; // Used to give each slave a unique ID.
+ int64_t nextSlotOfferId; // Used to give each slot offer a unique ID.
- bool active; // Turns false when framework is being removed
- double connectTime;
+ string allocatorType;
+ Allocator *allocator;
- boost::unordered_map<TaskID, Task*> tasks;
- boost::unordered_set<SlotOffer*> slotOffers; // Active offers for framework.
+ string masterId; // Contains the date the master was launched and its fault
+ // tolerance ID (e.g. ephemeral ID returned from ZooKeeper).
+ // Used in framework and slave IDs created by this master.
- Resources resources; // Total resources owned by framework (tasks + offers)
-
- // Contains a time of unfiltering for each slave we've filtered,
- // or 0 for slaves that we want to keep filtered forever
- boost::unordered_map<Slave*, double> slaveFilter;
+public:
+ Master();
- // A failover timer if the connection to this framework is lost.
- FrameworkFailoverTimer* failoverTimer;
+ Master(const Params& conf);
+
+ ~Master();
- Framework(const FrameworkInfo& _info, const FrameworkID& _frameworkId,
- const process::UPID& _pid, double time)
- : info(_info), frameworkId(_frameworkId), pid(_pid), active(true),
- connectTime(time), failoverTimer(NULL) {}
+ static void registerOptions(Configurator* conf);
- ~Framework()
- {
- if (failoverTimer != NULL) {
- process::post(failoverTimer->self(), process::TERMINATE);
- process::wait(failoverTimer->self());
- delete failoverTimer;
- }
- }
+ state::MasterState *getState();
- Task* lookupTask(const TaskID& taskId)
- {
- if (tasks.count(taskId) > 0) {
- return tasks[taskId];
- } else {
- return NULL;
- }
- }
+ OfferID makeOffer(Framework *framework,
+ const vector<SlaveResources>& resources);
- void addTask(Task* task)
- {
- CHECK(tasks.count(task->task_id()) == 0);
- tasks[task->task_id()] = task;
- for (int i = 0; i < task->resources_size(); i++) {
- resources += task->resources(i);
- }
- }
+ void rescindOffer(SlotOffer *offer);
- void removeTask(const TaskID& taskId)
- {
- CHECK(tasks.count(taskId) > 0);
- Task* task = tasks[taskId];
- for (int i = 0; i < task->resources_size(); i++) {
- resources -= task->resources(i);
- }
- tasks.erase(taskId);
- }
+ void killTask(Task *task);
- void addOffer(SlotOffer* offer)
- {
- CHECK(slotOffers.count(offer) == 0);
- slotOffers.insert(offer);
- foreach (const SlaveResources& sr, offer->resources) {
- resources += sr.resources;
- }
- }
+ Framework * lookupFramework(FrameworkID fid);
- void removeOffer(SlotOffer* offer)
- {
- CHECK(slotOffers.find(offer) != slotOffers.end());
- slotOffers.erase(offer);
- foreach (const SlaveResources& sr, offer->resources) {
- resources -= sr.resources;
- }
- }
+ Slave * lookupSlave(SlaveID sid);
+
+ SlotOffer * lookupSlotOffer(OfferID soid);
+
+ // Return connected frameworks that are not in the process of being removed
+ vector<Framework *> getActiveFrameworks();
- bool filters(Slave* slave, Resources resources)
- {
- // TODO: Implement other filters
- return slaveFilter.find(slave) != slaveFilter.end();
- }
+ // Return connected slaves that are not in the process of being removed
+ vector<Slave *> getActiveSlaves();
+
+ const Params& getConf();
+
+protected:
+ void operator () ();
+
+ // Process a resource offer reply (for a non-cancelled offer) by launching
+ // the desired tasks (if the offer contains a valid set of tasks) and
+ // reporting any unused resources to the allocator
+ void processOfferReply(SlotOffer *offer,
+ const vector<TaskDescription>& tasks, const Params& params);
+
+ // Launch a task described in a slot offer response
+ void launchTask(Framework *framework, const TaskDescription& task);
- void removeExpiredFilters(double now)
- {
- foreachpaircopy (Slave* slave, double removalTime, slaveFilter) {
- if (removalTime != 0 && removalTime <= now) {
- slaveFilter.erase(slave);
- }
- }
- }
+ // Terminate a framework, sending it a particular error message
+ // TODO: Make the error codes and messages programmer-friendly
+ void terminateFramework(Framework *framework,
+ int32_t code,
+ const std::string& message);
+
+ // Remove a slot offer (because it was replied to, or we want to rescind it,
+ // or we lost a framework or a slave)
+ void removeSlotOffer(SlotOffer *offer,
+ OfferReturnReason reason,
+ const vector<SlaveResources>& resourcesLeft);
+
+ void removeTask(Task *task, TaskRemovalReason reason);
+
+ void addFramework(Framework *framework);
+
+ // Replace the scheduler for a framework with a new process ID, in the
+ // event of a scheduler failover.
+ void failoverFramework(Framework *framework, const PID &newPid);
+
+ // Kill all of a framework's tasks, delete the framework object, and
+ // reschedule slot offers for slots that were assigned to this framework
+ void removeFramework(Framework *framework);
+
+ // Lose all of a slave's tasks and delete the slave object
+ void removeSlave(Slave *slave);
+
+ virtual Allocator* createAllocator();
+
+ FrameworkID newFrameworkId();
+
+ string currentDate();
};
-// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc.
+// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc
inline std::ostream& operator << (std::ostream& stream, const SlotOffer *o)
{
- stream << "offer " << o->offerId;
+ stream << "offer " << o->id;
return stream;
}
inline std::ostream& operator << (std::ostream& stream, const Slave *s)
{
- stream << "slave " << s->slaveId;
+ stream << "slave " << s->id;
return stream;
}
inline std::ostream& operator << (std::ostream& stream, const Framework *f)
{
- stream << "framework " << f->frameworkId;
+ stream << "framework " << f->id;
+ return stream;
+}
+
+
+inline std::ostream& operator << (std::ostream& stream, const Task *t)
+{
+ stream << "task " << t->frameworkId << ":" << t->id;
return stream;
}
-}}} // namespace mesos { namespace internal { namespace master {
+}}} /* namespace */
-#endif // __MASTER_HPP__
+#endif /* __MASTER_HPP__ */
Modified: incubator/mesos/trunk/src/master/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/state.hpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/state.hpp (original)
+++ incubator/mesos/trunk/src/master/state.hpp Sun Jun 5 09:25:41 2011
@@ -1,40 +1,38 @@
-#ifndef __MASTER_STATE_HPP__
-#define __MASTER_STATE_HPP__
+#ifndef MASTER_STATE_HPP
+#define MASTER_STATE_HPP
#include <iostream>
#include <string>
#include <vector>
+#include <mesos_types.hpp>
+
#include "common/foreach.hpp"
#include "config/config.hpp"
-// TODO(...): Make all the variable naming in here consistant with the
-// rest of the code base. This will require cleaning up some Python code.
-
-
namespace mesos { namespace internal { namespace master { namespace state {
struct SlaveResources
{
- std::string slave_id;
+ SlaveID slave_id;
int32_t cpus;
int64_t mem;
- SlaveResources(std::string _slaveId, int32_t _cpus, int64_t _mem)
- : slave_id(_slaveId), cpus(_cpus), mem(_mem) {}
+ SlaveResources(SlaveID _sid, int32_t _cpus, int64_t _mem)
+ : slave_id(_sid), cpus(_cpus), mem(_mem) {}
};
struct SlotOffer
{
- std::string id;
- std::string framework_id;
+ OfferID id;
+ FrameworkID framework_id;
std::vector<SlaveResources *> resources;
- SlotOffer(std::string _id, std::string _frameworkId)
- : id(_id), framework_id(_frameworkId) {}
+ SlotOffer(OfferID _id, FrameworkID _fid)
+ : id(_id), framework_id(_fid) {}
~SlotOffer()
{
@@ -46,15 +44,14 @@ struct SlotOffer
struct Slave
{
- Slave(std::string id_, const std::string& host_,
- const std::string& public_dns_,
+ Slave(SlaveID id_, const std::string& host_, const std::string& public_dns_,
int32_t cpus_, int64_t mem_, time_t connect_)
: id(id_), host(host_), public_dns(public_dns_),
cpus(cpus_), mem(mem_), connect_time(connect_) {}
Slave() {}
- std::string id;
+ SlaveID id;
std::string host;
std::string public_dns;
int32_t cpus;
@@ -65,18 +62,18 @@ struct Slave
struct Task
{
- Task(std::string id_, const std::string& name_, std::string framework_id_,
- std::string slaveId_, std::string state_, int32_t _cpus, int64_t _mem)
- : id(id_), name(name_), framework_id(framework_id_), slave_id(slaveId_),
- state(state_), cpus(_cpus), mem(_mem) {}
+ Task(TaskID id_, const std::string& name_, FrameworkID fid_, SlaveID sid_,
+ TaskState state_, int32_t _cpus, int64_t _mem)
+ : id(id_), name(name_), framework_id(fid_), slave_id(sid_), state(state_),
+ cpus(_cpus), mem(_mem) {}
Task() {}
- std::string id;
+ TaskID id;
std::string name;
- std::string framework_id;
- std::string slave_id;
- std::string state;
+ FrameworkID framework_id;
+ SlaveID slave_id;
+ TaskState state;
int32_t cpus;
int64_t mem;
};
@@ -84,9 +81,9 @@ struct Task
struct Framework
{
- Framework(std::string id_, const std::string& user_,
- const std::string& name_, const std::string& executor_,
- int32_t cpus_, int64_t mem_, time_t connect_)
+ Framework(FrameworkID id_, const std::string& user_,
+ const std::string& name_, const std::string& executor_,
+ int32_t cpus_, int64_t mem_, time_t connect_)
: id(id_), user(user_), name(name_), executor(executor_),
cpus(cpus_), mem(mem_), connect_time(connect_) {}
@@ -100,7 +97,7 @@ struct Framework
delete offer;
}
- std::string id;
+ FrameworkID id;
std::string user;
std::string name;
std::string executor;
@@ -116,8 +113,8 @@ struct Framework
struct MasterState
{
MasterState(const std::string& build_date_, const std::string& build_user_,
- const std::string& pid_)
- : build_date(build_date_), build_user(build_user_), pid(pid_) {}
+ const std::string& pid_, bool _isFT = false)
+ : build_date(build_date_), build_user(build_user_), pid(pid_), isFT(_isFT) {}
MasterState() {}
@@ -135,8 +132,9 @@ struct MasterState
std::vector<Slave *> slaves;
std::vector<Framework *> frameworks;
+ bool isFT;
};
-}}}} // namespace mesos { namespace internal { namespace master { namespace state {
+}}}} /* namespace */
-#endif // __MASTER_STATE_HPP__
+#endif /* MASTER_STATE_HPP */
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 09:25:41 2011
@@ -1,393 +1,529 @@
-#ifndef __MESSAGES_HPP__
-#define __MESSAGES_HPP__
+#ifndef MESSAGES_HPP
+#define MESSAGES_HPP
#include <float.h>
+#include <stdint.h>
-#include <glog/logging.h>
-
+#include <map>
#include <string>
#include <vector>
-#include <tr1/functional>
+#include <reliable.hpp>
+
+#include <tuples/tuples.hpp>
#include <mesos.hpp>
-#include <process.hpp>
+#include <mesos_types.hpp>
-#include <boost/unordered_map.hpp>
+#include "common/foreach.hpp"
+#include "common/logging.hpp"
+#include "common/params.hpp"
+#include "common/resources.hpp"
+#include "common/task.hpp"
-#include "common/utils.hpp"
+#include "master/state.hpp"
-#include "messaging/messages.pb.h"
+#include "slave/state.hpp"
namespace mesos { namespace internal {
-// To couple a message name with a protocol buffer we use a templated
-// class that extends the necessary protocol buffer type (this also
-// allows the code to be better isolated from protocol buffer
-// naming). While protocol buffers are allegedly not meant to be
-// inherited, we decided this was an acceptable option since we don't
-// add any new functionality (or do any thing with the existing
-// functionality).
-//
-// To add another message that uses a protocol buffer you need to
-// provide a specialization of the Message class (i.e., using the
-// MESSAGE macro defined below).
-template <const char* name>
-class MSG;
-
-#define MESSAGE1(name) \
- extern char name[]
-
-#define MESSAGE2(name, T) \
- extern char name[]; \
- template <> \
- class MSG<name> : public T {}
-
-#define MESSAGE(...) \
- CONCAT(MESSAGE, VA_NUM_ARGS(__VA_ARGS__))(__VA_ARGS__)
-
-
-// From framework to master.
-MESSAGE(F2M_REGISTER_FRAMEWORK, RegisterFrameworkMessage);
-MESSAGE(F2M_REREGISTER_FRAMEWORK, ReregisterFrameworkMessage);
-MESSAGE(F2M_UNREGISTER_FRAMEWORK, UnregisterFrameworkMessage);
-MESSAGE(F2M_RESOURCE_OFFER_REPLY, ResourceOfferReplyMessage);
-MESSAGE(F2M_REVIVE_OFFERS, ReviveOffersMessage);
-MESSAGE(F2M_KILL_TASK, KillTaskMessage);
-MESSAGE(F2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(F2M_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-// From master to framework.
-MESSAGE(M2F_REGISTER_REPLY, FrameworkRegisteredMessage);
-MESSAGE(M2F_RESOURCE_OFFER, ResourceOfferMessage);
-MESSAGE(M2F_RESCIND_OFFER, RescindResourceOfferMessage);
-MESSAGE(M2F_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(M2F_LOST_SLAVE, LostSlaveMessage);
-MESSAGE(M2F_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2F_ERROR, FrameworkErrorMessage);
-
-// From slave to master.
-MESSAGE(S2M_REGISTER_SLAVE, RegisterSlaveMessage);
-MESSAGE(S2M_REREGISTER_SLAVE, ReregisterSlaveMessage);
-MESSAGE(S2M_UNREGISTER_SLAVE, UnregisterSlaveMessage);
-MESSAGE(S2M_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
-
-// From master to slave.
-MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_RUN_TASK, RunTaskMessage);
-MESSAGE(M2S_KILL_TASK, KillTaskMessage);
-MESSAGE(M2S_KILL_FRAMEWORK, KillFrameworkMessage);
-MESSAGE(M2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2S_UPDATE_FRAMEWORK, UpdateFrameworkMessage);
-MESSAGE(M2S_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-// From executor to slave.
-MESSAGE(E2S_REGISTER_EXECUTOR, RegisterExecutorMessage);
-MESSAGE(E2S_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(E2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-
-// From slave to executor.
-MESSAGE(S2E_REGISTER_REPLY, ExecutorRegisteredMessage);
-MESSAGE(S2E_RUN_TASK, RunTaskMessage);
-MESSAGE(S2E_KILL_TASK, KillTaskMessage);
-MESSAGE(S2E_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(S2E_KILL_EXECUTOR);
+const std::string MESOS_MESSAGING_VERSION = "0";
+
+enum MessageType {
+ /* From framework to master. */
+ F2M_REGISTER_FRAMEWORK = RELIABLE_MSGID,
+ F2M_REREGISTER_FRAMEWORK,
+ F2M_UNREGISTER_FRAMEWORK,
+ F2M_SLOT_OFFER_REPLY,
+ F2M_REVIVE_OFFERS,
+ F2M_KILL_TASK,
+ F2M_FRAMEWORK_MESSAGE,
+
+ F2F_TASK_RUNNING_STATUS,
+
+ /* From master to framework. */
+ M2F_REGISTER_REPLY,
+ M2F_SLOT_OFFER,
+ M2F_RESCIND_OFFER,
+ M2F_STATUS_UPDATE,
+ M2F_LOST_SLAVE,
+ M2F_FRAMEWORK_MESSAGE,
+ M2F_ERROR,
+
+ /* From slave to master. */
+ S2M_REGISTER_SLAVE,
+ S2M_REREGISTER_SLAVE,
+ S2M_UNREGISTER_SLAVE,
+ S2M_STATUS_UPDATE,
+ S2M_FRAMEWORK_MESSAGE,
+ S2M_LOST_EXECUTOR,
+
+ /* From slave heart to master. */
+ SH2M_HEARTBEAT,
+
+ /* From master detector to processes */
+ GOT_MASTER_ID,
+ NEW_MASTER_DETECTED,
+ NO_MASTER_DETECTED,
+
+ /* From master to slave. */
+ M2S_REGISTER_REPLY,
+ M2S_REREGISTER_REPLY,
+ M2S_RUN_TASK,
+ M2S_KILL_TASK,
+ M2S_KILL_FRAMEWORK,
+ M2S_FRAMEWORK_MESSAGE,
+ M2S_UPDATE_FRAMEWORK_PID,
+ M2S_SHUTDOWN, // Used in unit tests to shut down cluster
+
+ /* From executor to slave. */
+ E2S_REGISTER_EXECUTOR,
+ E2S_STATUS_UPDATE,
+ E2S_FRAMEWORK_MESSAGE,
+
+ /* From slave to executor. */
+ S2E_REGISTER_REPLY,
+ S2E_RUN_TASK,
+ S2E_KILL_TASK,
+ S2E_FRAMEWORK_MESSAGE,
+ S2E_KILL_EXECUTOR,
#ifdef __sun__
-// From projd to slave.
-MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
-MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
-
-// From slave to projd.
-MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
-MESSAGE(S2PD_KILL_ALL);
-#endif // __sun__
-
-// From master detector to processes.
-MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
-MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
-MESSAGE(NO_MASTER_DETECTED);
-
-// Generic messages.
-MESSAGE(PING);
-MESSAGE(PONG);
-
-
-// Type conversions helpful for changing between protocol buffer types
-// and standard C++ types (for parameters).
-template <typename T>
-const T& convert(const T& t)
-{
- return t;
-}
+ /* From projd to slave. */
+ PD2S_REGISTER_PROJD,
+ PD2S_PROJECT_READY,
+
+ /* From slave to projd. */
+ S2PD_UPDATE_RESOURCES,
+ S2PD_KILL_ALL,
+#endif /* __sun__ */
+
+ /* Internal to master */
+ M2M_GET_STATE, // Used by web UI
+ M2M_GET_STATE_REPLY,
+ M2M_TIMER_TICK, // Timer for expiring filters etc
+ M2M_FRAMEWORK_EXPIRED, // Timer for expiring frameworks
+ M2M_SHUTDOWN, // Used in tests to shut down master
+
+ /* Internal to slave */
+ S2S_GET_STATE, // Used by web UI
+ S2S_GET_STATE_REPLY,
+ S2S_SHUTDOWN, // Used in tests to shut down slave
+ MESOS_MSGID,
+};
-template <typename T>
-std::vector<T> convert(const google::protobuf::RepeatedPtrField<T>& items)
-{
- std::vector<T> result;
- for (int i = 0; i < items.size(); i++) {
- result.push_back(items.Get(i));
- }
- return result;
-}
+/*
+ * Include tuples details for our namespace (this strategy is
+ * typically called "supermacros" and is often used to build types or
+ * messages).
+ */
+#include <tuples/details.hpp>
-template <typename T>
-class MesosProcess : public process::Process<T>
+class MesosProcess : public ReliableProcess
{
public:
- MesosProcess(const std::string& id = "")
- : process::Process<T>(id) {}
-
- virtual ~MesosProcess() {}
-
- template <const char *name>
- static void post(const process::UPID& to, const MSG<name>& msg)
+ static void post(const PID &to, MSGID id)
{
- std::string data;
- msg.SerializeToString(&data);
- process::post(to, name, data.data(), data.size());
+ const std::string &data = MESOS_MESSAGING_VERSION + "|";
+ ReliableProcess::post(to, id, data.data(), data.size());
}
-protected:
- void send(const process::UPID& to, const std::string& name)
+ template <MSGID ID>
+ static void post(const PID &to, const tuple<ID> &t)
{
- process::Process<T>::send(to, name);
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ ReliableProcess::post(to, ID, data.data(), data.size());
}
- template <const char* name>
- void send(const process::UPID& to, const MSG<name>& msg)
+protected:
+ std::string body() const
{
- std::string data;
- msg.SerializeToString(&data);
- process::Process<T>::send(to, name, data.data(), data.size());
+ size_t size;
+ const char *s = ReliableProcess::body(&size);
+ const std::string data(s, size);
+ size_t index = data.find('|');
+ CHECK(index != std::string::npos);
+ return data.substr(index + 1);
}
- const std::string& serve(double secs = 0, bool once = false)
+ static void send(const PID &to, MSGID id)
{
- do {
- process::Process<T>::serve(secs, once);
- if (handlers.count(process::Process<T>::name()) > 0) {
- handlers[process::Process<T>::name()](process::Process<T>::body());
- } else {
- return process::Process<T>::name();
- }
- } while (!once);
+ const std::string &data = MESOS_MESSAGING_VERSION + "|";
+ ReliableProcess::post(to, id, data.data(), data.size());
}
- void install(const std::string& name, void (T::*method)())
+ template <MSGID ID>
+ void send(const PID &to, const tuple<ID> &t)
{
- T* t = static_cast<T*>(this);
- handlers[name] =
- std::tr1::bind(&MesosProcess<T>::handler0, t,
- method,
- std::tr1::placeholders::_1);
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ ReliableProcess::send(to, ID, data.data(), data.size());
}
- template <typename PB,
- typename P1, typename P1C>
- void install(const std::string& name, void (T::*method)(P1C),
- P1 (PB::*param1)() const)
+ template <MSGID ID>
+ bool forward(const PID &to, const tuple<ID> &t)
{
- T* t = static_cast<T*>(this);
- handlers[name] =
- std::tr1::bind(&handler1<PB, P1, P1C>, t,
- method, param1,
- std::tr1::placeholders::_1);
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ ReliableProcess::forward(to, ID, data.data(), data.size());
}
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C>
- void install(const std::string& name, void (T::*method)(P1C, P2C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const)
- {
- T* t = static_cast<T*>(this);
- handlers[name] =
- std::tr1::bind(&handler2<PB, P1, P1C, P2, P2C>, t,
- method, p1, p2,
- std::tr1::placeholders::_1);
- }
-
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C,
- typename P3, typename P3C>
- void install(const std::string& name,
- void (T::*method)(P1C, P2C, P3C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- P3 (PB::*p3)() const)
+ template <MSGID ID>
+ int rsend(const PID &to, const tuple<ID> &t)
{
- T* t = static_cast<T*>(this);
- handlers[name] =
- std::tr1::bind(&handler3<PB, P1, P1C, P2, P2C, P3, P3C>, t,
- method, p1, p2, p3,
- std::tr1::placeholders::_1);
- }
-
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C,
- typename P3, typename P3C,
- typename P4, typename P4C>
- void install(const std::string& name,
- void (T::*method)(P1C, P2C, P3C, P4C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- P3 (PB::*p3)() const,
- P4 (PB::*p4)() const)
- {
- T* t = static_cast<T*>(this);
- handlers[name] =
- std::tr1::bind(&handler4<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C>, t,
- method, p1, p2, p3, p4,
- std::tr1::placeholders::_1);
- }
-
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C,
- typename P3, typename P3C,
- typename P4, typename P4C,
- typename P5, typename P5C>
- void install(const std::string& name,
- void (T::*method)(P1C, P2C, P3C, P4C, P5C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- P3 (PB::*p3)() const,
- P4 (PB::*p4)() const,
- P5 (PB::*p5)() const)
- {
- T* t = static_cast<T*>(this);
- handlers[name] =
- std::tr1::bind(&handler5<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C, P5, P5C>, t,
- method, p1, p2, p3, p4, p5,
- std::tr1::placeholders::_1);
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ return ReliableProcess::rsend(to, ID, data.data(), data.size());
}
-private:
- static void handler0(T* t, void (T::*method)(),
- const std::string& data)
+ template <MSGID ID>
+ int rsend(const PID &via, const PID &to, const tuple<ID> &t)
{
- (t->*method)();
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ return ReliableProcess::rsend(via, to, ID, data.data(), data.size());
}
- template <typename PB,
- typename P1, typename P1C>
- static void handler1(T* t, void (T::*method)(P1C),
- P1 (PB::*p1)() const,
- const std::string& data)
+ virtual MSGID receive(double secs = 0)
{
- PB pb;
- pb.ParseFromString(data);
- if (pb.IsInitialized()) {
- (t->*method)(convert((&pb->*p1)()));
- } else {
- LOG(WARNING) << "Initialization errors: "
- << pb.InitializationErrorString();
+ bool indefinite = secs == 0;
+ double now = elapsed();
+ MSGID id = ReliableProcess::receive(secs);
+ if (RELIABLE_MSGID < id && id < MESOS_MSGID) {
+ size_t size;
+ const char *s = ReliableProcess::body(&size);
+ const std::string data(s, size);
+ size_t index = data.find('|');
+ if (index == std::string::npos ||
+ MESOS_MESSAGING_VERSION != data.substr(0, index)) {
+ LOG(ERROR) << "Dropping message from " << from()
+ << " with incorrect messaging version!";
+ if (!indefinite) {
+ double remaining = secs - (elapsed() - now);
+ return receive(remaining <= 0 ? DBL_EPSILON : remaining);
+ } else {
+ return receive(0);
+ }
+ }
}
+ return id;
}
+};
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C>
- static void handler2(T* t, void (T::*method)(P1C, P2C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- const std::string& data)
- {
- PB pb;
- pb.ParseFromString(data);
- if (pb.IsInitialized()) {
- (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()));
- } else {
- LOG(WARNING) << "Initialization errors: "
- << pb.InitializationErrorString();
- }
+
+using boost::tuples::tie;
+
+
+
+TUPLE(F2M_REGISTER_FRAMEWORK,
+ (std::string /*name*/,
+ std::string /*user*/,
+ ExecutorInfo));
+
+TUPLE(F2M_REREGISTER_FRAMEWORK,
+ (FrameworkID,
+ std::string /*name*/,
+ std::string /*user*/,
+ ExecutorInfo,
+ int32_t /*generation*/));
+
+TUPLE(F2M_UNREGISTER_FRAMEWORK,
+ (FrameworkID));
+
+TUPLE(F2M_SLOT_OFFER_REPLY,
+ (FrameworkID,
+ OfferID,
+ std::vector<TaskDescription>,
+ Params));
+
+TUPLE(F2M_REVIVE_OFFERS,
+ (FrameworkID));
+
+TUPLE(F2M_KILL_TASK,
+ (FrameworkID,
+ TaskID));
+
+TUPLE(F2M_FRAMEWORK_MESSAGE,
+ (FrameworkID,
+ FrameworkMessage));
+
+TUPLE(F2F_TASK_RUNNING_STATUS,
+ ());
+
+TUPLE(M2F_REGISTER_REPLY,
+ (FrameworkID));
+
+TUPLE(M2F_SLOT_OFFER,
+ (OfferID,
+ std::vector<SlaveOffer>,
+ std::map<SlaveID, PID>));
+
+TUPLE(M2F_RESCIND_OFFER,
+ (OfferID));
+
+TUPLE(M2F_STATUS_UPDATE,
+ (TaskID,
+ TaskState,
+ std::string));
+
+TUPLE(M2F_LOST_SLAVE,
+ (SlaveID));
+
+TUPLE(M2F_FRAMEWORK_MESSAGE,
+ (FrameworkMessage));
+
+TUPLE(M2F_ERROR,
+ (int32_t /*code*/,
+ std::string /*msg*/));
+
+
+TUPLE(S2M_REGISTER_SLAVE,
+ (std::string /*name*/,
+ std::string /*publicDns*/,
+ Resources));
+
+TUPLE(S2M_REREGISTER_SLAVE,
+ (SlaveID,
+ std::string /*name*/,
+ std::string /*publicDns*/,
+ Resources,
+ std::vector<Task>));
+
+TUPLE(S2M_UNREGISTER_SLAVE,
+ (SlaveID));
+
+TUPLE(S2M_STATUS_UPDATE,
+ (SlaveID,
+ FrameworkID,
+ TaskID,
+ TaskState,
+ std::string));
+
+TUPLE(S2M_FRAMEWORK_MESSAGE,
+ (SlaveID,
+ FrameworkID,
+ FrameworkMessage));
+
+TUPLE(S2M_LOST_EXECUTOR,
+ (SlaveID,
+ FrameworkID,
+ int32_t /*exitStatus*/));
+
+TUPLE(SH2M_HEARTBEAT,
+ (SlaveID));
+
+TUPLE(NEW_MASTER_DETECTED,
+ (std::string, /* master seq */
+ PID /* master PID */));
+
+TUPLE(NO_MASTER_DETECTED,
+ ());
+
+TUPLE(GOT_MASTER_ID,
+ (std::string /* id */));
+
+TUPLE(M2S_REGISTER_REPLY,
+ (SlaveID,
+ double /*heartbeat interval*/));
+
+TUPLE(M2S_REREGISTER_REPLY,
+ (SlaveID,
+ double /*heartbeat interval*/));
+
+TUPLE(M2S_RUN_TASK,
+ (FrameworkID,
+ TaskID,
+ std::string /*frameworkName*/,
+ std::string /*user*/,
+ ExecutorInfo,
+ std::string /*taskName*/,
+ std::string /*taskArgs*/,
+ Params,
+ PID /*framework PID*/));
+
+TUPLE(M2S_KILL_TASK,
+ (FrameworkID,
+ TaskID));
+
+TUPLE(M2S_KILL_FRAMEWORK,
+ (FrameworkID));
+
+TUPLE(M2S_FRAMEWORK_MESSAGE,
+ (FrameworkID,
+ FrameworkMessage));
+
+TUPLE(M2S_UPDATE_FRAMEWORK_PID,
+ (FrameworkID,
+ PID));
+
+TUPLE(M2S_SHUTDOWN,
+ ());
+
+TUPLE(E2S_REGISTER_EXECUTOR,
+ (FrameworkID));
+
+TUPLE(E2S_STATUS_UPDATE,
+ (FrameworkID,
+ TaskID,
+ TaskState,
+ std::string));
+
+TUPLE(E2S_FRAMEWORK_MESSAGE,
+ (FrameworkID,
+ FrameworkMessage));
+
+TUPLE(S2E_REGISTER_REPLY,
+ (SlaveID,
+ std::string /*hostname*/,
+ std::string /*frameworkName*/,
+ std::string /*initArg*/));
+
+TUPLE(S2E_RUN_TASK,
+ (TaskID,
+ std::string /*name*/,
+ std::string /*arg*/,
+ Params));
+
+TUPLE(S2E_KILL_TASK,
+ (TaskID));
+
+TUPLE(S2E_FRAMEWORK_MESSAGE,
+ (FrameworkMessage));
+
+TUPLE(S2E_KILL_EXECUTOR,
+ ());
+
+#ifdef __sun__
+TUPLE(PD2S_REGISTER_PROJD,
+ (std::string /*project*/));
+
+TUPLE(PD2S_PROJECT_READY,
+ (std::string /*project*/));
+
+TUPLE(S2PD_UPDATE_RESOURCES,
+ (Resources));
+
+TUPLE(S2PD_KILL_ALL,
+ ());
+#endif /* __sun__ */
+
+TUPLE(M2M_GET_STATE,
+ ());
+
+TUPLE(M2M_GET_STATE_REPLY,
+ (master::state::MasterState *));
+
+TUPLE(M2M_TIMER_TICK,
+ ());
+
+TUPLE(M2M_FRAMEWORK_EXPIRED,
+ (FrameworkID));
+
+TUPLE(M2M_SHUTDOWN,
+ ());
+
+TUPLE(S2S_GET_STATE,
+ ());
+
+TUPLE(S2S_GET_STATE_REPLY,
+ (slave::state::SlaveState *));
+
+TUPLE(S2S_SHUTDOWN,
+ ());
+
+
+/* Serialization functions for sharing objects of local Mesos types. */
+
+void operator & (process::tuples::serializer&, const master::state::MasterState *);
+void operator & (process::tuples::deserializer&, master::state::MasterState *&);
+
+void operator & (process::tuples::serializer&, const slave::state::SlaveState *);
+void operator & (process::tuples::deserializer&, slave::state::SlaveState *&);
+
+
+/* Serialization functions for various Mesos data types. */
+
+void operator & (process::tuples::serializer&, const TaskState&);
+void operator & (process::tuples::deserializer&, TaskState&);
+
+void operator & (process::tuples::serializer&, const SlaveOffer&);
+void operator & (process::tuples::deserializer&, SlaveOffer&);
+
+void operator & (process::tuples::serializer&, const TaskDescription&);
+void operator & (process::tuples::deserializer&, TaskDescription&);
+
+void operator & (process::tuples::serializer&, const FrameworkMessage&);
+void operator & (process::tuples::deserializer&, FrameworkMessage&);
+
+void operator & (process::tuples::serializer&, const ExecutorInfo&);
+void operator & (process::tuples::deserializer&, ExecutorInfo&);
+
+void operator & (process::tuples::serializer&, const Params&);
+void operator & (process::tuples::deserializer&, Params&);
+
+void operator & (process::tuples::serializer&, const Resources&);
+void operator & (process::tuples::deserializer&, Resources&);
+
+void operator & (process::tuples::serializer&, const Task&);
+void operator & (process::tuples::deserializer&, Task&);
+
+
+/* Serialization functions for STL vectors. */
+
+template<typename T>
+void operator & (process::tuples::serializer& s, const std::vector<T>& v)
+{
+ int32_t size = (int32_t) v.size();
+ s & size;
+ for (size_t i = 0; i < size; i++) {
+ s & v[i];
}
+}
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C,
- typename P3, typename P3C>
- static void handler3(T* t, void (T::*method)(P1C, P2C, P3C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- P3 (PB::*p3)() const,
- const std::string& data)
- {
- PB pb;
- pb.ParseFromString(data);
- if (pb.IsInitialized()) {
- (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
- convert((&pb->*p3)()));
- } else {
- LOG(WARNING) << "Initialization errors: "
- << pb.InitializationErrorString();
- }
+
+template<typename T>
+void operator & (process::tuples::deserializer& d, std::vector<T>& v)
+{
+ int32_t size;
+ d & size;
+ v.resize(size);
+ for (size_t i = 0; i < size; i++) {
+ d & v[i];
}
+}
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C,
- typename P3, typename P3C,
- typename P4, typename P4C>
- static void handler4(T* t, void (T::*method)(P1C, P2C, P3C, P4C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- P3 (PB::*p3)() const,
- P4 (PB::*p4)() const,
- const std::string& data)
- {
- PB pb;
- pb.ParseFromString(data);
- if (pb.IsInitialized()) {
- (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
- convert((&pb->*p3)()), convert((&pb->*p4)()));
- } else {
- LOG(WARNING) << "Initialization errors: "
- << pb.InitializationErrorString();
- }
+
+/* Serialization functions for STL maps. */
+
+template<typename K, typename V>
+void operator & (process::tuples::serializer& s, const std::map<K, V>& m)
+{
+ int32_t size = (int32_t) m.size();
+ s & size;
+ foreachpair (const K& k, const V& v, m) {
+ s & k;
+ s & v;
}
+}
- template <typename PB,
- typename P1, typename P1C,
- typename P2, typename P2C,
- typename P3, typename P3C,
- typename P4, typename P4C,
- typename P5, typename P5C>
- static void handler5(T* t, void (T::*method)(P1C, P2C, P3C, P4C, P5C),
- P1 (PB::*p1)() const,
- P2 (PB::*p2)() const,
- P3 (PB::*p3)() const,
- P4 (PB::*p4)() const,
- P5 (PB::*p5)() const,
- const std::string& data)
- {
- PB pb;
- pb.ParseFromString(data);
- if (pb.IsInitialized()) {
- (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
- convert((&pb->*p3)()), convert((&pb->*p4)()),
- convert((&pb->*p5)()));
- } else {
- LOG(WARNING) << "Initialization errors: "
- << pb.InitializationErrorString();
- }
+
+template<typename K, typename V>
+void operator & (process::tuples::deserializer& d, std::map<K, V>& m)
+{
+ m.clear();
+ int32_t size;
+ d & size;
+ K k;
+ V v;
+ for (size_t i = 0; i < size; i++) {
+ d & k;
+ d & v;
+ m[k] = v;
}
+}
- boost::unordered_map<std::string, std::tr1::function<void (const std::string&)> > handlers;
-};
-}} // namespace mesos { namespace internal {
+}} /* namespace mesos { namespace internal { */
-#endif // __MESSAGES_HPP__
+#endif /* MESSAGES_HPP */