You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:25:43 UTC

svn commit: r1132329 [5/6] - in /incubator/mesos/trunk: ./ src/ src/common/ src/config/ src/event_history/ src/examples/ src/local/ src/master/ src/messaging/ src/slave/ src/tests/ third_party/sqlite-3.6.23.1/

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun  5 09:25:41 2011
@@ -1,30 +1,56 @@
 #ifndef __MASTER_HPP__
 #define __MASTER_HPP__
 
+#include <time.h>
+#include <arpa/inet.h>
+
+#include <algorithm>
+#include <fstream>
+#include <map>
+#include <set>
+#include <sstream>
+#include <stdexcept>
 #include <string>
 #include <vector>
 
-#include <process.hpp>
+#include <reliable.hpp>
 
 #include <glog/logging.h>
 
+#include <boost/lexical_cast.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 
 #include "state.hpp"
 
+#include "common/fatal.hpp"
 #include "common/foreach.hpp"
-#include "common/multimap.hpp"
+#include "common/params.hpp"
 #include "common/resources.hpp"
-#include "common/type_utils.hpp"
+#include "common/task.hpp"
 
 #include "configurator/configurator.hpp"
 
+#include "detector/detector.hpp"
+
 #include "messaging/messages.hpp"
 
 
 namespace mesos { namespace internal { namespace master {
 
+using namespace mesos;
+using namespace mesos::internal;
+
+using std::make_pair;
+using std::map;
+using std::pair;
+using std::set;
+using std::string;
+using std::vector;
+
+using boost::unordered_map;
+using boost::unordered_set;
+
 using foreach::_;
 
 
@@ -46,461 +72,373 @@ const int32_t MAX_CPUS = 1000 * 1000;
 // Maximum amount of memory / machine.
 const int32_t MAX_MEM = 1024 * 1024 * Megabyte;
 
-// Acceptable timeout for slave PONG.
-const double SLAVE_PONG_TIMEOUT = 15.0;
+// Interval that slaves should send heartbeats.
+const double HEARTBEAT_INTERVAL = 2;
 
-// Maximum number of timeouts until slave is considered failed.
-const int MAX_SLAVE_TIMEOUTS = 5;
+// Acceptable time since we saw the last heartbeat (four heartbeats).
+const double HEARTBEAT_TIMEOUT = 15;
 
 // Time to wait for a framework to failover (TODO(benh): Make configurable)).
-const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;
+const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60;
 
+// Some forward declarations
+struct Slave;
+class Allocator;
 
-// Reasons why offers might be returned to the Allocator.
-enum OfferReturnReason
+
+class FrameworkFailoverTimer : public MesosProcess
 {
-  ORR_FRAMEWORK_REPLIED,
-  ORR_OFFER_RESCINDED,
-  ORR_FRAMEWORK_LOST,
-  ORR_FRAMEWORK_FAILOVER,
-  ORR_SLAVE_LOST
-};
+private:
+  const PID master;
+  const FrameworkID fid;
 
+protected:
+  void operator () ()
+  {
+    link(master);
+    do {
+      switch (receive(FRAMEWORK_FAILOVER_TIMEOUT)) {
+      case PROCESS_TIMEOUT:
+        send(master, pack<M2M_FRAMEWORK_EXPIRED>(fid));
+	return;
+      case PROCESS_EXIT:
+	return;
+      case M2M_SHUTDOWN:
+	return;
+      }
+    } while (true);
+  }
 
-// Reasons why tasks might be removed, passed to the Allocator.
-enum TaskRemovalReason
-{
-  TRR_TASK_ENDED,
-  TRR_FRAMEWORK_LOST,
-  TRR_EXECUTOR_LOST,
-  TRR_SLAVE_LOST
+public:
+  FrameworkFailoverTimer(const PID &_master, FrameworkID _fid)
+    : master(_master), fid(_fid) {}
 };
 
 
-// Some forward declarations.
-class Allocator;
-class SlavesManager;
-struct Framework;
-struct Slave;
-struct SlaveResources;
-class SlaveObserver;
-struct SlotOffer;
+// Resources offered on a particular slave.
+struct SlaveResources
+{
+  Slave *slave;
+  Resources resources;
+  
+  SlaveResources() {}
+  
+  SlaveResources(Slave *s, Resources r): slave(s), resources(r) {}
+};
 
 
-class Master : public MesosProcess<Master>
+// A resource offer.
+struct SlotOffer
 {
-public:
-  Master();
-  Master(const Configuration& conf);
+  OfferID id;
+  FrameworkID frameworkId;
+  vector<SlaveResources> resources;
   
-  virtual ~Master();
+  SlotOffer(OfferID i, FrameworkID f, const vector<SlaveResources>& r)
+    : id(i), frameworkId(f), resources(r) {}
+};
 
-  static void registerOptions(Configurator* configurator);
+// An connected framework.
+struct Framework
+{
+  PID pid;
+  FrameworkID id;
+  bool active; // Turns false when framework is being removed
+  string name;
+  string user;
+  ExecutorInfo executorInfo;
+  double connectTime;
 
-  process::Promise<state::MasterState*> getState();
-  
-  OfferID makeOffer(Framework* framework,
-		    const std::vector<SlaveResources>& resources);
+  unordered_map<TaskID, Task *> tasks;
+  unordered_set<SlotOffer *> slotOffers; // Active offers given to this framework
 
-  // Return connected frameworks that are not in the process of being removed
-  std::vector<Framework*> getActiveFrameworks();
+  Resources resources; // Total resources owned by framework (tasks + offers)
   
-  // Return connected slaves that are not in the process of being removed
-  std::vector<Slave*> getActiveSlaves();
+  // Contains a time of unfiltering for each slave we've filtered,
+  // or 0 for slaves that we want to keep filtered forever
+  unordered_map<Slave *, double> slaveFilter;
 
-  void newMasterDetected(const std::string& pid);
-  void noMasterDetected();
-  void masterDetectionFailure();
-  void registerFramework(const FrameworkInfo& frameworkInfo);
-  void reregisterFramework(const FrameworkID& frameworkId,
-                           const FrameworkInfo& frameworkInfo,
-                           int32_t generation);
-  void unregisterFramework(const FrameworkID& frameworkId);
-  void resourceOfferReply(const FrameworkID& frameworkId,
-                          const OfferID& offerId,
-                          const std::vector<TaskDescription>& tasks,
-                          const Params& params);
-  void reviveOffers(const FrameworkID& frameworkId);
-  void killTask(const FrameworkID& frameworkId,
-                const TaskID& taskId);
-  void schedulerMessage(const SlaveID& slaveId,
-			const FrameworkID& frameworkId,
-			const ExecutorID& executorId,
-			const std::string& data);
-  void statusUpdateAck(const FrameworkID& frameworkId,
-                       const TaskID& taskId,
-                       const SlaveID& slaveId);
-  void registerSlave(const SlaveInfo& slaveInfo);
-  void reregisterSlave(const SlaveID& slaveId,
-                       const SlaveInfo& slaveInfo,
-                       const std::vector<Task>& tasks);
-  void unregisterSlave(const SlaveID& slaveId);
-  void statusUpdate(const FrameworkID& frameworkId,
-                    const TaskStatus& status);
-  void executorMessage(const SlaveID& slaveId,
-		       const FrameworkID& frameworkId,
-		       const ExecutorID& executorId,
-		       const std::string& data);
-  void exitedExecutor(const SlaveID& slaveId,
-                      const FrameworkID& frameworkId,
-                      const ExecutorID& executorId,
-                      int32_t result);
-  void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
-  void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
-  void timerTick();
-  void frameworkExpired(const FrameworkID& frameworkId);
-  void exited();
-
-  process::Promise<process::HttpResponse> vars(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> stats(const process::HttpRequest& request);
-
-  Framework* lookupFramework(const FrameworkID& frameworkId);
-  Slave* lookupSlave(const SlaveID& slaveId);
-  SlotOffer* lookupSlotOffer(const OfferID& offerId);
-  
-protected:
-  virtual void operator () ();
-  
-  void initialize();
+  // A failover timer if the connection to this framework is lost.
+  FrameworkFailoverTimer *failoverTimer;
 
-  // Process a resource offer reply (for a non-cancelled offer) by launching
-  // the desired tasks (if the offer contains a valid set of tasks) and
-  // reporting any unused resources to the allocator
-  void processOfferReply(SlotOffer* offer,
-                         const std::vector<TaskDescription>& tasks,
-                         const Params& params);
+  Framework(const PID &_pid, FrameworkID _id, double time)
+    : pid(_pid), id(_id), active(true), connectTime(time),
+      failoverTimer(NULL) {}
 
-  // Launch a task described in a slot offer response
-  void launchTask(Framework* framework, const TaskDescription& task);
+  ~Framework()
+  {
+    if (failoverTimer != NULL) {
+      MesosProcess::post(failoverTimer->self(), pack<M2M_SHUTDOWN>());
+      Process::wait(failoverTimer->self());
+      delete failoverTimer;
+      failoverTimer = NULL;
+    }
+  }
   
-  // Terminate a framework, sending it a particular error message
-  // TODO: Make the error codes and messages programmer-friendly
-  void terminateFramework(Framework* framework,
-                          int32_t code,
-                          const std::string& message);
+  Task * lookupTask(TaskID tid)
+  {
+    unordered_map<TaskID, Task *>::iterator it = tasks.find(tid);
+    if (it != tasks.end())
+      return it->second;
+    else
+      return NULL;
+  }
   
-  // Remove a slot offer (because it was replied to, or we want to rescind it,
-  // or we lost a framework or a slave)
-  void removeSlotOffer(SlotOffer* offer,
-                       OfferReturnReason reason,
-                       const std::vector<SlaveResources>& resourcesLeft);
-
-  void removeTask(Task* task, TaskRemovalReason reason);
-
-  void addFramework(Framework* framework);
-
-  // Replace the scheduler for a framework with a new process ID, in the
-  // event of a scheduler failover.
-  void failoverFramework(Framework* framework, const process::UPID& newPid);
-
-  // Kill all of a framework's tasks, delete the framework object, and
-  // reschedule slot offers for slots that were assigned to this framework
-  void removeFramework(Framework* framework);
-
-  // Add a slave.
-  void addSlave(Slave* slave);
-
-  void readdSlave(Slave* slave, const std::vector<Task>& tasks);
-
-  // Lose all of a slave's tasks and delete the slave object
-  void removeSlave(Slave* slave);
-
-  virtual Allocator* createAllocator();
-
-  FrameworkID newFrameworkId();
-  OfferID newOfferId();
-  SlaveID newSlaveId();
-
-  const Configuration& getConfiguration();
-
-private:
-  const Configuration conf;
-
-  SlavesManager* slavesManager;
-
-  multimap<std::string, uint16_t> slaveHostnamePorts;
-
-  boost::unordered_map<FrameworkID, Framework*> frameworks;
-  boost::unordered_map<SlaveID, Slave*> slaves;
-  boost::unordered_map<OfferID, SlotOffer*> slotOffers;
-
-  boost::unordered_map<process::UPID, FrameworkID> pidToFrameworkId;
-  boost::unordered_map<process::UPID, SlaveID> pidToSlaveId;
-
-  int64_t nextFrameworkId; // Used to give each framework a unique ID.
-  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
-  int64_t nextSlaveId;     // Used to give each slave a unique ID.
-
-  std::string allocatorType;
-  Allocator* allocator;
-
-  bool active;
-
-  // Contains the date the master was launched and
-  // some ephemeral token (e.g. returned from
-  // ZooKeeper). Used in framework and slave IDs
-  // created by this master.
-  std::string masterId;
-
-  // Statistics (initialized in Master::initialize).
-  struct {
-    uint64_t launched_tasks;
-    uint64_t finished_tasks;
-    uint64_t killed_tasks;
-    uint64_t failed_tasks;
-    uint64_t lost_tasks;
-    uint64_t valid_status_updates;
-    uint64_t invalid_status_updates;
-    uint64_t valid_framework_messages;
-    uint64_t invalid_framework_messages;
-  } statistics;
-};
-
-
-// A resource offer.
-struct SlotOffer
-{
-  OfferID offerId;
-  FrameworkID frameworkId;
-  std::vector<SlaveResources> resources;
+  void addTask(Task *task)
+  {
+    CHECK(tasks.count(task->id) == 0);
+    tasks[task->id] = task;
+    this->resources += task->resources;
+  }
+  
+  void removeTask(TaskID tid)
+  {
+    CHECK(tasks.find(tid) != tasks.end());
+    unordered_map<TaskID, Task *>::iterator it = tasks.find(tid);
+    this->resources -= it->second->resources;
+    tasks.erase(it);
+  }
+  
+  void addOffer(SlotOffer *offer)
+  {
+    CHECK(slotOffers.find(offer) == slotOffers.end());
+    slotOffers.insert(offer);
+    foreach (SlaveResources &r, offer->resources)
+      this->resources += r.resources;
+  }
 
-  SlotOffer(const OfferID& _offerId,
-            const FrameworkID& _frameworkId,
-            const std::vector<SlaveResources>& _resources)
-    : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
+  void removeOffer(SlotOffer *offer)
+  {
+    CHECK(slotOffers.find(offer) != slotOffers.end());
+    slotOffers.erase(offer);
+    foreach (SlaveResources &r, offer->resources)
+      this->resources -= r.resources;
+  }
+  
+  bool filters(Slave *slave, Resources resources)
+  {
+    // TODO: Implement other filters
+    return slaveFilter.find(slave) != slaveFilter.end();
+  }
+  
+  void removeExpiredFilters(double now)
+  {
+    vector<Slave *> toRemove;
+    foreachpair (Slave *slave, double removalTime, slaveFilter)
+      if (removalTime != 0 && removalTime <= now)
+        toRemove.push_back(slave);
+    foreach (Slave *slave, toRemove)
+      slaveFilter.erase(slave);
+  }
 };
 
 
 // A connected slave.
 struct Slave
-{
-  SlaveInfo info;
-  SlaveID slaveId;
-  process::UPID pid;
-
+{  
+  PID pid;
+  SlaveID id;
   bool active; // Turns false when slave is being removed
+  string hostname;
+  string publicDns;
   double connectTime;
   double lastHeartbeat;
   
+  Resources resources;        // Total resources on slave
   Resources resourcesOffered; // Resources currently in offers
   Resources resourcesInUse;   // Resources currently used by tasks
 
-  boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
-  boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
-
-  SlaveObserver* observer;
+  unordered_map<pair<FrameworkID, TaskID>, Task *> tasks;
+  unordered_set<SlotOffer *> slotOffers; // Active offers of slots on this slave
   
-  Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
-        const process::UPID& _pid, double time)
-    : info(_info), slaveId(_slaveId), pid(_pid), active(true),
-      connectTime(time), lastHeartbeat(time) {}
-
-  ~Slave() {}
+  Slave(const PID &_pid, SlaveID _id, double time)
+    : pid(_pid), id(_id), active(true)
+  {
+    connectTime = lastHeartbeat = time;
+  }
 
-  Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
+  Task * lookupTask(FrameworkID fid, TaskID tid)
   {
-    foreachpair (_, Task* task, tasks) {
-      if (task->framework_id() == frameworkId && task->task_id() == taskId) {
+    foreachpair (_, Task *task, tasks)
+      if (task->frameworkId == fid && task->id == tid)
         return task;
-      }
-    }
 
     return NULL;
   }
 
-  void addTask(Task* task)
+  void addTask(Task *task)
   {
-    std::pair<FrameworkID, TaskID> key =
-      std::make_pair(task->framework_id(), task->task_id());
-    CHECK(tasks.count(key) == 0);
-    tasks[key] = task;
-    foreach (const Resource& resource, task->resources()) {
-      resourcesInUse += resource;
-    }
+    CHECK(tasks.find(make_pair(task->frameworkId, task->id)) == tasks.end());
+    tasks[make_pair(task->frameworkId, task->id)] = task;
+    resourcesInUse += task->resources;
   }
   
-  void removeTask(Task* task)
+  void removeTask(Task *task)
   {
-    std::pair<FrameworkID, TaskID> key =
-      std::make_pair(task->framework_id(), task->task_id());
-    CHECK(tasks.count(key) > 0);
-    tasks.erase(key);
-    foreach (const Resource& resource, task->resources()) {
-      resourcesInUse -= resource;
-    }
+    CHECK(tasks.find(make_pair(task->frameworkId, task->id)) != tasks.end());
+    tasks.erase(make_pair(task->frameworkId, task->id));
+    resourcesInUse -= task->resources;
   }
   
   Resources resourcesFree()
   {
-    Resources resources;
-    foreach (const Resource& resource, info.resources()) {
-      resources += resource;
-    }
     return resources - (resourcesOffered + resourcesInUse);
   }
 };
 
 
-// Resources offered on a particular slave.
-struct SlaveResources
+// Reasons why offers might be returned to the Allocator.
+enum OfferReturnReason
 {
-  SlaveResources() {}
-  SlaveResources(Slave* s, Resources r): slave(s), resources(r) {}
-
-  Slave* slave;
-  Resources resources;
+  ORR_FRAMEWORK_REPLIED,
+  ORR_OFFER_RESCINDED,
+  ORR_FRAMEWORK_LOST,
+  ORR_FRAMEWORK_FAILOVER,
+  ORR_SLAVE_LOST
 };
 
 
-class FrameworkFailoverTimer : public process::Process<FrameworkFailoverTimer>
+// Reasons why tasks might be removed, passed to the Allocator.
+enum TaskRemovalReason
 {
-public:
-  FrameworkFailoverTimer(const process::PID<Master>& _master,
-                         const FrameworkID& _frameworkId)
-    : master(_master), frameworkId(_frameworkId) {}
+  TRR_TASK_ENDED,
+  TRR_FRAMEWORK_LOST,
+  TRR_EXECUTOR_LOST,
+  TRR_SLAVE_LOST
+};
+
 
+class Master : public MesosProcess
+{
 protected:
-  virtual void operator () ()
-  {
-    link(master);
-    while (true) {
-      receive(FRAMEWORK_FAILOVER_TIMEOUT);
-      if (name() == process::TIMEOUT) {
-        process::dispatch(master, &Master::frameworkExpired, frameworkId);
-        return;
-      } else if (name() == process::EXITED || name() == process::TERMINATE) {
-        return;
-      }
-    }
-  }
+  Params conf;
 
-private:
-  const process::PID<Master> master;
-  const FrameworkID frameworkId;
-};
+  unordered_map<FrameworkID, Framework *> frameworks;
+  unordered_map<SlaveID, Slave *> slaves;
+  unordered_map<OfferID, SlotOffer *> slotOffers;
 
+  unordered_map<PID, FrameworkID> pidToFid;
+  unordered_map<PID, SlaveID> pidToSid;
 
-// An connected framework.
-struct Framework
-{
-  FrameworkInfo info;
-  FrameworkID frameworkId;
-  process::UPID pid;
+  int64_t nextFrameworkId; // Used to give each framework a unique ID.
+  int64_t nextSlaveId;     // Used to give each slave a unique ID.
+  int64_t nextSlotOfferId; // Used to give each slot offer a unique ID.
 
-  bool active; // Turns false when framework is being removed
-  double connectTime;
+  string allocatorType;
+  Allocator *allocator;
 
-  boost::unordered_map<TaskID, Task*> tasks;
-  boost::unordered_set<SlotOffer*> slotOffers; // Active offers for framework.
+  string masterId; // Contains the date the master was launched and its fault
+                   // tolerance ID (e.g. ephemeral ID returned from ZooKeeper).
+                   // Used in framework and slave IDs created by this master.
 
-  Resources resources; // Total resources owned by framework (tasks + offers)
-  
-  // Contains a time of unfiltering for each slave we've filtered,
-  // or 0 for slaves that we want to keep filtered forever
-  boost::unordered_map<Slave*, double> slaveFilter;
+public:
+  Master();
 
-  // A failover timer if the connection to this framework is lost.
-  FrameworkFailoverTimer* failoverTimer;
+  Master(const Params& conf);
+  
+  ~Master();
 
-  Framework(const FrameworkInfo& _info, const FrameworkID& _frameworkId,
-            const process::UPID& _pid, double time)
-    : info(_info), frameworkId(_frameworkId), pid(_pid), active(true),
-      connectTime(time), failoverTimer(NULL) {}
+  static void registerOptions(Configurator* conf);
 
-  ~Framework()
-  {
-    if (failoverTimer != NULL) {
-      process::post(failoverTimer->self(), process::TERMINATE);
-      process::wait(failoverTimer->self());
-      delete failoverTimer;
-    }
-  }
+  state::MasterState *getState();
   
-  Task* lookupTask(const TaskID& taskId)
-  {
-    if (tasks.count(taskId) > 0) {
-      return tasks[taskId];
-    } else {
-      return NULL;
-    }
-  }
+  OfferID makeOffer(Framework *framework,
+		    const vector<SlaveResources>& resources);
   
-  void addTask(Task* task)
-  {
-    CHECK(tasks.count(task->task_id()) == 0);
-    tasks[task->task_id()] = task;
-    for (int i = 0; i < task->resources_size(); i++) {
-      resources += task->resources(i);
-    }
-  }
+  void rescindOffer(SlotOffer *offer);
   
-  void removeTask(const TaskID& taskId)
-  {
-    CHECK(tasks.count(taskId) > 0);
-    Task* task = tasks[taskId];
-    for (int i = 0; i < task->resources_size(); i++) {
-      resources -= task->resources(i);
-    }
-    tasks.erase(taskId);
-  }
+  void killTask(Task *task);
   
-  void addOffer(SlotOffer* offer)
-  {
-    CHECK(slotOffers.count(offer) == 0);
-    slotOffers.insert(offer);
-    foreach (const SlaveResources& sr, offer->resources) {
-      resources += sr.resources;
-    }
-  }
+  Framework * lookupFramework(FrameworkID fid);
 
-  void removeOffer(SlotOffer* offer)
-  {
-    CHECK(slotOffers.find(offer) != slotOffers.end());
-    slotOffers.erase(offer);
-    foreach (const SlaveResources& sr, offer->resources) {
-      resources -= sr.resources;
-    }
-  }
+  Slave * lookupSlave(SlaveID sid);
+
+  SlotOffer * lookupSlotOffer(OfferID soid);
+
+  // Return connected frameworks that are not in the process of being removed
+  vector<Framework *> getActiveFrameworks();
   
-  bool filters(Slave* slave, Resources resources)
-  {
-    // TODO: Implement other filters
-    return slaveFilter.find(slave) != slaveFilter.end();
-  }
+  // Return connected slaves that are not in the process of being removed
+  vector<Slave *> getActiveSlaves();
+
+  const Params& getConf();
+
+protected:
+  void operator () ();
+
+  // Process a resource offer reply (for a non-cancelled offer) by launching
+  // the desired tasks (if the offer contains a valid set of tasks) and
+  // reporting any unused resources to the allocator
+  void processOfferReply(SlotOffer *offer,
+      const vector<TaskDescription>& tasks, const Params& params);
+
+  // Launch a task described in a slot offer response
+  void launchTask(Framework *framework, const TaskDescription& task);
   
-  void removeExpiredFilters(double now)
-  {
-    foreachpaircopy (Slave* slave, double removalTime, slaveFilter) {
-      if (removalTime != 0 && removalTime <= now) {
-        slaveFilter.erase(slave);
-      }
-    }
-  }
+  // Terminate a framework, sending it a particular error message
+  // TODO: Make the error codes and messages programmer-friendly
+  void terminateFramework(Framework *framework,
+                          int32_t code,
+                          const std::string& message);
+  
+  // Remove a slot offer (because it was replied to, or we want to rescind it,
+  // or we lost a framework or a slave)
+  void removeSlotOffer(SlotOffer *offer,
+                       OfferReturnReason reason,
+                       const vector<SlaveResources>& resourcesLeft);
+
+  void removeTask(Task *task, TaskRemovalReason reason);
+
+  void addFramework(Framework *framework);
+
+  // Replace the scheduler for a framework with a new process ID, in the
+  // event of a scheduler failover.
+  void failoverFramework(Framework *framework, const PID &newPid);
+
+  // Kill all of a framework's tasks, delete the framework object, and
+  // reschedule slot offers for slots that were assigned to this framework
+  void removeFramework(Framework *framework);
+
+  // Lose all of a slave's tasks and delete the slave object
+  void removeSlave(Slave *slave);
+
+  virtual Allocator* createAllocator();
+
+  FrameworkID newFrameworkId();
+
+  string currentDate();
 };
 
 
-// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc.
+// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc
 
 inline std::ostream& operator << (std::ostream& stream, const SlotOffer *o)
 {
-  stream << "offer " << o->offerId;
+  stream << "offer " << o->id;
   return stream;
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const Slave *s)
 {
-  stream << "slave " << s->slaveId;
+  stream << "slave " << s->id;
   return stream;
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const Framework *f)
 {
-  stream << "framework " << f->frameworkId;
+  stream << "framework " << f->id;
+  return stream;
+}
+
+
+inline std::ostream& operator << (std::ostream& stream, const Task *t)
+{
+  stream << "task " << t->frameworkId << ":" << t->id;
   return stream;
 }
 
-}}} // namespace mesos { namespace internal { namespace master {
+}}} /* namespace */
 
-#endif // __MASTER_HPP__
+#endif /* __MASTER_HPP__ */

Modified: incubator/mesos/trunk/src/master/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/state.hpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/state.hpp (original)
+++ incubator/mesos/trunk/src/master/state.hpp Sun Jun  5 09:25:41 2011
@@ -1,40 +1,38 @@
-#ifndef __MASTER_STATE_HPP__
-#define __MASTER_STATE_HPP__
+#ifndef MASTER_STATE_HPP
+#define MASTER_STATE_HPP
 
 #include <iostream>
 #include <string>
 #include <vector>
 
+#include <mesos_types.hpp>
+
 #include "common/foreach.hpp"
 
 #include "config/config.hpp"
 
 
-// TODO(...): Make all the variable naming in here consistant with the
-// rest of the code base. This will require cleaning up some Python code.
-
-
 namespace mesos { namespace internal { namespace master { namespace state {
 
 struct SlaveResources
 {  
-  std::string slave_id;
+  SlaveID slave_id;
   int32_t cpus;
   int64_t mem;
   
-  SlaveResources(std::string _slaveId, int32_t _cpus, int64_t _mem)
-    : slave_id(_slaveId), cpus(_cpus), mem(_mem) {}
+  SlaveResources(SlaveID _sid, int32_t _cpus, int64_t _mem)
+    : slave_id(_sid), cpus(_cpus), mem(_mem) {}
 };
 
 
 struct SlotOffer
 {  
-  std::string id;
-  std::string framework_id;
+  OfferID id;
+  FrameworkID framework_id;
   std::vector<SlaveResources *> resources;
   
-  SlotOffer(std::string _id, std::string _frameworkId)
-    : id(_id), framework_id(_frameworkId) {}
+  SlotOffer(OfferID _id, FrameworkID _fid)
+    : id(_id), framework_id(_fid) {}
     
   ~SlotOffer()
   {
@@ -46,15 +44,14 @@ struct SlotOffer
 
 struct Slave
 {
-  Slave(std::string id_, const std::string& host_,
-        const std::string& public_dns_,
+  Slave(SlaveID id_, const std::string& host_, const std::string& public_dns_,
 	int32_t cpus_, int64_t mem_, time_t connect_)
     : id(id_), host(host_), public_dns(public_dns_),
       cpus(cpus_), mem(mem_), connect_time(connect_) {}
 
   Slave() {}
 
-  std::string id;
+  SlaveID id;
   std::string host;
   std::string public_dns;
   int32_t cpus;
@@ -65,18 +62,18 @@ struct Slave
 
 struct Task
 {
-  Task(std::string id_, const std::string& name_, std::string framework_id_,
-       std::string slaveId_, std::string state_, int32_t _cpus, int64_t _mem)
-    : id(id_), name(name_), framework_id(framework_id_), slave_id(slaveId_),
-      state(state_), cpus(_cpus), mem(_mem) {}
+  Task(TaskID id_, const std::string& name_, FrameworkID fid_, SlaveID sid_,
+       TaskState state_, int32_t _cpus, int64_t _mem)
+    : id(id_), name(name_), framework_id(fid_), slave_id(sid_), state(state_), 
+      cpus(_cpus), mem(_mem) {}
 
   Task() {}
 
-  std::string id;
+  TaskID id;
   std::string name;
-  std::string framework_id;
-  std::string slave_id;
-  std::string state;
+  FrameworkID framework_id;
+  SlaveID slave_id;
+  TaskState state;
   int32_t cpus;
   int64_t mem;
 };
@@ -84,9 +81,9 @@ struct Task
 
 struct Framework
 {
-  Framework(std::string id_, const std::string& user_,
-            const std::string& name_, const std::string& executor_,
-            int32_t cpus_, int64_t mem_, time_t connect_)
+  Framework(FrameworkID id_, const std::string& user_,
+      const std::string& name_, const std::string& executor_,
+      int32_t cpus_, int64_t mem_, time_t connect_)
     : id(id_), user(user_), name(name_), executor(executor_),
       cpus(cpus_), mem(mem_), connect_time(connect_) {}
 
@@ -100,7 +97,7 @@ struct Framework
       delete offer;
   }
 
-  std::string id;
+  FrameworkID id;
   std::string user;
   std::string name;
   std::string executor;
@@ -116,8 +113,8 @@ struct Framework
 struct MasterState
 {
   MasterState(const std::string& build_date_, const std::string& build_user_,
-	      const std::string& pid_)
-    : build_date(build_date_), build_user(build_user_), pid(pid_) {}
+	      const std::string& pid_, bool _isFT = false)
+    : build_date(build_date_), build_user(build_user_), pid(pid_), isFT(_isFT) {}
 
   MasterState() {}
 
@@ -135,8 +132,9 @@ struct MasterState
 
   std::vector<Slave *> slaves;
   std::vector<Framework *> frameworks;
+  bool isFT;
 };
 
-}}}} // namespace mesos { namespace internal { namespace master { namespace state {
+}}}} /* namespace */
 
-#endif // __MASTER_STATE_HPP__
+#endif /* MASTER_STATE_HPP */

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 09:25:41 2011
@@ -1,393 +1,529 @@
-#ifndef __MESSAGES_HPP__
-#define __MESSAGES_HPP__
+#ifndef MESSAGES_HPP
+#define MESSAGES_HPP
 
 #include <float.h>
+#include <stdint.h>
 
-#include <glog/logging.h>
-
+#include <map>
 #include <string>
 #include <vector>
 
-#include <tr1/functional>
+#include <reliable.hpp>
+
+#include <tuples/tuples.hpp>
 
 #include <mesos.hpp>
-#include <process.hpp>
+#include <mesos_types.hpp>
 
-#include <boost/unordered_map.hpp>
+#include "common/foreach.hpp"
+#include "common/logging.hpp"
+#include "common/params.hpp"
+#include "common/resources.hpp"
+#include "common/task.hpp"
 
-#include "common/utils.hpp"
+#include "master/state.hpp"
 
-#include "messaging/messages.pb.h"
+#include "slave/state.hpp"
 
 
 namespace mesos { namespace internal {
 
-// To couple a message name with a protocol buffer we use a templated
-// class that extends the necessary protocol buffer type (this also
-// allows the code to be better isolated from protocol buffer
-// naming). While protocol buffers are allegedly not meant to be
-// inherited, we decided this was an acceptable option since we don't
-// add any new functionality (or do any thing with the existing
-// functionality).
-//
-// To add another message that uses a protocol buffer you need to
-// provide a specialization of the Message class (i.e., using the
-// MESSAGE macro defined below).
-template <const char* name>
-class MSG;
-
-#define MESSAGE1(name)                          \
-    extern char name[]
-
-#define MESSAGE2(name, T)                           \
-    extern char name[];                             \
-    template <>                                     \
-    class MSG<name> : public T {}
-                                                                                
-#define MESSAGE(...)                                            \
-    CONCAT(MESSAGE, VA_NUM_ARGS(__VA_ARGS__))(__VA_ARGS__)
-
-
-// From framework to master.
-MESSAGE(F2M_REGISTER_FRAMEWORK, RegisterFrameworkMessage);
-MESSAGE(F2M_REREGISTER_FRAMEWORK, ReregisterFrameworkMessage);
-MESSAGE(F2M_UNREGISTER_FRAMEWORK, UnregisterFrameworkMessage);
-MESSAGE(F2M_RESOURCE_OFFER_REPLY, ResourceOfferReplyMessage);
-MESSAGE(F2M_REVIVE_OFFERS, ReviveOffersMessage);
-MESSAGE(F2M_KILL_TASK, KillTaskMessage);
-MESSAGE(F2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(F2M_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-// From master to framework.
-MESSAGE(M2F_REGISTER_REPLY, FrameworkRegisteredMessage);
-MESSAGE(M2F_RESOURCE_OFFER, ResourceOfferMessage);
-MESSAGE(M2F_RESCIND_OFFER, RescindResourceOfferMessage);
-MESSAGE(M2F_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(M2F_LOST_SLAVE, LostSlaveMessage);
-MESSAGE(M2F_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2F_ERROR, FrameworkErrorMessage);
-
-// From slave to master.
-MESSAGE(S2M_REGISTER_SLAVE, RegisterSlaveMessage);
-MESSAGE(S2M_REREGISTER_SLAVE, ReregisterSlaveMessage);
-MESSAGE(S2M_UNREGISTER_SLAVE, UnregisterSlaveMessage);
-MESSAGE(S2M_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
-
-// From master to slave.
-MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_RUN_TASK, RunTaskMessage);
-MESSAGE(M2S_KILL_TASK, KillTaskMessage);
-MESSAGE(M2S_KILL_FRAMEWORK, KillFrameworkMessage);
-MESSAGE(M2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2S_UPDATE_FRAMEWORK, UpdateFrameworkMessage);
-MESSAGE(M2S_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-// From executor to slave.
-MESSAGE(E2S_REGISTER_EXECUTOR, RegisterExecutorMessage);
-MESSAGE(E2S_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(E2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-
-// From slave to executor.
-MESSAGE(S2E_REGISTER_REPLY, ExecutorRegisteredMessage);
-MESSAGE(S2E_RUN_TASK, RunTaskMessage);
-MESSAGE(S2E_KILL_TASK, KillTaskMessage);
-MESSAGE(S2E_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(S2E_KILL_EXECUTOR);
+const std::string MESOS_MESSAGING_VERSION = "0";
+
+enum MessageType {
+  /* From framework to master. */
+  F2M_REGISTER_FRAMEWORK = RELIABLE_MSGID,
+  F2M_REREGISTER_FRAMEWORK,
+  F2M_UNREGISTER_FRAMEWORK,
+  F2M_SLOT_OFFER_REPLY,
+  F2M_REVIVE_OFFERS,
+  F2M_KILL_TASK,
+  F2M_FRAMEWORK_MESSAGE,
+
+  F2F_TASK_RUNNING_STATUS,
+  
+  /* From master to framework. */
+  M2F_REGISTER_REPLY,
+  M2F_SLOT_OFFER,
+  M2F_RESCIND_OFFER,
+  M2F_STATUS_UPDATE,
+  M2F_LOST_SLAVE,
+  M2F_FRAMEWORK_MESSAGE,
+  M2F_ERROR,
+  
+  /* From slave to master. */
+  S2M_REGISTER_SLAVE,
+  S2M_REREGISTER_SLAVE,
+  S2M_UNREGISTER_SLAVE,
+  S2M_STATUS_UPDATE,
+  S2M_FRAMEWORK_MESSAGE,
+  S2M_LOST_EXECUTOR,
+
+  /* From slave heart to master. */
+  SH2M_HEARTBEAT,
+
+  /* From master detector to processes */
+  GOT_MASTER_ID,
+  NEW_MASTER_DETECTED,
+  NO_MASTER_DETECTED,
+  
+  /* From master to slave. */
+  M2S_REGISTER_REPLY,
+  M2S_REREGISTER_REPLY,
+  M2S_RUN_TASK,
+  M2S_KILL_TASK,
+  M2S_KILL_FRAMEWORK,
+  M2S_FRAMEWORK_MESSAGE,
+  M2S_UPDATE_FRAMEWORK_PID,
+  M2S_SHUTDOWN, // Used in unit tests to shut down cluster
+
+  /* From executor to slave. */
+  E2S_REGISTER_EXECUTOR,
+  E2S_STATUS_UPDATE,
+  E2S_FRAMEWORK_MESSAGE,
+
+  /* From slave to executor. */
+  S2E_REGISTER_REPLY,
+  S2E_RUN_TASK,
+  S2E_KILL_TASK,
+  S2E_FRAMEWORK_MESSAGE,
+  S2E_KILL_EXECUTOR,
 
 #ifdef __sun__
-// From projd to slave.
-MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
-MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
-
-// From slave to projd.
-MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
-MESSAGE(S2PD_KILL_ALL);
-#endif // __sun__
-
-// From master detector to processes.
-MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
-MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
-MESSAGE(NO_MASTER_DETECTED);
-
-// Generic messages.
-MESSAGE(PING);
-MESSAGE(PONG);
-
-
-// Type conversions helpful for changing between protocol buffer types
-// and standard C++ types (for parameters).
-template <typename T>
-const T& convert(const T& t)
-{
-  return t;
-}
+  /* From projd to slave. */
+  PD2S_REGISTER_PROJD,
+  PD2S_PROJECT_READY,
+
+  /* From slave to projd. */
+  S2PD_UPDATE_RESOURCES,
+  S2PD_KILL_ALL,
+#endif /* __sun__ */
+
+  /* Internal to master */
+  M2M_GET_STATE,         // Used by web UI
+  M2M_GET_STATE_REPLY,
+  M2M_TIMER_TICK,        // Timer for expiring filters etc
+  M2M_FRAMEWORK_EXPIRED, // Timer for expiring frameworks
+  M2M_SHUTDOWN,          // Used in tests to shut down master
+
+  /* Internal to slave */
+  S2S_GET_STATE,         // Used by web UI
+  S2S_GET_STATE_REPLY,
+  S2S_SHUTDOWN,          // Used in tests to shut down slave
 
+  MESOS_MSGID,
+};
 
-template <typename T>
-std::vector<T> convert(const google::protobuf::RepeatedPtrField<T>& items)
-{
-  std::vector<T> result;
-  for (int i = 0; i < items.size(); i++) {
-    result.push_back(items.Get(i));
-  }
 
-  return result;
-}
+/*
+ * Include tuples details for our namespace (this strategy is
+ * typically called "supermacros" and is often used to build types or
+ * messages).
+ */
+#include <tuples/details.hpp>
 
 
-template <typename T>
-class MesosProcess : public process::Process<T>
+class MesosProcess : public ReliableProcess
 {
 public:
-  MesosProcess(const std::string& id = "")
-    : process::Process<T>(id) {}
-
-  virtual ~MesosProcess() {}
-
-  template <const char *name>
-  static void post(const process::UPID& to, const MSG<name>& msg)
+  static void post(const PID &to, MSGID id)
   {
-    std::string data;
-    msg.SerializeToString(&data);
-    process::post(to, name, data.data(), data.size());
+    const std::string &data = MESOS_MESSAGING_VERSION + "|";
+    ReliableProcess::post(to, id, data.data(), data.size());
   }
 
-protected:
-  void send(const process::UPID& to, const std::string& name)
+  template <MSGID ID>
+  static void post(const PID &to, const tuple<ID> &t)
   {
-    process::Process<T>::send(to, name);
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    ReliableProcess::post(to, ID, data.data(), data.size());
   }
 
-  template <const char* name>
-  void send(const process::UPID& to, const MSG<name>& msg)
+protected:
+  std::string body() const
   {
-    std::string data;
-    msg.SerializeToString(&data);
-    process::Process<T>::send(to, name, data.data(), data.size());
+    size_t size;
+    const char *s = ReliableProcess::body(&size);
+    const std::string data(s, size);
+    size_t index = data.find('|');
+    CHECK(index != std::string::npos);
+    return data.substr(index + 1);
   }
 
-  const std::string& serve(double secs = 0, bool once = false)
+  static void send(const PID &to, MSGID id)
   {
-    do {
-      process::Process<T>::serve(secs, once);
-      if (handlers.count(process::Process<T>::name()) > 0) {
-        handlers[process::Process<T>::name()](process::Process<T>::body());
-      } else {
-        return process::Process<T>::name();
-      }
-    } while (!once);
+    const std::string &data = MESOS_MESSAGING_VERSION + "|";
+    ReliableProcess::post(to, id, data.data(), data.size());
   }
 
-  void install(const std::string& name, void (T::*method)())
+  template <MSGID ID>
+  void send(const PID &to, const tuple<ID> &t)
   {
-    T* t = static_cast<T*>(this);
-    handlers[name] =
-      std::tr1::bind(&MesosProcess<T>::handler0, t,
-                     method,
-                     std::tr1::placeholders::_1);
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    ReliableProcess::send(to, ID, data.data(), data.size());
   }
 
-  template <typename PB,
-            typename P1, typename P1C>
-  void install(const std::string& name, void (T::*method)(P1C),
-               P1 (PB::*param1)() const)
+  template <MSGID ID>
+  bool forward(const PID &to, const tuple<ID> &t)
   {
-    T* t = static_cast<T*>(this);
-    handlers[name] =
-      std::tr1::bind(&handler1<PB, P1, P1C>, t,
-                     method, param1,
-                     std::tr1::placeholders::_1);
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    ReliableProcess::forward(to, ID, data.data(), data.size());
   }
 
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C>
-  void install(const std::string& name, void (T::*method)(P1C, P2C),
-               P1 (PB::*p1)() const,
-               P2 (PB::*p2)() const)
-  {
-    T* t = static_cast<T*>(this);
-    handlers[name] =
-      std::tr1::bind(&handler2<PB, P1, P1C, P2, P2C>, t,
-                     method, p1, p2,
-                     std::tr1::placeholders::_1);
-  }
-
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C,
-            typename P3, typename P3C>
-  void install(const std::string& name,
-               void (T::*method)(P1C, P2C, P3C),
-               P1 (PB::*p1)() const,
-               P2 (PB::*p2)() const,
-               P3 (PB::*p3)() const)
+  template <MSGID ID>
+  int rsend(const PID &to, const tuple<ID> &t)
   {
-    T* t = static_cast<T*>(this);
-    handlers[name] =
-      std::tr1::bind(&handler3<PB, P1, P1C, P2, P2C, P3, P3C>, t,
-                     method, p1, p2, p3,
-                     std::tr1::placeholders::_1);
-  }
-
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C,
-            typename P3, typename P3C,
-            typename P4, typename P4C>
-  void install(const std::string& name,
-               void (T::*method)(P1C, P2C, P3C, P4C),
-               P1 (PB::*p1)() const,
-               P2 (PB::*p2)() const,
-               P3 (PB::*p3)() const,
-               P4 (PB::*p4)() const)
-  {
-    T* t = static_cast<T*>(this);
-    handlers[name] =
-      std::tr1::bind(&handler4<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C>, t,
-                     method, p1, p2, p3, p4,
-                     std::tr1::placeholders::_1);
-  }
-
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C,
-            typename P3, typename P3C,
-            typename P4, typename P4C,
-            typename P5, typename P5C>
-  void install(const std::string& name,
-               void (T::*method)(P1C, P2C, P3C, P4C, P5C),
-               P1 (PB::*p1)() const,
-               P2 (PB::*p2)() const,
-               P3 (PB::*p3)() const,
-               P4 (PB::*p4)() const,
-               P5 (PB::*p5)() const)
-  {
-    T* t = static_cast<T*>(this);
-    handlers[name] =
-      std::tr1::bind(&handler5<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C, P5, P5C>, t,
-                     method, p1, p2, p3, p4, p5,
-                     std::tr1::placeholders::_1);
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    return ReliableProcess::rsend(to, ID, data.data(), data.size());
   }
 
-private:
-  static void handler0(T* t, void (T::*method)(),
-                       const std::string& data)
+  template <MSGID ID>
+  int rsend(const PID &via, const PID &to, const tuple<ID> &t)
   {
-    (t->*method)();
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    return ReliableProcess::rsend(via, to, ID, data.data(), data.size());
   }
 
-  template <typename PB,
-            typename P1, typename P1C>
-  static void handler1(T* t, void (T::*method)(P1C),
-                       P1 (PB::*p1)() const,
-                       const std::string& data)
+  virtual MSGID receive(double secs = 0)
   {
-    PB pb;
-    pb.ParseFromString(data);
-    if (pb.IsInitialized()) {
-      (t->*method)(convert((&pb->*p1)()));
-    } else {
-      LOG(WARNING) << "Initialization errors: "
-                   << pb.InitializationErrorString();
+    bool indefinite = secs == 0;
+    double now = elapsed();
+    MSGID id = ReliableProcess::receive(secs);
+    if (RELIABLE_MSGID < id && id < MESOS_MSGID) {
+      size_t size;
+      const char *s = ReliableProcess::body(&size);
+      const std::string data(s, size);
+      size_t index = data.find('|');
+      if (index == std::string::npos ||
+          MESOS_MESSAGING_VERSION != data.substr(0, index)) {
+        LOG(ERROR) << "Dropping message from " << from()
+                   << " with incorrect messaging version!";
+        if (!indefinite) {
+          double remaining = secs - (elapsed() - now);
+          return receive(remaining <= 0 ? DBL_EPSILON : remaining);
+        } else {
+          return receive(0);
+        }
+      }
     }
+    return id;
   }
+};
 
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C>
-  static void handler2(T* t, void (T::*method)(P1C, P2C),
-                       P1 (PB::*p1)() const,
-                       P2 (PB::*p2)() const,
-                       const std::string& data)
-  {
-    PB pb;
-    pb.ParseFromString(data);
-    if (pb.IsInitialized()) {
-      (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()));
-    } else {
-      LOG(WARNING) << "Initialization errors: "
-                   << pb.InitializationErrorString();
-    }
+
+using boost::tuples::tie;
+
+
+
+TUPLE(F2M_REGISTER_FRAMEWORK,
+      (std::string /*name*/,
+       std::string /*user*/,
+       ExecutorInfo));
+
+TUPLE(F2M_REREGISTER_FRAMEWORK,
+      (FrameworkID,
+       std::string /*name*/,
+       std::string /*user*/,
+       ExecutorInfo,
+       int32_t /*generation*/));
+
+TUPLE(F2M_UNREGISTER_FRAMEWORK,
+      (FrameworkID));
+
+TUPLE(F2M_SLOT_OFFER_REPLY,
+      (FrameworkID,
+       OfferID,
+       std::vector<TaskDescription>,
+       Params));
+
+TUPLE(F2M_REVIVE_OFFERS,
+      (FrameworkID));
+
+TUPLE(F2M_KILL_TASK,
+      (FrameworkID,
+       TaskID));
+
+TUPLE(F2M_FRAMEWORK_MESSAGE,
+      (FrameworkID,
+       FrameworkMessage));
+
+TUPLE(F2F_TASK_RUNNING_STATUS,
+      ());
+
+TUPLE(M2F_REGISTER_REPLY,
+      (FrameworkID));
+
+TUPLE(M2F_SLOT_OFFER,
+      (OfferID,
+       std::vector<SlaveOffer>,
+       std::map<SlaveID, PID>));
+
+TUPLE(M2F_RESCIND_OFFER,
+      (OfferID));
+
+TUPLE(M2F_STATUS_UPDATE,
+      (TaskID,
+       TaskState,
+       std::string));
+
+TUPLE(M2F_LOST_SLAVE,
+      (SlaveID));
+
+TUPLE(M2F_FRAMEWORK_MESSAGE,
+      (FrameworkMessage));
+
+TUPLE(M2F_ERROR,
+      (int32_t /*code*/,
+       std::string /*msg*/));
+
+
+TUPLE(S2M_REGISTER_SLAVE,
+      (std::string /*name*/,
+       std::string /*publicDns*/,
+       Resources));
+
+TUPLE(S2M_REREGISTER_SLAVE,
+      (SlaveID,
+       std::string /*name*/,
+       std::string /*publicDns*/,
+       Resources,
+       std::vector<Task>));
+
+TUPLE(S2M_UNREGISTER_SLAVE,
+      (SlaveID));
+
+TUPLE(S2M_STATUS_UPDATE,
+      (SlaveID,
+       FrameworkID,
+       TaskID,
+       TaskState,
+       std::string));
+
+TUPLE(S2M_FRAMEWORK_MESSAGE,
+      (SlaveID,
+       FrameworkID,
+       FrameworkMessage));
+
+TUPLE(S2M_LOST_EXECUTOR,
+      (SlaveID,
+       FrameworkID,
+       int32_t /*exitStatus*/));
+
+TUPLE(SH2M_HEARTBEAT,
+      (SlaveID));
+    
+TUPLE(NEW_MASTER_DETECTED,
+      (std::string, /* master seq */
+       PID /* master PID */));
+
+TUPLE(NO_MASTER_DETECTED,
+      ());
+
+TUPLE(GOT_MASTER_ID,
+      (std::string /* id */));
+  
+TUPLE(M2S_REGISTER_REPLY,
+      (SlaveID,
+       double /*heartbeat interval*/));
+
+TUPLE(M2S_REREGISTER_REPLY,
+      (SlaveID,
+       double /*heartbeat interval*/));
+
+TUPLE(M2S_RUN_TASK,
+      (FrameworkID,
+       TaskID,
+       std::string /*frameworkName*/,
+       std::string /*user*/,
+       ExecutorInfo,
+       std::string /*taskName*/,
+       std::string /*taskArgs*/,
+       Params,
+       PID /*framework PID*/));
+
+TUPLE(M2S_KILL_TASK,
+      (FrameworkID,
+       TaskID));
+
+TUPLE(M2S_KILL_FRAMEWORK,
+      (FrameworkID));
+
+TUPLE(M2S_FRAMEWORK_MESSAGE,
+      (FrameworkID,
+       FrameworkMessage));
+
+TUPLE(M2S_UPDATE_FRAMEWORK_PID,
+      (FrameworkID,
+       PID));
+
+TUPLE(M2S_SHUTDOWN,
+      ());
+
+TUPLE(E2S_REGISTER_EXECUTOR,
+      (FrameworkID));
+
+TUPLE(E2S_STATUS_UPDATE,
+      (FrameworkID,
+       TaskID,
+       TaskState,
+       std::string));
+
+TUPLE(E2S_FRAMEWORK_MESSAGE,
+      (FrameworkID,
+       FrameworkMessage));
+
+TUPLE(S2E_REGISTER_REPLY,
+      (SlaveID,
+       std::string /*hostname*/,
+       std::string /*frameworkName*/,
+       std::string /*initArg*/));
+
+TUPLE(S2E_RUN_TASK,
+      (TaskID,
+       std::string /*name*/,
+       std::string /*arg*/,
+       Params));
+
+TUPLE(S2E_KILL_TASK,
+      (TaskID));
+
+TUPLE(S2E_FRAMEWORK_MESSAGE,
+      (FrameworkMessage));
+
+TUPLE(S2E_KILL_EXECUTOR,
+      ());
+
+#ifdef __sun__
+TUPLE(PD2S_REGISTER_PROJD,
+      (std::string /*project*/));
+
+TUPLE(PD2S_PROJECT_READY,
+      (std::string /*project*/));
+
+TUPLE(S2PD_UPDATE_RESOURCES,
+      (Resources));
+
+TUPLE(S2PD_KILL_ALL,
+      ());
+#endif /* __sun__ */
+
+TUPLE(M2M_GET_STATE,
+      ());
+
+TUPLE(M2M_GET_STATE_REPLY,
+      (master::state::MasterState *));
+
+TUPLE(M2M_TIMER_TICK,
+      ());
+
+TUPLE(M2M_FRAMEWORK_EXPIRED,
+      (FrameworkID));
+
+TUPLE(M2M_SHUTDOWN,
+      ());
+
+TUPLE(S2S_GET_STATE,
+      ());
+
+TUPLE(S2S_GET_STATE_REPLY,
+      (slave::state::SlaveState *));
+
+TUPLE(S2S_SHUTDOWN,
+      ());
+
+
+/* Serialization functions for sharing objects of local Mesos types. */
+
+void operator & (process::tuples::serializer&, const master::state::MasterState *);
+void operator & (process::tuples::deserializer&, master::state::MasterState *&);
+
+void operator & (process::tuples::serializer&, const slave::state::SlaveState *);
+void operator & (process::tuples::deserializer&, slave::state::SlaveState *&);
+
+
+/* Serialization functions for various Mesos data types. */
+
+void operator & (process::tuples::serializer&, const TaskState&);
+void operator & (process::tuples::deserializer&, TaskState&);
+
+void operator & (process::tuples::serializer&, const SlaveOffer&);
+void operator & (process::tuples::deserializer&, SlaveOffer&);
+
+void operator & (process::tuples::serializer&, const TaskDescription&);
+void operator & (process::tuples::deserializer&, TaskDescription&);
+
+void operator & (process::tuples::serializer&, const FrameworkMessage&);
+void operator & (process::tuples::deserializer&, FrameworkMessage&);
+
+void operator & (process::tuples::serializer&, const ExecutorInfo&);
+void operator & (process::tuples::deserializer&, ExecutorInfo&);
+
+void operator & (process::tuples::serializer&, const Params&);
+void operator & (process::tuples::deserializer&, Params&);
+
+void operator & (process::tuples::serializer&, const Resources&);
+void operator & (process::tuples::deserializer&, Resources&);
+
+void operator & (process::tuples::serializer&, const Task&);
+void operator & (process::tuples::deserializer&, Task&);
+
+
+/* Serialization functions for STL vectors. */
+
+template<typename T>
+void operator & (process::tuples::serializer& s, const std::vector<T>& v)
+{
+  int32_t size = (int32_t) v.size();
+  s & size;
+  for (size_t i = 0; i < size; i++) {
+    s & v[i];
   }
+}
 
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C,
-            typename P3, typename P3C>
-  static void handler3(T* t, void (T::*method)(P1C, P2C, P3C),
-                       P1 (PB::*p1)() const,
-                       P2 (PB::*p2)() const,
-                       P3 (PB::*p3)() const,
-                       const std::string& data)
-  {
-    PB pb;
-    pb.ParseFromString(data);
-    if (pb.IsInitialized()) {
-      (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
-                   convert((&pb->*p3)()));
-    } else {
-      LOG(WARNING) << "Initialization errors: "
-                   << pb.InitializationErrorString();
-    }
+
+template<typename T>
+void operator & (process::tuples::deserializer& d, std::vector<T>& v)
+{
+  int32_t size;
+  d & size;
+  v.resize(size);
+  for (size_t i = 0; i < size; i++) {
+    d & v[i];
   }
+}
 
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C,
-            typename P3, typename P3C,
-            typename P4, typename P4C>
-  static void handler4(T* t, void (T::*method)(P1C, P2C, P3C, P4C),
-                       P1 (PB::*p1)() const,
-                       P2 (PB::*p2)() const,
-                       P3 (PB::*p3)() const,
-                       P4 (PB::*p4)() const,
-                       const std::string& data)
-  {
-    PB pb;
-    pb.ParseFromString(data);
-    if (pb.IsInitialized()) {
-      (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
-                   convert((&pb->*p3)()), convert((&pb->*p4)()));
-    } else {
-      LOG(WARNING) << "Initialization errors: "
-                   << pb.InitializationErrorString();
-    }
+
+/* Serialization functions for STL maps. */
+
+template<typename K, typename V>
+void operator & (process::tuples::serializer& s, const std::map<K, V>& m)
+{
+  int32_t size = (int32_t) m.size();
+  s & size;
+  foreachpair (const K& k, const V& v, m) {
+    s & k;
+    s & v;
   }
+}
 
-  template <typename PB,
-            typename P1, typename P1C,
-            typename P2, typename P2C,
-            typename P3, typename P3C,
-            typename P4, typename P4C,
-            typename P5, typename P5C>
-  static void handler5(T* t, void (T::*method)(P1C, P2C, P3C, P4C, P5C),
-                       P1 (PB::*p1)() const,
-                       P2 (PB::*p2)() const,
-                       P3 (PB::*p3)() const,
-                       P4 (PB::*p4)() const,
-                       P5 (PB::*p5)() const,
-                       const std::string& data)
-  {
-    PB pb;
-    pb.ParseFromString(data);
-    if (pb.IsInitialized()) {
-      (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
-                   convert((&pb->*p3)()), convert((&pb->*p4)()),
-                   convert((&pb->*p5)()));
-    } else {
-      LOG(WARNING) << "Initialization errors: "
-                   << pb.InitializationErrorString();
-    }
+
+template<typename K, typename V>
+void operator & (process::tuples::deserializer& d, std::map<K, V>& m)
+{
+  m.clear();
+  int32_t size;
+  d & size;
+  K k;
+  V v;
+  for (size_t i = 0; i < size; i++) {
+    d & k;
+    d & v;
+    m[k] = v;
   }
+}
 
-  boost::unordered_map<std::string, std::tr1::function<void (const std::string&)> > handlers;
-};
 
-}} // namespace mesos { namespace internal {
+}} /* namespace mesos { namespace internal { */
 
 
-#endif // __MESSAGES_HPP__
+#endif /* MESSAGES_HPP */