You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:07:21 UTC

svn commit: r1132249 - /incubator/mesos/trunk/src/slave/slave.cpp

Author: benh
Date: Sun Jun  5 09:07:21 2011
New Revision: 1132249

URL: http://svn.apache.org/viewvc?rev=1132249&view=rev
Log:
Don't use HOST_NAME_MAX because it is not defined on all platforms

Modified:
    incubator/mesos/trunk/src/slave/slave.cpp

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132249&r1=1132248&r2=1132249&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 09:07:21 2011
@@ -6,8 +6,6 @@
 #include <algorithm>
 #include <fstream>
 
-#include <google/protobuf/descriptor.h>
-
 #include "slave.hpp"
 #include "webui.hpp"
 
@@ -16,61 +14,101 @@
 #define gethostbyname2(name, _) gethostbyname(name)
 #endif
 
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using boost::unordered_map;
-using boost::unordered_set;
-
 using std::list;
 using std::make_pair;
 using std::ostringstream;
+using std::istringstream;
 using std::pair;
 using std::queue;
 using std::string;
 using std::vector;
 
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
 
-Slave::Slave(const Resources& _resources, bool _local,
+namespace {
+
+// Periodically sends heartbeats to the master
+class Heart : public MesosProcess
+{
+private:
+  PID master;
+  PID slave;
+  SlaveID sid;
+  double interval;
+
+protected:
+  void operator () ()
+  {
+    link(slave);
+    link(master);
+    do {
+      switch (receive(interval)) {
+      case PROCESS_TIMEOUT:
+	send(master, pack<SH2M_HEARTBEAT>(sid));
+	break;
+      case PROCESS_EXIT:
+	return;
+      }
+    } while (true);
+  }
+
+public:
+  Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
+    : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
+};
+
+
+// Default values for CPU cores and memory to include in configuration
+const int32_t DEFAULT_CPUS = 1;
+const int32_t DEFAULT_MEM = 1 * Gigabyte;
+
+
+} /* namespace */
+
+
+Slave::Slave(Resources _resources, bool _local,
              IsolationModule *_isolationModule)
-  : resources(_resources), local(_local),
-    isolationModule(_isolationModule), heart(NULL) {}
+  : id(""), resources(_resources), local(_local),
+    isolationModule(_isolationModule) {}
 
 
-Slave::Slave(const Configuration& _conf, bool _local,
-             IsolationModule* _isolationModule)
-  : conf(_conf), local(_local),
-    isolationModule(_isolationModule), heart(NULL)
-{
-  resources =
-    Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
+  : id(""), conf(_conf), local(_local), isolationModule(_module)
+{
+  resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
+                        conf.get<int32_t>("mem", DEFAULT_MEM));
 }
 
 
-void Slave::registerOptions(Configurator* configurator)
-{
-  // TODO(benh): Is there a way to specify units for the resources?
-  configurator->addOption<string>("resources",
-                                  "Total consumable resources per slave\n");
-//   configurator->addOption<string>("attributes",
-//                                   "Attributes of machine\n");
-  configurator->addOption<string>("work_dir",
-                                  "Where to place framework work directories\n"
-                                  "(default: MESOS_HOME/work)");
-  configurator->addOption<string>("hadoop_home",
-                                  "Where to find Hadoop installed (for\n"
-                                  "fetching framework executors from HDFS)\n"
-                                  "(default: look for HADOOP_HOME in\n"
-                                  "environment or find hadoop on PATH)");
-  configurator->addOption<bool>("switch_user", 
-                                "Whether to run tasks as the user who\n"
-                                "submitted them rather than the user running\n"
-                                "the slave (requires setuid permission)",
-                                true);
-  configurator->addOption<string>("frameworks_home",
-                                  "Directory prepended to relative executor\n"
-                                  "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* conf)
+{
+  conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
+                           DEFAULT_CPUS);
+  conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
+                           DEFAULT_MEM);
+  conf->addOption<string>("work_dir",
+                          "Where to place framework work directories\n"
+                          "(default: MESOS_HOME/work)");
+  conf->addOption<string>("hadoop_home",
+                          "Where to find Hadoop installed (for fetching\n"
+                          "framework executors from HDFS)\n"
+                          "(default: look for HADOOP_HOME environment\n"
+                          "variable or find hadoop on PATH)");
+  conf->addOption<bool>("switch_user", 
+                        "Whether to run tasks as the user who\n"
+                        "submitted them rather than the user running\n"
+                        "the slave (requires setuid permission)",
+                        true);
+   conf->addOption<string>("frameworks_home",
+                           "Directory prepended to relative executor\n"
+                           "paths (default: MESOS_HOME/frameworks)");
 }
 
 
@@ -82,54 +120,25 @@ Slave::~Slave()
 
 state::SlaveState *Slave::getState()
 {
-  Resources resources(resources);
-  Resource::Scalar cpus;
-  Resource::Scalar mem;
-  cpus.set_value(-1);
-  mem.set_value(-1);
-  cpus = resources.getScalar("cpus", cpus);
-  mem = resources.getScalar("mem", mem);
-
+  std::ostringstream my_pid;
+  my_pid << self();
+  std::ostringstream master_pid;
+  master_pid << master;
   state::SlaveState *state =
-    new state::SlaveState(BUILD_DATE, BUILD_USER, slaveId.value(),
-                          cpus.value(), mem.value(), self(), master);
-
-//   foreachpair (_, Framework *f, frameworks) {
-
-//     foreachpair (_, Executor* e, f->executors) {
+    new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus, 
+        resources.mem, my_pid.str(), master_pid.str());
 
-//       Resources resources(e->resources);
-//       Resource::Scalar cpus;
-//       Resource::Scalar mem;
-//       cpus.set_value(-1);
-//       mem.set_value(-1);
-//       cpus = resources.getScalar("cpus", cpus);
-//       mem = resources.getScalar("mem", mem);
-
-//     state::Framework *framework =
-//       new state::Framework(f->frameworkId.value(), f->info.name(),
-//                            f->info.executor().uri(), f->executorStatus,
-//                            cpus.value(), mem.value());
-
-//     state->frameworks.push_back(framework);
-
-//     foreachpair(_, Task *t, f->tasks) {
-//       Resources resources(t->resources());
-//       Resource::Scalar cpus;
-//       Resource::Scalar mem;
-//       cpus.set_value(-1);
-//       mem.set_value(-1);
-//       cpus = resources.getScalar("cpus", cpus);
-//       mem = resources.getScalar("mem", mem);
-
-//       state::Task *task =
-//         new state::Task(t->task_id().value(), t->name(),
-//                         TaskState_descriptor()->FindValueByNumber(t->state())->name(),
-//                         cpus.value(), mem.value());
-
-//       framework->tasks.push_back(task);
-//     }
-//   }
+  foreachpair(_, Framework *f, frameworks) {
+    state::Framework *framework = new state::Framework(f->id, f->name, 
+        f->executorInfo.uri, f->executorStatus, f->resources.cpus,
+        f->resources.mem);
+    state->frameworks.push_back(framework);
+    foreachpair(_, Task *t, f->tasks) {
+      state::Task *task = new state::Task(t->id, t->name, t->state,
+          t->resources.cpus, t->resources.mem);
+      framework->tasks.push_back(task);
+    }
+  }
 
   return state;
 }
@@ -138,60 +147,61 @@ state::SlaveState *Slave::getState()
 void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
-  LOG(INFO) << "Slave resources: " << resources;
 
   // Get our hostname
-  char buf[256];
+  char buf[512];
   gethostname(buf, sizeof(buf));
   hostent *he = gethostbyname2(buf, AF_INET);
   string hostname = he->h_name;
 
-  // Get our public DNS name. Normally this is our hostname, but on EC2
+  // Get our public Web UI URL. Normally this is our hostname, but on EC2
   // we look for the MESOS_PUBLIC_DNS environment variable. This allows
   // the master to display our public name in its web UI.
-  string public_hostname = hostname;
+  LOG(INFO) << "setting up webUIUrl on port " << conf["webui_port"];
+  string webUIUrl;
   if (getenv("MESOS_PUBLIC_DNS") != NULL) {
-    public_hostname = getenv("MESOS_PUBLIC_DNS");
+    webUIUrl = getenv("MESOS_PUBLIC_DNS");
+  } else {
+    webUIUrl = hostname;
   }
-
-  SlaveInfo slave;
-  slave.set_hostname(hostname);
-  slave.set_public_hostname(public_hostname);
-  slave.mutable_resources()->MergeFrom(resources);
+#ifdef MESOS_WEBUI
+  webUIUrl += ":" + conf["webui_port"];
+#endif
 
   // Initialize isolation module.
   isolationModule->initialize(this);
 
   while (true) {
-    switch (receive(1)) {
+    switch (receive()) {
       case NEW_MASTER_DETECTED: {
-        const Message<NEW_MASTER_DETECTED>& msg = message();
+	string masterSeq;
+	PID masterPid;
+	tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
 
-	LOG(INFO) << "New master at " << msg.pid();
+	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
 
-	master = msg.pid();
+        redirect(master, masterPid);
+	master = masterPid;
 	link(master);
 
-	if (slaveId == "") {
+	if (id.empty()) {
 	  // Slave started before master.
-          Message<S2M_REGISTER_SLAVE> out;
-          out.mutable_slave()->MergeFrom(slave);
-	  send(master, out);
+	  send(master, pack<S2M_REGISTER_SLAVE>(hostname, webUIUrl, resources));
 	} else {
-	  // Re-registering, so send tasks running.
-          Message<S2M_REREGISTER_SLAVE> out;
-          out.mutable_slave_id()->MergeFrom(slaveId);
-          out.mutable_slave()->MergeFrom(slave);
-
-	  foreachpair (_, Framework* framework, frameworks) {
-	    foreachpair (_, Executor* executor, framework->executors) {
-              foreachpair (_, Task* task, executor->tasks) {
-                out.add_task()->MergeFrom(*task);
-              }
-            }
-          }
+	  // Reconnecting, so reconstruct resourcesInUse for the master.
+	  Resources resourcesInUse; 
+	  vector<Task> taskVec;
+
+	  foreachpair(_, Framework *framework, frameworks) {
+	    foreachpair(_, Task *task, framework->tasks) {
+	      resourcesInUse += task->resources;
+	      Task ti = *task;
+	      ti.slaveId = id;
+	      taskVec.push_back(ti);
+	    }
+	  }
 
-	  send(master, out);
+	  send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, webUIUrl, resources, taskVec));
 	}
 	break;
       }
@@ -201,351 +211,231 @@ void Slave::operator () ()
 	break;
       }
 
-      case MASTER_DETECTION_FAILURE: {
-	LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
-	break;
-      }
-
       case M2S_REGISTER_REPLY: {
-        const Message<M2S_REGISTER_REPLY>& msg = message();
-        slaveId = msg.slave_id();
-
-        LOG(INFO) << "Registered with master; given slave ID " << slaveId;
-
-        heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
-        link(spawn(heart));
+	double interval = 0;
+        tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
+        LOG(INFO) << "Registered with master; given slave ID " << this->id;
+        link(spawn(new Heart(master, self(), this->id, interval)));
         break;
       }
       
       case M2S_REREGISTER_REPLY: {
-        const Message<M2S_REREGISTER_REPLY>& msg = message();
-
-        LOG(INFO) << "Re-registered with master";
-
-        if (!(slaveId == msg.slave_id())) {
-          LOG(FATAL) << "Slave re-registered but got wrong ID";
-        }
-
-        if (heart != NULL) {
-          send(heart->self(), MESOS_MSGID);
-          wait(heart->self());
-          delete heart;
-        }
-
-        heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
-        link(spawn(heart));
+        SlaveID sid;
+	double interval = 0;
+        tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
+        LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
+        if (this->id == "")
+          this->id = sid;
+        CHECK(this->id == sid);
+        link(spawn(new Heart(master, self(), this->id, interval)));
         break;
       }
       
       case M2S_RUN_TASK: {
-        const Message<M2S_RUN_TASK>& msg = message();
-
-        const TaskDescription& task = msg.task();
-
-        LOG(INFO) << "Got assigned task " << task.task_id()
-                  << " for framework " << msg.framework_id();
-
-        Framework *framework = getFramework(msg.framework_id());
+	FrameworkID fid;
+        TaskID tid;
+        string fwName, user, taskName, taskArg;
+        ExecutorInfo execInfo;
+        Params params;
+        PID pid;
+        tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
+          unpack<M2S_RUN_TASK>(body());
+        LOG(INFO) << "Got assigned task " << fid << ":" << tid;
+        Resources res;
+        res.cpus = params.getInt32("cpus", -1);
+        res.mem = params.getInt64("mem", -1);
+        Framework *framework = getFramework(fid);
         if (framework == NULL) {
-          framework =
-            new Framework(msg.framework_id(), msg.framework(), msg.pid());
-          frameworks[msg.framework_id()] = framework;
-        }
-
-        // Either send the task to an executor or start a new executor
-        // and queue the task until the executor has started.
-        Executor* executor = task.has_executor()
-          ? framework->getExecutor(task.executor().executor_id())
-          : framework->getExecutor(framework->info.executor().executor_id());
-        
-        if (executor != NULL) {
-          if (!executor->pid) {
-            // Queue task until the executor starts up.
-            executor->queuedTasks.push_back(task);
-          } else {
-            // Add the task to the executor.
-            executor->addTask(task);
-
-            Message<S2E_RUN_TASK> out;
-            out.mutable_framework()->MergeFrom(framework->info);
-            out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-            out.set_pid(framework->pid);
-            out.mutable_task()->MergeFrom(task);
-            send(executor->pid, out);
-            isolationModule->resourcesChanged(framework, executor);
-          }
+          // Framework not yet created on this node - create it.
+          framework = new Framework(fid, fwName, user, execInfo, pid);
+          frameworks[fid] = framework;
+          isolationModule->startExecutor(framework);
+        }
+        Task *task = framework->addTask(tid, taskName, res);
+        Executor *executor = getExecutor(fid);
+        if (executor) {
+          send(executor->pid,
+               pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
+          isolationModule->resourcesChanged(framework);
         } else {
-          // Launch an executor for this task.
-          if (task.has_executor()) {
-            executor = framework->createExecutor(task.executor());
-          } else {
-            executor = framework->createExecutor(framework->info.executor());
-          }
-
-          // Queue task until the executor starts up.
-          executor->queuedTasks.push_back(task);
-
-          // Tell the isolation module to launch the executor.
-          isolationModule->launchExecutor(framework, executor);
+          // Executor not yet registered; queue task for when it starts up
+          TaskDescription *td = new TaskDescription(
+              tid, taskName, taskArg, params.str());
+          framework->queuedTasks.push_back(td);
         }
         break;
       }
 
       case M2S_KILL_TASK: {
-        const Message<M2S_KILL_TASK>& msg = message();
-
-        LOG(INFO) << "Asked to kill task " << msg.task_id()
-                  << " of framework " << msg.framework_id();
-
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          // Tell the executor to kill the task if it is up and
-          // running, otherwise, consider the task lost.
-          Executor* executor = framework->getExecutor(msg.task_id());
-          if (executor == NULL || !executor->pid) {
-            // Update the resources locally, if an executor comes up
-            // after this then it just won't receive this task.
-            executor->removeTask(msg.task_id());
-            isolationModule->resourcesChanged(framework, executor);
-
-            Message<S2M_STATUS_UPDATE> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            TaskStatus *status = out.mutable_status();
-            status->mutable_task_id()->MergeFrom(msg.task_id());
-            status->mutable_slave_id()->MergeFrom(slaveId);
-            status->set_state(TASK_LOST);
-            send(master, out);
-
-            double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-            framework->statuses[deadline][status->task_id()] = *status;
-          } else {
-            // Otherwise, send a message to the executor and wait for
-            // it to send us a status update.
-            Message<S2E_KILL_TASK> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            out.mutable_task_id()->MergeFrom(msg.task_id());
-            send(executor->pid, out);
-          }
-        } else {
-          LOG(WARNING) << "Cannot kill task " << msg.task_id()
-                       << " of framework " << msg.framework_id()
-                       << " because no such framework is running";
-
-          Message<S2M_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(msg.framework_id());
-          TaskStatus *status = out.mutable_status();
-          status->mutable_task_id()->MergeFrom(msg.task_id());
-          status->mutable_slave_id()->MergeFrom(slaveId);
-          status->set_state(TASK_LOST);
-          send(master, out);
-
-          double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-          framework->statuses[deadline][status->task_id()] = *status;
+        FrameworkID fid;
+        TaskID tid;
+        tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
+        LOG(INFO) << "Killing task " << fid << ":" << tid;
+        if (Executor *ex = getExecutor(fid)) {
+          send(ex->pid, pack<S2E_KILL_TASK>(tid));
+        }
+        if (Framework *fw = getFramework(fid)) {
+          fw->removeTask(tid);
+          isolationModule->resourcesChanged(fw);
         }
         break;
       }
 
       case M2S_KILL_FRAMEWORK: {
-        const Message<M2S_KILL_FRAMEWORK>&msg = message();
-
-        LOG(INFO) << "Asked to kill framework " << msg.framework_id();
-
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework != NULL)
-          killFramework(framework);
+        FrameworkID fid;
+        tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
+        LOG(INFO) << "Asked to kill framework " << fid;
+        Framework *fw = getFramework(fid);
+        if (fw != NULL)
+          killFramework(fw);
         break;
       }
 
       case M2S_FRAMEWORK_MESSAGE: {
-        const Message<M2S_FRAMEWORK_MESSAGE>&msg = message();
-
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          const FrameworkMessage& message = msg.message();
-
-          Executor* executor = framework->getExecutor(message.executor_id());
-          if (executor == NULL) {
-            LOG(WARNING) << "Dropping message for executor "
-                         << message.executor_id() << " of framework "
-                         << msg.framework_id()
-                         << " because executor does not exist";
-          } else if (!executor->pid) {
-            // TODO(*): If executor is not started, queue framework message?
-            // (It's probably okay to just drop it since frameworks can have
-            // the executor send a message to the master to say when it's ready.)
-            LOG(WARNING) << "Dropping message for executor "
-                         << message.executor_id() << " of framework "
-                         << msg.framework_id()
-                         << " because executor is not running";
-          } else {
-            Message<S2E_FRAMEWORK_MESSAGE> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            out.mutable_message()->MergeFrom(message);
-            send(executor->pid, out);
-          }
+        FrameworkID fid;
+        FrameworkMessage message;
+        tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
+        if (Executor *ex = getExecutor(fid)) {
+          VLOG(1) << "Relaying framework message for framework " << fid;
+          send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
         } else {
-          LOG(WARNING) << "Dropping message for framework "
-                       << msg.framework_id()
-                       << " because it does not exist";
-        }
-        break;
-      }
-
-      case M2S_UPDATE_FRAMEWORK: {
-        const Message<M2S_UPDATE_FRAMEWORK>&msg = message();
-
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          LOG(INFO) << "Updating framework " << msg.framework_id()
-                    << " pid to " << msg.pid();
-          framework->pid = msg.pid();
+          VLOG(1) << "Dropping framework message for framework " << fid
+                  << " because its executor is not running";
         }
+        // TODO(*): If executor is not started, queue framework message?
+        // (It's probably okay to just drop it since frameworks can have
+        // the executor send a message to the master to say when it's ready.)
         break;
       }
 
-      case M2S_STATUS_UPDATE_ACK: {
-        const Message<M2S_STATUS_UPDATE_ACK>& msg = message();
-
-        Framework* framework = getFramework(msg.framework_id());
+      case M2S_UPDATE_FRAMEWORK_PID: {
+        FrameworkID fid;
+        PID pid;
+        tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
+        Framework *framework = getFramework(fid);
         if (framework != NULL) {
-          foreachpair (double deadline, _, framework->statuses) {
-            if (framework->statuses[deadline].count(msg.task_id()) > 0) {
-              LOG(INFO) << "Got acknowledgement of status update"
-                        << " for task " << msg.task_id()
-                        << " of framework " << framework->frameworkId;
-              framework->statuses[deadline].erase(msg.task_id());
-              break;
-            }
-          }
+          LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
+          framework->pid = pid;
         }
         break;
       }
 
       case E2S_REGISTER_EXECUTOR: {
-        const Message<E2S_REGISTER_EXECUTOR>& msg = message();
-
-        LOG(INFO) << "Got registration for executor "
-                  << msg.executor_id() << " of framework "
-                  << msg.framework_id();
-
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          Executor* executor = framework->getExecutor(msg.executor_id());
-
-          // Check the status of the executor.
-          if (executor == NULL) {
-            LOG(WARNING) << "Not expecting executor " << msg.executor_id()
-                         << " of framework " << msg.framework_id();
-            send(from(), S2E_KILL_EXECUTOR);
-          } else if (executor->pid != PID()) {
-            LOG(WARNING) << "Not good, executor " << msg.executor_id()
-                         << " of framework " << msg.framework_id()
-                         << " is already running";
-            send(from(), S2E_KILL_EXECUTOR);
-          } else {
-            // Save the pid for the executor.
-            executor->pid = from();
-
-            // Now that the executor is up, set its resource limits.
-            isolationModule->resourcesChanged(framework, executor);
-
-            // Tell executor it's registered and give it any queued tasks.
-            Message<S2E_REGISTER_REPLY> out;
-            ExecutorArgs* args = out.mutable_args();
-            args->mutable_framework_id()->MergeFrom(framework->frameworkId);
-            args->set_name(framework->info.name());
-            args->mutable_slave_id()->MergeFrom(slaveId);
-            args->set_hostname(hostname);
-            args->set_data(framework->info.executor().data());
-            send(executor->pid, out);
-            sendQueuedTasks(framework, executor);
+        FrameworkID fid;
+        tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
+        LOG(INFO) << "Got executor registration for framework " << fid;
+        if (Framework *fw = getFramework(fid)) {
+          if (getExecutor(fid) != 0) {
+            LOG(ERROR) << "Executor for framework " << fid
+                       << "already exists";
+            send(from(), pack<S2E_KILL_EXECUTOR>());
+            break;
           }
+          Executor *executor = new Executor(fid, from());
+          executors[fid] = executor;
+          link(from());
+          // Now that the executor is up, set its resource limits
+          isolationModule->resourcesChanged(fw);
+          // Tell executor that it's registered and give it its queued tasks
+          send(from(), pack<S2E_REGISTER_REPLY>(this->id,
+                                                hostname,
+                                                fw->name,
+                                                fw->executorInfo.initArg));
+          sendQueuedTasks(fw);
         } else {
-          // Framework is gone; tell the executor to exit.
-          LOG(WARNING) << "Framework " << msg.framework_id()
-                       << " does not exist (it may have been killed),"
-                       << " telling executor to exit";
-
-          // TODO(benh): Don't we also want to tell the isolation
-          // module to shut this guy down!
-          send(from(), S2E_KILL_EXECUTOR);
+          // Framework is gone; tell the executor to exit
+          send(from(), pack<S2E_KILL_EXECUTOR>());
         }
         break;
       }
 
       case E2S_STATUS_UPDATE: {
-        const Message<E2S_STATUS_UPDATE>& msg = message();
-
-        const TaskStatus& status = msg.status();
+        FrameworkID fid;
+        TaskID tid;
+        TaskState taskState;
+        string data;
+        tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
 
-	LOG(INFO) << "Status update: task " << status.task_id()
-		  << " of framework " << msg.framework_id()
-		  << " is now in state "
-		  << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
-        Framework *framework = getFramework(msg.framework_id());
+        Framework *framework = getFramework(fid);
         if (framework != NULL) {
-          Executor* executor = framework->getExecutor(status.task_id());
-          if (executor != NULL) {
-            if (status.state() == TASK_FINISHED ||
-                status.state() == TASK_FAILED ||
-                status.state() == TASK_KILLED ||
-                status.state() == TASK_LOST) {
-              executor->removeTask(status.task_id());
-              isolationModule->resourcesChanged(framework, executor);
-            }
-
-            // Send message and record the status for possible resending.
-            Message<S2M_STATUS_UPDATE> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            out.mutable_status()->MergeFrom(status);
-            send(master, out);
-
-            double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-            framework->statuses[deadline][status.task_id()] = status;
-          } else {
-            LOG(WARNING) << "Status update error: couldn't lookup "
-                         << "executor for framework " << msg.framework_id();
+	  LOG(INFO) << "Got status update for task " << fid << ":" << tid;
+	  if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
+	      taskState == TASK_KILLED || taskState == TASK_LOST) {
+	    LOG(INFO) << "Task " << fid << ":" << tid << " done";
+
+            framework->removeTask(tid);
+            isolationModule->resourcesChanged(framework);
           }
+
+	  // Reliably send message and save sequence number for
+	  // canceling later.
+	  int seq = rsend(master, framework->pid,
+			  pack<S2M_STATUS_UPDATE>(id, fid, tid,
+                                                  taskState, data));
+	  seqs[fid].insert(seq);
 	} else {
-          LOG(WARNING) << "Status update error: couldn't lookup "
-                       << "framework " << msg.framework_id();
+	  LOG(WARNING) << "Got status update for UNKNOWN task "
+		       << fid << ":" << tid;
 	}
         break;
       }
 
       case E2S_FRAMEWORK_MESSAGE: {
-        const Message<E2S_FRAMEWORK_MESSAGE>& msg = message();
-
-        const FrameworkMessage& message = msg.message();
+        FrameworkID fid;
+        FrameworkMessage message;
+        tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
 
-        Framework *framework = getFramework(msg.framework_id());
+        Framework *framework = getFramework(fid);
         if (framework != NULL) {
-	  LOG(INFO) << "Sending message for framework "
-                    << framework->frameworkId
+	  LOG(INFO) << "Sending message for framework " << fid
 		    << " to " << framework->pid;
 
-          // TODO(benh): This is weird, sending an M2F message.
-          Message<M2F_FRAMEWORK_MESSAGE> out;
-          out.mutable_framework_id()->MergeFrom(msg.framework_id());
-          out.mutable_message()->MergeFrom(message);
-          out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
-          send(framework->pid, out);
+          // Set slave ID in case framework omitted it.
+          message.slaveId = this->id;
+          VLOG(1) << "Sending framework message to framework " << fid
+                  << " with PID " << framework->pid;
+          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
         }
         break;
       }
 
       case S2S_GET_STATE: {
-        state::SlaveState *state = getState();
-        Message<S2S_GET_STATE_REPLY> out;
-        out.set_pointer((char *) &state, sizeof(state));
-        send(from(), out);
+ 	send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
+	break;
+      }
+
+      case PROCESS_EXIT: {
+        LOG(INFO) << "Process exited: " << from();
+
+        if (from() == master) {
+	  LOG(WARNING) << "Master disconnected! "
+		       << "Waiting for a new master to be elected.";
+	  // TODO(benh): After so long waiting for a master, commit suicide.
+	} else {
+	  // Check if an executor has exited (this is technically
+	  // redundant because the isolation module should be doing
+	  // this for us).
+	  foreachpair (_, Executor *ex, executors) {
+	    if (from() == ex->pid) {
+	      LOG(INFO) << "Executor for framework " << ex->frameworkId
+			<< " disconnected";
+	      Framework *framework = getFramework(ex->frameworkId);
+	      if (framework != NULL) {
+		send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
+		killFramework(framework);
+	      }
+	      break;
+	    }
+	  }
+	}
+
         break;
       }
 
       case M2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by master: " << from();
-        foreachpaircopy (_, Framework *framework, frameworks) {
+        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+        foreachpair (_, Framework *framework, frameworksCopy) {
           killFramework(framework);
         }
         return;
@@ -553,46 +443,16 @@ void Slave::operator () ()
 
       case S2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by " << from();
-        foreachpaircopy (_, Framework *framework, frameworks) {
+        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+        foreachpair (_, Framework *framework, frameworksCopy) {
           killFramework(framework);
         }
         return;
       }
 
-      case PROCESS_EXIT: {
-        LOG(INFO) << "Process exited: " << from();
-
-        if (from() == master) {
-	  LOG(WARNING) << "Master disconnected! "
-		       << "Waiting for a new master to be elected.";
-	  // TODO(benh): After so long waiting for a master, commit suicide.
-	}
-        break;
-      }
-
-      case PROCESS_TIMEOUT: {
-        // Check and see if we should re-send any status updates.
-        foreachpair (_, Framework* framework, frameworks) {
-          foreachpair (double deadline, _, framework->statuses) {
-            if (deadline <= elapsed()) {
-              foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
-                LOG(WARNING) << "Resending status update"
-                             << " for task " << status.task_id()
-                             << " of framework " << framework->frameworkId;
-                Message<S2M_STATUS_UPDATE> out;
-                out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-                out.mutable_status()->MergeFrom(status);
-                send(master, out);
-              }
-            }
-          }
-        }
-        break;
-      }
-
       default: {
-        LOG(ERROR) << "Received unknown message (" << msgid()
-                   << ") from " << from();
+        LOG(ERROR) << "Received unknown message ID " << msgid()
+                   << " from " << from();
         break;
       }
     }
@@ -600,66 +460,70 @@ void Slave::operator () ()
 }
 
 
-Framework* Slave::getFramework(const FrameworkID& frameworkId)
+Framework * Slave::getFramework(FrameworkID frameworkId)
 {
-  if (frameworks.count(frameworkId) > 0) {
-    return frameworks[frameworkId];
-  }
+  FrameworkMap::iterator it = frameworks.find(frameworkId);
+  if (it == frameworks.end()) return NULL;
+  return it->second;
+}
+
 
-  return NULL;
+Executor * Slave::getExecutor(FrameworkID frameworkId)
+{
+  ExecutorMap::iterator it = executors.find(frameworkId);
+  if (it == executors.end()) return NULL;
+  return it->second;
 }
 
 
 // Send any tasks queued up for the given framework to its executor
 // (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
+void Slave::sendQueuedTasks(Framework *framework)
 {
-  LOG(INFO) << "Flushing queued tasks for framework "
-            << framework->frameworkId;
-
-  CHECK(executor->pid != PID());
-
-  foreach (const TaskDescription& task, executor->queuedTasks) {
-    // Add the task to the executor.
-    executor->addTask(task);
-
-    Message<S2E_RUN_TASK> out;
-    out.mutable_framework()->MergeFrom(framework->info);
-    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-    out.set_pid(framework->pid);
-    out.mutable_task()->MergeFrom(task);
-    send(executor->pid, out);
+  LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
+  Executor *executor = getExecutor(framework->id);
+  if (!executor) return;
+  foreach(TaskDescription *td, framework->queuedTasks) {
+    send(executor->pid,
+        pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
+    delete td;
   }
-
-  executor->queuedTasks.clear();
+  framework->queuedTasks.clear();
 }
 
 
 // Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutors)
+void Slave::killFramework(Framework *framework, bool killExecutor)
 {
-  LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
+  LOG(INFO) << "Cleaning up framework " << framework->id;
 
-  // Shutdown all executors of this framework.
-  foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
-    if (killExecutors) {
-      LOG(INFO) << "Killing executor " << executorId
-                << " of framework " << framework->frameworkId;
+  // Cancel sending any reliable messages for this framework.
+  foreach (int seq, seqs[framework->id])
+    cancel(seq);
 
-      send(executor->pid, S2E_KILL_EXECUTOR);
+  seqs.erase(framework->id);
 
+  // Remove its allocated resources.
+  framework->resources = Resources();
+
+  // If an executor is running, tell it to exit and kill it.
+  if (Executor *ex = getExecutor(framework->id)) {
+    if (killExecutor) {
+      LOG(INFO) << "Killing executor for framework " << framework->id;
       // TODO(benh): There really isn't ANY time between when an
       // executor gets a S2E_KILL_EXECUTOR message and the isolation
       // module goes and kills it. We should really think about making
       // the semantics of this better.
-
-      isolationModule->killExecutor(framework, executor);
+      send(ex->pid, pack<S2E_KILL_EXECUTOR>());
+      isolationModule->killExecutor(framework);
     }
 
-    framework->destroyExecutor(executorId);
+    LOG(INFO) << "Cleaning up executor for framework " << framework->id;
+    delete ex;
+    executors.erase(framework->id);
   }
 
-  frameworks.erase(framework->frameworkId);
+  frameworks.erase(framework->id);
   delete framework;
 }
 
@@ -667,44 +531,18 @@ void Slave::killFramework(Framework *fra
 // Called by isolation module when an executor process exits
 // TODO(benh): Make this callback be a message so that we can avoid
 // race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status)
+void Slave::executorExited(FrameworkID fid, int status)
 {
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    Executor* executor = framework->getExecutor(executorId);
-    if (executor != NULL) {
-      LOG(INFO) << "Exited executor " << executorId
-                << " of framework " << frameworkId
-                << " with status " << status;
-
-      Message<S2M_EXITED_EXECUTOR> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_status(status);
-      send(master, out);
-
-      framework->destroyExecutor(executorId);
-
-      // TODO(benh): When should we kill the presence of an entire
-      // framework on a slave?
-      if (framework->executors.size() == 0) {
-        killFramework(framework);
-      }
-    } else {
-      LOG(WARNING) << "UNKNOWN executor " << executorId
-                   << " of framework " << frameworkId
-                   << " has exited with status " << status;
-    }
-  } else {
-    LOG(WARNING) << "UNKNOWN executor " << executorId
-                 << " of UNKNOWN framework " << frameworkId
-                 << " has exited with status " << status;
+  if (Framework *f = getFramework(fid)) {
+    LOG(INFO) << "Executor for framework " << fid << " exited "
+              << "with status " << status;
+    send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
+    killFramework(f, false);
   }
 };
 
 
-string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId)
+string Slave::getUniqueWorkDirectory(FrameworkID fid)
 {
   string workDir;
   if (conf.contains("work_dir")) {
@@ -716,7 +554,7 @@ string Slave::getUniqueWorkDirectory(con
   }
 
   ostringstream os(std::ios_base::app | std::ios_base::out);
-  os << workDir << "/slave-" << slaveId << "/fw-" << frameworkId;
+  os << workDir << "/slave-" << id << "/fw-" << fid;
 
   // Find a unique directory based on the path given by the slave
   // (this is because we might launch multiple executors from the same
@@ -737,7 +575,7 @@ string Slave::getUniqueWorkDirectory(con
 }
 
 
-const Configuration& Slave::getConfiguration()
+const Params& Slave::getConf()
 {
   return conf;
 }