You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:27:16 UTC

svn commit: r1132335 [2/4] - in /incubator/mesos/trunk: include/ include/mesos/ src/ src/common/ src/detector/ src/examples/ src/exec/ src/java/jni/ src/launcher/ src/local/ src/master/ src/messaging/ src/python/native/ src/sched/ src/slave/ src/tests/

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132335&r1=1132334&r2=1132335&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 09:27:14 2011
@@ -1,19 +1,44 @@
 #include <iomanip>
+#include <fstream>
+#include <sstream>
 
 #include <glog/logging.h>
 
+#include <google/protobuf/descriptor.h>
+
+#include <process/run.hpp>
+
+#include "config/config.hpp"
+
+#include "common/build.hpp"
 #include "common/date_utils.hpp"
 
 #include "allocator.hpp"
 #include "allocator_factory.hpp"
 #include "master.hpp"
+#include "slaves_manager.hpp"
 #include "webui.hpp"
 
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::master;
+
+using boost::bad_lexical_cast;
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using process::HttpOKResponse;
+using process::HttpResponse;
+using process::HttpRequest;
+using process::PID;
+using process::Process;
+using process::Promise;
+using process::UPID;
+
 using std::endl;
-using std::make_pair;
 using std::map;
 using std::max;
-using std::min;
 using std::ostringstream;
 using std::pair;
 using std::set;
@@ -22,77 +47,69 @@ using std::setw;
 using std::string;
 using std::vector;
 
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::master;
-
 
 namespace {
 
-// A process that periodically pings the master to check filter expiries, etc
-class AllocatorTimer : public MesosProcess
+// A process that periodically pings the master to check filter
+// expiries, etc.
+class AllocatorTimer : public Process<AllocatorTimer>
 {
-private:
-  const PID master;
+public:
+  AllocatorTimer(const PID<Master>& _master) : master(_master) {}
 
 protected:
-  void operator () ()
+  virtual void operator () ()
   {
     link(master);
-    do {
-      switch (receive(1)) {
-      case PROCESS_TIMEOUT:
-	send(master, pack<M2M_TIMER_TICK>());
-	break;
-      case PROCESS_EXIT:
+    while (true) {
+      receive(1);
+      if (name() == process::TIMEOUT) {
+        process::dispatch(master, &Master::timerTick);
+      } else if (name() == process::EXITED) {
 	return;
       }
-    } while (true);
+    }
   }
 
-public:
-  AllocatorTimer(const PID &_master) : master(_master) {}
+private:
+  const PID<Master> master;
 };
 
 
 // A process that periodically prints frameworks' shares to a file
-class SharesPrinter : public MesosProcess
+class SharesPrinter : public Process<SharesPrinter>
 {
-protected:
-  PID master;
+public:
+  SharesPrinter(const PID<Master>& _master) : master(_master) {}
+  ~SharesPrinter() {}
 
-  void operator () ()
+protected:
+  virtual void operator () ()
   {
     int tick = 0;
 
     std::ofstream file ("/mnt/shares");
-    if (!file.is_open())
+    if (!file.is_open()) {
       LOG(FATAL) << "Could not open /mnt/shares";
+    }
 
     while (true) {
       pause(1);
 
-      send(master, pack<M2M_GET_STATE>());
-      receive();
-      CHECK(msgid() == M2M_GET_STATE_REPLY);
-      state::MasterState *state = unpack<M2M_GET_STATE_REPLY, 0>(body());
+      state::MasterState* state = call(master, &Master::getState);
 
       uint32_t total_cpus = 0;
       uint32_t total_mem = 0;
 
-      foreach (state::Slave *s, state->slaves) {
+      foreach (state::Slave* s, state->slaves) {
         total_cpus += s->cpus;
         total_mem += s->mem;
       }
       
       if (state->frameworks.empty()) {
-        file << "--------------------------------" << std::endl;
+        file << "--------------------------------" << endl;
       } else {
-        foreach (state::Framework *f, state->frameworks) {
+        foreach (state::Framework* f, state->frameworks) {
           double cpu_share = f->cpus / (double) total_cpus;
           double mem_share = f->mem / (double) total_mem;
           double max_share = max(cpu_share, mem_share);
@@ -107,25 +124,88 @@ protected:
     file.close();
   }
 
+private:
+  const PID<Master> master;
+};
+
+} // namespace {
+
+
+namespace mesos { namespace internal { namespace master {
+
+class SlaveObserver : public Process<SlaveObserver>
+{
 public:
-  SharesPrinter(const PID &_master) : master(_master) {}
-  ~SharesPrinter() {}
+  SlaveObserver(const UPID& _slave,
+                const SlaveInfo& _slaveInfo,
+                const SlaveID& _slaveId,
+                const PID<SlavesManager>& _slavesManager)
+    : slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId),
+      slavesManager(_slavesManager), timeouts(0), pinged(false) {}
+
+  virtual ~SlaveObserver() {}
+
+protected:
+  virtual void operator () ()
+  {
+    // Send a ping some interval after we heard the last pong. Or if
+    // we don't hear a pong, increment the number of timeouts from the
+    // slave and try and send another ping. If we eventually timeout too
+    // many missed pongs in a row, consider the slave dead.
+    send(slave, PING);
+    pinged = true;
+
+    do {
+      receive(SLAVE_PONG_TIMEOUT);
+      if (name() == PONG) {
+        timeouts = 0;
+        pinged = false;
+      } else if (name() == process::TIMEOUT) {
+        if (pinged) {
+          timeouts++;
+          pinged = false;
+        }
+
+        send(slave, PING);
+        pinged = true;
+      } else if (name() == process::TERMINATE) {
+        return;
+      } 
+    } while (timeouts < MAX_SLAVE_TIMEOUTS);
+
+    // Tell the slave manager to deactivate the slave, this will take
+    // care of updating the master too.
+    while (!process::call(slavesManager, &SlavesManager::deactivate,
+                          slaveInfo.hostname(), slave.port)) {
+      LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
+      pause(5);
+    }
+  }
+
+private:
+  const UPID slave;
+  const SlaveInfo slaveInfo;
+  const SlaveID slaveId;
+  const PID<SlavesManager> slavesManager;
+  int timeouts;
+  bool pinged;
 };
 
-}
+}}} // namespace mesos { namespace master { namespace internal {
 
 
 Master::Master()
-  : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0)
+  : MesosProcess<Master>("master")
 {
-  allocatorType = "simple";
+  initialize();
 }
 
 
-Master::Master(const Params& conf_)
-  : conf(conf_), nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0)
+Master::Master(const Configuration& conf)
+  : MesosProcess<Master>("master"),
+    conf(conf)
 {
-  allocatorType = conf.get("allocator", "simple");
+  initialize();
 }
                    
 
@@ -133,63 +213,113 @@ Master::~Master()
 {
   LOG(INFO) << "Shutting down master";
 
-  delete allocator;
-
-  foreachpair (_, Framework *framework, frameworks) {
-    foreachpair(_, Task *task, framework->tasks)
-      delete task;
-    delete framework;
+  foreachpaircopy (_, Framework* framework, frameworks) {
+    removeFramework(framework);
   }
 
-  foreachpair (_, Slave *slave, slaves) {
-    delete slave;
+  foreachpaircopy (_, Slave* slave, slaves) {
+    removeSlave(slave);
   }
 
-  foreachpair (_, SlotOffer *offer, slotOffers) {
-    delete offer;
-  }
+  delete allocator;
+
+  CHECK(slotOffers.size() == 0);
+
+  process::post(slavesManager->self(), process::TERMINATE);
+  process::wait(slavesManager->self());
+
+  delete slavesManager;
 }
 
 
-void Master::registerOptions(Configurator* conf)
+void Master::registerOptions(Configurator* configurator)
 {
-  conf->addOption<string>("allocator", 'a', "Allocation module name", "simple");
-  conf->addOption<bool>("root_submissions",
-                        "Can root submit frameworks?",
-                        true);
+  SlavesManager::registerOptions(configurator);
+  configurator->addOption<string>("allocator", 'a',
+                                  "Allocation module name",
+                                  "simple");
+  configurator->addOption<bool>("root_submissions",
+                                "Can root submit frameworks?",
+                                true);
 }
 
 
-state::MasterState * Master::getState()
+Promise<state::MasterState*> Master::getState()
 {
-  std::ostringstream oss;
-  oss << self();
-  state::MasterState *state =
-    new state::MasterState(BUILD_DATE, BUILD_USER, oss.str());
+  state::MasterState* state =
+    new state::MasterState(build::DATE, build::USER, self());
+
+  foreachpair (_, Slave* s, slaves) {
+    Resources resources(s->info.resources());
+    Resource::Scalar cpus;
+    Resource::Scalar mem;
+    cpus.set_value(-1);
+    mem.set_value(-1);
+    cpus = resources.getScalar("cpus", cpus);
+    mem = resources.getScalar("mem", mem);
+
+    state::Slave* slave =
+      new state::Slave(s->slaveId.value(), s->info.hostname(),
+                       s->info.public_hostname(), cpus.value(),
+                       mem.value(), s->connectTime);
 
-  foreachpair (_, Slave *s, slaves) {
-    state::Slave *slave = new state::Slave(s->id, s->hostname, s->publicDns,
-        s->resources.cpus, s->resources.mem, s->connectTime);
     state->slaves.push_back(slave);
   }
 
-  foreachpair (_, Framework *f, frameworks) {
-    state::Framework *framework = new state::Framework(f->id, f->user,
-        f->name, f->executorInfo.uri, f->resources.cpus, f->resources.mem,
-        f->connectTime);
+  foreachpair (_, Framework* f, frameworks) {
+    Resources resources(f->resources);
+    Resource::Scalar cpus;
+    Resource::Scalar mem;
+    cpus.set_value(-1);
+    mem.set_value(-1);
+    cpus = resources.getScalar("cpus", cpus);
+    mem = resources.getScalar("mem", mem);
+
+    state::Framework* framework =
+      new state::Framework(f->frameworkId.value(), f->info.user(),
+                           f->info.name(), f->info.executor().uri(),
+                           cpus.value(), mem.value(), f->connectTime);
+
     state->frameworks.push_back(framework);
-    foreachpair (_, Task *t, f->tasks) {
-      state::Task *task = new state::Task(t->id, t->name, t->frameworkId,
-          t->slaveId, t->state, t->resources.cpus, t->resources.mem);
+
+    foreachpair (_, Task* t, f->tasks) {
+      Resources resources(t->resources());
+      Resource::Scalar cpus;
+      Resource::Scalar mem;
+      cpus.set_value(-1);
+      mem.set_value(-1);
+      cpus = resources.getScalar("cpus", cpus);
+      mem = resources.getScalar("mem", mem);
+
+      state::Task* task =
+        new state::Task(t->task_id().value(), t->name(),
+                        t->framework_id().value(), t->slave_id().value(),
+                        TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+                        cpus.value(), mem.value());
+
       framework->tasks.push_back(task);
     }
-    foreach (SlotOffer *o, f->slotOffers) {
-      state::SlotOffer *offer = new state::SlotOffer(o->id, o->frameworkId);
-      foreach (SlaveResources &r, o->resources) {
-        state::SlaveResources *resources = new state::SlaveResources(
-            r.slave->id, r.resources.cpus, r.resources.mem);
-        offer->resources.push_back(resources);
+
+    foreach (SlotOffer* o, f->slotOffers) {
+      state::SlotOffer* offer =
+        new state::SlotOffer(o->offerId.value(), o->frameworkId.value());
+
+      foreach (const SlaveResources& r, o->resources) {
+        Resources resources(r.resources);
+        Resource::Scalar cpus;
+        Resource::Scalar mem;
+        cpus.set_value(-1);
+        mem.set_value(-1);
+        cpus = resources.getScalar("cpus", cpus);
+        mem = resources.getScalar("mem", mem);
+
+        state::SlaveResources* sr =
+          new state::SlaveResources(r.slave->slaveId.value(),
+                                    cpus.value(), mem.value());
+
+        offer->resources.push_back(sr);
       }
+
       framework->offers.push_back(offer);
     }
   }
@@ -199,57 +329,58 @@ state::MasterState * Master::getState()
 
 
 // Return connected frameworks that are not in the process of being removed
-vector<Framework *> Master::getActiveFrameworks()
+vector<Framework*> Master::getActiveFrameworks()
 {
-  vector <Framework *> result;
-  foreachpair(_, Framework *framework, frameworks)
-    if (framework->active)
+  vector <Framework*> result;
+  foreachpair(_, Framework* framework, frameworks) {
+    if (framework->active) {
       result.push_back(framework);
+    }
+  }
   return result;
 }
 
 
 // Return connected slaves that are not in the process of being removed
-vector<Slave *> Master::getActiveSlaves()
+vector<Slave*> Master::getActiveSlaves()
 {
-  vector <Slave *> result;
-  foreachpair(_, Slave *slave, slaves)
-    if (slave->active)
+  vector <Slave*> result;
+  foreachpair(_, Slave* slave, slaves) {
+    if (slave->active) {
       result.push_back(slave);
+    }
+  }
   return result;
 }
 
 
-Framework * Master::lookupFramework(FrameworkID fid)
+Framework* Master::lookupFramework(const FrameworkID& frameworkId)
 {
-  unordered_map<FrameworkID, Framework *>::iterator it =
-    frameworks.find(fid);
-  if (it != frameworks.end())
-    return it->second;
-  else
+  if (frameworks.count(frameworkId) > 0) {
+    return frameworks[frameworkId];
+  } else {
     return NULL;
+  }
 }
 
 
-Slave * Master::lookupSlave(SlaveID sid)
+Slave* Master::lookupSlave(const SlaveID& slaveId)
 {
-  unordered_map<SlaveID, Slave *>::iterator it =
-    slaves.find(sid);
-  if (it != slaves.end())
-    return it->second;
-  else
+  if (slaves.count(slaveId) > 0) {
+    return slaves[slaveId];
+  } else {
     return NULL;
+  }
 }
 
 
-SlotOffer * Master::lookupSlotOffer(OfferID oid)
+SlotOffer* Master::lookupSlotOffer(const OfferID& offerId)
 {
-  unordered_map<OfferID, SlotOffer *>::iterator it =
-    slotOffers.find(oid);
-  if (it != slotOffers.end()) 
-    return it->second;
-  else
+  if (slotOffers.count(offerId) > 0) {
+    return slotOffers[offerId];
+  } else {
     return NULL;
+  }
 }
 
 
@@ -257,662 +388,1091 @@ void Master::operator () ()
 {
   LOG(INFO) << "Master started at mesos://" << self();
 
-  // Don't do anything until we get a master ID.
-  while (receive() != GOT_MASTER_ID) {
+  // Don't do anything until we get a master token.
+  while (receive() != GOT_MASTER_TOKEN) {
     LOG(INFO) << "Oops! We're dropping a message since "
               << "we haven't received an identifier yet!";  
   }
-  string faultToleranceId;
-  tie(faultToleranceId) = unpack<GOT_MASTER_ID>(body());
-  masterId = DateUtils::currentDate() + "-" + faultToleranceId;
+
+  MSG<GOT_MASTER_TOKEN> msg;
+  msg.ParseFromString(body());
+
+  // The master ID is comprised of the current date and some ephemeral
+  // token (e.g., determined by ZooKeeper).
+
+  masterId = DateUtils::currentDate() + "-" + msg.token();
   LOG(INFO) << "Master ID: " << masterId;
 
+  // Setup slave manager.
+  slavesManager = new SlavesManager(conf, self());
+  process::spawn(slavesManager);
+
   // Create the allocator (we do this after the constructor because it
   // leaks 'this').
   allocator = createAllocator();
-  if (!allocator)
+  if (!allocator) {
     LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
+  }
 
   link(spawn(new AllocatorTimer(self())));
   //link(spawn(new SharesPrinter(self())));
 
   while (true) {
-    switch (receive()) {
-
-    case NEW_MASTER_DETECTED: {
-      // TODO(benh): We might have been the master, but then got
-      // partitioned, and now we are finding out once we reconnect
-      // that we are no longer the master, so we should just die.
-      LOG(INFO) << "New master detected ... maybe it's us!";
+    serve();
+    if (name() == process::TERMINATE) {
+      LOG(INFO) << "Asked to terminate by " << from();
+      foreachpair (_, Slave* slave, slaves) {
+        send(slave->pid, process::TERMINATE);
+      }
       break;
+    } else {
+      LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+                   << " from: " << from();
     }
+  }
+}
 
-    case NO_MASTER_DETECTED: {
-      LOG(INFO) << "No master detected ... maybe we're next!";
-      break;
-    }
 
-    case F2M_REGISTER_FRAMEWORK: {
-      FrameworkID fid = newFrameworkId();
-      Framework *framework = new Framework(from(), fid, elapsed());
-
-      tie(framework->name, framework->user, framework->executorInfo) =
-        unpack<F2M_REGISTER_FRAMEWORK>(body());
-
-      LOG(INFO) << "Registering " << framework << " at " << framework->pid;
-
-      if (framework->executorInfo.uri == "") {
-        LOG(INFO) << framework << " registering without an executor URI";
-        send(framework->pid, pack<M2F_ERROR>(1, "No executor URI given"));
-        delete framework;
-        break;
-      }
+void Master::initialize()
+{
+  active = false;
 
-      bool rootSubmissions = conf.get<bool>("root_submissions", true);
-      if (framework->user == "root" && rootSubmissions == false) {
-        LOG(INFO) << framework << " registering as root, but "
-                  << "root submissions are disabled on this cluster";
-        send(framework->pid, pack<M2F_ERROR>(
-              1, "Root is not allowed to submit jobs on this cluster"));
-        delete framework;
-        break;
-      }
+  nextFrameworkId = 0;
+  nextSlaveId = 0;
+  nextOfferId = 0;
 
-      addFramework(framework);
-      break;
-    }
+  allocatorType = conf.get("allocator", "simple");
 
-    case F2M_REREGISTER_FRAMEWORK: {
-      FrameworkID fid;
-      string name;
-      string user;
-      ExecutorInfo executorInfo;
-      int32_t generation;
-
-      tie(fid, name, user, executorInfo, generation) =
-        unpack<F2M_REREGISTER_FRAMEWORK>(body());
-
-      if (executorInfo.uri == "") {
-        LOG(INFO) << "Framework " << fid << " re-registering "
-                  << "without an executor URI";
-        send(from(), pack<M2F_ERROR>(1, "No executor URI given"));
-        break;
-      }
+  // Start all the statistics at 0.
+  statistics.launched_tasks = 0;
+  statistics.finished_tasks = 0;
+  statistics.killed_tasks = 0;
+  statistics.failed_tasks = 0;
+  statistics.lost_tasks = 0;
+  statistics.valid_status_updates = 0;
+  statistics.invalid_status_updates = 0;
+  statistics.valid_framework_messages = 0;
+  statistics.invalid_framework_messages = 0;
 
-      if (fid == "") {
-        LOG(ERROR) << "Framework re-registering without an id!";
-        send(from(), pack<M2F_ERROR>(1, "Missing framework id"));
-        break;
-      }
+  // Install handler functions for certain messages.
+  install(NEW_MASTER_DETECTED, &Master::newMasterDetected,
+          &NewMasterDetectedMessage::pid);
 
-      LOG(INFO) << "Re-registering framework " << fid << " at " << from();
+  install(NO_MASTER_DETECTED, &Master::noMasterDetected);
 
-      if (frameworks.count(fid) > 0) {
-        // Using the "generation" of the scheduler allows us to keep a
-        // scheduler that got partitioned but didn't die (in ZooKeeper
-        // speak this means didn't lose their session) and then
-        // eventually tried to connect to this master even though
-        // another instance of their scheduler has reconnected. This
-        // might not be an issue in the future when the
-        // master/allocator launches the scheduler can get restarted
-        // (if necessary) by the master and the master will always
-        // know which scheduler is the correct one.
-        if (generation == 0) {
-          LOG(INFO) << "Framework " << fid << " failed over";
-          failoverFramework(frameworks[fid], from());
-          // TODO: Should we check whether the new scheduler has given
-          // us a different framework name, user name or executor info?
-        } else {
-          LOG(INFO) << "Framework " << fid << " re-registering with an "
-		    << "already used id and not failing over!";
-          send(from(), pack<M2F_ERROR>(1, "Framework id in use"));
-          break;
-        }
-      } else {
-        // We don't have a framework with this ID, so we must be a newly
-        // elected Mesos master to which either an existing scheduler or a
-        // failed-over one is connecting. Create a Framework object and add
-        // any tasks it has that have been reported by reconnecting slaves.
-        Framework *framework = new Framework(from(), fid, elapsed());
-        framework->name = name;
-        framework->user = user;
-        framework->executorInfo = executorInfo;
-        addFramework(framework);
-        // Add any running tasks reported by slaves for this framework.
-        foreachpair (SlaveID slaveId, Slave *slave, slaves) {
-          foreachpair (_, Task *task, slave->tasks) {
-            if (framework->id == task->frameworkId) {
-              framework->addTask(task);
-            }
-          }
-        }
-      }
+  install(F2M_REGISTER_FRAMEWORK, &Master::registerFramework,
+          &RegisterFrameworkMessage::framework);
 
-      CHECK(frameworks.find(fid) != frameworks.end());
+  install(F2M_REREGISTER_FRAMEWORK, &Master::reregisterFramework,
+          &ReregisterFrameworkMessage::framework_id,
+          &ReregisterFrameworkMessage::framework,
+          &ReregisterFrameworkMessage::generation);
 
-      // Broadcast the new framework pid to all the slaves. We have to
-      // broadcast because an executor might be running on a slave but
-      // it currently isn't running any tasks. This could be a
-      // potential scalability issue ...
-      foreachpair (_, Slave *slave, slaves) {
-        send(slave->pid, pack<M2S_UPDATE_FRAMEWORK_PID>(fid, from()));
-      }
+  install(F2M_UNREGISTER_FRAMEWORK, &Master::unregisterFramework,
+          &UnregisterFrameworkMessage::framework_id);
 
-      break;
-    }
+  install(F2M_RESOURCE_OFFER_REPLY, &Master::resourceOfferReply,
+          &ResourceOfferReplyMessage::framework_id,
+          &ResourceOfferReplyMessage::offer_id,
+          &ResourceOfferReplyMessage::tasks,
+          &ResourceOfferReplyMessage::params);
 
-    case F2M_UNREGISTER_FRAMEWORK: {
-      FrameworkID fid;
-      tie(fid) = unpack<F2M_UNREGISTER_FRAMEWORK>(body());
-      LOG(INFO) << "Asked to unregister framework " << fid;
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL && framework->pid == from())
-        removeFramework(framework);
-      else
-        LOG(WARNING) << "Non-authoratative PID attempting framework "
-                     << "unregistration ... ignoring";
-      break;
+  install(F2M_REVIVE_OFFERS, &Master::reviveOffers,
+          &ReviveOffersMessage::framework_id);
+
+  install(F2M_KILL_TASK, &Master::killTask,
+          &KillTaskMessage::framework_id,
+          &KillTaskMessage::task_id);
+
+  install(F2M_FRAMEWORK_MESSAGE, &Master::schedulerMessage,
+          &FrameworkMessageMessage::slave_id,
+          &FrameworkMessageMessage::framework_id,
+          &FrameworkMessageMessage::executor_id,
+          &FrameworkMessageMessage::data);
+
+  install(F2M_STATUS_UPDATE_ACK, &Master::statusUpdateAck,
+          &StatusUpdateAckMessage::framework_id,
+          &StatusUpdateAckMessage::task_id,
+          &StatusUpdateAckMessage::slave_id);
+
+  install(S2M_REGISTER_SLAVE, &Master::registerSlave,
+          &RegisterSlaveMessage::slave);
+
+  install(S2M_REREGISTER_SLAVE, &Master::reregisterSlave,
+          &ReregisterSlaveMessage::slave_id,
+          &ReregisterSlaveMessage::slave,
+          &ReregisterSlaveMessage::tasks);
+
+  install(S2M_UNREGISTER_SLAVE, &Master::unregisterSlave,
+          &UnregisterSlaveMessage::slave_id);
+
+  install(S2M_STATUS_UPDATE, &Master::statusUpdate,
+          &StatusUpdateMessage::framework_id,
+          &StatusUpdateMessage::status);
+
+  install(S2M_FRAMEWORK_MESSAGE, &Master::executorMessage,
+          &FrameworkMessageMessage::slave_id,
+          &FrameworkMessageMessage::framework_id,
+          &FrameworkMessageMessage::executor_id,
+          &FrameworkMessageMessage::data);
+
+  install(S2M_EXITED_EXECUTOR, &Master::exitedExecutor,
+          &ExitedExecutorMessage::slave_id,
+          &ExitedExecutorMessage::framework_id,
+          &ExitedExecutorMessage::executor_id,
+          &ExitedExecutorMessage::result);
+
+  install(process::EXITED, &Master::exited);
+
+  // Install HTTP request handlers.
+  Process<Master>::install("vars", &Master::vars);
+  Process<Master>::install("stats", &Master::stats);
+}
+
+
+void Master::newMasterDetected(const string& pid)
+{
+  // Check and see if we are (1) still waiting to be the active
+  // master, (2) newly active master, (3) no longer active master,
+  // or (4) still active master.
+
+  UPID master(pid);
+
+  if (master != self() && !active) {
+    LOG(INFO) << "Waiting to be master!";
+  } else if (master == self() && !active) {
+    LOG(INFO) << "Acting as master!";
+    active = true;
+  } else if (master != self() && active) {
+    LOG(FATAL) << "No longer active master ... committing suicide!";
+  } else if (master == self() && active) {
+    LOG(INFO) << "Still acting as master!";
+  }
+}
+
+
+void Master::noMasterDetected()
+{
+  if (active) {
+    LOG(FATAL) << "No longer active master ... committing suicide!";
+  } else {
+    LOG(FATAL) << "No master detected (?) ... committing suicide!";
+  }
+}
+
+
+void Master::registerFramework(const FrameworkInfo& frameworkInfo)
+{
+  Framework* framework =
+    new Framework(frameworkInfo, newFrameworkId(), from(), elapsed());
+
+  LOG(INFO) << "Registering " << framework << " at " << from();
+
+  if (framework->info.executor().uri() == "") {
+    LOG(INFO) << framework << " registering without an executor URI";
+    MSG<M2F_ERROR> out;
+    out.set_code(1);
+    out.set_message("No executor URI given");
+    send(from(), out);
+    delete framework;
+  } else {
+    bool rootSubmissions = conf.get<bool>("root_submissions", true);
+    if (framework->info.user() == "root" && rootSubmissions == false) {
+      LOG(INFO) << framework << " registering as root, but "
+                << "root submissions are disabled on this cluster";
+      MSG<M2F_ERROR> out;
+      out.set_code(1);
+      out.set_message("User 'root' is not allowed to run frameworks");
+      send(from(), out);
+      delete framework;
     }
+  }
+
+  addFramework(framework);
+}
+
+
+void Master::reregisterFramework(const FrameworkID& frameworkId,
+                                 const FrameworkInfo& frameworkInfo,
+                                 int32_t generation)
+{
+  if (frameworkId == "") {
+    LOG(ERROR) << "Framework re-registering without an id!";
+    MSG<M2F_ERROR> out;
+    out.set_code(1);
+    out.set_message("Missing framework id");
+    send(from(), out);
+  } else if (frameworkInfo.executor().uri() == "") {
+    LOG(INFO) << "Framework " << frameworkId << " re-registering "
+              << "without an executor URI";
+    MSG<M2F_ERROR> out;
+    out.set_code(1);
+    out.set_message("No executor URI given");
+    send(from(), out);
+  } else {
+    LOG(INFO) << "Re-registering framework " << frameworkId
+              << " at " << from();
+
+    if (frameworks.count(frameworkId) > 0) {
+      // Using the "generation" of the scheduler allows us to keep a
+      // scheduler that got partitioned but didn't die (in ZooKeeper
+      // speak this means didn't lose their session) and then
+      // eventually tried to connect to this master even though
+      // another instance of their scheduler has reconnected. This
+      // might not be an issue in the future when the
+      // master/allocator launches the scheduler can get restarted
+      // (if necessary) by the master and the master will always
+      // know which scheduler is the correct one.
+      if (generation == 0) {
+        LOG(INFO) << "Framework " << frameworkId << " failed over";
+        failoverFramework(frameworks[frameworkId], from());
+        // TODO: Should we check whether the new scheduler has given
+        // us a different framework name, user name or executor info?
+      } else {
+        LOG(INFO) << "Framework " << frameworkId
+                  << " re-registering with an already used id "
+                  << " and not failing over!";
+        MSG<M2F_ERROR> out;
+        out.set_code(1);
+        out.set_message("Framework id in use");
+        send(from(), out);
+        return;
+      }
+    } else {
+      // We don't have a framework with this ID, so we must be a newly
+      // elected Mesos master to which either an existing scheduler or a
+      // failed-over one is connecting. Create a Framework object and add
+      // any tasks it has that have been reported by reconnecting slaves.
+      Framework* framework =
+        new Framework(frameworkInfo, frameworkId, from(), elapsed());
 
-    case F2M_SLOT_OFFER_REPLY: {
-      FrameworkID fid;
-      OfferID oid;
-      vector<TaskDescription> tasks;
-      Params params;
-      tie(fid, oid, tasks, params) = unpack<F2M_SLOT_OFFER_REPLY>(body());
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL) {
-        SlotOffer *offer = lookupSlotOffer(oid);
-        if (offer != NULL) {
-          processOfferReply(offer, tasks, params);
-        } else {
-          // The slot offer is gone, meaning that we rescinded it or that
-          // the slave was lost; immediately report any tasks in it as lost
-          foreach (const TaskDescription &t, tasks) {
-            send(framework->pid,
-                 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+      // TODO(benh): Check for root submissions like above!
+
+      addFramework(framework);
+      // Add any running tasks reported by slaves for this framework.
+      foreachpair (const SlaveID& slaveId, Slave* slave, slaves) {
+        foreachpair (_, Task* task, slave->tasks) {
+          if (framework->frameworkId == task->framework_id()) {
+            framework->addTask(task);
           }
         }
       }
-      break;
     }
 
-    case F2M_REVIVE_OFFERS: {
-      FrameworkID fid;
-      tie(fid) = unpack<F2M_REVIVE_OFFERS>(body());
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL) {
-        LOG(INFO) << "Reviving offers for " << framework;
-        framework->slaveFilter.clear();
-        allocator->offersRevived(framework);
-      }
-      break;
+    CHECK(frameworks.count(frameworkId) > 0);
+
+    // Broadcast the new framework pid to all the slaves. We have to
+    // broadcast because an executor might be running on a slave but
+    // it currently isn't running any tasks. This could be a
+    // potential scalability issue ...
+    foreachpair (_, Slave* slave, slaves) {
+      MSG<M2S_UPDATE_FRAMEWORK> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.set_pid(from());
+      send(slave->pid, out);
     }
+  }
+}
 
-    case F2M_KILL_TASK: {
-      FrameworkID fid;
-      TaskID tid;
-      tie(fid, tid) = unpack<F2M_KILL_TASK>(body());
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL) {
-        Task *task = framework->lookupTask(tid);
-        if (task != NULL) {
-          LOG(INFO) << "Asked to kill " << task << " by its framework";
-          killTask(task);
-	} else {
-	  LOG(INFO) << "Asked to kill UNKNOWN task by its framework";
-	  send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
-        }
-      }
-      break;
+
+void Master::unregisterFramework(const FrameworkID& frameworkId)
+{
+  LOG(INFO) << "Asked to unregister framework " << frameworkId;
+
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    if (framework->pid == from()) {
+      removeFramework(framework);
+    } else {
+      LOG(WARNING) << from() << " tried to unregister framework; "
+                   << "expecting " << framework->pid;
     }
+  }
+}
+
 
-    case F2M_FRAMEWORK_MESSAGE: {
-      FrameworkID fid;
-      FrameworkMessage message;
-      tie(fid, message) = unpack<F2M_FRAMEWORK_MESSAGE>(body());
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL) {
-        Slave *slave = lookupSlave(message.slaveId);
-        if (slave != NULL)
-          send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
+void Master::resourceOfferReply(const FrameworkID& frameworkId,
+                                const OfferID& offerId,
+                                const vector<TaskDescription>& tasks,
+                                const Params& params)
+{
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    SlotOffer* offer = lookupSlotOffer(offerId);
+    if (offer != NULL) {
+      processOfferReply(offer, tasks, params);
+    } else {
+      // The slot offer is gone, meaning that we rescinded it, it
+      // has already been replied to, or that the slave was lost;
+      // immediately report any tasks in it as lost (it would
+      // probably be better to have better error messages here).
+      foreach (const TaskDescription& task, tasks) {
+        MSG<M2F_STATUS_UPDATE> out;
+        out.mutable_framework_id()->MergeFrom(frameworkId);
+        TaskStatus* status = out.mutable_status();
+        status->mutable_task_id()->MergeFrom(task.task_id());
+        status->mutable_slave_id()->MergeFrom(task.slave_id());
+        status->set_state(TASK_LOST);
+        send(framework->pid, out);
       }
-      break;
     }
+  }
+}
 
-    case S2M_REGISTER_SLAVE: {
-      string slaveId = masterId + "-" + lexical_cast<string>(nextSlaveId++);
-      Slave *slave = new Slave(from(), slaveId, elapsed());
-      tie(slave->hostname, slave->publicDns, slave->resources) =
-        unpack<S2M_REGISTER_SLAVE>(body());
-      LOG(INFO) << "Registering " << slave << " at " << slave->pid;
-      slaves[slave->id] = slave;
-      pidToSid[slave->pid] = slave->id;
-      link(slave->pid);
-      send(slave->pid,
-	   pack<M2S_REGISTER_REPLY>(slave->id, HEARTBEAT_INTERVAL));
-      allocator->slaveAdded(slave);
-      break;
-    }
 
-    case S2M_REREGISTER_SLAVE: {
-      Slave *slave = new Slave(from(), "", elapsed());
-      vector<Task> tasks;
-      tie(slave->id, slave->hostname, slave->publicDns,
-          slave->resources, tasks) = unpack<S2M_REREGISTER_SLAVE>(body());
-
-      if (slave->id == "") {
-        slave->id = masterId + "-" + lexical_cast<string>(nextSlaveId++);
-        LOG(ERROR) << "Slave re-registered without a SlaveID, "
-                   << "generating a new id for it.";
-      }
+void Master::reviveOffers(const FrameworkID& frameworkId)
+{
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    LOG(INFO) << "Reviving offers for " << framework;
+    framework->slaveFilter.clear();
+    allocator->offersRevived(framework);
+  }
+}
 
-      LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
-      slaves[slave->id] = slave;
-      pidToSid[slave->pid] = slave->id;
-      link(slave->pid);
-      send(slave->pid,
-           pack<M2S_REREGISTER_REPLY>(slave->id, HEARTBEAT_INTERVAL));
-
-      allocator->slaveAdded(slave);
-
-      foreach (const Task &t, tasks) {
-        Task *task = new Task(t);
-        slave->addTask(task);
-
-        // Tell this slave the current framework pid for this task.
-        Framework *framework = lookupFramework(task->frameworkId);
-        if (framework != NULL) {
-          framework->addTask(task);
-          send(slave->pid, pack<M2S_UPDATE_FRAMEWORK_PID>(framework->id,
-                                                          framework->pid));
-        }
-      }
 
-      // TODO(benh|alig): We should put a timeout on how long we keep
-      // tasks running that never have frameworks reregister that
-      // claim them.
+void Master::killTask(const FrameworkID& frameworkId,
+                      const TaskID& taskId)
+{
+  LOG(INFO) << "Asked to kill task " << taskId
+            << " of framework " << frameworkId;
 
-      break;
-    }
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    Task* task = framework->lookupTask(taskId);
+    if (task != NULL) {
+      Slave* slave = lookupSlave(task->slave_id());
+      CHECK(slave != NULL);
 
-    case S2M_UNREGISTER_SLAVE: {
-      SlaveID sid;
-      tie(sid) = unpack<S2M_UNREGISTER_SLAVE>(body());
-      LOG(INFO) << "Asked to unregister slave " << sid;
-      Slave *slave = lookupSlave(sid);
-      if (slave != NULL)
-        removeSlave(slave);
-      break;
+      LOG(INFO) << "Telling slave " << slave->slaveId
+                << " to kill task " << taskId
+                << " of framework " << frameworkId;
+
+      MSG<M2S_KILL_TASK> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_task_id()->MergeFrom(taskId);
+      send(slave->pid, out);
+    } else {
+      LOG(WARNING) << "Cannot kill task " << taskId
+                   << " of framework " << frameworkId
+                   << " because it cannot be found";
+      MSG<M2F_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      TaskStatus *status = out.mutable_status();
+      status->mutable_task_id()->MergeFrom(taskId);
+      status->mutable_slave_id()->set_value("UNKNOWN");
+      status->set_state(TASK_LOST);
+      send(framework->pid, out);
     }
+  }
+}
 
-    case S2M_STATUS_UPDATE: {
-      SlaveID sid;
-      FrameworkID fid;
-      TaskID tid;
-      TaskState state;
-      string data;
-      tie(sid, fid, tid, state, data) = unpack<S2M_STATUS_UPDATE>(body());
-
-      if (Slave *slave = lookupSlave(sid)) {
-        if (Framework *framework = lookupFramework(fid)) {
-	  // Pass on the (transformed) status update to the framework.
-          forward(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
-          if (duplicate()) {
-            LOG(WARNING) << "Locally ignoring duplicate message with id:" << seq();
-            break;
-          }
-          // Update the task state locally.
-          Task *task = slave->lookupTask(fid, tid);
-          if (task != NULL) {
-            LOG(INFO) << "Status update: " << task << " is in state " << state;
-            task->state = state;
-            // Remove the task if it finished or failed
-            if (state == TASK_FINISHED || state == TASK_FAILED ||
-                state == TASK_KILLED || state == TASK_LOST) {
-              LOG(INFO) << "Removing " << task << " because it's done";
-              removeTask(task, TRR_TASK_ENDED);
-            }
-          }
-        } else {
-          LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup "
-                     << "framework id " << fid;
-        }
-      } else {
-        LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup slave id "
-                   << sid;
-      }
-      break;
+
+void Master::schedulerMessage(const SlaveID& slaveId,
+			      const FrameworkID& frameworkId,
+			      const ExecutorID& executorId,
+                              const string& data)
+{
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    Slave* slave = lookupSlave(slaveId);
+    if (slave != NULL) {
+      LOG(INFO) << "Sending framework message for framework "
+                << frameworkId << " to slave " << slaveId;
+      MSG<M2S_FRAMEWORK_MESSAGE> out;
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_executor_id()->MergeFrom(executorId);
+      out.set_data(data);
+      send(slave->pid, out);
+    } else {
+      LOG(WARNING) << "Cannot send framework message for framework "
+                   << frameworkId << " to slave " << slaveId
+                   << " because slave does not exist";
     }
+  } else {
+    LOG(WARNING) << "Cannot send framework message for framework "
+                 << frameworkId << " to slave " << slaveId
+                 << " because framework does not exist";
+  }
+}
 
-    case S2M_FRAMEWORK_MESSAGE: {
-      SlaveID sid;
-      FrameworkID fid;
-      FrameworkMessage message;
-      tie(sid, fid, message) = unpack<S2M_FRAMEWORK_MESSAGE>(body());
-      Slave *slave = lookupSlave(sid);
-      if (slave != NULL) {
-        Framework *framework = lookupFramework(fid);
-        if (framework != NULL)
-          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
-      }
-      break;
+
+void Master::statusUpdateAck(const FrameworkID& frameworkId,
+                             const TaskID& taskId,
+                             const SlaveID& slaveId)
+{
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    Slave* slave = lookupSlave(slaveId);
+    if (slave != NULL) {
+      LOG(INFO) << "Sending slave " << slaveId
+                << " status update acknowledgement for task " << taskId
+                << " of framework " << frameworkId;
+      MSG<M2S_STATUS_UPDATE_ACK> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_task_id()->MergeFrom(taskId);
+      send(slave->pid, out);
+    } else {
+      LOG(WARNING) << "Cannot tell slave " << slaveId
+                   << " of status update acknowledgement for task " << taskId
+                   << " of framework " << frameworkId
+                   << " because slave does not exist";
     }
+  } else {
+    LOG(WARNING) << "Cannot tell slave " << slaveId
+                 << " of status update acknowledgement for task " << taskId
+                 << " of framework " << frameworkId
+                 << " because framework does not exist";
+  }
+}
 
-    case S2M_LOST_EXECUTOR: {
-      SlaveID sid;
-      FrameworkID fid;
-      int32_t status;
-      tie(sid, fid, status) = unpack<S2M_LOST_EXECUTOR>(body());
-      Slave *slave = lookupSlave(sid);
-      if (slave != NULL) {
-        Framework *framework = lookupFramework(fid);
-        if (framework != NULL) {
-          // TODO(benh): Send the framework it's executor's exit status?
-          if (status == -1) {
-            LOG(INFO) << "Executor on " << slave << " (" << slave->hostname
-                      << ") disconnected";
-          } else {
-            LOG(INFO) << "Executor on " << slave << " (" << slave->hostname
-                      << ") exited with status " << status;
-          }
 
-          // Collect all the lost tasks for this framework.
-          set<Task*> tasks;
-          foreachpair (_, Task* task, framework->tasks)
-            if (task->slaveId == slave->id)
-              tasks.insert(task);
-
-          // Tell the framework they have been lost and remove them.
-          foreach (Task* task, tasks) {
-            send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
-                                                         task->message));
+void Master::registerSlave(const SlaveInfo& slaveInfo)
+{
+  Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsed());
+
+  LOG(INFO) << "Attempting to register slave " << slave->slaveId
+            << " at " << slave->pid;
+
+  struct SlaveRegistrar
+  {
+    static bool run(Slave* slave, const PID<Master>& master)
+    {
+      // TODO(benh): Do a reverse lookup to ensure IP maps to
+      // hostname, or check credentials of this slave.
+      process::dispatch(master, &Master::addSlave, slave);
+    }
+
+    static bool run(Slave* slave,
+                    const PID<Master>& master,
+                    const PID<SlavesManager>& slavesManager)
+    {
+      if (!process::call(slavesManager, &SlavesManager::add,
+                         slave->info.hostname(), slave->pid.port)) {
+        LOG(WARNING) << "Could not register slave because failed"
+                     << " to add it to the slaves maanger";
+        delete slave;
+        return false;
+      }
+
+      return run(slave, master);
+    }
+  };
+
+  // Check whether this slave can be accepted, or if all slaves are accepted.
+  if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
+    process::run(&SlaveRegistrar::run, slave, self());
+  } else if (conf.get<string>("slaves", "*") == "*") {
+    process::run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
+  } else {
+    LOG(WARNING) << "Cannot register slave at "
+                 << slaveInfo.hostname() << ":" << from().port
+                 << " because not in allocated set of slaves!";
+    send(from(), process::TERMINATE);
+  }
+}
+
+
+void Master::reregisterSlave(const SlaveID& slaveId,
+                             const SlaveInfo& slaveInfo,
+                             const vector<Task>& tasks)
+{
+  if (slaveId == "") {
+    LOG(ERROR) << "Slave re-registered without an id!";
+    send(from(), process::TERMINATE);
+  } else {
+    Slave* slave = lookupSlave(slaveId);
+    if (slave != NULL) {
+      // TODO(benh): It's still unclear whether or not
+      // MasterDetector::detectMaster will cause spurious
+      // Slave::newMasterDetected to get invoked even though the
+      // ephemeral znode hasn't changed. If that does happen, the
+      // re-register that the slave is trying to do is just
+      // bogus. Letting it re-register might not be all that bad now,
+      // but maybe in the future it's bad because during that
+      // "disconnected" time it might not have received certain
+      // messages from us (like launching a task), and so until we
+      // have some form of task reconciliation between all the
+      // different components, the safe thing to do is have the slave
+      // restart (kind of defeats the purpose of session expiration
+      // support in ZooKeeper if the spurious calls happen each time).
+      LOG(ERROR) << "Slave at " << from()
+		 << " attempted to re-register with an already in use id ("
+		 << slaveId << ")";
+      send(from(), process::TERMINATE);
+    } else {
+      Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsed());
+
+      LOG(INFO) << "Attempting to re-register slave " << slave->slaveId
+                << " at " << slave->pid;
+
+      struct SlaveReregistrar
+      {
+        static bool run(Slave* slave,
+                        const vector<Task>& tasks,
+                        const PID<Master>& master)
+        {
+          // TODO(benh): Do a reverse lookup to ensure IP maps to
+          // hostname, or check credentials of this slave.
+          process::dispatch(master, &Master::readdSlave, slave, tasks);
+        }
 
-            LOG(INFO) << "Removing " << task << " because of lost executor";
-            removeTask(task, TRR_EXECUTOR_LOST);
+        static bool run(Slave* slave,
+                        const vector<Task>& tasks,
+                        const PID<Master>& master,
+                        const PID<SlavesManager>& slavesManager)
+        {
+          if (!process::call(slavesManager, &SlavesManager::add,
+                             slave->info.hostname(), slave->pid.port)) {
+            LOG(WARNING) << "Could not register slave because failed"
+                         << " to add it to the slaves maanger";
+            delete slave;
+            return false;
           }
 
-          // TODO(benh): Might we still want something like M2F_EXECUTOR_LOST?
+          return run(slave, tasks, master);
         }
-      }
-      break;
-    }
+      };
 
-    case SH2M_HEARTBEAT: {
-      SlaveID sid;
-      tie(sid) = unpack<SH2M_HEARTBEAT>(body());
-      Slave *slave = lookupSlave(sid);
-      if (slave != NULL) {
-        slave->lastHeartbeat = elapsed();
+      // Check whether this slave can be accepted, or if all slaves are accepted.
+      if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
+        process::run(&SlaveReregistrar::run, slave, tasks, self());
+      } else if (conf.get<string>("slaves", "*") == "*") {
+        process::run(&SlaveReregistrar::run,
+                     slave, tasks, self(), slavesManager->self());
       } else {
-        LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
-                     << " from " << from();
+        LOG(WARNING) << "Cannot re-register slave at "
+                     << slaveInfo.hostname() << ":" << from().port
+                     << " because not in allocated set of slaves!";
+        send(from(), process::TERMINATE);
       }
-      break;
     }
+  }
+}
 
-    case M2M_TIMER_TICK: {
-      unordered_map<SlaveID, Slave *> slavesCopy = slaves;
-      foreachpair (_, Slave *slave, slavesCopy) {
-	if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
-	  LOG(INFO) << slave << " missing heartbeats ... considering disconnected";
-	  removeSlave(slave);
+
+void Master::unregisterSlave(const SlaveID& slaveId)
+{
+  LOG(INFO) << "Asked to unregister slave " << slaveId;
+
+  // TODO(benh): Check that only the slave is asking to unregister?
+
+  Slave* slave = lookupSlave(slaveId);
+  if (slave != NULL) {
+    removeSlave(slave);
+  }
+}
+
+
+void Master::statusUpdate(const FrameworkID& frameworkId,
+                          const TaskStatus& status)
+{
+  LOG(INFO) << "Status update: task " << status.task_id()
+            << " of framework " << frameworkId
+            << " is now in state "
+            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+
+  Slave* slave = lookupSlave(status.slave_id());
+  if (slave != NULL) {
+    Framework* framework = lookupFramework(frameworkId);
+    if (framework != NULL) {
+      // Pass on the (transformed) status update to the framework.
+      MSG<M2F_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_status()->MergeFrom(status);
+      send(framework->pid, out);
+
+      // Lookup the task and see if we need to update anything locally.
+      Task* task = slave->lookupTask(frameworkId, status.task_id());
+      if (task != NULL) {
+        task->set_state(status.state());
+
+        // Remove the task if necessary, and update statistics.
+	switch (status.state()) {
+	  case TASK_FINISHED:
+	    statistics.finished_tasks++;
+	    removeTask(task, TRR_TASK_ENDED);
+	    break;
+	  case TASK_FAILED:
+	    statistics.failed_tasks++;
+	    removeTask(task, TRR_TASK_ENDED);
+	    break;
+	  case TASK_KILLED:
+	    statistics.killed_tasks++;
+	    removeTask(task, TRR_TASK_ENDED);
+	    break;
+	  case TASK_LOST:
+	    statistics.lost_tasks++;
+	    removeTask(task, TRR_TASK_ENDED);
+	    break;
 	}
+
+	statistics.valid_status_updates++;
+      } else {
+        LOG(WARNING) << "Status update error: couldn't lookup "
+                     << "task " << status.task_id();
+	statistics.invalid_status_updates++;
+      }
+    } else {
+      LOG(WARNING) << "Status update error: couldn't lookup "
+                   << "framework " << frameworkId;
+      statistics.invalid_status_updates++;
+    }
+  } else {
+    LOG(WARNING) << "Status update error: couldn't lookup slave "
+                 << status.slave_id();
+    statistics.invalid_status_updates++;
+  }
+}
+
+
+void Master::executorMessage(const SlaveID& slaveId,
+			     const FrameworkID& frameworkId,
+			     const ExecutorID& executorId,
+                             const string& data)
+{
+  Slave* slave = lookupSlave(slaveId);
+  if (slave != NULL) {
+    Framework* framework = lookupFramework(frameworkId);
+    if (framework != NULL) {
+      LOG(INFO) << "Sending framework message from slave " << slaveId
+                << " to framework " << frameworkId;
+      MSG<M2S_FRAMEWORK_MESSAGE> out;
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_executor_id()->MergeFrom(executorId);
+      out.set_data(data);
+      send(framework->pid, out);
+
+      statistics.valid_framework_messages++;
+    } else {
+      LOG(WARNING) << "Cannot send framework message from slave "
+                   << slaveId << " to framework " << frameworkId
+                   << " because framework does not exist";
+      statistics.invalid_framework_messages++;
+    }
+  } else {
+    LOG(WARNING) << "Cannot send framework message from slave "
+                 << slaveId << " to framework " << frameworkId
+                 << " because slave does not exist";
+    statistics.invalid_framework_messages++;
+  }
+}
+
+
+void Master::exitedExecutor(const SlaveID& slaveId,
+                            const FrameworkID& frameworkId,
+                            const ExecutorID& executorId,
+                            int32_t result)
+{
+  Slave* slave = lookupSlave(slaveId);
+  if (slave != NULL) {
+    Framework* framework = lookupFramework(frameworkId);
+    if (framework != NULL) {
+      LOG(INFO) << "Executor " << executorId
+                << " of framework " << framework->frameworkId
+                << " on slave " << slave->slaveId
+                << " (" << slave->info.hostname() << ") "
+                << "exited with result " << result;
+
+      // Tell the framework which tasks have been lost.
+      foreachpaircopy (_, Task* task, framework->tasks) {
+        if (task->slave_id() == slave->slaveId &&
+            task->executor_id() == executorId) {
+          MSG<M2F_STATUS_UPDATE> out;
+          out.mutable_framework_id()->MergeFrom(task->framework_id());
+          TaskStatus* status = out.mutable_status();
+          status->mutable_task_id()->MergeFrom(task->task_id());
+          status->mutable_slave_id()->MergeFrom(task->slave_id());
+          status->set_state(TASK_LOST);
+          send(framework->pid, out);
+
+          LOG(INFO) << "Removing task " << task->task_id()
+                    << " of framework " << frameworkId
+                    << " because of lost executor";
+
+          removeTask(task, TRR_EXECUTOR_LOST);
+        }
       }
 
-      // Check which framework filters can be expired.
-      foreachpair (_, Framework *framework, frameworks)
-        framework->removeExpiredFilters(elapsed());
-
-      // Do allocations!
-      allocator->timerTick();
-
-      // int cnts = 0;
-      // foreachpair(_, Framework *framework, frameworks) {
-      // 	VLOG(1) << (cnts++) << " resourceInUse:" << framework->resources;
-      // }
-      break;
+      // TODO(benh): Send the framework it's executor's exit
+      // status? Or maybe at least have something like
+      // M2F_EXECUTOR_LOST?
     }
+  }
+}
+
 
-    case M2M_FRAMEWORK_EXPIRED: {
-      FrameworkID fid;
-      tie(fid) = unpack<M2M_FRAMEWORK_EXPIRED>(body());
-      if (Framework *framework = lookupFramework(fid)) {
-	LOG(INFO) << "Framework failover timer expired, removing framework "
-		  << framework;
-	removeFramework(framework);
+void Master::activatedSlaveHostnamePort(const string& hostname, uint16_t port)
+{
+  LOG(INFO) << "Master now considering a slave at "
+            << hostname << ":" << port << " as active";
+  slaveHostnamePorts.insert(hostname, port);
+}
+
+
+void Master::deactivatedSlaveHostnamePort(const string& hostname, uint16_t port)
+{
+  if (slaveHostnamePorts.count(hostname, port) > 0) {
+    // Look for a connected slave and remove it.
+    foreachpair (_, Slave* slave, slaves) {
+      if (slave->info.hostname() == hostname && slave->pid.port == port) {
+        LOG(WARNING) << "Removing slave " << slave->slaveId << " at "
+		     << hostname << ":" << port
+                     << " because it has been deactivated";
+	send(slave->pid, process::TERMINATE);
+        removeSlave(slave);
+        break;
       }
-      break;
     }
 
-    case PROCESS_EXIT: {
-      // TODO(benh): Could we get PROCESS_EXIT from a network partition?
-      LOG(INFO) << "Process exited: " << from();
-      if (pidToFid.find(from()) != pidToFid.end()) {
-        FrameworkID fid = pidToFid[from()];
-        if (Framework *framework = lookupFramework(fid)) {
-          LOG(INFO) << framework << " disconnected";
-//   	  framework->failoverTimer = new FrameworkFailoverTimer(self(), fid);
-//   	  link(spawn(framework->failoverTimer));
-	  removeFramework(framework);
-        }
-      } else if (pidToSid.find(from()) != pidToSid.end()) {
-        SlaveID sid = pidToSid[from()];
-        if (Slave *slave = lookupSlave(sid)) {
-          LOG(INFO) << slave << " disconnected";
-          removeSlave(slave);
-        }
-      } else {
-	foreachpair (_, Framework *framework, frameworks) {
-	  if (framework->failoverTimer != NULL &&
-	      framework->failoverTimer->self() == from()) {
-	    LOG(INFO) << "Lost framework failover timer, removing framework "
-		      << framework;
-	    removeFramework(framework);
-	    break;
-	  }
-	}
+    LOG(INFO) << "Master now considering a slave at "
+	      << hostname << ":" << port << " as inactive";
+    slaveHostnamePorts.erase(hostname, port);
+  }
+}
+
+
+void Master::timerTick()
+{
+  // Check which framework filters can be expired.
+  foreachpair (_, Framework* framework, frameworks) {
+    framework->removeExpiredFilters(elapsed());
+  }
+
+  // Do allocations!
+  allocator->timerTick();
+}
+
+
+void Master::frameworkExpired(const FrameworkID& frameworkId)
+{
+  Framework* framework = lookupFramework(frameworkId);
+  if (framework != NULL) {
+    LOG(INFO) << "Framework failover timer expired, removing "
+              << framework;
+    removeFramework(framework);
+  }
+}
+
+
+void Master::exited()
+{
+  // TODO(benh): Could we get PROCESS_EXIT from a network partition?
+  LOG(INFO) << "Process exited: " << from();
+  if (pidToFrameworkId.count(from()) > 0) {
+    const FrameworkID& frameworkId = pidToFrameworkId[from()];
+    Framework* framework = lookupFramework(frameworkId);
+    if (framework != NULL) {
+      LOG(INFO) << "Framework " << frameworkId << " disconnected";
+
+      // Stop sending offers here for now.
+      framework->active = false;
+
+      // Remove the framework's slot offers.
+      foreachcopy (SlotOffer* offer, framework->slotOffers) {
+        removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
       }
-      break;
-    }
 
-    case M2M_GET_STATE: {
-      send(from(), pack<M2M_GET_STATE_REPLY>(getState()));
-      break;
+      framework->failoverTimer =
+        new FrameworkFailoverTimer(self(), frameworkId);
+      link(spawn(framework->failoverTimer));
+//       removeFramework(framework);
     }
-    
-    case M2M_SHUTDOWN: {
-      LOG(INFO) << "Asked to shut down by " << from();
-      foreachpair (_, Slave *slave, slaves)
-        send(slave->pid, pack<M2S_SHUTDOWN>());
-      return;
+  } else if (pidToSlaveId.count(from()) > 0) {
+    const SlaveID& slaveId = pidToSlaveId[from()];
+    Slave* slave = lookupSlave(slaveId);
+    if (slave != NULL) {
+      LOG(INFO) << slave << " disconnected";
+      removeSlave(slave);
     }
-
-    default:
-      LOG(ERROR) << "Received unknown MSGID " << msgid() << " from " << from();
-      break;
+  } else {
+    foreachpair (_, Framework* framework, frameworks) {
+      if (framework->failoverTimer != NULL &&
+          framework->failoverTimer->self() == from()) {
+        LOG(ERROR) << "Bad framework failover timer, removing "
+                   << framework;
+        removeFramework(framework);
+        break;
+      }
     }
   }
 }
 
 
-OfferID Master::makeOffer(Framework *framework,
+Promise<HttpResponse> Master::vars(const HttpRequest& request)
+{
+  LOG(INFO) << "Request for 'vars'";
+
+  ostringstream out;
+
+  out <<
+    "build_date " << build::DATE << "\n" <<
+    "build_user " << build::USER << "\n" <<
+    "build_flags " << build::FLAGS << "\n" <<
+    "frameworks_count " << frameworks.size() << "\n";
+
+  // Also add the configuration values.
+  foreachpair (const string& key, const string& value, conf.getMap()) {
+    out << key << " " << value << "\n";
+  }
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/plain";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Master::stats(const HttpRequest& request)
+{
+  LOG(INFO) << "Request for 'stats'";
+
+  ostringstream out;
+
+  out <<
+    "{" <<
+    "\"total_schedulers\":" << frameworks.size() << "," <<
+    "\"active_schedulers\":" << getActiveFrameworks().size() << "," <<
+    "\"activated_slaves\":" << slaveHostnamePorts.size() << "," <<
+    "\"connected_slaves\":" << slaves.size() << "," <<
+    "\"launched_tasks\":" << statistics.launched_tasks << "," <<
+    "\"finished_tasks\":" << statistics.finished_tasks << "," <<
+    "\"killed_tasks\":" << statistics.killed_tasks << "," <<
+    "\"failed_tasks\":" << statistics.failed_tasks << "," <<
+    "\"lost_tasks\":" << statistics.lost_tasks << "," <<
+    "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
+    "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
+    "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
+    "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
+    "}";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+OfferID Master::makeOffer(Framework* framework,
                           const vector<SlaveResources>& resources)
 {
-  OfferID oid = masterId + "-" + lexical_cast<string>(nextSlotOfferId++);
+  const OfferID& offerId = newOfferId();
+
+  SlotOffer* offer = new SlotOffer(offerId, framework->frameworkId, resources);
 
-  SlotOffer *offer = new SlotOffer(oid, framework->id, resources);
-  slotOffers[offer->id] = offer;
+  slotOffers[offer->offerId] = offer;
   framework->addOffer(offer);
+
+  // Update the resource information within each of the slave objects. Gross!
   foreach (const SlaveResources& r, resources) {
     r.slave->slotOffers.insert(offer);
     r.slave->resourcesOffered += r.resources;
   }
-  LOG(INFO) << "Sending " << offer << " to " << framework;
-  vector<SlaveOffer> offers;
-  map<SlaveID, PID> pids;
+
+  LOG(INFO) << "Sending offer " << offer->offerId
+            << " to framework " << framework->frameworkId;
+
+  MSG<M2F_RESOURCE_OFFER> out;
+  out.mutable_offer_id()->MergeFrom(offerId);
+
   foreach (const SlaveResources& r, resources) {
-    Params params;
-    params.set("cpus", r.resources.cpus);
-    params.set("mem", r.resources.mem);
-    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
-    offers.push_back(offer);
-    pids[r.slave->id] = r.slave->pid;
+    SlaveOffer* offer = out.add_offers();
+    offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
+    offer->set_hostname(r.slave->info.hostname());
+    offer->mutable_resources()->MergeFrom(r.resources);
+
+    out.add_pids(r.slave->pid);
   }
-  send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers, pids));
-  return oid;
+
+  send(framework->pid, out);
+
+  return offerId;
 }
 
 
 // Process a resource offer reply (for a non-cancelled offer) by launching
 // the desired tasks (if the offer contains a valid set of tasks) and
-// reporting any unused resources to the allocator
-void Master::processOfferReply(SlotOffer *offer,
-    const vector<TaskDescription>& tasks, const Params& params)
+// reporting any unused resources to the allocator.
+void Master::processOfferReply(SlotOffer* offer,
+                               const vector<TaskDescription>& tasks,
+                               const Params& params)
 {
   LOG(INFO) << "Received reply for " << offer;
 
-  Framework *framework = lookupFramework(offer->frameworkId);
+  Framework* framework = lookupFramework(offer->frameworkId);
   CHECK(framework != NULL);
 
-  // Count resources in the offer
-  unordered_map<Slave *, Resources> offerResources;
-  foreach (SlaveResources &r, offer->resources) {
-    offerResources[r.slave] = r.resources;
-  }
-
-  // Count resources in the response, and check that its tasks are valid
-  unordered_map<Slave *, Resources> responseResources;
-  foreach (const TaskDescription &t, tasks) {
-    // Check whether this task size is valid
-    Params params(t.params);
-    Resources res(params.getInt32("cpus", -1),
-                  params.getInt32("mem", -1));
-    if (res.cpus < MIN_CPUS || res.mem < MIN_MEM || 
-        res.cpus > MAX_CPUS || res.mem > MAX_MEM) {
-      terminateFramework(framework, 0,
-          "Invalid task size: " + lexical_cast<string>(res));
+  // Count resources in the offer.
+  unordered_map<Slave*, Resources> resourcesOffered;
+  foreach (const SlaveResources& r, offer->resources) {
+    resourcesOffered[r.slave] = r.resources;
+  }
+
+  // Count used resources and check that its tasks are valid.
+  unordered_map<Slave*, Resources> resourcesUsed;
+  foreach (const TaskDescription& task, tasks) {
+    // Check whether the task is on a valid slave.
+    Slave* slave = lookupSlave(task.slave_id());
+    if (slave == NULL || resourcesOffered.count(slave) == 0) {
+      terminateFramework(framework, 0, "Invalid slave in offer reply");
       return;
     }
-    // Check whether the task is on a valid slave
-    Slave *slave = lookupSlave(t.slaveId);
-    if (!slave || offerResources.find(slave) == offerResources.end()) {
-      terminateFramework(framework, 0, "Invalid slave in offer reply");
+
+    // Check whether or not the resources for the task are valid.
+    // TODO(benh): In the future maybe we can also augment the
+    // protobuf to deal with fragmentation purposes by providing some
+    // sort of minimum amount of resources required per task.
+
+    if (task.resources().size() == 0) {
+      terminateFramework(framework, 0, "Invalid resources for task");
       return;
     }
-    responseResources[slave] += res;
+
+    foreach (const Resource& resource, task.resources()) {
+      if (!Resources::isAllocatable(resource)) {
+        // TODO(benh): Also send back the invalid resources as a string?
+        terminateFramework(framework, 0, "Invalid resources for task");
+        return;
+      }
+    }
+
+    resourcesUsed[slave] += task.resources();
   }
 
-  // Check that the total accepted on each slave isn't more than offered
-  foreachpair (Slave *s, Resources& respRes, responseResources) {
-    Resources &offRes = offerResources[s];
-    if (respRes.cpus > offRes.cpus || respRes.mem > offRes.mem) {
+  // Check that the total accepted on each slave isn't more than offered.
+  foreachpair (Slave* slave, const Resources& used, resourcesUsed) {
+    if (!(used <= resourcesOffered[slave])) {
       terminateFramework(framework, 0, "Too many resources accepted");
       return;
     }
   }
 
-  // Check that there are no duplicate task IDs
+  // Check that there are no duplicate task IDs.
   unordered_set<TaskID> idsInResponse;
-  foreach (const TaskDescription &t, tasks) {
-    if (framework->tasks.find(t.taskId) != framework->tasks.end() ||
-        idsInResponse.find(t.taskId) != idsInResponse.end()) {
-      terminateFramework(framework, 0,
-          "Duplicate task ID: " + lexical_cast<string>(t.taskId));
+  foreach (const TaskDescription& task, tasks) {
+    if (framework->tasks.count(task.task_id()) > 0 ||
+        idsInResponse.count(task.task_id()) > 0) {
+      terminateFramework(framework, 0, "Duplicate task ID: " +
+                         lexical_cast<string>(task.task_id()));
       return;
     }
-    idsInResponse.insert(t.taskId);
+    idsInResponse.insert(task.task_id());
   }
 
-  // Launch the tasks in the response
-  foreach (const TaskDescription &t, tasks) {
-    launchTask(framework, t);
-  }
-
-  // If there are resources left on some slaves, add filters for them
-  vector<SlaveResources> resourcesLeft;
-  int timeout = params.getInt32("timeout", DEFAULT_REFUSAL_TIMEOUT);
-  double expiry = (timeout == -1) ? 0 : elapsed() + timeout;
-  foreachpair (Slave *s, Resources offRes, offerResources) {
-    Resources respRes = responseResources[s];
-    Resources left = offRes - respRes;
-    if (left.cpus > 0 || left.mem > 0) {
-      resourcesLeft.push_back(SlaveResources(s, left));
-    }
-    if (timeout != 0 && respRes.cpus == 0 && respRes.mem == 0) {
-      LOG(INFO) << "Adding filter on " << s << " to " << framework
-                << " for  " << timeout << " seconds";
-      framework->slaveFilter[s] = expiry;
-    }
+  // Launch the tasks in the response.
+  foreach (const TaskDescription& task, tasks) {
+    launchTask(framework, task);
   }
-  
-  // Return the resources left to the allocator
-  removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesLeft);
-}
 
+  // Get out the timeout for left over resources (if exists), and use
+  // that to calculate the expiry timeout.
+  int timeout = DEFAULT_REFUSAL_TIMEOUT;
 
-void Master::launchTask(Framework *framework, const TaskDescription& t)
-{
-  Params params(t.params);
-  Resources res(params.getInt32("cpus", -1),
-                params.getInt32("mem", -1));
+  for (int i = 0; i < params.param_size(); i++) {
+    if (params.param(i).key() == "timeout") {
+      timeout = lexical_cast<int>(params.param(i).value());
+      break;
+    }
+  }
 
-  // The invariant right now is that launchTask is called only for
-  // TaskDescriptions where the slave is still valid (see the code
-  // above in processOfferReply).
-  Slave *slave = lookupSlave(t.slaveId);
-  CHECK(slave != NULL);
+  double expiry = (timeout == -1) ? 0 : elapsed() + timeout;  
 
-  Task *task = new Task(t.taskId, framework->id, res, TASK_STARTING,
-                        t.name, "", slave->id);
+  // Now check for unused resources on slaves and add filters for them.
+  vector<SlaveResources> resourcesUnused;
 
-  framework->addTask(task);
-  slave->addTask(task);
+  foreachpair (Slave* slave, const Resources& offered, resourcesOffered) {
+    Resources used = resourcesUsed[slave];
+    Resources unused = offered - used;
 
-  allocator->taskAdded(task);
+    CHECK(used == used.allocatable());
 
-  LOG(INFO) << "Launching " << task << " on " << slave;
-  send(slave->pid, pack<M2S_RUN_TASK>(
-        framework->id, t.taskId, framework->name, framework->user,
-        framework->executorInfo, t.name, t.arg, t.params, framework->pid));
-}
+    Resources allocatable = unused.allocatable();
 
+    if (allocatable.size() > 0) {
+      resourcesUnused.push_back(SlaveResources(slave, allocatable));
+    }
 
-void Master::rescindOffer(SlotOffer *offer)
-{
-  removeSlotOffer(offer, ORR_OFFER_RESCINDED, offer->resources);
+    // Only add a filter on a slave if none of the resources are used.
+    if (timeout != 0 && used.size() == 0) {
+      LOG(INFO) << "Adding filter on " << slave << " to " << framework
+                << " for " << timeout << " seconds";
+      framework->slaveFilter[slave] = expiry;
+    }
+  }
+  
+  // Return the resources left to the allocator.
+  removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
 }
 
 
-void Master::killTask(Task *task)
+void Master::launchTask(Framework* framework, const TaskDescription& task)
 {
-  LOG(INFO) << "Killing " << task;
-  Framework *framework = lookupFramework(task->frameworkId);
-  Slave *slave = lookupSlave(task->slaveId);
-  CHECK(framework != NULL);
+  // The invariant right now is that launchTask is called only for
+  // TaskDescriptions where the slave is still valid (see the code
+  // above in processOfferReply).
+  Slave* slave = lookupSlave(task.slave_id());
   CHECK(slave != NULL);
-  send(slave->pid, pack<M2S_KILL_TASK>(framework->id, task->id));
+
+  // Determine the executor ID for this task.
+  const ExecutorID& executorId = task.has_executor()
+    ? task.executor().executor_id()
+    : framework->info.executor().executor_id();
+
+  Task* t = new Task();
+  t->mutable_framework_id()->MergeFrom(framework->frameworkId);
+  t->mutable_executor_id()->MergeFrom(executorId);
+  t->set_state(TASK_STARTING);
+  t->set_name(task.name());
+  t->mutable_task_id()->MergeFrom(task.task_id());
+  t->mutable_slave_id()->MergeFrom(task.slave_id());
+  t->mutable_resources()->MergeFrom(task.resources());
+
+  framework->addTask(t);
+  slave->addTask(t);
+
+  allocator->taskAdded(t);
+
+  LOG(INFO) << "Launching " << t << " on " << slave;
+
+  MSG<M2S_RUN_TASK> out;
+  out.mutable_framework()->MergeFrom(framework->info);
+  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+  out.set_pid(framework->pid);
+  out.mutable_task()->MergeFrom(task);
+  send(slave->pid, out);
+
+  statistics.launched_tasks++;
 }
 
 
 // Terminate a framework, sending it a particular error message
 // TODO: Make the error codes and messages programmer-friendly
-void Master::terminateFramework(Framework *framework,
+void Master::terminateFramework(Framework* framework,
                                 int32_t code,
-                                const std::string& message)
+                                const string& message)
 {
   LOG(INFO) << "Terminating " << framework << " due to error: " << message;
-  send(framework->pid, pack<M2F_ERROR>(code, message));
+
+  MSG<M2F_ERROR> out;
+  out.set_code(code);
+  out.set_message(message);
+  send(framework->pid, out);
+
   removeFramework(framework);
 }
 
 
 // Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeSlotOffer(SlotOffer *offer,
+void Master::removeSlotOffer(SlotOffer* offer,
                              OfferReturnReason reason,
-                             const vector<SlaveResources>& resourcesLeft)
+                             const vector<SlaveResources>& resourcesUnused)
 {
-  // Remove from slaves
+  // Remove from slaves.
   foreach (SlaveResources& r, offer->resources) {
     CHECK(r.slave != NULL);
     r.slave->resourcesOffered -= r.resources;
@@ -923,30 +1483,35 @@ void Master::removeSlotOffer(SlotOffer *
   Framework *framework = lookupFramework(offer->frameworkId);
   CHECK(framework != NULL);
   framework->removeOffer(offer);
+
   // Also send framework a rescind message unless the reason we are
   // removing the offer is that the framework replied to it
   if (reason != ORR_FRAMEWORK_REPLIED) {
-    send(framework->pid, pack<M2F_RESCIND_OFFER>(offer->id));
+    MSG<M2F_RESCIND_OFFER> out;
+    out.mutable_offer_id()->MergeFrom(offer->offerId);
+    send(framework->pid, out);
   }
   
-  // Tell the allocator about the resources freed up
-  allocator->offerReturned(offer, reason, resourcesLeft);
+  // Tell the allocator about the unused resources.
+  allocator->offerReturned(offer, reason, resourcesUnused);
   
   // Delete it
-  slotOffers.erase(offer->id);
+  slotOffers.erase(offer->offerId);
   delete offer;
 }
 
 
-void Master::addFramework(Framework *framework)
+void Master::addFramework(Framework* framework)
 {
-  CHECK(frameworks.find(framework->id) == frameworks.end());
+  CHECK(frameworks.count(framework->frameworkId) == 0);
 
-  frameworks[framework->id] = framework;
-  pidToFid[framework->pid] = framework->id;
+  frameworks[framework->frameworkId] = framework;
+  pidToFrameworkId[framework->pid] = framework->frameworkId;
   link(framework->pid);
 
-  send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
+  MSG<M2F_REGISTER_REPLY> out;
+  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+  send(framework->pid, out);
 
   allocator->frameworkAdded(framework);
 }
@@ -954,51 +1519,68 @@ void Master::addFramework(Framework *fra
 
 // Replace the scheduler for a framework with a new process ID, in the
 // event of a scheduler failover.
-void Master::failoverFramework(Framework *framework, const PID &newPid)
+void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
-  PID oldPid = framework->pid;
+  const UPID& oldPid = framework->pid;
 
-  // Remove the framework's slot offers.
+  // Remove the framework's slot offers (if they weren't removed before)..
   // TODO(benh): Consider just reoffering these to the new framework.
-  unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
-  foreach (SlotOffer* offer, slotOffersCopy) {
+  foreachcopy (SlotOffer* offer, framework->slotOffers) {
     removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
   }
 
-  send(oldPid, pack<M2F_ERROR>(1, "Framework failover"));
+  MSG<M2F_ERROR> out;
+  out.set_code(1);
+  out.set_message("Framework failover");
+  send(oldPid, out);
+
+  // TODO(benh): unlink(oldPid);
+  pidToFrameworkId.erase(oldPid);
+  pidToFrameworkId[newPid] = framework->frameworkId;
 
-  // TODO(benh): unlink(old->pid);
-  pidToFid.erase(oldPid);
-  pidToFid[newPid] = framework->id;
   framework->pid = newPid;
   link(newPid);
 
-  send(newPid, pack<M2F_REGISTER_REPLY>(framework->id));
+  // Kill the failover timer.
+  if (framework->failoverTimer != NULL) {
+    process::post(framework->failoverTimer->self(), process::TERMINATE);
+    process::wait(framework->failoverTimer->self());
+    delete framework->failoverTimer;
+    framework->failoverTimer = NULL;
+  }
+
+  // Make sure we can get offers again.
+  framework->active = true;
+
+  MSG<M2F_REGISTER_REPLY> reply;
+  reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
+  send(newPid, reply);
 }
 
 
 // Kill all of a framework's tasks, delete the framework object, and
 // reschedule slot offers for slots that were assigned to this framework
-void Master::removeFramework(Framework *framework)
+void Master::removeFramework(Framework* framework)
 { 
   framework->active = false;
   // TODO: Notify allocator that a framework removal is beginning?
   
   // Tell slaves to kill the framework
-  foreachpair (_, Slave *slave, slaves)
-    send(slave->pid, pack<M2S_KILL_FRAMEWORK>(framework->id));
+  foreachpair (_, Slave *slave, slaves) {
+    MSG<M2S_KILL_FRAMEWORK> out;
+    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+    send(slave->pid, out);
+  }
 
   // Remove pointers to the framework's tasks in slaves
-  unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
-  foreachpair (_, Task *task, tasksCopy) {
-    Slave *slave = lookupSlave(task->slaveId);
+  foreachpaircopy (_, Task *task, framework->tasks) {
+    Slave *slave = lookupSlave(task->slave_id());
     CHECK(slave != NULL);
     removeTask(task, TRR_FRAMEWORK_LOST);
   }
   
-  // Remove the framework's slot offers
-  unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
-  foreach (SlotOffer* offer, slotOffersCopy) {
+  // Remove the framework's slot offers (if they weren't removed before).
+  foreachcopy (SlotOffer* offer, framework->slotOffers) {
     removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
   }
 
@@ -1006,25 +1588,88 @@ void Master::removeFramework(Framework *
   // failoverFramework needs to be shared!
 
   // TODO(benh): unlink(framework->pid);
-  pidToFid.erase(framework->pid);
+  pidToFrameworkId.erase(framework->pid);
 
-  // Delete it
-  frameworks.erase(framework->id);
+  // Delete it.
+  frameworks.erase(framework->frameworkId);
   allocator->frameworkRemoved(framework);
   delete framework;
 }
 
 
+void Master::addSlave(Slave* slave)
+{
+  CHECK(slave != NULL);
+
+  slaves[slave->slaveId] = slave;
+  pidToSlaveId[slave->pid] = slave->slaveId;
+  link(slave->pid);
+
+  allocator->slaveAdded(slave);
+
+  MSG<M2S_REGISTER_REPLY> out;
+  out.mutable_slave_id()->MergeFrom(slave->slaveId);
+  send(slave->pid, out);
+
+  // TODO(benh):
+  //     // Ask the slaves manager to monitor this slave for us.
+  //     process::dispatch(slavesManager->self(), &SlavesManager::monitor,
+  //                       slave->pid, slave->info, slave->slaveId);
+
+  // Set up an observer for the slave.
+  slave->observer = new SlaveObserver(slave->pid, slave->info,
+                                      slave->slaveId, slavesManager->self());
+  process::spawn(slave->observer);
+}
+
+
+void Master::readdSlave(Slave* slave, const vector<Task>& tasks)
+{
+  CHECK(slave != NULL);
+
+  addSlave(slave);
+
+  for (int i = 0; i < tasks.size(); i++) {
+    Task* task = new Task(tasks[i]);
+
+    // Add the task to the slave.
+    slave->addTask(task);
+
+    // Try and add the task to the framework too, but since the
+    // framework might not yet be connected we won't be able to
+    // add them. However, when the framework connects later we
+    // will add them then. We also tell this slave the current
+    // framework pid for this task. Again, we do the same thing
+    // if a framework currently isn't registered.
+    Framework* framework = lookupFramework(task->framework_id());
+    if (framework != NULL) {
+      framework->addTask(task);
+      MSG<M2S_UPDATE_FRAMEWORK> out;
+      out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+      out.set_pid(framework->pid);
+      send(slave->pid, out);
+    } else {
+      // TODO(benh): We should really put a timeout on how long we
+      // keep tasks running on a slave that never have frameworks
+      // reregister and claim them.
+      LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+                   << " of framework " << task->framework_id()
+                   << " running on slave " << slave->slaveId;
+    }
+  }
+}
+
+
 // Lose all of a slave's tasks and delete the slave object
-void Master::removeSlave(Slave *slave)
+void Master::removeSlave(Slave* slave)
 { 
   slave->active = false;
+
   // TODO: Notify allocator that a slave removal is beginning?
   
   // Remove pointers to slave's tasks in frameworks, and send status updates
-  unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
-  foreachpair (_, Task *task, tasksCopy) {
-    Framework *framework = lookupFramework(task->frameworkId);
+  foreachpaircopy (_, Task* task, slave->tasks) {
+    Framework *framework = lookupFramework(task->framework_id());
     // A framework might not actually exist because the master failed
     // over and the framework hasn't reconnected. This can be a tricky
     // situation for frameworks that want to have high-availability,
@@ -1033,18 +1678,23 @@ void Master::removeSlave(Slave *slave)
     // want to do is create a local Framework object to represent that
     // framework until it fails over. See the TODO above in
     // S2M_REREGISTER_SLAVE.
-    if (framework != NULL)
-      send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
-						   task->message));
+    if (framework != NULL) {
+      MSG<M2F_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(task->framework_id());
+      TaskStatus* status = out.mutable_status();
+      status->mutable_task_id()->MergeFrom(task->task_id());
+      status->mutable_slave_id()->MergeFrom(task->slave_id());
+      status->set_state(TASK_LOST);
+      send(framework->pid, out);
+    }
     removeTask(task, TRR_SLAVE_LOST);
   }
 
   // Remove slot offers from the slave; this will also rescind them
-  unordered_set<SlotOffer *> slotOffersCopy = slave->slotOffers;
-  foreach (SlotOffer *offer, slotOffersCopy) {
+  foreachcopy (SlotOffer* offer, slave->slotOffers) {
     // Only report resources on slaves other than this one to the allocator
     vector<SlaveResources> otherSlaveResources;
-    foreach (SlaveResources& r, offer->resources) {
+    foreach (const SlaveResources& r, offer->resources) {
       if (r.slave != slave) {
         otherSlaveResources.push_back(r);
       }
@@ -1053,32 +1703,46 @@ void Master::removeSlave(Slave *slave)
   }
   
   // Remove slave from any filters
-  foreachpair (_, Framework *framework, frameworks)
+  foreachpair (_, Framework* framework, frameworks) {
     framework->slaveFilter.erase(slave);
+  }
   
   // Send lost-slave message to all frameworks (this helps them re-run
   // previously finished tasks whose output was on the lost slave)
-  foreachpair (_, Framework *framework, frameworks)
-    send(framework->pid, pack<M2F_LOST_SLAVE>(slave->id));
+  foreachpair (_, Framework* framework, frameworks) {
+    MSG<M2F_LOST_SLAVE> out;
+    out.mutable_slave_id()->MergeFrom(slave->slaveId);
+    send(framework->pid, out);
+  }
+
+  // TODO(benh):
+  //     // Tell the slaves manager to stop monitoring this slave for us.
+  //     process::dispatch(slavesManager->self(), &SlavesManager::forget,
+  //                       slave->pid, slave->info, slave->slaveId);
+
+  // Kill the slave observer.
+  process::post(slave->observer->self(), process::TERMINATE);
+  process::wait(slave->observer->self());
+  delete slave->observer;
 
   // TODO(benh): unlink(slave->pid);
-  pidToSid.erase(slave->pid);
+  pidToSlaveId.erase(slave->pid);
 
   // Delete it
-  slaves.erase(slave->id);
+  slaves.erase(slave->slaveId);
   allocator->slaveRemoved(slave);
   delete slave;
 }
 
 
 // Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(Task *task, TaskRemovalReason reason)
+void Master::removeTask(Task* task, TaskRemovalReason reason)
 {
-  Framework *framework = lookupFramework(task->frameworkId);
-  Slave *slave = lookupSlave(task->slaveId);
+  Framework* framework = lookupFramework(task->framework_id());
+  Slave* slave = lookupSlave(task->slave_id());
   CHECK(framework != NULL);
   CHECK(slave != NULL);
-  framework->removeTask(task->id);
+  framework->removeTask(task->task_id());
   slave->removeTask(task);
   allocator->taskRemoved(task, reason);
   delete task;
@@ -1097,14 +1761,31 @@ Allocator* Master::createAllocator()
 // and FWID is an increasing integer.
 FrameworkID Master::newFrameworkId()
 {
-  int fwId = nextFrameworkId++;
   ostringstream oss;
-  oss << masterId << "-" << setw(4) << setfill('0') << fwId;
-  return oss.str();
+  oss << masterId << "-" << setw(4) << setfill('0') << nextFrameworkId++;
+  FrameworkID frameworkId;
+  frameworkId.set_value(oss.str());
+  return frameworkId;
+}
+
+
+OfferID Master::newOfferId()
+{
+  OfferID offerId;
+  offerId.set_value(masterId + "-" + lexical_cast<string>(nextOfferId++));
+  return offerId;
+}
+
+
+SlaveID Master::newSlaveId()
+{
+  SlaveID slaveId;
+  slaveId.set_value(masterId + "-" + lexical_cast<string>(nextSlaveId++));
+  return slaveId;
 }
 
 
-const Params& Master::getConf()
+const Configuration& Master::getConfiguration()
 {
   return conf;
 }