You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:08:03 UTC

svn commit: r1132253 [2/5] - in /incubator/mesos/trunk: src/ src/detector/ src/exec/ src/master/ src/messaging/ src/sched/ src/slave/ src/tests/ third_party/libprocess/ third_party/libprocess/third_party/ry-http-parser-1c3624a/

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 09:08:02 2011
@@ -1,34 +1,40 @@
-#ifndef MESSAGES_HPP
-#define MESSAGES_HPP
+#ifndef __MESSAGES_HPP__
+#define __MESSAGES_HPP__
 
 #include <float.h>
 
+#include <glog/logging.h>
+
 #include <string>
 
+#include <tr1/functional>
+
 #include <mesos.hpp>
 #include <process.hpp>
 
+#include <boost/unordered_map.hpp>
+
 #include "messaging/messages.pb.h"
 
 
 namespace mesos { namespace internal {
 
+enum MSGID {
+  // Artifacts from libprocess.
+  PROCESS_TIMEOUT,
+  PROCESS_EXIT,
 
-// TODO(benh): Eliminate versioning once message ids become strings.
-const std::string MESOS_MESSAGING_VERSION = "2";
-
-
-enum MessageType {
-  /* From framework to master. */
-  F2M_REGISTER_FRAMEWORK = PROCESS_MSGID,
+  // From framework to master.
+  F2M_REGISTER_FRAMEWORK,
   F2M_REREGISTER_FRAMEWORK,
   F2M_UNREGISTER_FRAMEWORK,
   F2M_RESOURCE_OFFER_REPLY,
   F2M_REVIVE_OFFERS,
   F2M_KILL_TASK,
   F2M_FRAMEWORK_MESSAGE,
+  F2M_STATUS_UPDATE_ACK,
   
-  /* From master to framework. */
+  // From master to framework.
   M2F_REGISTER_REPLY,
   M2F_RESOURCE_OFFER,
   M2F_RESCIND_OFFER,
@@ -37,7 +43,7 @@ enum MessageType {
   M2F_FRAMEWORK_MESSAGE,
   M2F_ERROR,
   
-  /* From slave to master. */
+  // From slave to master.
   S2M_REGISTER_SLAVE,
   S2M_REREGISTER_SLAVE,
   S2M_UNREGISTER_SLAVE,
@@ -45,15 +51,10 @@ enum MessageType {
   S2M_FRAMEWORK_MESSAGE,
   S2M_EXITED_EXECUTOR,
 
-  /* From slave heart to master. */
+  // From slave heart to master.
   SH2M_HEARTBEAT,
-
-  /* From master detector to processes */
-  GOT_MASTER_TOKEN,
-  NEW_MASTER_DETECTED,
-  NO_MASTER_DETECTED,
   
-  /* From master to slave. */
+  // From master to slave.
   M2S_REGISTER_REPLY,
   M2S_REREGISTER_REPLY,
   M2S_RUN_TASK,
@@ -61,14 +62,15 @@ enum MessageType {
   M2S_KILL_FRAMEWORK,
   M2S_FRAMEWORK_MESSAGE,
   M2S_UPDATE_FRAMEWORK,
+  M2S_STATUS_UPDATE_ACK,
   M2S_SHUTDOWN, // Used in unit tests to shut down cluster
 
-  /* From executor to slave. */
+  // From executor to slave.
   E2S_REGISTER_EXECUTOR,
   E2S_STATUS_UPDATE,
   E2S_FRAMEWORK_MESSAGE,
 
-  /* From slave to executor. */
+  // From slave to executor.
   S2E_REGISTER_REPLY,
   S2E_RUN_TASK,
   S2E_KILL_TASK,
@@ -76,62 +78,56 @@ enum MessageType {
   S2E_KILL_EXECUTOR,
 
 #ifdef __sun__
-  /* From projd to slave. */
+  // From projd to slave.
   PD2S_REGISTER_PROJD,
   PD2S_PROJECT_READY,
 
-  /* From slave to projd. */
+  // From slave to projd.
   S2PD_UPDATE_RESOURCES,
   S2PD_KILL_ALL,
-#endif /* __sun__ */
-
-  /* Internal to framework. */
-  F2F_RESOURCE_OFFER_REPLY,
-  F2F_FRAMEWORK_MESSAGE,
+#endif // __sun__
 
-  /* Internal to master. */
+  // Internal to master.
   M2M_GET_STATE,         // Used by web UI
   M2M_GET_STATE_REPLY,
   M2M_TIMER_TICK,        // Timer for expiring filters etc
   M2M_FRAMEWORK_EXPIRED, // Timer for expiring frameworks
   M2M_SHUTDOWN,          // Used in tests to shut down master
 
-  /* Internal to slave. */
+  // Internal to slave.
   S2S_GET_STATE,         // Used by web UI
   S2S_GET_STATE_REPLY,
   S2S_SHUTDOWN,          // Used in tests to shut down slave
 
-  /* Generic. */
-  TERMINATE,
+  // From master detector to processes.
+  GOT_MASTER_TOKEN,
+  NEW_MASTER_DETECTED,
+  NO_MASTER_DETECTED,
+  MASTER_DETECTION_FAILURE,
 
-  // TODO(benh): Put these all in their right place.
-  MASTER_DETECTION_FAILURE, 
-  F2M_STATUS_UPDATE_ACK,
-  M2S_STATUS_UPDATE_ACK,
+  // HTTP messages.
+  vars,
 
-  MESOS_MSGID,
+  MESOS_MSGID
 };
 
 
-/**
- * To couple a MSGID with a protocol buffer we use a templated class
- * that extends the necessary protocol buffer type (this also allows
- * the code to be better isolated from protocol buffer naming). While
- * protocol buffers are allegedly not meant to be inherited, we
- * decided this was an acceptable option since we don't add any new
- * functionality (or do any thing with the existing functionality).
- *
- * To add another message that uses a protocol buffer you need to
- * provide a specialization of the Message class (i.e., using the
- * MESSAGE macro defined below).
- */
+// To couple a MSGID with a protocol buffer we use a templated class
+// that extends the necessary protocol buffer type (this also allows
+// the code to be better isolated from protocol buffer naming). While
+// protocol buffers are allegedly not meant to be inherited, we
+// decided this was an acceptable option since we don't add any new
+// functionality (or do any thing with the existing functionality).
+//
+// To add another message that uses a protocol buffer you need to
+// provide a specialization of the Message class (i.e., using the
+// MESSAGE macro defined below).
 template <MSGID ID>
-class Message;
-
+class MSG;
 
 #define MESSAGE(ID, T)                          \
   template <>                                   \
-  class Message<ID> : public T {}
+  class MSG<ID> : public T {}
 
 
 class AnyMessage
@@ -141,9 +137,9 @@ public:
     : data(data_) {}
 
   template <MSGID ID>
-  operator Message<ID> () const
+  operator MSG<ID> () const
   {
-    Message<ID> msg;
+    MSG<ID> msg;
     msg.ParseFromString(data);
     return msg;
   }
@@ -156,76 +152,326 @@ private:
 class MesosProcess : public Process
 {
 public:
+  MesosProcess(const std::string& id = "") : Process(id) {}
+
+  virtual ~MesosProcess() {}
+
   static void post(const PID &to, MSGID id)
   {
-    const std::string &data = MESOS_MESSAGING_VERSION + "|";
-    Process::post(to, id, data.data(), data.size());
+    CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
+    Process::post(to, names[id]);
   }
 
   template <MSGID ID>
-  static void post(const PID &to, const Message<ID> &msg)
+  static void post(const PID &to, const MSG<ID> &msg)
   {
+    CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
     std::string data;
     msg.SerializeToString(&data);
-    data = MESOS_MESSAGING_VERSION + "|" + data;
-    Process::post(to, ID, data.data(), data.size());
+    Process::post(to, names[ID], data.data(), data.size());
   }
 
+  static boost::unordered_map<std::string, MSGID> ids;
+  static boost::unordered_map<MSGID, std::string> names;
+
 protected:
   AnyMessage message() const
   {
     return AnyMessage(body());
   }
 
+  MSGID msgid() const
+  {
+    CHECK(ids.count(name()) > 0) << "Missing MSGID for '" << name() << "'";
+    return ids[name()];
+  }
+
   std::string body() const
   {
     size_t size;
-    const char *s = Process::body(&size);
-    const std::string data(s, size);
-    size_t index = data.find('|');
-    CHECK(index != std::string::npos);
-    return data.substr(index + 1);
+    const char *data = Process::body(&size);
+    return std::string(data, size);
   }
 
-  virtual void send(const PID &to, MSGID id)
+  void send(const PID &to, MSGID id)
   {
-    const std::string &data = MESOS_MESSAGING_VERSION + "|";
-    Process::send(to, id, data.data(), data.size());
+    CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
+    Process::send(to, names[id]);
   }
 
   template <MSGID ID>
-  void send(const PID &to, const Message<ID> &msg)
+  void send(const PID &to, const MSG<ID> &msg)
   {
+    CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
     std::string data;
     msg.SerializeToString(&data);
-    data = MESOS_MESSAGING_VERSION + "|" + data;
-    Process::send(to, ID, data.data(), data.size());
+    Process::send(to, names[ID], data.data(), data.size());
+  }
+
+  MSGID receive(double secs = 0)
+  {
+    while (true) {
+      Process::receive(secs);
+      if (ids.count(name()) > 0) {
+        // Check if this has been bound and invoke the handler.
+        if (handlers.count(name()) > 0) {
+          size_t length;
+          const char* data = Process::body(&length);
+          handlers[name()](data, length);
+        } else {
+          return ids[name()];
+        }
+      } else {
+        LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+                     << " from: " << from() << " to: " << self();
+      }
+    }
   }
 
-  virtual MSGID receive(double secs = 0)
+  MSGID serve(double secs = 0)
   {
-    bool indefinite = secs == 0;
-    double now = elapsed();
-    MSGID id = Process::receive(secs);
-    if (PROCESS_MSGID < id && id < MESOS_MSGID) {
-      size_t size;
-      const char *s = Process::body(&size);
-      const std::string data(s, size);
-      size_t index = data.find('|');
-      if (index == std::string::npos ||
-          MESOS_MESSAGING_VERSION != data.substr(0, index)) {
-        LOG(ERROR) << "Dropping message from " << from()
-                   << " with incorrect messaging version!";
-        if (!indefinite) {
-          double remaining = secs - (elapsed() - now);
-          return receive(remaining <= 0 ? DBL_EPSILON : remaining);
+    while (true) {
+      Process::serve(secs);
+      if (ids.count(name()) > 0) {
+        // Check if this has been bound and invoke the handler.
+        if (handlers.count(name()) > 0) {
+          size_t length;
+          const char* data = Process::body(&length);
+          handlers[name()](data, length);
         } else {
-          return receive(0);
+          return ids[name()];
         }
+      } else {
+        LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+                     << " from: " << from() << " to: " << self();
       }
     }
-    return id;
   }
+
+  template <typename T>
+  void handle(MSGID id, void (T::*method)())
+  {
+    T* t = static_cast<T*>(this);
+    CHECK(names.count(id) > 0);
+    handlers[names[id]] =
+      std::tr1::bind(&MesosProcess::handler0<T>, t,
+                     method, std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1>
+  void handle(MSGID id, void (T::*method)(P1),
+              P1 (PB::*param1)() const)
+  {
+    T* t = static_cast<T*>(this);
+    CHECK(names.count(id) > 0);
+    handlers[names[id]] =
+      std::tr1::bind(&MesosProcess::handler1<T, PB, P1>, t,
+                     method, param1,
+                     std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2>
+  void handle(MSGID id, void (T::*method)(P1, P2),
+              P1 (PB::*p1)() const,
+              P2 (PB::*p2)() const)
+  {
+    T* t = static_cast<T*>(this);
+    CHECK(names.count(id) > 0);
+    handlers[names[id]] =
+      std::tr1::bind(&MesosProcess::handler2<T, PB, P1, P2>, t,
+                     method, p1, p2,
+                     std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2,
+            typename P3>
+  void handle(MSGID id,
+              void (T::*method)(P1, P2, P3),
+              P1 (PB::*p1)() const,
+              P2 (PB::*p2)() const,
+              P3 (PB::*p3)() const)
+  {
+    T* t = static_cast<T*>(this);
+    CHECK(names.count(id) > 0);
+    handlers[names[id]] =
+      std::tr1::bind(&MesosProcess::handler3<T, PB, P1, P2, P3>, t,
+                     method, p1, p2, p3,
+                     std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2,
+            typename P3,
+            typename P4>
+  void handle(MSGID id,
+              void (T::*method)(P1, P2, P3, P4),
+              P1 (PB::*p1)() const,
+              P2 (PB::*p2)() const,
+              P3 (PB::*p3)() const,
+              P4 (PB::*p4)() const)
+  {
+    T* t = static_cast<T*>(this);
+    CHECK(names.count(id) > 0);
+    handlers[names[id]] =
+      std::tr1::bind(&MesosProcess::handler4<T, PB, P1, P2, P3, P4>, t,
+                     method, p1, p2, p3, p4,
+                     std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2,
+            typename P3,
+            typename P4,
+            typename P5>
+  void handle(MSGID id,
+              void (T::*method)(P1, P2, P3, P4, P5),
+              P1 (PB::*p1)() const,
+              P2 (PB::*p2)() const,
+              P3 (PB::*p3)() const,
+              P4 (PB::*p4)() const,
+              P5 (PB::*p5)() const)
+  {
+    T* t = static_cast<T*>(this);
+    CHECK(names.count(id) > 0);
+    handlers[names[id]] =
+      std::tr1::bind(&MesosProcess::handler5<T, PB, P1, P2, P3, P4, P5>, t,
+                     method, p1, p2, p3, p4, p5,
+                     std::tr1::placeholders::_1,
+                     std::tr1::placeholders::_2);
+  }
+
+private:
+  template <typename T>
+  static void handler0(T* t, void (T::*method)(),
+                       const char* data, size_t length)
+  {
+    (t->*method)();
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1>
+  static void handler1(T* t, void (T::*method)(P1),
+                       P1 (PB::*p1)() const,
+                       const char* data, size_t length)
+  {
+    PB pb;
+    pb.ParseFromArray(data, length);
+    if (pb.IsInitialized()) {
+      (t->*method)((&pb->*p1)());
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << pb.InitializationErrorString();
+    }
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2>
+  static void handler2(T* t, void (T::*method)(P1, P2),
+                       P1 (PB::*p1)() const,
+                       P2 (PB::*p2)() const,
+                       const char* data, size_t length)
+  {
+    PB pb;
+    pb.ParseFromArray(data, length);
+    if (pb.IsInitialized()) {
+      (t->*method)((&pb->*p1)(), (&pb->*p2)());
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << pb.InitializationErrorString();
+    }
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2,
+            typename P3>
+  static void handler3(T* t, void (T::*method)(P1, P2, P3),
+                       P1 (PB::*p1)() const,
+                       P2 (PB::*p2)() const,
+                       P3 (PB::*p3)() const,
+                       const char* data, size_t length)
+  {
+    PB pb;
+    pb.ParseFromArray(data, length);
+    if (pb.IsInitialized()) {
+      (t->*method)((&pb->*p1)(), (&pb->*p2)(), (&pb->*p3)());
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << pb.InitializationErrorString();
+    }
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2,
+            typename P3,
+            typename P4>
+  static void handler4(T* t, void (T::*method)(P1, P2, P3, P4),
+                       P1 (PB::*p1)() const,
+                       P2 (PB::*p2)() const,
+                       P3 (PB::*p3)() const,
+                       P4 (PB::*p4)() const,
+                       const char* data, size_t length)
+  {
+    PB pb;
+    pb.ParseFromArray(data, length);
+    if (pb.IsInitialized()) {
+      (t->*method)((&pb->*p1)(), (&pb->*p2)(), (&pb->*p3)(), (&pb->*p4)());
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << pb.InitializationErrorString();
+    }
+  }
+
+  template <typename T,
+            typename PB,
+            typename P1,
+            typename P2,
+            typename P3,
+            typename P4,
+            typename P5>
+  static void handler5(T* t, void (T::*method)(P1, P2, P3, P4, P5),
+                       P1 (PB::*p1)() const,
+                       P2 (PB::*p2)() const,
+                       P3 (PB::*p3)() const,
+                       P4 (PB::*p4)() const,
+                       P5 (PB::*p5)() const,
+                       const char* data, size_t length)
+  {
+    PB pb;
+    pb.ParseFromArray(data, length);
+    if (pb.IsInitialized()) {
+      (t->*method)((&pb->*p1)(), (&pb->*p2)(), (&pb->*p3)(), (&pb->*p4)(),
+                   (&pb->*p5)());
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << pb.InitializationErrorString();
+    }
+  }
+
+  boost::unordered_map<std::string, std::tr1::function<void (const char*, size_t)> > handlers;
 };
 
 
@@ -277,7 +523,7 @@ MESSAGE(S2E_FRAMEWORK_MESSAGE, Framework
 MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
 MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
 MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
-#endif /* __sun__ */
+#endif // __sun__
 
 MESSAGE(M2M_GET_STATE_REPLY, StateMessage);
 MESSAGE(M2M_FRAMEWORK_EXPIRED, FrameworkExpiredMessage);
@@ -287,7 +533,7 @@ MESSAGE(S2S_GET_STATE_REPLY, StateMessag
 MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
 MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
 
-}} /* namespace mesos { namespace internal { */
+}} // namespace mesos { namespace internal {
 
 
-#endif /* MESSAGES_HPP */
+#endif // __MESSAGES_HPP__

Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun  5 09:08:02 2011
@@ -56,8 +56,8 @@ message UnregisterFrameworkMessage {
 
 message ResourceOfferMessage {
   required OfferID offer_id = 1;
-  repeated SlaveOffer offer = 2;
-  repeated string pid = 3;
+  repeated SlaveOffer offers = 2;
+  repeated string pids = 3;
 }
 
 

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun  5 09:08:02 2011
@@ -13,11 +13,12 @@
 #include <string>
 #include <sstream>
 
+#include <tr1/functional>
+
 #include <mesos.hpp>
 #include <mesos_sched.hpp>
-#include <reliable.hpp>
+#include <process.hpp>
 
-#include <boost/bind.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 
@@ -37,14 +38,17 @@
 using namespace mesos;
 using namespace mesos::internal;
 
+using boost::cref;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using google::protobuf::RepeatedPtrField;
+
 using std::map;
 using std::string;
 using std::vector;
 
-using boost::bind;
-using boost::cref;
-using boost::unordered_map;
-using boost::unordered_set;
+using std::tr1::bind;
 
 
 namespace mesos { namespace internal {
@@ -78,7 +82,7 @@ protected:
           VLOG(1) << "No status updates received for task ID: "
                   << taskId << " after "
                   << STATUS_UPDATE_TIMEOUT << ", assuming task was lost";
-          Message<M2F_STATUS_UPDATE> out;
+          MSG<M2F_STATUS_UPDATE> out;
           out.mutable_framework_id()->MergeFrom(frameworkId);
           TaskStatus* status = out.mutable_status();
           status->mutable_task_id()->MergeFrom(taskId);
@@ -117,9 +121,42 @@ public:
 		   const FrameworkID& _frameworkId,
                    const FrameworkInfo& _framework)
     : driver(_driver), sched(_sched), frameworkId(_frameworkId),
-      framework(_framework), generation(0), master(PID()), terminate(false) {}
+      framework(_framework), generation(0), master(PID()), terminate(false)
+  {
+    handle(NEW_MASTER_DETECTED, &SchedulerProcess::newMasterDetected,
+           &NewMasterDetectedMessage::pid);
+
+    handle(NO_MASTER_DETECTED, &SchedulerProcess::noMasterDetected);
 
-  ~SchedulerProcess()
+    handle(MASTER_DETECTION_FAILURE, &SchedulerProcess::masterDetectionFailure);
+
+    handle(M2F_REGISTER_REPLY, &SchedulerProcess::registerReply,
+           &FrameworkRegisteredMessage::framework_id);
+
+    handle(M2F_RESOURCE_OFFER, &SchedulerProcess::resourceOffer
+           &ResourceOfferMessage::offer_id,
+           &ResourceOfferMessage::offers,
+           &ResourceOfferMessage::pids);
+
+    handle(M2F_RESCIND_OFFER, &SchedulerProcess::rescindOffer,
+           &RescindResourceOfferMessage::offer_id);
+
+    handle(M2F_STATUS_UPDATE, &SchedulerProcess::statusUpdate,
+           &StatusUpdateMessage::framework_id,
+           &StatusUpdateMessage::status);
+
+    handle(M2F_LOST_SLAVE, &SchedulerProcess::lostSlave,
+           &LostSlaveMessage::slave_id);
+
+    handle(M2F_FRAMEWORK_MESSAGE, &SchedulerProcess::frameworkMessage,
+           &FrameworkMessageMessage::message);
+
+    handle(M2F_ERROR, &SchedulerProcess::error,
+           &FrameworkErrorMessage::code,
+           &FrameworkErrorMessage::message);
+  }
+
+  virtual ~SchedulerProcess()
   {
     // Cleanup any remaining timers.
     foreachpair (const TaskID& taskId, StatusUpdateTimer* timer, timers) {
@@ -154,165 +191,148 @@ protected:
       // above for why sending a message will still require us to use
       // the terminate flag).
       switch (serve(2)) {
-
-      case NEW_MASTER_DETECTED: {
-        const Message<NEW_MASTER_DETECTED>& msg = message();
-
-        VLOG(1) << "New master at " << msg.pid();
-
-        master = msg.pid();
-        link(master);
-
-        if (frameworkId == "") {
-          // Touched for the very first time.
-          Message<F2M_REGISTER_FRAMEWORK> out;
-          out.mutable_framework()->MergeFrom(framework);
-          send(master, out);
-        } else {
-          // Not the first time, or failing over.
-          Message<F2M_REREGISTER_FRAMEWORK> out;
-          out.mutable_framework()->MergeFrom(framework);
-          out.mutable_framework_id()->MergeFrom(frameworkId);
-          out.set_generation(generation++);
-          send(master, out);
+        case PROCESS_EXIT: {
+          // TODO(benh): Don't wait for a new master forever.
+          if (from() == master)
+            VLOG(1) << "Connection to master lost .. waiting for new master";
+          break;
         }
 
-	active = true;
-
-	break;
-      }
+        case PROCESS_TIMEOUT: {
+          break;
+        }
 
-      case NO_MASTER_DETECTED: {
-	// In this case, we don't actually invoke Scheduler::error
-	// since we might get reconnected to a master imminently.
-	active = false;
-	VLOG(1) << "No master detected, waiting for another master";
-	break;
+        default: {
+          VLOG(1) << "Received unknown message " << msgid()
+                  << " from " << from();
+          break;
+        }
       }
+    }
+  }
 
-      case MASTER_DETECTION_FAILURE: {
-	active = false;
-	// TODO(benh): Better error codes/messages!
-        int32_t code = 1;
-        const string& message = "Failed to detect master(s)";
-        invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
-	break;
-      }
+  void newMasterDetected(const string& pid)
+  {
+    VLOG(1) << "New master at " << pid;
 
-      case M2F_REGISTER_REPLY: {
-        const Message<M2F_REGISTER_REPLY>& msg = message();
-        frameworkId = msg.framework_id();
-        invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
-        break;
-      }
+    master = pid;
+    link(master);
 
-      case M2F_RESOURCE_OFFER: {
-        const Message<M2F_RESOURCE_OFFER>& msg = message();
+    if (frameworkId == "") {
+      // Touched for the very first time.
+      MSG<F2M_REGISTER_FRAMEWORK> out;
+      out.mutable_framework()->MergeFrom(framework);
+      send(master, out);
+    } else {
+      // Not the first time, or failing over.
+      MSG<F2M_REREGISTER_FRAMEWORK> out;
+      out.mutable_framework()->MergeFrom(framework);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.set_generation(generation++);
+      send(master, out);
+    }
 
-        // Construct a vector for the offers. Also save the pid
-        // associated with each slave (one per SlaveOffer) so later we
-        // can send framework messages directly.
-        vector<SlaveOffer> offers;
-
-        for (int i = 0; i < msg.offer_size(); i++) {
-          const SlaveOffer& offer = msg.offer(i);
-          PID pid(msg.pid(i));
-          CHECK(pid != PID());
-          savedOffers[msg.offer_id()][offer.slave_id()] = pid;
-          offers.push_back(offer);
-        }
+    active = true;
+  }
 
-        invoke(bind(&Scheduler::resourceOffer, sched, driver,
-                    cref(msg.offer_id()), cref(offers)));
-        break;
-      }
+  void noMasterDetected()
+  {
+    // In this case, we don't actually invoke Scheduler::error
+    // since we might get reconnected to a master imminently.
+    active = false;
+    VLOG(1) << "No master detected, waiting for another master";
+  }
 
-      case M2F_RESCIND_OFFER: {
-        const Message<M2F_RESCIND_OFFER>& msg = message();
-        savedOffers.erase(msg.offer_id());
-        invoke(bind(&Scheduler::offerRescinded, sched, driver,
-                    cref(msg.offer_id())));
-        break;
-      }
+  void masterDetectionFailure()
+  {
+    active = false;
+    // TODO(benh): Better error codes/messages!
+    int32_t code = 1;
+    const string& message = "Failed to detect master(s)";
+    invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
+  }
 
-      case M2F_STATUS_UPDATE: {
-        const Message<M2F_STATUS_UPDATE>& msg = message();
+  void registerReply(const FrameworkID& frameworkId)
+  {
+    this->frameworkId = frameworkId;
+    invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
+  }
 
-        const TaskStatus &status = msg.status();
+  void resourceOffer(const OfferID& offerId,
+                     const RepeatedPtrField<SlaveOffer>& offers,
+                     const RepeatedPtrField<string>& pids)
+  {
+    // Construct a vector for the offers. Also save the pid
+    // associated with each slave (one per SlaveOffer) so later we
+    // can send framework messages directly.
+    vector<SlaveOffer> temp;
+
+    for (int i = 0; i < offers.size(); i++) {
+      PID pid(pids.Get(i));
+      CHECK(pid != PID());
+      savedOffers[offerId][offers.Get(i).slave_id()] = pid;
+      temp.push_back(offers.Get(i));
+    }
 
-        // TODO(benh): Note that this maybe a duplicate status update!
-        // Once we get support to try and have a more consistent view
-        // of what's running in the cluster, we'll just let this one
-        // slide. The alternative is possibly dealing with a scheduler
-        // failover and not correctly giving the scheduler it's status
-        // update, which seems worse than giving a status update
-        // multiple times (of course, if a scheduler re-uses a TaskID,
-        // that could be bad.
-
-        // Stop any status update timers we might have had running.
-        if (timers.count(status.task_id()) > 0) {
-          StatusUpdateTimer* timer = timers[status.task_id()];
-          timers.erase(status.task_id());
-          send(timer->self(), MESOS_MSGID);
-          wait(timer->self());
-          delete timer;
-        }
+    invoke(bind(&Scheduler::resourceOffer, sched, driver, cref(offerId),
+                cref(temp)));
+  }
 
-        invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
+  void rescindOffer(const OfferID& offerId)
+  {
+    savedOffers.erase(offerId);
+    invoke(bind(&Scheduler::offerRescinded, sched, driver, cref(offerId)));
+  }
 
-        // Acknowledge the message (we do this last, after we invoked
-        // the scheduler, if we did at all, in case it causes a crash,
-        // since this way the message might get resent/routed after
-        // the scheduler comes back online).
-        Message<F2M_STATUS_UPDATE_ACK> out;
-        out.mutable_framework_id()->MergeFrom(frameworkId);
-        out.mutable_slave_id()->MergeFrom(status.slave_id());
-        out.mutable_task_id()->MergeFrom(status.task_id());
-        send(master, out);
-        break;
-      }
+  void statusUpdate(const FrameworkID& frameworkId, const TaskStatus& status)
+  {
+    CHECK(this->frameworkId == frameworkId);
 
-      case M2F_FRAMEWORK_MESSAGE: {
-        const Message<M2F_FRAMEWORK_MESSAGE>& msg = message();
-        invoke(bind(&Scheduler::frameworkMessage, sched, driver,
-                    cref(msg.message())));
-        break;
-      }
+    // TODO(benh): Note that this maybe a duplicate status update!
+    // Once we get support to try and have a more consistent view
+    // of what's running in the cluster, we'll just let this one
+    // slide. The alternative is possibly dealing with a scheduler
+    // failover and not correctly giving the scheduler it's status
+    // update, which seems worse than giving a status update
+    // multiple times (of course, if a scheduler re-uses a TaskID,
+    // that could be bad.
+
+    // Stop any status update timers we might have had running.
+    if (timers.count(status.task_id()) > 0) {
+      StatusUpdateTimer* timer = timers[status.task_id()];
+      timers.erase(status.task_id());
+      send(timer->self(), MESOS_MSGID);
+      wait(timer->self());
+      delete timer;
+    }
 
-      case M2F_LOST_SLAVE: {
-        const Message<M2F_LOST_SLAVE>& msg = message();
-	savedSlavePids.erase(msg.slave_id());
-        invoke(bind(&Scheduler::slaveLost, sched, driver,
-                    cref(msg.slave_id())));
-        break;
-      }
+    invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
 
-      case M2F_ERROR: {
-        const Message<M2F_ERROR>& msg = message();
-        int32_t code = msg.code();
-        const string& message = msg.message();
-        invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
-        break;
-      }
+    // Acknowledge the message (we do this last, after we invoked
+    // the scheduler, if we did at all, in case it causes a crash,
+    // since this way the message might get resent/routed after
+    // the scheduler comes back online).
+    MSG<F2M_STATUS_UPDATE_ACK> out;
+    out.mutable_framework_id()->MergeFrom(frameworkId);
+    out.mutable_slave_id()->MergeFrom(status.slave_id());
+    out.mutable_task_id()->MergeFrom(status.task_id());
+    send(master, out);
+  }
 
-      case PROCESS_EXIT: {
-	// TODO(benh): Don't wait for a new master forever.
-        if (from() == master)
-          VLOG(1) << "Connection to master lost .. waiting for new master";
-        break;
-      }
+  void lostSlave(const SlaveID& slaveId)
+  {
+    savedSlavePids.erase(slaveId);
+    invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
+  }
 
-      case PROCESS_TIMEOUT: {
-        break;
-      }
+  void frameworkMessage(const FrameworkMessage& message)
+  {
+    invoke(bind(&Scheduler::frameworkMessage, sched, driver, cref(message)));
+  }
 
-      default: {
-        VLOG(1) << "Received unknown message " << msgid()
-                << " from " << from();
-        break;
-      }
-      }
-    }
+  void error(int32_t code, const string& message)
+  {
+    invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
   }
 
   void stop()
@@ -320,7 +340,7 @@ protected:
     if (!active)
       return;
 
-    Message<F2M_UNREGISTER_FRAMEWORK> out;
+    MSG<F2M_UNREGISTER_FRAMEWORK> out;
     out.mutable_framework_id()->MergeFrom(frameworkId);
     send(master, out);
   }
@@ -330,7 +350,7 @@ protected:
     if (!active)
       return;
 
-    Message<F2M_KILL_TASK> out;
+    MSG<F2M_KILL_TASK> out;
     out.mutable_framework_id()->MergeFrom(frameworkId);
     out.mutable_task_id()->MergeFrom(taskId);
     send(master, out);
@@ -343,7 +363,7 @@ protected:
     if (!active)
       return;
 
-    Message<F2M_RESOURCE_OFFER_REPLY> out;
+    MSG<F2M_RESOURCE_OFFER_REPLY> out;
     out.mutable_framework_id()->MergeFrom(frameworkId);
     out.mutable_offer_id()->MergeFrom(offerId);
 
@@ -377,7 +397,7 @@ protected:
     if (!active)
       return;
 
-    Message<F2M_REVIVE_OFFERS> out;
+    MSG<F2M_REVIVE_OFFERS> out;
     out.mutable_framework_id()->MergeFrom(frameworkId);
     send(master, out);
   }
@@ -401,7 +421,7 @@ protected:
       CHECK(slave != PID());
 
       // TODO(benh): This is kind of wierd, M2S?
-      Message<M2S_FRAMEWORK_MESSAGE> out;
+      MSG<M2S_FRAMEWORK_MESSAGE> out;
       out.mutable_framework_id()->MergeFrom(frameworkId);
       out.mutable_message()->MergeFrom(message);
       send(slave, out);
@@ -409,7 +429,7 @@ protected:
       VLOG(1) << "Cannot send directly to slave " << message.slave_id()
 	      << "; sending through master";
 
-      Message<F2M_FRAMEWORK_MESSAGE> out;
+      MSG<F2M_FRAMEWORK_MESSAGE> out;
       out.mutable_framework_id()->MergeFrom(frameworkId);
       out.mutable_message()->MergeFrom(message);
       send(master, out);
@@ -432,8 +452,6 @@ private:
   unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
   unordered_map<SlaveID, PID> savedSlavePids;
 
-  unordered_set<TaskID> tasks;
-
   // Timers to ensure we get a status update for each task we launch.
   unordered_map<TaskID, StatusUpdateTimer *> timers;
 };

Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp Sun Jun  5 09:08:02 2011
@@ -244,8 +244,8 @@ void LxcIsolationModule::Reaper::operato
 {
   link(module->slave->self());
   while (true) {
-    switch (receive(1)) {
-    case PROCESS_TIMEOUT: {
+    receive(1);
+    if (name() == TIMEOUT) {
       // Check whether any child process has exited
       pid_t pid;
       int status;
@@ -264,10 +264,7 @@ void LxcIsolationModule::Reaper::operato
           }
         }
       }
-      break;
-    }
-    case SHUTDOWN_REAPER:
-    case PROCESS_EXIT:
+    } else if (name() == TERMINATE || name() == EXIT) {
       return;
     }
   }

Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Sun Jun  5 09:08:02 2011
@@ -23,15 +23,12 @@ public:
     LxcIsolationModule* module;
 
   protected:
-    void operator () ();
+    virtual void operator () ();
 
   public:
     Reaper(LxcIsolationModule* module);
   };
 
-  // Extra shutdown message for reaper
-  enum { SHUTDOWN_REAPER = PROCESS_MSGID };
-
   // Per-framework information object maintained in info hashmap
   struct FrameworkInfo {
     string container;    // Name of Linux container used for this framework

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun  5 09:08:02 2011
@@ -39,7 +39,7 @@ ProcessBasedIsolationModule::~ProcessBas
   // could thus lead to a seg fault!
   if (initialized) {
     CHECK(reaper != NULL);
-    Process::post(reaper->self(), SHUTDOWN_REAPER);
+    Process::post(reaper->self(), TERMINATE);
     Process::wait(reaper->self());
     delete reaper;
   }
@@ -148,8 +148,8 @@ void ProcessBasedIsolationModule::Reaper
 {
   link(module->slave->self());
   while (true) {
-    switch (receive(1)) {
-    case PROCESS_TIMEOUT: {
+    receive(1);
+    if (name() == TIMEOUT) {
       // Check whether any child process has exited.
       pid_t pid;
       int status;
@@ -171,10 +171,7 @@ void ProcessBasedIsolationModule::Reaper
           }
         }
       }
-      break;
-    }
-    case SHUTDOWN_REAPER:
-    case PROCESS_EXIT:
+    } else if (name() == TERMINATE || name() == EXIT) {
       return;
     }
   }

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Sun Jun  5 09:08:02 2011
@@ -34,15 +34,12 @@ public:
     ProcessBasedIsolationModule* module;
 
   protected:
-    void operator () ();
+    virtual void operator () ();
 
   public:
     Reaper(ProcessBasedIsolationModule* module);
   };
 
-  // Extra shutdown message for reaper
-  enum { SHUTDOWN_REAPER = PROCESS_MSGID };
-
 protected:
   // Main method executed after a fork() to create a Launcher for launching
   // an executor's process. The Launcher will create the child's working

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 09:08:02 2011
@@ -6,6 +6,8 @@
 #include <algorithm>
 #include <fstream>
 
+#include <google/protobuf/descriptor.h>
+
 #include "slave.hpp"
 #include "webui.hpp"
 
@@ -14,101 +16,61 @@
 #define gethostbyname2(name, _) gethostbyname(name)
 #endif
 
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
+using boost::unordered_map;
+using boost::unordered_set;
+
 using std::list;
 using std::make_pair;
 using std::ostringstream;
-using std::istringstream;
 using std::pair;
 using std::queue;
 using std::string;
 using std::vector;
 
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
 
-namespace {
-
-// Periodically sends heartbeats to the master
-class Heart : public MesosProcess
-{
-private:
-  PID master;
-  PID slave;
-  SlaveID sid;
-  double interval;
-
-protected:
-  void operator () ()
-  {
-    link(slave);
-    link(master);
-    do {
-      switch (receive(interval)) {
-      case PROCESS_TIMEOUT:
-	send(master, pack<SH2M_HEARTBEAT>(sid));
-	break;
-      case PROCESS_EXIT:
-	return;
-      }
-    } while (true);
-  }
-
-public:
-  Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
-    : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
-};
-
-
-// Default values for CPU cores and memory to include in configuration
-const int32_t DEFAULT_CPUS = 1;
-const int32_t DEFAULT_MEM = 1 * Gigabyte;
-
-
-} /* namespace */
-
-
-Slave::Slave(Resources _resources, bool _local,
+Slave::Slave(const Resources& _resources, bool _local,
              IsolationModule *_isolationModule)
-  : id(""), resources(_resources), local(_local),
-    isolationModule(_isolationModule) {}
+  : resources(_resources), local(_local),
+    isolationModule(_isolationModule), heart(NULL) {}
 
 
-Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
-  : id(""), conf(_conf), local(_local), isolationModule(_module)
-{
-  resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
-                        conf.get<int32_t>("mem", DEFAULT_MEM));
+Slave::Slave(const Configuration& _conf, bool _local,
+             IsolationModule* _isolationModule)
+  : conf(_conf), local(_local),
+    isolationModule(_isolationModule), heart(NULL)
+{
+  resources =
+    Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
 }
 
 
-void Slave::registerOptions(Configurator* conf)
-{
-  conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
-                           DEFAULT_CPUS);
-  conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
-                           DEFAULT_MEM);
-  conf->addOption<string>("work_dir",
-                          "Where to place framework work directories\n"
-                          "(default: MESOS_HOME/work)");
-  conf->addOption<string>("hadoop_home",
-                          "Where to find Hadoop installed (for fetching\n"
-                          "framework executors from HDFS)\n"
-                          "(default: look for HADOOP_HOME environment\n"
-                          "variable or find hadoop on PATH)");
-  conf->addOption<bool>("switch_user", 
-                        "Whether to run tasks as the user who\n"
-                        "submitted them rather than the user running\n"
-                        "the slave (requires setuid permission)",
-                        true);
-   conf->addOption<string>("frameworks_home",
-                           "Directory prepended to relative executor\n"
-                           "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* configurator)
+{
+  // TODO(benh): Is there a way to specify units for the resources?
+  configurator->addOption<string>("resources",
+                                  "Total consumable resources per slave\n");
+//   configurator->addOption<string>("attributes",
+//                                   "Attributes of machine\n");
+  configurator->addOption<string>("work_dir",
+                                  "Where to place framework work directories\n"
+                                  "(default: MESOS_HOME/work)");
+  configurator->addOption<string>("hadoop_home",
+                                  "Where to find Hadoop installed (for\n"
+                                  "fetching framework executors from HDFS)\n"
+                                  "(default: look for HADOOP_HOME in\n"
+                                  "environment or find hadoop on PATH)");
+  configurator->addOption<bool>("switch_user", 
+                                "Whether to run tasks as the user who\n"
+                                "submitted them rather than the user running\n"
+                                "the slave (requires setuid permission)",
+                                true);
+  configurator->addOption<string>("frameworks_home",
+                                  "Directory prepended to relative executor\n"
+                                  "paths (default: MESOS_HOME/frameworks)");
 }
 
 
@@ -120,25 +82,54 @@ Slave::~Slave()
 
 state::SlaveState *Slave::getState()
 {
-  std::ostringstream my_pid;
-  my_pid << self();
-  std::ostringstream master_pid;
-  master_pid << master;
+  Resources resources(resources);
+  Resource::Scalar cpus;
+  Resource::Scalar mem;
+  cpus.set_value(-1);
+  mem.set_value(-1);
+  cpus = resources.getScalar("cpus", cpus);
+  mem = resources.getScalar("mem", mem);
+
   state::SlaveState *state =
-    new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus, 
-        resources.mem, my_pid.str(), master_pid.str());
+    new state::SlaveState(BUILD_DATE, BUILD_USER, slaveId.value(),
+                          cpus.value(), mem.value(), self(), master);
 
-  foreachpair(_, Framework *f, frameworks) {
-    state::Framework *framework = new state::Framework(f->id, f->name, 
-        f->executorInfo.uri, f->executorStatus, f->resources.cpus,
-        f->resources.mem);
-    state->frameworks.push_back(framework);
-    foreachpair(_, Task *t, f->tasks) {
-      state::Task *task = new state::Task(t->id, t->name, t->state,
-          t->resources.cpus, t->resources.mem);
-      framework->tasks.push_back(task);
-    }
-  }
+//   foreachpair (_, Framework *f, frameworks) {
+
+//     foreachpair (_, Executor* e, f->executors) {
+
+//       Resources resources(e->resources);
+//       Resource::Scalar cpus;
+//       Resource::Scalar mem;
+//       cpus.set_value(-1);
+//       mem.set_value(-1);
+//       cpus = resources.getScalar("cpus", cpus);
+//       mem = resources.getScalar("mem", mem);
+
+//     state::Framework *framework =
+//       new state::Framework(f->frameworkId.value(), f->info.name(),
+//                            f->info.executor().uri(), f->executorStatus,
+//                            cpus.value(), mem.value());
+
+//     state->frameworks.push_back(framework);
+
+//     foreachpair(_, Task *t, f->tasks) {
+//       Resources resources(t->resources());
+//       Resource::Scalar cpus;
+//       Resource::Scalar mem;
+//       cpus.set_value(-1);
+//       mem.set_value(-1);
+//       cpus = resources.getScalar("cpus", cpus);
+//       mem = resources.getScalar("mem", mem);
+
+//       state::Task *task =
+//         new state::Task(t->task_id().value(), t->name(),
+//                         TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+//                         cpus.value(), mem.value());
+
+//       framework->tasks.push_back(task);
+//     }
+//   }
 
   return state;
 }
@@ -147,61 +138,60 @@ state::SlaveState *Slave::getState()
 void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
+  LOG(INFO) << "Slave resources: " << resources;
 
   // Get our hostname
-  char buf[512];
+  char buf[256];
   gethostname(buf, sizeof(buf));
   hostent *he = gethostbyname2(buf, AF_INET);
   string hostname = he->h_name;
 
-  // Get our public Web UI URL. Normally this is our hostname, but on EC2
+  // Get our public DNS name. Normally this is our hostname, but on EC2
   // we look for the MESOS_PUBLIC_DNS environment variable. This allows
   // the master to display our public name in its web UI.
-  LOG(INFO) << "setting up webUIUrl on port " << conf["webui_port"];
-  string webUIUrl;
+  string public_hostname = hostname;
   if (getenv("MESOS_PUBLIC_DNS") != NULL) {
-    webUIUrl = getenv("MESOS_PUBLIC_DNS");
-  } else {
-    webUIUrl = hostname;
+    public_hostname = getenv("MESOS_PUBLIC_DNS");
   }
-#ifdef MESOS_WEBUI
-  webUIUrl += ":" + conf["webui_port"];
-#endif
+
+  SlaveInfo slave;
+  slave.set_hostname(hostname);
+  slave.set_public_hostname(public_hostname);
+  slave.mutable_resources()->MergeFrom(resources);
 
   // Initialize isolation module.
   isolationModule->initialize(this);
 
   while (true) {
-    switch (receive()) {
+    switch (receive(1)) {
       case NEW_MASTER_DETECTED: {
-	string masterSeq;
-	PID masterPid;
-	tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
+        const MSG<NEW_MASTER_DETECTED>& msg = message();
 
-	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+	LOG(INFO) << "New master at " << msg.pid();
 
-        redirect(master, masterPid);
-	master = masterPid;
+	master = msg.pid();
 	link(master);
 
-	if (id.empty()) {
+	if (slaveId == "") {
 	  // Slave started before master.
-	  send(master, pack<S2M_REGISTER_SLAVE>(hostname, webUIUrl, resources));
+          MSG<S2M_REGISTER_SLAVE> out;
+          out.mutable_slave()->MergeFrom(slave);
+	  send(master, out);
 	} else {
-	  // Reconnecting, so reconstruct resourcesInUse for the master.
-	  Resources resourcesInUse; 
-	  vector<Task> taskVec;
-
-	  foreachpair(_, Framework *framework, frameworks) {
-	    foreachpair(_, Task *task, framework->tasks) {
-	      resourcesInUse += task->resources;
-	      Task ti = *task;
-	      ti.slaveId = id;
-	      taskVec.push_back(ti);
-	    }
-	  }
+	  // Re-registering, so send tasks running.
+          MSG<S2M_REREGISTER_SLAVE> out;
+          out.mutable_slave_id()->MergeFrom(slaveId);
+          out.mutable_slave()->MergeFrom(slave);
+
+	  foreachpair (_, Framework* framework, frameworks) {
+	    foreachpair (_, Executor* executor, framework->executors) {
+              foreachpair (_, Task* task, executor->tasks) {
+                out.add_task()->MergeFrom(*task);
+              }
+            }
+          }
 
-	  send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, webUIUrl, resources, taskVec));
+	  send(master, out);
 	}
 	break;
       }
@@ -211,231 +201,351 @@ void Slave::operator () ()
 	break;
       }
 
+      case MASTER_DETECTION_FAILURE: {
+	LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
+	break;
+      }
+
       case M2S_REGISTER_REPLY: {
-	double interval = 0;
-        tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
-        LOG(INFO) << "Registered with master; given slave ID " << this->id;
-        link(spawn(new Heart(master, self(), this->id, interval)));
+        const MSG<M2S_REGISTER_REPLY>& msg = message();
+        slaveId = msg.slave_id();
+
+        LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+
+        heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
+        link(spawn(heart));
         break;
       }
       
       case M2S_REREGISTER_REPLY: {
-        SlaveID sid;
-	double interval = 0;
-        tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
-        LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
-        if (this->id == "")
-          this->id = sid;
-        CHECK(this->id == sid);
-        link(spawn(new Heart(master, self(), this->id, interval)));
+        const MSG<M2S_REREGISTER_REPLY>& msg = message();
+
+        LOG(INFO) << "Re-registered with master";
+
+        if (!(slaveId == msg.slave_id())) {
+          LOG(FATAL) << "Slave re-registered but got wrong ID";
+        }
+
+        if (heart != NULL) {
+          send(heart->self(), MESOS_MSGID);
+          wait(heart->self());
+          delete heart;
+        }
+
+        heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
+        link(spawn(heart));
         break;
       }
       
       case M2S_RUN_TASK: {
-	FrameworkID fid;
-        TaskID tid;
-        string fwName, user, taskName, taskArg;
-        ExecutorInfo execInfo;
-        Params params;
-        PID pid;
-        tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
-          unpack<M2S_RUN_TASK>(body());
-        LOG(INFO) << "Got assigned task " << fid << ":" << tid;
-        Resources res;
-        res.cpus = params.getInt32("cpus", -1);
-        res.mem = params.getInt64("mem", -1);
-        Framework *framework = getFramework(fid);
+        const MSG<M2S_RUN_TASK>& msg = message();
+
+        const TaskDescription& task = msg.task();
+
+        LOG(INFO) << "Got assigned task " << task.task_id()
+                  << " for framework " << msg.framework_id();
+
+        Framework *framework = getFramework(msg.framework_id());
         if (framework == NULL) {
-          // Framework not yet created on this node - create it.
-          framework = new Framework(fid, fwName, user, execInfo, pid);
-          frameworks[fid] = framework;
-          isolationModule->startExecutor(framework);
-        }
-        Task *task = framework->addTask(tid, taskName, res);
-        Executor *executor = getExecutor(fid);
-        if (executor) {
-          send(executor->pid,
-               pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
-          isolationModule->resourcesChanged(framework);
+          framework =
+            new Framework(msg.framework_id(), msg.framework(), msg.pid());
+          frameworks[msg.framework_id()] = framework;
+        }
+
+        // Either send the task to an executor or start a new executor
+        // and queue the task until the executor has started.
+        Executor* executor = task.has_executor()
+          ? framework->getExecutor(task.executor().executor_id())
+          : framework->getExecutor(framework->info.executor().executor_id());
+        
+        if (executor != NULL) {
+          if (!executor->pid) {
+            // Queue task until the executor starts up.
+            executor->queuedTasks.push_back(task);
+          } else {
+            // Add the task to the executor.
+            executor->addTask(task);
+
+            MSG<S2E_RUN_TASK> out;
+            out.mutable_framework()->MergeFrom(framework->info);
+            out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+            out.set_pid(framework->pid);
+            out.mutable_task()->MergeFrom(task);
+            send(executor->pid, out);
+            isolationModule->resourcesChanged(framework, executor);
+          }
         } else {
-          // Executor not yet registered; queue task for when it starts up
-          TaskDescription *td = new TaskDescription(
-              tid, taskName, taskArg, params.str());
-          framework->queuedTasks.push_back(td);
+          // Launch an executor for this task.
+          if (task.has_executor()) {
+            executor = framework->createExecutor(task.executor());
+          } else {
+            executor = framework->createExecutor(framework->info.executor());
+          }
+
+          // Queue task until the executor starts up.
+          executor->queuedTasks.push_back(task);
+
+          // Tell the isolation module to launch the executor.
+          isolationModule->launchExecutor(framework, executor);
         }
         break;
       }
 
       case M2S_KILL_TASK: {
-        FrameworkID fid;
-        TaskID tid;
-        tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
-        LOG(INFO) << "Killing task " << fid << ":" << tid;
-        if (Executor *ex = getExecutor(fid)) {
-          send(ex->pid, pack<S2E_KILL_TASK>(tid));
-        }
-        if (Framework *fw = getFramework(fid)) {
-          fw->removeTask(tid);
-          isolationModule->resourcesChanged(fw);
+        const MSG<M2S_KILL_TASK>& msg = message();
+
+        LOG(INFO) << "Asked to kill task " << msg.task_id()
+                  << " of framework " << msg.framework_id();
+
+        Framework* framework = getFramework(msg.framework_id());
+        if (framework != NULL) {
+          // Tell the executor to kill the task if it is up and
+          // running, otherwise, consider the task lost.
+          Executor* executor = framework->getExecutor(msg.task_id());
+          if (executor == NULL || !executor->pid) {
+            // Update the resources locally, if an executor comes up
+            // after this then it just won't receive this task.
+            executor->removeTask(msg.task_id());
+            isolationModule->resourcesChanged(framework, executor);
+
+            MSG<S2M_STATUS_UPDATE> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            TaskStatus *status = out.mutable_status();
+            status->mutable_task_id()->MergeFrom(msg.task_id());
+            status->mutable_slave_id()->MergeFrom(slaveId);
+            status->set_state(TASK_LOST);
+            send(master, out);
+
+            double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+            framework->statuses[deadline][status->task_id()] = *status;
+          } else {
+            // Otherwise, send a message to the executor and wait for
+            // it to send us a status update.
+            MSG<S2E_KILL_TASK> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            out.mutable_task_id()->MergeFrom(msg.task_id());
+            send(executor->pid, out);
+          }
+        } else {
+          LOG(WARNING) << "Cannot kill task " << msg.task_id()
+                       << " of framework " << msg.framework_id()
+                       << " because no such framework is running";
+
+          MSG<S2M_STATUS_UPDATE> out;
+          out.mutable_framework_id()->MergeFrom(msg.framework_id());
+          TaskStatus *status = out.mutable_status();
+          status->mutable_task_id()->MergeFrom(msg.task_id());
+          status->mutable_slave_id()->MergeFrom(slaveId);
+          status->set_state(TASK_LOST);
+          send(master, out);
+
+          double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+          framework->statuses[deadline][status->task_id()] = *status;
         }
         break;
       }
 
       case M2S_KILL_FRAMEWORK: {
-        FrameworkID fid;
-        tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
-        LOG(INFO) << "Asked to kill framework " << fid;
-        Framework *fw = getFramework(fid);
-        if (fw != NULL)
-          killFramework(fw);
+        const MSG<M2S_KILL_FRAMEWORK>&msg = message();
+
+        LOG(INFO) << "Asked to kill framework " << msg.framework_id();
+
+        Framework *framework = getFramework(msg.framework_id());
+        if (framework != NULL)
+          killFramework(framework);
         break;
       }
 
       case M2S_FRAMEWORK_MESSAGE: {
-        FrameworkID fid;
-        FrameworkMessage message;
-        tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
-        if (Executor *ex = getExecutor(fid)) {
-          VLOG(1) << "Relaying framework message for framework " << fid;
-          send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+        const MSG<M2S_FRAMEWORK_MESSAGE>&msg = message();
+
+        Framework* framework = getFramework(msg.framework_id());
+        if (framework != NULL) {
+          const FrameworkMessage& message = msg.message();
+
+          Executor* executor = framework->getExecutor(message.executor_id());
+          if (executor == NULL) {
+            LOG(WARNING) << "Dropping message for executor '"
+                         << message.executor_id() << "' of framework "
+                         << msg.framework_id()
+                         << " because executor does not exist";
+          } else if (!executor->pid) {
+            // TODO(*): If executor is not started, queue framework message?
+            // (It's probably okay to just drop it since frameworks can have
+            // the executor send a message to the master to say when it's ready.)
+            LOG(WARNING) << "Dropping message for executor '"
+                         << message.executor_id() << "' of framework "
+                         << msg.framework_id()
+                         << " because executor is not running";
+          } else {
+            MSG<S2E_FRAMEWORK_MESSAGE> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            out.mutable_message()->MergeFrom(message);
+            send(executor->pid, out);
+          }
         } else {
-          VLOG(1) << "Dropping framework message for framework " << fid
-                  << " because its executor is not running";
+          LOG(WARNING) << "Dropping message for framework "
+                       << msg.framework_id()
+                       << " because it does not exist";
+        }
+        break;
+      }
+
+      case M2S_UPDATE_FRAMEWORK: {
+        const MSG<M2S_UPDATE_FRAMEWORK>&msg = message();
+
+        Framework *framework = getFramework(msg.framework_id());
+        if (framework != NULL) {
+          LOG(INFO) << "Updating framework " << msg.framework_id()
+                    << " pid to " << msg.pid();
+          framework->pid = msg.pid();
         }
-        // TODO(*): If executor is not started, queue framework message?
-        // (It's probably okay to just drop it since frameworks can have
-        // the executor send a message to the master to say when it's ready.)
         break;
       }
 
-      case M2S_UPDATE_FRAMEWORK_PID: {
-        FrameworkID fid;
-        PID pid;
-        tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
-        Framework *framework = getFramework(fid);
+      case M2S_STATUS_UPDATE_ACK: {
+        const MSG<M2S_STATUS_UPDATE_ACK>& msg = message();
+
+        Framework* framework = getFramework(msg.framework_id());
         if (framework != NULL) {
-          LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
-          framework->pid = pid;
+          foreachpair (double deadline, _, framework->statuses) {
+            if (framework->statuses[deadline].count(msg.task_id()) > 0) {
+              LOG(INFO) << "Got acknowledgement of status update"
+                        << " for task " << msg.task_id()
+                        << " of framework " << framework->frameworkId;
+              framework->statuses[deadline].erase(msg.task_id());
+              break;
+            }
+          }
         }
         break;
       }
 
       case E2S_REGISTER_EXECUTOR: {
-        FrameworkID fid;
-        tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
-        LOG(INFO) << "Got executor registration for framework " << fid;
-        if (Framework *fw = getFramework(fid)) {
-          if (getExecutor(fid) != 0) {
-            LOG(ERROR) << "Executor for framework " << fid
-                       << "already exists";
-            send(from(), pack<S2E_KILL_EXECUTOR>());
-            break;
+        const MSG<E2S_REGISTER_EXECUTOR>& msg = message();
+
+        LOG(INFO) << "Got registration for executor '"
+                  << msg.executor_id() << "' of framework "
+                  << msg.framework_id();
+
+        Framework* framework = getFramework(msg.framework_id());
+        if (framework != NULL) {
+          Executor* executor = framework->getExecutor(msg.executor_id());
+
+          // Check the status of the executor.
+          if (executor == NULL) {
+            LOG(WARNING) << "Not expecting executor '" << msg.executor_id()
+                         << "' of framework " << msg.framework_id();
+            send(from(), S2E_KILL_EXECUTOR);
+          } else if (executor->pid != PID()) {
+            LOG(WARNING) << "Not good, executor '" << msg.executor_id()
+                         << "' of framework " << msg.framework_id()
+                         << " is already running";
+            send(from(), S2E_KILL_EXECUTOR);
+          } else {
+            // Save the pid for the executor.
+            executor->pid = from();
+
+            // Now that the executor is up, set its resource limits.
+            isolationModule->resourcesChanged(framework, executor);
+
+            // Tell executor it's registered and give it any queued tasks.
+            MSG<S2E_REGISTER_REPLY> out;
+            ExecutorArgs* args = out.mutable_args();
+            args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+            args->set_name(framework->info.name());
+            args->mutable_slave_id()->MergeFrom(slaveId);
+            args->set_hostname(hostname);
+            args->set_data(framework->info.executor().data());
+            send(executor->pid, out);
+            sendQueuedTasks(framework, executor);
           }
-          Executor *executor = new Executor(fid, from());
-          executors[fid] = executor;
-          link(from());
-          // Now that the executor is up, set its resource limits
-          isolationModule->resourcesChanged(fw);
-          // Tell executor that it's registered and give it its queued tasks
-          send(from(), pack<S2E_REGISTER_REPLY>(this->id,
-                                                hostname,
-                                                fw->name,
-                                                fw->executorInfo.initArg));
-          sendQueuedTasks(fw);
         } else {
-          // Framework is gone; tell the executor to exit
-          send(from(), pack<S2E_KILL_EXECUTOR>());
+          // Framework is gone; tell the executor to exit.
+          LOG(WARNING) << "Framework " << msg.framework_id()
+                       << " does not exist (it may have been killed),"
+                       << " telling executor to exit";
+
+          // TODO(benh): Don't we also want to tell the isolation
+          // module to shut this guy down!
+          send(from(), S2E_KILL_EXECUTOR);
         }
         break;
       }
 
       case E2S_STATUS_UPDATE: {
-        FrameworkID fid;
-        TaskID tid;
-        TaskState taskState;
-        string data;
-        tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
+        const MSG<E2S_STATUS_UPDATE>& msg = message();
 
-        Framework *framework = getFramework(fid);
-        if (framework != NULL) {
-	  LOG(INFO) << "Got status update for task " << fid << ":" << tid;
-	  if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
-	      taskState == TASK_KILLED || taskState == TASK_LOST) {
-	    LOG(INFO) << "Task " << fid << ":" << tid << " done";
+        const TaskStatus& status = msg.status();
 
-            framework->removeTask(tid);
-            isolationModule->resourcesChanged(framework);
-          }
+	LOG(INFO) << "Status update: task " << status.task_id()
+		  << " of framework " << msg.framework_id()
+		  << " is now in state "
+		  << TaskState_descriptor()->FindValueByNumber(status.state())->name();
 
-	  // Reliably send message and save sequence number for
-	  // canceling later.
-	  int seq = rsend(master, framework->pid,
-			  pack<S2M_STATUS_UPDATE>(id, fid, tid,
-                                                  taskState, data));
-	  seqs[fid].insert(seq);
+        Framework *framework = getFramework(msg.framework_id());
+        if (framework != NULL) {
+          Executor* executor = framework->getExecutor(status.task_id());
+          if (executor != NULL) {
+            if (status.state() == TASK_FINISHED ||
+                status.state() == TASK_FAILED ||
+                status.state() == TASK_KILLED ||
+                status.state() == TASK_LOST) {
+              executor->removeTask(status.task_id());
+              isolationModule->resourcesChanged(framework, executor);
+            }
+
+            // Send message and record the status for possible resending.
+            MSG<S2M_STATUS_UPDATE> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            out.mutable_status()->MergeFrom(status);
+            send(master, out);
+
+            double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+            framework->statuses[deadline][status.task_id()] = status;
+          } else {
+            LOG(WARNING) << "Status update error: couldn't lookup "
+                         << "executor for framework " << msg.framework_id();
+          }
 	} else {
-	  LOG(WARNING) << "Got status update for UNKNOWN task "
-		       << fid << ":" << tid;
+          LOG(WARNING) << "Status update error: couldn't lookup "
+                       << "framework " << msg.framework_id();
 	}
         break;
       }
 
       case E2S_FRAMEWORK_MESSAGE: {
-        FrameworkID fid;
-        FrameworkMessage message;
-        tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
+        const MSG<E2S_FRAMEWORK_MESSAGE>& msg = message();
+
+        const FrameworkMessage& message = msg.message();
 
-        Framework *framework = getFramework(fid);
+        Framework *framework = getFramework(msg.framework_id());
         if (framework != NULL) {
-	  LOG(INFO) << "Sending message for framework " << fid
+	  LOG(INFO) << "Sending message for framework "
+                    << framework->frameworkId
 		    << " to " << framework->pid;
 
-          // Set slave ID in case framework omitted it.
-          message.slaveId = this->id;
-          VLOG(1) << "Sending framework message to framework " << fid
-                  << " with PID " << framework->pid;
-          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+          // TODO(benh): This is weird, sending an M2F message.
+          MSG<M2F_FRAMEWORK_MESSAGE> out;
+          out.mutable_framework_id()->MergeFrom(msg.framework_id());
+          out.mutable_message()->MergeFrom(message);
+          out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
+          send(framework->pid, out);
         }
         break;
       }
 
       case S2S_GET_STATE: {
- 	send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
-	break;
-      }
-
-      case PROCESS_EXIT: {
-        LOG(INFO) << "Process exited: " << from();
-
-        if (from() == master) {
-	  LOG(WARNING) << "Master disconnected! "
-		       << "Waiting for a new master to be elected.";
-	  // TODO(benh): After so long waiting for a master, commit suicide.
-	} else {
-	  // Check if an executor has exited (this is technically
-	  // redundant because the isolation module should be doing
-	  // this for us).
-	  foreachpair (_, Executor *ex, executors) {
-	    if (from() == ex->pid) {
-	      LOG(INFO) << "Executor for framework " << ex->frameworkId
-			<< " disconnected";
-	      Framework *framework = getFramework(ex->frameworkId);
-	      if (framework != NULL) {
-		send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
-		killFramework(framework);
-	      }
-	      break;
-	    }
-	  }
-	}
-
+        state::SlaveState *state = getState();
+        MSG<S2S_GET_STATE_REPLY> out;
+        out.set_pointer((char *) &state, sizeof(state));
+        send(from(), out);
         break;
       }
 
       case M2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by master: " << from();
-        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
-        foreachpair (_, Framework *framework, frameworksCopy) {
+        foreachpaircopy (_, Framework *framework, frameworks) {
           killFramework(framework);
         }
         return;
@@ -443,16 +553,46 @@ void Slave::operator () ()
 
       case S2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by " << from();
-        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
-        foreachpair (_, Framework *framework, frameworksCopy) {
+        foreachpaircopy (_, Framework *framework, frameworks) {
           killFramework(framework);
         }
         return;
       }
 
+      case PROCESS_EXIT: {
+        LOG(INFO) << "Process exited: " << from();
+
+        if (from() == master) {
+	  LOG(WARNING) << "Master disconnected! "
+		       << "Waiting for a new master to be elected.";
+	  // TODO(benh): After so long waiting for a master, commit suicide.
+	}
+        break;
+      }
+
+      case PROCESS_TIMEOUT: {
+        // Check and see if we should re-send any status updates.
+        foreachpair (_, Framework* framework, frameworks) {
+          foreachpair (double deadline, _, framework->statuses) {
+            if (deadline <= elapsed()) {
+              foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
+                LOG(WARNING) << "Resending status update"
+                             << " for task " << status.task_id()
+                             << " of framework " << framework->frameworkId;
+                MSG<S2M_STATUS_UPDATE> out;
+                out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+                out.mutable_status()->MergeFrom(status);
+                send(master, out);
+              }
+            }
+          }
+        }
+        break;
+      }
+
       default: {
-        LOG(ERROR) << "Received unknown message ID " << msgid()
-                   << " from " << from();
+        LOG(ERROR) << "Received unknown message (" << msgid()
+                   << ") from " << from();
         break;
       }
     }
@@ -460,70 +600,66 @@ void Slave::operator () ()
 }
 
 
-Framework * Slave::getFramework(FrameworkID frameworkId)
+Framework* Slave::getFramework(const FrameworkID& frameworkId)
 {
-  FrameworkMap::iterator it = frameworks.find(frameworkId);
-  if (it == frameworks.end()) return NULL;
-  return it->second;
-}
-
+  if (frameworks.count(frameworkId) > 0) {
+    return frameworks[frameworkId];
+  }
 
-Executor * Slave::getExecutor(FrameworkID frameworkId)
-{
-  ExecutorMap::iterator it = executors.find(frameworkId);
-  if (it == executors.end()) return NULL;
-  return it->second;
+  return NULL;
 }
 
 
 // Send any tasks queued up for the given framework to its executor
 // (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework *framework)
+void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
 {
-  LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
-  Executor *executor = getExecutor(framework->id);
-  if (!executor) return;
-  foreach(TaskDescription *td, framework->queuedTasks) {
-    send(executor->pid,
-        pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
-    delete td;
+  LOG(INFO) << "Flushing queued tasks for framework "
+            << framework->frameworkId;
+
+  CHECK(executor->pid != PID());
+
+  foreach (const TaskDescription& task, executor->queuedTasks) {
+    // Add the task to the executor.
+    executor->addTask(task);
+
+    MSG<S2E_RUN_TASK> out;
+    out.mutable_framework()->MergeFrom(framework->info);
+    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+    out.set_pid(framework->pid);
+    out.mutable_task()->MergeFrom(task);
+    send(executor->pid, out);
   }
-  framework->queuedTasks.clear();
+
+  executor->queuedTasks.clear();
 }
 
 
 // Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutor)
+void Slave::killFramework(Framework *framework, bool killExecutors)
 {
-  LOG(INFO) << "Cleaning up framework " << framework->id;
+  LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
 
-  // Cancel sending any reliable messages for this framework.
-  foreach (int seq, seqs[framework->id])
-    cancel(seq);
+  // Shutdown all executors of this framework.
+  foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
+    if (killExecutors) {
+      LOG(INFO) << "Killing executor '" << executorId
+                << "' of framework " << framework->frameworkId;
 
-  seqs.erase(framework->id);
+      send(executor->pid, S2E_KILL_EXECUTOR);
 
-  // Remove its allocated resources.
-  framework->resources = Resources();
-
-  // If an executor is running, tell it to exit and kill it.
-  if (Executor *ex = getExecutor(framework->id)) {
-    if (killExecutor) {
-      LOG(INFO) << "Killing executor for framework " << framework->id;
       // TODO(benh): There really isn't ANY time between when an
       // executor gets a S2E_KILL_EXECUTOR message and the isolation
       // module goes and kills it. We should really think about making
       // the semantics of this better.
-      send(ex->pid, pack<S2E_KILL_EXECUTOR>());
-      isolationModule->killExecutor(framework);
+
+      isolationModule->killExecutor(framework, executor);
     }
 
-    LOG(INFO) << "Cleaning up executor for framework " << framework->id;
-    delete ex;
-    executors.erase(framework->id);
+    framework->destroyExecutor(executorId);
   }
 
-  frameworks.erase(framework->id);
+  frameworks.erase(framework->frameworkId);
   delete framework;
 }
 
@@ -531,18 +667,44 @@ void Slave::killFramework(Framework *fra
 // Called by isolation module when an executor process exits
 // TODO(benh): Make this callback be a message so that we can avoid
 // race conditions.
-void Slave::executorExited(FrameworkID fid, int status)
+void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status)
 {
-  if (Framework *f = getFramework(fid)) {
-    LOG(INFO) << "Executor for framework " << fid << " exited "
-              << "with status " << status;
-    send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
-    killFramework(f, false);
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(executorId);
+    if (executor != NULL) {
+      LOG(INFO) << "Exited executor '" << executorId
+                << "' of framework " << frameworkId
+                << " with status " << status;
+
+      MSG<S2M_EXITED_EXECUTOR> out;
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_executor_id()->MergeFrom(executorId);
+      out.set_status(status);
+      send(master, out);
+
+      framework->destroyExecutor(executorId);
+
+      // TODO(benh): When should we kill the presence of an entire
+      // framework on a slave?
+      if (framework->executors.size() == 0) {
+        killFramework(framework);
+      }
+    } else {
+      LOG(WARNING) << "UNKNOWN executor '" << executorId
+                   << "' of framework " << frameworkId
+                   << " has exited with status " << status;
+    }
+  } else {
+    LOG(WARNING) << "UNKNOWN executor '" << executorId
+                 << "' of UNKNOWN framework " << frameworkId
+                 << " has exited with status " << status;
   }
 };
 
 
-string Slave::getUniqueWorkDirectory(FrameworkID fid)
+string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId)
 {
   string workDir;
   if (conf.contains("work_dir")) {
@@ -554,7 +716,7 @@ string Slave::getUniqueWorkDirectory(Fra
   }
 
   ostringstream os(std::ios_base::app | std::ios_base::out);
-  os << workDir << "/slave-" << id << "/fw-" << fid;
+  os << workDir << "/slave-" << slaveId << "/fw-" << frameworkId;
 
   // Find a unique directory based on the path given by the slave
   // (this is because we might launch multiple executors from the same
@@ -575,7 +737,7 @@ string Slave::getUniqueWorkDirectory(Fra
 }
 
 
-const Params& Slave::getConf()
+const Configuration& Slave::getConfiguration()
 {
   return conf;
 }

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun  5 09:08:02 2011
@@ -27,7 +27,7 @@
 #include <sys/types.h>
 #include <sys/wait.h>
 
-#include <reliable.hpp>
+#include <process.hpp>
 
 #include "isolation_module.hpp"
 #include "state.hpp"
@@ -195,7 +195,7 @@ protected:
     do {
       switch (receive(interval)) {
         case PROCESS_TIMEOUT: {
-          Message<SH2M_HEARTBEAT> msg;
+          MSG<SH2M_HEARTBEAT> msg;
           msg.mutable_slave_id()->MergeFrom(slaveId);
           send(master, msg);
           break;

Modified: incubator/mesos/trunk/src/slave/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.cpp (original)
+++ incubator/mesos/trunk/src/slave/webui.cpp Sun Jun  5 09:08:02 2011
@@ -96,7 +96,7 @@ public:
     receive();
     CHECK(msgid() == S2S_GET_STATE_REPLY);
 
-    const Message<S2S_GET_STATE_REPLY>& msg = message();
+    const MSG<S2S_GET_STATE_REPLY>& msg = message();
 
     slaveState =
       *(state::SlaveState **) msg.pointer().data();

Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun  5 09:08:02 2011
@@ -490,7 +490,7 @@ TEST(MasterTest, SlavePartitioned)
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .WillOnce(Trigger(&slaveLostCall));
 
-  EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
+  EXPECT_MSG(filter, Eq(MesosProcess::names[SH2M_HEARTBEAT]), _, _)
     .WillRepeatedly(Return(true));
 
   driver.start();
@@ -774,7 +774,7 @@ TEST(MasterTest, SchedulerFailoverStatus
   EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
     .Times(1);
 
-  EXPECT_MSG(filter, Eq(M2F_STATUS_UPDATE), _, Ne(master))
+  EXPECT_MSG(filter, Eq(MesosProcess::names[M2F_STATUS_UPDATE]), _, Ne(master))
     .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
     .RetiresOnSaturation();
 

Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Sun Jun  5 09:08:02 2011
@@ -10,6 +10,8 @@
 
 #include <gmock/gmock.h>
 
+#include <messaging/messages.hpp>
+
 
 namespace mesos { namespace internal { namespace test {
 
@@ -99,8 +101,8 @@ public:
  */
 class MockFilter : public Filter
 {
- public:
-  MOCK_METHOD1(filter, bool(struct msg *));
+public:
+  MOCK_METHOD1(filter, bool(Message *));
 };
 
 
@@ -108,9 +110,9 @@ class MockFilter : public Filter
  * A message can be matched against in conjunction with the MockFilter
  * (see above) to perform specific actions based for messages.
  */
-MATCHER_P3(MsgMatcher, id, from, to, "")
+MATCHER_P3(MsgMatcher, name, from, to, "")
 {
-  return (testing::Matcher<MSGID>(id).Matches(arg->id) &&
+  return (testing::Matcher<std::string>(name).Matches(arg->name) &&
           testing::Matcher<PID>(from).Matches(arg->from) &&
           testing::Matcher<PID>(to).Matches(arg->to));
 }
@@ -121,8 +123,8 @@ MATCHER_P3(MsgMatcher, id, from, to, "")
  * using the message matcher (see above) as well as the MockFilter
  * (see above).
  */
-#define EXPECT_MSG(filter, id, from, to)                \
-  EXPECT_CALL(filter, filter(MsgMatcher(id, from, to)))
+#define EXPECT_MSG(filter, name, from, to)                \
+  EXPECT_CALL(filter, filter(MsgMatcher(name, from, to)))
 
 
 /**

Modified: incubator/mesos/trunk/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/Makefile.in?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/third_party/libprocess/Makefile.in Sun Jun  5 09:08:02 2011
@@ -15,13 +15,16 @@ CXXFLAGS += -I@abs_srcdir@/third_party/l
 # Add Boost to CXXFLAGS.
 CXXFLAGS += -I@abs_srcdir@/third_party/boost-1.37.0
 
+# Add http parser to CXXFLAGS.
+CXXFLAGS += -I@abs_srcdir@/third_party/ry-http-parser-1c3624a
+
 # Add -fPIC to CXXFLAGS.
-CXXFLAGS += -fPIC
+CXXFLAGS += -fPIC -D_XOPEN_SOURCE
 
 # Add dependency tracking to CXXFLAGS.
 CXXFLAGS += -MMD -MP
 
-LIB_OBJ = process.o pid.o reliable.o fatal.o
+LIB_OBJ = process.o pid.o fatal.o tokenize.o
 LIB = libprocess.a
 
 OBJS = $(LIB_OBJ)
@@ -36,10 +39,12 @@ $(LIB_OBJ): %.o: @abs_srcdir@/%.cpp 
 	$(CXX) -c $(CXXFLAGS) -o $@ $<
 
 $(LIB): $(LIB_OBJ)
-	$(AR) rcs $@ $^
+	$(AR) rcs $@ $^ third_party/ry-http-parser-1c3624a/http_parser_g.o
 
 third_party:
 	$(MAKE) -C @abs_builddir@/third_party/libev-3.8
+	cp -r @abs_srcdir@/third_party/ry-http-parser-1c3624a @abs_builddir@/third_party/
+	$(MAKE) -C @abs_builddir@/third_party/ry-http-parser-1c3624a http_parser_g.o
 
 all: third_party $(LIBS)