You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC

svn commit: r1140024 [3/15] - in /incubator/mesos/trunk: ./ ec2/ ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/ include/mesos/ src/ src/common/ src/configurator/ src/detector/ src/examples/ src/examples/java/ src/examples/python/...

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Mon Jun 27 06:08:33 2011
@@ -1,17 +1,18 @@
-#include <iomanip>
 #include <fstream>
+#include <iomanip>
 #include <sstream>
 
 #include <glog/logging.h>
 
-#include <google/protobuf/descriptor.h>
-
 #include <process/run.hpp>
+#include <process/timer.hpp>
 
 #include "config/config.hpp"
 
 #include "common/build.hpp"
 #include "common/date_utils.hpp"
+#include "common/utils.hpp"
+#include "common/uuid.hpp"
 
 #include "allocator.hpp"
 #include "allocator_factory.hpp"
@@ -19,116 +20,10 @@
 #include "slaves_manager.hpp"
 #include "webui.hpp"
 
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::master;
-
-using boost::bad_lexical_cast;
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using process::HttpOKResponse;
-using process::HttpResponse;
-using process::HttpRequest;
-using process::PID;
-using process::Process;
-using process::Promise;
-using process::UPID;
-
-using std::endl;
-using std::map;
-using std::max;
-using std::ostringstream;
-using std::pair;
-using std::set;
-using std::setfill;
-using std::setw;
 using std::string;
 using std::vector;
 
-
-namespace {
-
-// A process that periodically pings the master to check filter
-// expiries, etc.
-class AllocatorTimer : public Process<AllocatorTimer>
-{
-public:
-  AllocatorTimer(const PID<Master>& _master) : master(_master) {}
-
-protected:
-  virtual void operator () ()
-  {
-    link(master);
-    while (true) {
-      receive(1);
-      if (name() == process::TIMEOUT) {
-        process::dispatch(master, &Master::timerTick);
-      } else if (name() == process::EXITED) {
-	return;
-      }
-    }
-  }
-
-private:
-  const PID<Master> master;
-};
-
-
-// A process that periodically prints frameworks' shares to a file
-class SharesPrinter : public Process<SharesPrinter>
-{
-public:
-  SharesPrinter(const PID<Master>& _master) : master(_master) {}
-  ~SharesPrinter() {}
-
-protected:
-  virtual void operator () ()
-  {
-    int tick = 0;
-
-    std::ofstream file ("/mnt/shares");
-    if (!file.is_open()) {
-      LOG(FATAL) << "Could not open /mnt/shares";
-    }
-
-    while (true) {
-      pause(1);
-
-      state::MasterState* state = call(master, &Master::getState);
-
-      uint32_t total_cpus = 0;
-      uint32_t total_mem = 0;
-
-      foreach (state::Slave* s, state->slaves) {
-        total_cpus += s->cpus;
-        total_mem += s->mem;
-      }
-      
-      if (state->frameworks.empty()) {
-        file << "--------------------------------" << endl;
-      } else {
-        foreach (state::Framework* f, state->frameworks) {
-          double cpu_share = f->cpus / (double) total_cpus;
-          double mem_share = f->mem / (double) total_mem;
-          double max_share = max(cpu_share, mem_share);
-          file << tick << "#" << f->id << "#" << f->name << "#" 
-               << f->cpus << "#" << f->mem << "#"
-               << cpu_share << "#" << mem_share << "#" << max_share << endl;
-        }
-      }
-      delete state;
-      tick++;
-    }
-    file.close();
-  }
-
-private:
-  const PID<Master> master;
-};
-
-} // namespace {
+using process::wait; // Necessary on some OS's to disambiguate.
 
 
 namespace mesos { namespace internal { namespace master {
@@ -140,8 +35,12 @@ public:
                 const SlaveInfo& _slaveInfo,
                 const SlaveID& _slaveId,
                 const PID<SlavesManager>& _slavesManager)
-    : slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId),
-      slavesManager(_slavesManager), timeouts(0), pinged(false) {}
+    : slave(_slave),
+      slaveInfo(_slaveInfo),
+      slaveId(_slaveId),
+      slavesManager(_slavesManager),
+      timeouts(0),
+      pinged(false) {}
 
   virtual ~SlaveObserver() {}
 
@@ -152,31 +51,31 @@ protected:
     // we don't hear a pong, increment the number of timeouts from the
     // slave and try and send another ping. If we eventually timeout too
     // many missed pongs in a row, consider the slave dead.
-    send(slave, PING);
+    send(slave, "PING");
     pinged = true;
 
     do {
       receive(SLAVE_PONG_TIMEOUT);
-      if (name() == PONG) {
+      if (name() == "PONG") {
         timeouts = 0;
         pinged = false;
-      } else if (name() == process::TIMEOUT) {
+      } else if (name() == TIMEOUT) {
         if (pinged) {
           timeouts++;
           pinged = false;
         }
 
-        send(slave, PING);
+        send(slave, "PING");
         pinged = true;
-      } else if (name() == process::TERMINATE) {
+      } else if (name() == TERMINATE) {
         return;
       } 
     } while (timeouts < MAX_SLAVE_TIMEOUTS);
 
     // Tell the slave manager to deactivate the slave, this will take
     // care of updating the master too.
-    while (!process::call(slavesManager, &SlavesManager::deactivate,
-                          slaveInfo.hostname(), slave.port)) {
+    while (!call(slavesManager, &SlavesManager::deactivate,
+                 slaveInfo.hostname(), slave.port)) {
       LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
       pause(5);
     }
@@ -191,18 +90,76 @@ private:
   bool pinged;
 };
 
-}}} // namespace mesos { namespace master { namespace internal {
+
+// Performs slave registration asynchronously. There are two means of
+// doing this, one first tries to add this slave to the slaves
+// manager, while the other one simply tells the master to add the
+// slave.
+struct SlaveRegistrar
+{
+  static bool run(Slave* slave, const PID<Master>& master)
+  {
+    // TODO(benh): Do a reverse lookup to ensure IP maps to
+    // hostname, or check credentials of this slave.
+    dispatch(master, &Master::addSlave, slave, false);
+  }
+
+  static bool run(Slave* slave,
+                  const PID<Master>& master,
+                  const PID<SlavesManager>& slavesManager)
+  {
+    if (!call(slavesManager, &SlavesManager::add,
+              slave->info.hostname(), slave->pid.port)) {
+      LOG(WARNING) << "Could not register slave because failed"
+                   << " to add it to the slaves maanger";
+      delete slave;
+      return false;
+    }
+
+    return run(slave, master);
+  }
+};
+
+
+// Performs slave re-registration asynchronously as above.
+struct SlaveReregistrar
+{
+  static bool run(Slave* slave,
+                  const vector<Task>& tasks,
+                  const PID<Master>& master)
+  {
+    // TODO(benh): Do a reverse lookup to ensure IP maps to
+    // hostname, or check credentials of this slave.
+    dispatch(master, &Master::readdSlave, slave, tasks);
+  }
+
+  static bool run(Slave* slave,
+                  const vector<Task>& tasks,
+                  const PID<Master>& master,
+                  const PID<SlavesManager>& slavesManager)
+  {
+    if (!call(slavesManager, &SlavesManager::add,
+              slave->info.hostname(), slave->pid.port)) {
+      LOG(WARNING) << "Could not register slave because failed"
+                   << " to add it to the slaves maanger";
+      delete slave;
+      return false;
+    }
+
+    return run(slave, tasks, master);
+  }
+};
 
 
 Master::Master()
-  : MesosProcess<Master>("master")
+  : ProcessBase("master")
 {
   initialize();
 }
 
 
 Master::Master(const Configuration& conf)
-  : MesosProcess<Master>("master"),
+  : ProcessBase("master"),
     conf(conf)
 {
   initialize();
@@ -213,34 +170,39 @@ Master::~Master()
 {
   LOG(INFO) << "Shutting down master";
 
-  foreachpaircopy (_, Framework* framework, frameworks) {
+  foreachvalue (Framework* framework, utils::copy(frameworks)) {
     removeFramework(framework);
   }
 
-  foreachpaircopy (_, Slave* slave, slaves) {
+  foreachvalue (Slave* slave, utils::copy(slaves)) {
     removeSlave(slave);
   }
 
-  delete allocator;
-
-  CHECK(slotOffers.size() == 0);
+  CHECK(offers.size() == 0);
 
-  process::post(slavesManager->self(), process::TERMINATE);
-  process::wait(slavesManager->self());
+  terminate(slavesManager);
+  wait(slavesManager);
 
   delete slavesManager;
+
+  delete allocator;
 }
 
 
 void Master::registerOptions(Configurator* configurator)
 {
   SlavesManager::registerOptions(configurator);
-  configurator->addOption<string>("allocator", 'a',
-                                  "Allocation module name",
-                                  "simple");
-  configurator->addOption<bool>("root_submissions",
-                                "Can root submit frameworks?",
-                                true);
+
+  configurator->addOption<string>(
+      "allocator",
+      'a',
+      "Allocation module name",
+      "simple");
+
+  configurator->addOption<bool>(
+      "root_submissions",
+      "Can root submit frameworks?",
+      true);
 }
 
 
@@ -249,7 +211,7 @@ Promise<state::MasterState*> Master::get
   state::MasterState* state =
     new state::MasterState(build::DATE, build::USER, self());
 
-  foreachpair (_, Slave* s, slaves) {
+  foreachvalue (Slave* s, slaves) {
     Resources resources(s->info.resources());
     Resource::Scalar cpus;
     Resource::Scalar mem;
@@ -259,14 +221,14 @@ Promise<state::MasterState*> Master::get
     mem = resources.getScalar("mem", mem);
 
     state::Slave* slave =
-      new state::Slave(s->slaveId.value(), s->info.hostname(),
+      new state::Slave(s->id.value(), s->info.hostname(),
                        s->info.public_hostname(), cpus.value(),
-                       mem.value(), s->connectTime);
+                       mem.value(), s->registeredTime);
 
     state->slaves.push_back(slave);
   }
 
-  foreachpair (_, Framework* f, frameworks) {
+  foreachvalue (Framework* f, frameworks) {
     Resources resources(f->resources);
     Resource::Scalar cpus;
     Resource::Scalar mem;
@@ -276,13 +238,13 @@ Promise<state::MasterState*> Master::get
     mem = resources.getScalar("mem", mem);
 
     state::Framework* framework =
-      new state::Framework(f->frameworkId.value(), f->info.user(),
+      new state::Framework(f->id.value(), f->info.user(),
                            f->info.name(), f->info.executor().uri(),
-                           cpus.value(), mem.value(), f->connectTime);
+                           cpus.value(), mem.value(), f->registeredTime);
 
     state->frameworks.push_back(framework);
 
-    foreachpair (_, Task* t, f->tasks) {
+    foreachvalue (Task* t, f->tasks) {
       Resources resources(t->resources());
       Resource::Scalar cpus;
       Resource::Scalar mem;
@@ -300,9 +262,9 @@ Promise<state::MasterState*> Master::get
       framework->tasks.push_back(task);
     }
 
-    foreach (SlotOffer* o, f->slotOffers) {
-      state::SlotOffer* offer =
-        new state::SlotOffer(o->offerId.value(), o->frameworkId.value());
+    foreach (Offer* o, f->offers) {
+      state::Offer* offer =
+        new state::Offer(o->id.value(), o->frameworkId.value());
 
       foreach (const SlaveResources& r, o->resources) {
         Resources resources(r.resources);
@@ -314,7 +276,7 @@ Promise<state::MasterState*> Master::get
         mem = resources.getScalar("mem", mem);
 
         state::SlaveResources* sr =
-          new state::SlaveResources(r.slave->slaveId.value(),
+          new state::SlaveResources(r.slave->id.value(),
                                     cpus.value(), mem.value());
 
         offer->resources.push_back(sr);
@@ -332,7 +294,7 @@ Promise<state::MasterState*> Master::get
 vector<Framework*> Master::getActiveFrameworks()
 {
   vector <Framework*> result;
-  foreachpair(_, Framework* framework, frameworks) {
+  foreachvalue (Framework* framework, frameworks) {
     if (framework->active) {
       result.push_back(framework);
     }
@@ -345,7 +307,7 @@ vector<Framework*> Master::getActiveFram
 vector<Slave*> Master::getActiveSlaves()
 {
   vector <Slave*> result;
-  foreachpair(_, Slave* slave, slaves) {
+  foreachvalue (Slave* slave, slaves) {
     if (slave->active) {
       result.push_back(slave);
     }
@@ -354,75 +316,48 @@ vector<Slave*> Master::getActiveSlaves()
 }
 
 
-Framework* Master::lookupFramework(const FrameworkID& frameworkId)
-{
-  if (frameworks.count(frameworkId) > 0) {
-    return frameworks[frameworkId];
-  } else {
-    return NULL;
-  }
-}
-
-
-Slave* Master::lookupSlave(const SlaveID& slaveId)
-{
-  if (slaves.count(slaveId) > 0) {
-    return slaves[slaveId];
-  } else {
-    return NULL;
-  }
-}
-
-
-SlotOffer* Master::lookupSlotOffer(const OfferID& offerId)
-{
-  if (slotOffers.count(offerId) > 0) {
-    return slotOffers[offerId];
-  } else {
-    return NULL;
-  }
-}
-
-
 void Master::operator () ()
 {
   LOG(INFO) << "Master started at mesos://" << self();
 
   // Don't do anything until we get a master token.
-  while (receive() != GOT_MASTER_TOKEN) {
+  while (receive() != GotMasterTokenMessage().GetTypeName()) {
     LOG(INFO) << "Oops! We're dropping a message since "
               << "we haven't received an identifier yet!";  
   }
 
-  MSG<GOT_MASTER_TOKEN> msg;
-  msg.ParseFromString(body());
+  GotMasterTokenMessage message;
+  message.ParseFromString(body());
 
   // The master ID is comprised of the current date and some ephemeral
   // token (e.g., determined by ZooKeeper).
 
-  masterId = DateUtils::currentDate() + "-" + msg.token();
+  masterId = DateUtils::currentDate() + "-" + message.token();
   LOG(INFO) << "Master ID: " << masterId;
 
   // Setup slave manager.
   slavesManager = new SlavesManager(conf, self());
-  process::spawn(slavesManager);
+  spawn(slavesManager);
 
   // Create the allocator (we do this after the constructor because it
   // leaks 'this').
-  allocator = createAllocator();
+  string type = conf.get("allocator", "simple");
+  LOG(INFO) << "Creating \"" << type << "\" allocator";
+  allocator = AllocatorFactory::instantiate(type, this);
+
   if (!allocator) {
-    LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
+    LOG(FATAL) << "Unrecognized allocator type: " << type;
   }
 
-  link(spawn(new AllocatorTimer(self())));
-  //link(spawn(new SharesPrinter(self())));
+  // Start our timer ticks.
+  delay(1.0, self(), &Master::timerTick);
 
   while (true) {
     serve();
-    if (name() == process::TERMINATE) {
+    if (name() == TERMINATE) {
       LOG(INFO) << "Asked to terminate by " << from();
-      foreachpair (_, Slave* slave, slaves) {
-        send(slave->pid, process::TERMINATE);
+      foreachvalue (Slave* slave, slaves) {
+        send(slave->pid, TERMINATE);
       }
       break;
     } else {
@@ -441,90 +376,102 @@ void Master::initialize()
   nextSlaveId = 0;
   nextOfferId = 0;
 
-  allocatorType = conf.get("allocator", "simple");
-
   // Start all the statistics at 0.
-  statistics.launched_tasks = 0;
-  statistics.finished_tasks = 0;
-  statistics.killed_tasks = 0;
-  statistics.failed_tasks = 0;
-  statistics.lost_tasks = 0;
-  statistics.valid_status_updates = 0;
-  statistics.invalid_status_updates = 0;
-  statistics.valid_framework_messages = 0;
-  statistics.invalid_framework_messages = 0;
+  CHECK(TASK_STARTING == TaskState_MIN);
+  CHECK(TASK_LOST == TaskState_MAX);
+  stats.tasks[TASK_STARTING] = 0;
+  stats.tasks[TASK_RUNNING] = 0;
+  stats.tasks[TASK_FINISHED] = 0;
+  stats.tasks[TASK_FAILED] = 0;
+  stats.tasks[TASK_KILLED] = 0;
+  stats.tasks[TASK_LOST] = 0;
+  stats.validStatusUpdates = 0;
+  stats.invalidStatusUpdates = 0;
+  stats.validFrameworkMessages = 0;
+  stats.invalidFrameworkMessages = 0;
 
   startTime = elapsedTime();
 
   // Install handler functions for certain messages.
-  install(NEW_MASTER_DETECTED, &Master::newMasterDetected,
-          &NewMasterDetectedMessage::pid);
-
-  install(NO_MASTER_DETECTED, &Master::noMasterDetected);
+  installProtobufHandler<NewMasterDetectedMessage>(
+      &Master::newMasterDetected,
+      &NewMasterDetectedMessage::pid);
+
+  installProtobufHandler<NoMasterDetectedMessage>(
+      &Master::noMasterDetected);
+
+  installProtobufHandler<RegisterFrameworkMessage>(
+      &Master::registerFramework,
+      &RegisterFrameworkMessage::framework);
+
+  installProtobufHandler<ReregisterFrameworkMessage>(
+      &Master::reregisterFramework,
+      &ReregisterFrameworkMessage::framework_id,
+      &ReregisterFrameworkMessage::framework,
+      &ReregisterFrameworkMessage::generation);
+
+  installProtobufHandler<UnregisterFrameworkMessage>(
+      &Master::unregisterFramework,
+      &UnregisterFrameworkMessage::framework_id);
+
+  installProtobufHandler<ResourceOfferReplyMessage>(
+      &Master::resourceOfferReply,
+      &ResourceOfferReplyMessage::framework_id,
+      &ResourceOfferReplyMessage::offer_id,
+      &ResourceOfferReplyMessage::tasks,
+      &ResourceOfferReplyMessage::params);
+
+  installProtobufHandler<ReviveOffersMessage>(
+      &Master::reviveOffers,
+      &ReviveOffersMessage::framework_id);
+
+  installProtobufHandler<KillTaskMessage>(
+      &Master::killTask,
+      &KillTaskMessage::framework_id,
+      &KillTaskMessage::task_id);
+
+  installProtobufHandler<FrameworkToExecutorMessage>(
+      &Master::schedulerMessage,
+      &FrameworkToExecutorMessage::slave_id,
+      &FrameworkToExecutorMessage::framework_id,
+      &FrameworkToExecutorMessage::executor_id,
+      &FrameworkToExecutorMessage::data);
+
+  installProtobufHandler<RegisterSlaveMessage>(
+      &Master::registerSlave,
+      &RegisterSlaveMessage::slave);
+
+  installProtobufHandler<ReregisterSlaveMessage>(
+      &Master::reregisterSlave,
+      &ReregisterSlaveMessage::slave_id,
+      &ReregisterSlaveMessage::slave,
+      &ReregisterSlaveMessage::tasks);
+
+  installProtobufHandler<UnregisterSlaveMessage>(
+      &Master::unregisterSlave,
+      &UnregisterSlaveMessage::slave_id);
+
+  installProtobufHandler<StatusUpdateMessage>(
+      &Master::statusUpdate,
+      &StatusUpdateMessage::update,
+      &StatusUpdateMessage::pid);
+
+  installProtobufHandler<ExecutorToFrameworkMessage>(
+      &Master::executorMessage,
+      &ExecutorToFrameworkMessage::slave_id,
+      &ExecutorToFrameworkMessage::framework_id,
+      &ExecutorToFrameworkMessage::executor_id,
+      &ExecutorToFrameworkMessage::data);
+
+  installProtobufHandler<ExitedExecutorMessage>(
+      &Master::exitedExecutor,
+      &ExitedExecutorMessage::slave_id,
+      &ExitedExecutorMessage::framework_id,
+      &ExitedExecutorMessage::executor_id,
+      &ExitedExecutorMessage::status);
 
-  install(F2M_REGISTER_FRAMEWORK, &Master::registerFramework,
-          &RegisterFrameworkMessage::framework);
-
-  install(F2M_REREGISTER_FRAMEWORK, &Master::reregisterFramework,
-          &ReregisterFrameworkMessage::framework_id,
-          &ReregisterFrameworkMessage::framework,
-          &ReregisterFrameworkMessage::generation);
-
-  install(F2M_UNREGISTER_FRAMEWORK, &Master::unregisterFramework,
-          &UnregisterFrameworkMessage::framework_id);
-
-  install(F2M_RESOURCE_OFFER_REPLY, &Master::resourceOfferReply,
-          &ResourceOfferReplyMessage::framework_id,
-          &ResourceOfferReplyMessage::offer_id,
-          &ResourceOfferReplyMessage::tasks,
-          &ResourceOfferReplyMessage::params);
-
-  install(F2M_REVIVE_OFFERS, &Master::reviveOffers,
-          &ReviveOffersMessage::framework_id);
-
-  install(F2M_KILL_TASK, &Master::killTask,
-          &KillTaskMessage::framework_id,
-          &KillTaskMessage::task_id);
-
-  install(F2M_FRAMEWORK_MESSAGE, &Master::schedulerMessage,
-          &FrameworkMessageMessage::slave_id,
-          &FrameworkMessageMessage::framework_id,
-          &FrameworkMessageMessage::executor_id,
-          &FrameworkMessageMessage::data);
-
-  install(F2M_STATUS_UPDATE_ACK, &Master::statusUpdateAck,
-          &StatusUpdateAckMessage::framework_id,
-          &StatusUpdateAckMessage::task_id,
-          &StatusUpdateAckMessage::slave_id);
-
-  install(S2M_REGISTER_SLAVE, &Master::registerSlave,
-          &RegisterSlaveMessage::slave);
-
-  install(S2M_REREGISTER_SLAVE, &Master::reregisterSlave,
-          &ReregisterSlaveMessage::slave_id,
-          &ReregisterSlaveMessage::slave,
-          &ReregisterSlaveMessage::tasks);
-
-  install(S2M_UNREGISTER_SLAVE, &Master::unregisterSlave,
-          &UnregisterSlaveMessage::slave_id);
-
-  install(S2M_STATUS_UPDATE, &Master::statusUpdate,
-          &StatusUpdateMessage::framework_id,
-          &StatusUpdateMessage::status);
-
-  install(S2M_FRAMEWORK_MESSAGE, &Master::executorMessage,
-          &FrameworkMessageMessage::slave_id,
-          &FrameworkMessageMessage::framework_id,
-          &FrameworkMessageMessage::executor_id,
-          &FrameworkMessageMessage::data);
-
-  install(S2M_EXITED_EXECUTOR, &Master::exitedExecutor,
-          &ExitedExecutorMessage::slave_id,
-          &ExitedExecutorMessage::framework_id,
-          &ExitedExecutorMessage::executor_id,
-          &ExitedExecutorMessage::result);
-
-  install(process::EXITED, &Master::exited);
+  // Install some message handlers.
+  installMessageHandler(EXITED, &Master::exited);
 
   // Install HTTP request handlers.
   installHttpHandler("info.json", &Master::http_info_json);
@@ -536,13 +483,13 @@ void Master::initialize()
 }
 
 
-void Master::newMasterDetected(const string& pid)
+void Master::newMasterDetected(const UPID& pid)
 {
   // Check and see if we are (1) still waiting to be the active
   // master, (2) newly active master, (3) no longer active master,
   // or (4) still active master.
 
-  UPID master(pid);
+  UPID master = pid;
 
   if (master != self() && !active) {
     LOG(INFO) << "Waiting to be master!";
@@ -576,20 +523,20 @@ void Master::registerFramework(const Fra
 
   if (framework->info.executor().uri() == "") {
     LOG(INFO) << framework << " registering without an executor URI";
-    MSG<M2F_ERROR> out;
-    out.set_code(1);
-    out.set_message("No executor URI given");
-    send(from(), out);
+    FrameworkErrorMessage message;
+    message.set_code(1);
+    message.set_message("No executor URI given");
+    send(from(), message);
     delete framework;
   } else {
     bool rootSubmissions = conf.get<bool>("root_submissions", true);
     if (framework->info.user() == "root" && rootSubmissions == false) {
       LOG(INFO) << framework << " registering as root, but "
                 << "root submissions are disabled on this cluster";
-      MSG<M2F_ERROR> out;
-      out.set_code(1);
-      out.set_message("User 'root' is not allowed to run frameworks");
-      send(from(), out);
+      FrameworkErrorMessage message;
+      message.set_code(1);
+      message.set_message("User 'root' is not allowed to run frameworks");
+      send(from(), message);
       delete framework;
     }
   }
@@ -604,17 +551,17 @@ void Master::reregisterFramework(const F
 {
   if (frameworkId == "") {
     LOG(ERROR) << "Framework re-registering without an id!";
-    MSG<M2F_ERROR> out;
-    out.set_code(1);
-    out.set_message("Missing framework id");
-    send(from(), out);
+    FrameworkErrorMessage message;
+    message.set_code(1);
+    message.set_message("Missing framework id");
+    send(from(), message);
   } else if (frameworkInfo.executor().uri() == "") {
     LOG(INFO) << "Framework " << frameworkId << " re-registering "
               << "without an executor URI";
-    MSG<M2F_ERROR> out;
-    out.set_code(1);
-    out.set_message("No executor URI given");
-    send(from(), out);
+    FrameworkErrorMessage message;
+    message.set_code(1);
+    message.set_message("No executor URI given");
+    send(from(), message);
   } else {
     LOG(INFO) << "Re-registering framework " << frameworkId
               << " at " << from();
@@ -630,18 +577,18 @@ void Master::reregisterFramework(const F
       // (if necessary) by the master and the master will always
       // know which scheduler is the correct one.
       if (generation == 0) {
-        LOG(INFO) << "Framework " << frameworkId << " failed over";
-        failoverFramework(frameworks[frameworkId], from());
         // TODO: Should we check whether the new scheduler has given
         // us a different framework name, user name or executor info?
+        LOG(INFO) << "Framework " << frameworkId << " failed over";
+        failoverFramework(frameworks[frameworkId], from());
       } else {
         LOG(INFO) << "Framework " << frameworkId
                   << " re-registering with an already used id "
                   << " and not failing over!";
-        MSG<M2F_ERROR> out;
-        out.set_code(1);
-        out.set_message("Framework id in use");
-        send(from(), out);
+        FrameworkErrorMessage message;
+        message.set_code(1);
+        message.set_message("Framework id in use");
+        send(from(), message);
         return;
       }
     } else {
@@ -657,8 +604,8 @@ void Master::reregisterFramework(const F
       addFramework(framework);
       // Add any running tasks reported by slaves for this framework.
       foreachpair (const SlaveID& slaveId, Slave* slave, slaves) {
-        foreachpair (_, Task* task, slave->tasks) {
-          if (framework->frameworkId == task->framework_id()) {
+        foreachvalue (Task* task, slave->tasks) {
+          if (framework->id == task->framework_id()) {
             framework->addTask(task);
           }
         }
@@ -671,11 +618,11 @@ void Master::reregisterFramework(const F
     // broadcast because an executor might be running on a slave but
     // it currently isn't running any tasks. This could be a
     // potential scalability issue ...
-    foreachpair (_, Slave* slave, slaves) {
-      MSG<M2S_UPDATE_FRAMEWORK> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.set_pid(from());
-      send(slave->pid, out);
+    foreachvalue (Slave* slave, slaves) {
+      UpdateFrameworkMessage message;
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.set_pid(from());
+      send(slave->pid, message);
     }
   }
 }
@@ -685,7 +632,7 @@ void Master::unregisterFramework(const F
 {
   LOG(INFO) << "Asked to unregister framework " << frameworkId;
 
-  Framework* framework = lookupFramework(frameworkId);
+  Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
     if (framework->pid == from()) {
       removeFramework(framework);
@@ -702,9 +649,9 @@ void Master::resourceOfferReply(const Fr
                                 const vector<TaskDescription>& tasks,
                                 const Params& params)
 {
-  Framework* framework = lookupFramework(frameworkId);
+  Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    SlotOffer* offer = lookupSlotOffer(offerId);
+    Offer* offer = getOffer(offerId);
     if (offer != NULL) {
       processOfferReply(offer, tasks, params);
     } else {
@@ -713,13 +660,17 @@ void Master::resourceOfferReply(const Fr
       // immediately report any tasks in it as lost (it would
       // probably be better to have better error messages here).
       foreach (const TaskDescription& task, tasks) {
-        MSG<M2F_STATUS_UPDATE> out;
-        out.mutable_framework_id()->MergeFrom(frameworkId);
-        TaskStatus* status = out.mutable_status();
+        StatusUpdateMessage message;
+        StatusUpdate* update = message.mutable_update();
+        update->mutable_framework_id()->MergeFrom(frameworkId);
+        update->mutable_executor_id()->MergeFrom(task.executor().executor_id());
+        update->mutable_slave_id()->MergeFrom(task.slave_id());
+        TaskStatus* status = update->mutable_status();
         status->mutable_task_id()->MergeFrom(task.task_id());
-        status->mutable_slave_id()->MergeFrom(task.slave_id());
         status->set_state(TASK_LOST);
-        send(framework->pid, out);
+        update->set_timestamp(elapsedTime());
+	update->set_uuid(UUID::random().toBytes());
+        send(framework->pid, message);
       }
     }
   }
@@ -728,7 +679,7 @@ void Master::resourceOfferReply(const Fr
 
 void Master::reviveOffers(const FrameworkID& frameworkId)
 {
-  Framework* framework = lookupFramework(frameworkId);
+  Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
     LOG(INFO) << "Reviving offers for " << framework;
     framework->slaveFilter.clear();
@@ -743,32 +694,40 @@ void Master::killTask(const FrameworkID&
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
 
-  Framework* framework = lookupFramework(frameworkId);
+  Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    Task* task = framework->lookupTask(taskId);
+    Task* task = framework->getTask(taskId);
     if (task != NULL) {
-      Slave* slave = lookupSlave(task->slave_id());
+      Slave* slave = getSlave(task->slave_id());
       CHECK(slave != NULL);
 
-      LOG(INFO) << "Telling slave " << slave->slaveId
+      LOG(INFO) << "Telling slave " << slave->id
                 << " to kill task " << taskId
                 << " of framework " << frameworkId;
 
-      MSG<M2S_KILL_TASK> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_task_id()->MergeFrom(taskId);
-      send(slave->pid, out);
+      KillTaskMessage message;
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_task_id()->MergeFrom(taskId);
+      send(slave->pid, message);
     } else {
+      // TODO(benh): Once the scheduler has persistance and
+      // high-availability of it's tasks, it will be the one that
+      // determines that this invocation of 'killTask' is silly, and
+      // can just return "locally" (i.e., after hitting only the other
+      // replicas). Unfortunately, it still won't know the slave id.
+
       LOG(WARNING) << "Cannot kill task " << taskId
                    << " of framework " << frameworkId
                    << " because it cannot be found";
-      MSG<M2F_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      TaskStatus *status = out.mutable_status();
+      StatusUpdateMessage message;
+      StatusUpdate* update = message.mutable_update();
+      update->mutable_framework_id()->MergeFrom(frameworkId);
+      TaskStatus* status = update->mutable_status();
       status->mutable_task_id()->MergeFrom(taskId);
-      status->mutable_slave_id()->set_value("UNKNOWN");
       status->set_state(TASK_LOST);
-      send(framework->pid, out);
+      update->set_timestamp(elapsedTime());
+      update->set_uuid(UUID::random().toBytes());
+      send(framework->pid, message);
     }
   }
 }
@@ -779,58 +738,32 @@ void Master::schedulerMessage(const Slav
 			      const ExecutorID& executorId,
                               const string& data)
 {
-  Framework* framework = lookupFramework(frameworkId);
+  Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    Slave* slave = lookupSlave(slaveId);
+    Slave* slave = getSlave(slaveId);
     if (slave != NULL) {
       LOG(INFO) << "Sending framework message for framework "
                 << frameworkId << " to slave " << slaveId;
-      MSG<M2S_FRAMEWORK_MESSAGE> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_data(data);
-      send(slave->pid, out);
+
+      FrameworkToExecutorMessage message;
+      message.mutable_slave_id()->MergeFrom(slaveId);
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_executor_id()->MergeFrom(executorId);
+      message.set_data(data);
+      send(slave->pid, message);
+
+      stats.validFrameworkMessages++;
     } else {
       LOG(WARNING) << "Cannot send framework message for framework "
                    << frameworkId << " to slave " << slaveId
                    << " because slave does not exist";
+      stats.invalidFrameworkMessages++;
     }
   } else {
     LOG(WARNING) << "Cannot send framework message for framework "
                  << frameworkId << " to slave " << slaveId
                  << " because framework does not exist";
-  }
-}
-
-
-void Master::statusUpdateAck(const FrameworkID& frameworkId,
-                             const TaskID& taskId,
-                             const SlaveID& slaveId)
-{
-  Framework* framework = lookupFramework(frameworkId);
-  if (framework != NULL) {
-    Slave* slave = lookupSlave(slaveId);
-    if (slave != NULL) {
-      LOG(INFO) << "Sending slave " << slaveId
-                << " status update acknowledgement for task " << taskId
-                << " of framework " << frameworkId;
-      MSG<M2S_STATUS_UPDATE_ACK> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_task_id()->MergeFrom(taskId);
-      send(slave->pid, out);
-    } else {
-      LOG(WARNING) << "Cannot tell slave " << slaveId
-                   << " of status update acknowledgement for task " << taskId
-                   << " of framework " << frameworkId
-                   << " because slave does not exist";
-    }
-  } else {
-    LOG(WARNING) << "Cannot tell slave " << slaveId
-                 << " of status update acknowledgement for task " << taskId
-                 << " of framework " << frameworkId
-                 << " because framework does not exist";
+    stats.invalidFrameworkMessages++;
   }
 }
 
@@ -839,44 +772,19 @@ void Master::registerSlave(const SlaveIn
 {
   Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsedTime());
 
-  LOG(INFO) << "Attempting to register slave " << slave->slaveId
+  LOG(INFO) << "Attempting to register slave " << slave->id
             << " at " << slave->pid;
 
-  struct SlaveRegistrar
-  {
-    static bool run(Slave* slave, const PID<Master>& master)
-    {
-      // TODO(benh): Do a reverse lookup to ensure IP maps to
-      // hostname, or check credentials of this slave.
-      process::dispatch(master, &Master::addSlave, slave);
-    }
-
-    static bool run(Slave* slave,
-                    const PID<Master>& master,
-                    const PID<SlavesManager>& slavesManager)
-    {
-      if (!process::call(slavesManager, &SlavesManager::add,
-                         slave->info.hostname(), slave->pid.port)) {
-        LOG(WARNING) << "Could not register slave because failed"
-                     << " to add it to the slaves maanger";
-        delete slave;
-        return false;
-      }
-
-      return run(slave, master);
-    }
-  };
-
-  // Check whether this slave can be accepted, or if all slaves are accepted.
+  // Checks if this slave, or if all slaves, can be accepted.
   if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
-    process::run(&SlaveRegistrar::run, slave, self());
+    run(&SlaveRegistrar::run, slave, self());
   } else if (conf.get<string>("slaves", "*") == "*") {
-    process::run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
+    run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
   } else {
     LOG(WARNING) << "Cannot register slave at "
                  << slaveInfo.hostname() << ":" << from().port
                  << " because not in allocated set of slaves!";
-    send(from(), process::TERMINATE);
+    send(from(), TERMINATE);
   }
 }
 
@@ -887,9 +795,9 @@ void Master::reregisterSlave(const Slave
 {
   if (slaveId == "") {
     LOG(ERROR) << "Slave re-registered without an id!";
-    send(from(), process::TERMINATE);
+    send(from(), TERMINATE);
   } else {
-    Slave* slave = lookupSlave(slaveId);
+    Slave* slave = getSlave(slaveId);
     if (slave != NULL) {
       // TODO(benh): It's still unclear whether or not
       // MasterDetector::detectMaster will cause spurious
@@ -907,52 +815,24 @@ void Master::reregisterSlave(const Slave
       LOG(ERROR) << "Slave at " << from()
 		 << " attempted to re-register with an already in use id ("
 		 << slaveId << ")";
-      send(from(), process::TERMINATE);
+      send(from(), TERMINATE);
     } else {
       Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsedTime());
 
-      LOG(INFO) << "Attempting to re-register slave " << slave->slaveId
+      LOG(INFO) << "Attempting to re-register slave " << slave->id
                 << " at " << slave->pid;
 
-      struct SlaveReregistrar
-      {
-        static bool run(Slave* slave,
-                        const vector<Task>& tasks,
-                        const PID<Master>& master)
-        {
-          // TODO(benh): Do a reverse lookup to ensure IP maps to
-          // hostname, or check credentials of this slave.
-          process::dispatch(master, &Master::readdSlave, slave, tasks);
-        }
-
-        static bool run(Slave* slave,
-                        const vector<Task>& tasks,
-                        const PID<Master>& master,
-                        const PID<SlavesManager>& slavesManager)
-        {
-          if (!process::call(slavesManager, &SlavesManager::add,
-                             slave->info.hostname(), slave->pid.port)) {
-            LOG(WARNING) << "Could not register slave because failed"
-                         << " to add it to the slaves maanger";
-            delete slave;
-            return false;
-          }
-
-          return run(slave, tasks, master);
-        }
-      };
-
-      // Check whether this slave can be accepted, or if all slaves are accepted.
+      // Checks if this slave, or if all slaves, can be accepted.
       if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
-        process::run(&SlaveReregistrar::run, slave, tasks, self());
+        run(&SlaveReregistrar::run, slave, tasks, self());
       } else if (conf.get<string>("slaves", "*") == "*") {
-        process::run(&SlaveReregistrar::run,
-                     slave, tasks, self(), slavesManager->self());
+        run(&SlaveReregistrar::run,
+            slave, tasks, self(), slavesManager->self());
       } else {
         LOG(WARNING) << "Cannot re-register slave at "
                      << slaveInfo.hostname() << ":" << from().port
                      << " because not in allocated set of slaves!";
-        send(from(), process::TERMINATE);
+        send(from(), TERMINATE);
       }
     }
   }
@@ -965,71 +845,65 @@ void Master::unregisterSlave(const Slave
 
   // TODO(benh): Check that only the slave is asking to unregister?
 
-  Slave* slave = lookupSlave(slaveId);
+  Slave* slave = getSlave(slaveId);
   if (slave != NULL) {
     removeSlave(slave);
   }
 }
 
 
-void Master::statusUpdate(const FrameworkID& frameworkId,
-                          const TaskStatus& status)
+void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 {
-  LOG(INFO) << "Status update: task " << status.task_id()
-            << " of framework " << frameworkId
-            << " is now in state "
-            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+  const TaskStatus& status = update.status();
 
-  Slave* slave = lookupSlave(status.slave_id());
+  LOG(INFO) << "Status update from " << from()
+            << ": task " << status.task_id()
+            << " of framework " << update.framework_id()
+            << " is now in state " << status.state();
+
+  Slave* slave = getSlave(update.slave_id());
   if (slave != NULL) {
-    Framework* framework = lookupFramework(frameworkId);
+    Framework* framework = getFramework(update.framework_id());
     if (framework != NULL) {
       // Pass on the (transformed) status update to the framework.
-      MSG<M2F_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_status()->MergeFrom(status);
-      send(framework->pid, out);
+      StatusUpdateMessage message;
+      message.mutable_update()->MergeFrom(update);
+      message.set_pid(pid);
+      send(framework->pid, message);
 
       // Lookup the task and see if we need to update anything locally.
-      Task* task = slave->lookupTask(frameworkId, status.task_id());
+      Task* task = slave->getTask(update.framework_id(), status.task_id());
       if (task != NULL) {
         task->set_state(status.state());
 
-        // Remove the task if necessary, and update statistics.
-	switch (status.state()) {
-	  case TASK_FINISHED:
-	    statistics.finished_tasks++;
-	    removeTask(task, TRR_TASK_ENDED);
-	    break;
-	  case TASK_FAILED:
-	    statistics.failed_tasks++;
-	    removeTask(task, TRR_TASK_ENDED);
-	    break;
-	  case TASK_KILLED:
-	    statistics.killed_tasks++;
-	    removeTask(task, TRR_TASK_ENDED);
-	    break;
-	  case TASK_LOST:
-	    statistics.lost_tasks++;
-	    removeTask(task, TRR_TASK_ENDED);
-	    break;
-	}
+        // Handle the task appropriately if it's terminated.
+        if (status.state() == TASK_FINISHED ||
+            status.state() == TASK_FAILED ||
+            status.state() == TASK_KILLED ||
+            status.state() == TASK_LOST) {
+          removeTask(framework, slave, task, TRR_TASK_ENDED);
+        }
+
+        stats.tasks[status.state()]++;
 
-	statistics.valid_status_updates++;
+        stats.validStatusUpdates++;
       } else {
-        LOG(WARNING) << "Status update error: couldn't lookup "
+        LOG(WARNING) << "Status update from " << from()
+                     << ": error, couldn't lookup "
                      << "task " << status.task_id();
-	statistics.invalid_status_updates++;
+	stats.invalidStatusUpdates++;
       }
     } else {
-      LOG(WARNING) << "Status update error: couldn't lookup "
-                   << "framework " << frameworkId;
-      statistics.invalid_status_updates++;
+      LOG(WARNING) << "Status update from " << from()
+                   << ": error, couldn't lookup "
+                   << "framework " << update.framework_id();
+      stats.invalidStatusUpdates++;
     }
   } else {
-    LOG(WARNING) << "Status update error: couldn't lookup slave "
-                 << status.slave_id();
-    statistics.invalid_status_updates++;
+    LOG(WARNING) << "Status update from " << from()
+                 << ": error, couldn't lookup slave "
+                 << update.slave_id();
+    stats.invalidStatusUpdates++;
   }
 }
 
@@ -1039,31 +913,31 @@ void Master::executorMessage(const Slave
 			     const ExecutorID& executorId,
                              const string& data)
 {
-  Slave* slave = lookupSlave(slaveId);
+  Slave* slave = getSlave(slaveId);
   if (slave != NULL) {
-    Framework* framework = lookupFramework(frameworkId);
+    Framework* framework = getFramework(frameworkId);
     if (framework != NULL) {
       LOG(INFO) << "Sending framework message from slave " << slaveId
                 << " to framework " << frameworkId;
-      MSG<M2S_FRAMEWORK_MESSAGE> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_data(data);
-      send(framework->pid, out);
+      ExecutorToFrameworkMessage message;
+      message.mutable_slave_id()->MergeFrom(slaveId);
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_executor_id()->MergeFrom(executorId);
+      message.set_data(data);
+      send(framework->pid, message);
 
-      statistics.valid_framework_messages++;
+      stats.validFrameworkMessages++;
     } else {
       LOG(WARNING) << "Cannot send framework message from slave "
                    << slaveId << " to framework " << frameworkId
                    << " because framework does not exist";
-      statistics.invalid_framework_messages++;
+      stats.invalidFrameworkMessages++;
     }
   } else {
     LOG(WARNING) << "Cannot send framework message from slave "
                  << slaveId << " to framework " << frameworkId
                  << " because slave does not exist";
-    statistics.invalid_framework_messages++;
+    stats.invalidFrameworkMessages++;
   }
 }
 
@@ -1071,35 +945,53 @@ void Master::executorMessage(const Slave
 void Master::exitedExecutor(const SlaveID& slaveId,
                             const FrameworkID& frameworkId,
                             const ExecutorID& executorId,
-                            int32_t result)
+                            int32_t status)
 {
-  Slave* slave = lookupSlave(slaveId);
+  // TODO(benh): Send status updates for the tasks running under this
+  // executor from the slave! Maybe requires adding an extra "reason"
+  // so that people can see that the tasks were lost because of 
+
+  Slave* slave = getSlave(slaveId);
   if (slave != NULL) {
-    Framework* framework = lookupFramework(frameworkId);
+    Framework* framework = getFramework(frameworkId);
     if (framework != NULL) {
       LOG(INFO) << "Executor " << executorId
-                << " of framework " << framework->frameworkId
-                << " on slave " << slave->slaveId
+                << " of framework " << framework->id
+                << " on slave " << slave->id
                 << " (" << slave->info.hostname() << ") "
-                << "exited with result " << result;
+                << "exited with status " << status;
+
+      // TODO(benh): What about a status update that is on it's way
+      // from the slave but got re-ordered on the wire? By sending
+      // this status updates here we are not allowing possibly
+      // finished tasks to reach the scheduler appropriately. In
+      // stead, it seems like perhaps the right thing to do is to have
+      // the slave be responsible for sending those status updates,
+      // and have the master (or better yet ... and soon ... the
+      // scheduler) decide that a task is dead only when a slave lost
+      // has occured.
 
       // Tell the framework which tasks have been lost.
-      foreachpaircopy (_, Task* task, framework->tasks) {
-        if (task->slave_id() == slave->slaveId &&
+      foreachvalue (Task* task, utils::copy(framework->tasks)) {
+        if (task->slave_id() == slave->id &&
             task->executor_id() == executorId) {
-          MSG<M2F_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(task->framework_id());
-          TaskStatus* status = out.mutable_status();
+          StatusUpdateMessage message;
+          StatusUpdate* update = message.mutable_update();
+          update->mutable_framework_id()->MergeFrom(task->framework_id());
+          update->mutable_executor_id()->MergeFrom(task->executor_id());
+          update->mutable_slave_id()->MergeFrom(task->slave_id());
+          TaskStatus* status = update->mutable_status();
           status->mutable_task_id()->MergeFrom(task->task_id());
-          status->mutable_slave_id()->MergeFrom(task->slave_id());
           status->set_state(TASK_LOST);
-          send(framework->pid, out);
+          update->set_timestamp(elapsedTime());
+	  update->set_uuid(UUID::random().toBytes());
+          send(framework->pid, message);
 
           LOG(INFO) << "Removing task " << task->task_id()
                     << " of framework " << frameworkId
                     << " because of lost executor";
 
-          removeTask(task, TRR_EXECUTOR_LOST);
+          removeTask(framework, slave, task, TRR_EXECUTOR_LOST);
         }
       }
 
@@ -1119,16 +1011,17 @@ void Master::activatedSlaveHostnamePort(
 }
 
 
-void Master::deactivatedSlaveHostnamePort(const string& hostname, uint16_t port)
+void Master::deactivatedSlaveHostnamePort(const string& hostname,
+                                          uint16_t port)
 {
   if (slaveHostnamePorts.count(hostname, port) > 0) {
     // Look for a connected slave and remove it.
-    foreachpair (_, Slave* slave, slaves) {
+    foreachvalue (Slave* slave, slaves) {
       if (slave->info.hostname() == hostname && slave->pid.port == port) {
-        LOG(WARNING) << "Removing slave " << slave->slaveId << " at "
+        LOG(WARNING) << "Removing slave " << slave->id << " at "
 		     << hostname << ":" << port
                      << " because it has been deactivated";
-	send(slave->pid, process::TERMINATE);
+	send(slave->pid, TERMINATE);
         removeSlave(slave);
         break;
       }
@@ -1144,21 +1037,26 @@ void Master::deactivatedSlaveHostnamePor
 void Master::timerTick()
 {
   // Check which framework filters can be expired.
-  foreachpair (_, Framework* framework, frameworks) {
+  foreachvalue (Framework* framework, frameworks) {
     framework->removeExpiredFilters(elapsedTime());
   }
 
   // Do allocations!
   allocator->timerTick();
+
+  // Scheduler another timer tick!
+  delay(1.0, self(), &Master::timerTick);
 }
 
 
-void Master::frameworkExpired(const FrameworkID& frameworkId)
+void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,
+                                      double reregisteredTime)
 {
-  Framework* framework = lookupFramework(frameworkId);
-  if (framework != NULL) {
-    LOG(INFO) << "Framework failover timer expired, removing "
-              << framework;
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL && !framework->active &&
+      framework->reregisteredTime == reregisteredTime) {
+    LOG(INFO) << "Framework failover timeout, removing framework "
+              << framework->id;
     removeFramework(framework);
   }
 }
@@ -1166,340 +1064,120 @@ void Master::frameworkExpired(const Fram
 
 void Master::exited()
 {
-  // TODO(benh): Could we get PROCESS_EXIT from a network partition?
-  LOG(INFO) << "Process exited: " << from();
-  if (pidToFrameworkId.count(from()) > 0) {
-    const FrameworkID& frameworkId = pidToFrameworkId[from()];
-    Framework* framework = lookupFramework(frameworkId);
-    if (framework != NULL) {
-      LOG(INFO) << "Framework " << frameworkId << " disconnected";
+  foreachvalue (Framework* framework, frameworks) {
+    if (framework->pid == from()) {
+      LOG(INFO) << "Framework " << framework->id << " disconnected";
+
+//       removeFramework(framework);
 
       // Stop sending offers here for now.
       framework->active = false;
 
+      // Delay dispatching a message to ourselves for the timeout.
+      delay(FRAMEWORK_FAILOVER_TIMEOUT, self(),
+            &Master::frameworkFailoverTimeout,
+            framework->id, framework->reregisteredTime);
+
       // Remove the framework's slot offers.
-      foreachcopy (SlotOffer* offer, framework->slotOffers) {
-        removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+      foreach (Offer* offer, utils::copy(framework->offers)) {
+        removeOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
       }
-
-//       framework->failoverTimer =
-//         new FrameworkFailoverTimer(self(), frameworkId);
-//       link(spawn(framework->failoverTimer));
-      removeFramework(framework);
+      return;
     }
-  } else if (pidToSlaveId.count(from()) > 0) {
-    const SlaveID& slaveId = pidToSlaveId[from()];
-    Slave* slave = lookupSlave(slaveId);
-    if (slave != NULL) {
-      LOG(INFO) << slave << " disconnected";
+  }
+
+  foreachvalue (Slave* slave, slaves) {
+    if (slave->pid == from()) {
+      LOG(INFO) << "Slave " << slave->id << " disconnected";
       removeSlave(slave);
-    }
-  } else {
-    foreachpair (_, Framework* framework, frameworks) {
-      if (framework->failoverTimer != NULL &&
-          framework->failoverTimer->self() == from()) {
-        LOG(ERROR) << "Bad framework failover timer, removing "
-                   << framework;
-        removeFramework(framework);
-        break;
-      }
+      return;
     }
   }
 }
 
 
-Promise<HttpResponse> Master::http_info_json(const HttpRequest& request)
+OfferID Master::makeOffer(Framework* framework,
+                          const vector<SlaveResources>& resources)
 {
-  LOG(INFO) << "HTTP request for '/master/info.json'";
-
-  ostringstream out;
-
-  out <<
-    "{" <<
-    "\"built_date\":\"" << build::DATE << "\"," <<
-    "\"build_user\":\"" << build::USER << "\"," <<
-    "\"start_time\":\"" << startTime << "\"," <<
-    "\"pid\":\"" << self() << "\"" <<
-    "}";
+  const OfferID& offerId = newOfferId();
 
-  HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
-  response.body = out.str().data();
-  return response;
-}
+  Offer* offer = new Offer(offerId, framework->id, resources);
 
+  offers[offer->id] = offer;
+  framework->addOffer(offer);
 
-Promise<HttpResponse> Master::http_frameworks_json(const HttpRequest& request)
-{
-  LOG(INFO) << "HTTP request for '/master/frameworks.json'";
+  // Update the resource information within each of the slave objects. Gross!
+  foreach (const SlaveResources& r, resources) {
+    r.slave->offers.insert(offer);
+    r.slave->resourcesOffered += r.resources;
+  }
 
-  ostringstream out;
+  LOG(INFO) << "Sending offer " << offer->id
+            << " to framework " << framework->id;
 
-  out << "[";
+  ResourceOfferMessage message;
+  message.mutable_offer_id()->MergeFrom(offerId);
 
-  foreachpair (_, Framework* framework, frameworks) {
-    out <<
-      "{" <<
-      "\"id\":\"" << framework->frameworkId << "\"," <<
-      "\"name\":\"" << framework->info.name() << "\"," <<
-      "\"user\":\"" << framework->info.user() << "\""
-      "},";
-  }
+  foreach (const SlaveResources& r, resources) {
+    SlaveOffer* offer = message.add_offers();
+    offer->mutable_slave_id()->MergeFrom(r.slave->id);
+    offer->set_hostname(r.slave->info.hostname());
+    offer->mutable_resources()->MergeFrom(r.resources);
 
-  // Backup the put pointer to overwrite the last comma (hack).
-  if (frameworks.size() > 0) {
-    long pos = out.tellp();
-    out.seekp(pos - 1);
+    message.add_pids(r.slave->pid);
   }
 
-  out << "]";
+  send(framework->pid, message);
 
-  HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
-  response.body = out.str().data();
-  return response;
+  return offerId;
 }
 
 
-Promise<HttpResponse> Master::http_slaves_json(const HttpRequest& request)
+// Process a resource offer reply (for a non-cancelled offer) by launching
+// the desired tasks (if the offer contains a valid set of tasks) and
+// reporting any unused resources to the allocator.
+void Master::processOfferReply(Offer* offer,
+                               const vector<TaskDescription>& tasks,
+                               const Params& params)
 {
-  LOG(INFO) << "HTTP request for '/master/slaves.json'";
-
-  ostringstream out;
+  LOG(INFO) << "Received reply for " << offer;
 
-  out << "[";
+  Framework* framework = getFramework(offer->frameworkId);
+  CHECK(framework != NULL);
 
-  foreachpair (_, Slave* slave, slaves) {
-    // TODO(benh): Send all of the resources (as JSON).
-    Resources resources(slave->info.resources());
-    Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
-    Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
-    out <<
-      "{" <<
-      "\"id\":\"" << slave->slaveId << "\"," <<
-      "\"hostname\":\"" << slave->info.hostname() << "\"," <<
-      "\"cpus\":" << cpus.value() << "," <<
-      "\"mem\":" << mem.value() <<
-      "},";
+  // Count resources in the offer.
+  hashmap<Slave*, Resources> resourcesOffered;
+  foreach (const SlaveResources& r, offer->resources) {
+    resourcesOffered[r.slave] = r.resources;
   }
 
-  // Backup the put pointer to overwrite the last comma (hack).
-  if (slaves.size() > 0) {
-    long pos = out.tellp();
-    out.seekp(pos - 1);
-  }
+  // Count used resources and check that its tasks are valid.
+  hashmap<Slave*, Resources> resourcesUsed;
+  foreach (const TaskDescription& task, tasks) {
+    // Check whether the task is on a valid slave.
+    Slave* slave = getSlave(task.slave_id());
+    if (slave == NULL || resourcesOffered.count(slave) == 0) {
+      terminateFramework(framework, 0, "Invalid slave in offer reply");
+      return;
+    }
 
-  out << "]";
+    // Check whether or not the resources for the task are valid.
+    // TODO(benh): In the future maybe we can also augment the
+    // protobuf to deal with fragmentation purposes by providing some
+    // sort of minimum amount of resources required per task.
 
-  HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
-  response.body = out.str().data();
-  return response;
-}
+    if (task.resources().size() == 0) {
+      terminateFramework(framework, 0, "Invalid resources for task");
+      return;
+    }
 
-
-Promise<HttpResponse> Master::http_tasks_json(const HttpRequest& request)
-{
-  LOG(INFO) << "HTTP request for '/master/tasks.json'";
-
-  ostringstream out;
-
-  out << "[";
-
-  foreachpair (_, Framework* framework, frameworks) {
-    foreachpair (_, Task* task, framework->tasks) {
-      // TODO(benh): Send all of the resources (as JSON).
-      Resources resources(task->resources());
-      Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
-      Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
-      const string& state =
-        TaskState_descriptor()->FindValueByNumber(task->state())->name();
-      out <<
-        "{" <<
-        "\"task_id\":\"" << task->task_id() << "\"," <<
-        "\"framework_id\":\"" << task->framework_id() << "\"," <<
-        "\"slave_id\":\"" << task->slave_id() << "\"," <<
-        "\"name\":\"" << task->name() << "\"," <<
-        "\"state\":\"" << state << "\"," <<
-        "\"cpus\":" << cpus.value() << "," <<
-        "\"mem\":" << mem.value() <<
-        "},";
-    }
-  }
-
-  // Backup the put pointer to overwrite the last comma (hack).
-  if (frameworks.size() > 0) {
-    long pos = out.tellp();
-    out.seekp(pos - 1);
-  }
-
-  out << "]";
-
-  HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
-  response.body = out.str().data();
-  return response;
-}
-
-
-Promise<HttpResponse> Master::http_stats_json(const HttpRequest& request)
-{
-  LOG(INFO) << "Http request for '/master/stats.json'";
-
-  ostringstream out;
-
-  out <<
-    "{" <<
-    "\"uptime\":" << elapsedTime() - startTime << "," <<
-    "\"total_schedulers\":" << frameworks.size() << "," <<
-    "\"active_schedulers\":" << getActiveFrameworks().size() << "," <<
-    "\"activated_slaves\":" << slaveHostnamePorts.size() << "," <<
-    "\"connected_slaves\":" << slaves.size() << "," <<
-    "\"launched_tasks\":" << statistics.launched_tasks << "," <<
-    "\"finished_tasks\":" << statistics.finished_tasks << "," <<
-    "\"killed_tasks\":" << statistics.killed_tasks << "," <<
-    "\"failed_tasks\":" << statistics.failed_tasks << "," <<
-    "\"lost_tasks\":" << statistics.lost_tasks << "," <<
-    "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
-    "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
-    "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
-    "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
-    "}";
-
-  HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
-  response.body = out.str().data();
-  return response;
-}
-
-
-Promise<HttpResponse> Master::http_vars(const HttpRequest& request)
-{
-  LOG(INFO) << "HTTP request for '/master/vars'";
-
-  ostringstream out;
-
-  out <<
-    "build_date " << build::DATE << "\n" <<
-    "build_user " << build::USER << "\n" <<
-    "build_flags " << build::FLAGS << "\n";
-
-  // Also add the configuration values.
-  foreachpair (const string& key, const string& value, conf.getMap()) {
-    out << key << " " << value << "\n";
-  }
-
-  out <<
-    "uptime " << elapsedTime() - startTime << "\n" <<
-    "total_schedulers " << frameworks.size() << "\n" <<
-    "active_schedulers " << getActiveFrameworks().size() << "\n" <<
-    "activated_slaves " << slaveHostnamePorts.size() << "\n" <<
-    "connected_slaves " << slaves.size() << "\n" <<
-    "launched_tasks " << statistics.launched_tasks << "\n" <<
-    "finished_tasks " << statistics.finished_tasks << "\n" <<
-    "killed_tasks " << statistics.killed_tasks << "\n" <<
-    "failed_tasks " << statistics.failed_tasks << "\n" <<
-    "lost_tasks " << statistics.lost_tasks << "\n" <<
-    "valid_status_updates " << statistics.valid_status_updates << "\n" <<
-    "invalid_status_updates " << statistics.invalid_status_updates << "\n" <<
-    "valid_framework_messages " << statistics.valid_framework_messages << "\n" <<
-    "invalid_framework_messages " << statistics.invalid_framework_messages << "\n";
-
-  HttpOKResponse response;
-  response.headers["Content-Type"] = "text/plain";
-  response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
-  response.body = out.str().data();
-  return response;
-}
-
-
-OfferID Master::makeOffer(Framework* framework,
-                          const vector<SlaveResources>& resources)
-{
-  const OfferID& offerId = newOfferId();
-
-  SlotOffer* offer = new SlotOffer(offerId, framework->frameworkId, resources);
-
-  slotOffers[offer->offerId] = offer;
-  framework->addOffer(offer);
-
-  // Update the resource information within each of the slave objects. Gross!
-  foreach (const SlaveResources& r, resources) {
-    r.slave->slotOffers.insert(offer);
-    r.slave->resourcesOffered += r.resources;
-  }
-
-  LOG(INFO) << "Sending offer " << offer->offerId
-            << " to framework " << framework->frameworkId;
-
-  MSG<M2F_RESOURCE_OFFER> out;
-  out.mutable_offer_id()->MergeFrom(offerId);
-
-  foreach (const SlaveResources& r, resources) {
-    SlaveOffer* offer = out.add_offers();
-    offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
-    offer->set_hostname(r.slave->info.hostname());
-    offer->mutable_resources()->MergeFrom(r.resources);
-
-    out.add_pids(r.slave->pid);
-  }
-
-  send(framework->pid, out);
-
-  return offerId;
-}
-
-
-// Process a resource offer reply (for a non-cancelled offer) by launching
-// the desired tasks (if the offer contains a valid set of tasks) and
-// reporting any unused resources to the allocator.
-void Master::processOfferReply(SlotOffer* offer,
-                               const vector<TaskDescription>& tasks,
-                               const Params& params)
-{
-  LOG(INFO) << "Received reply for " << offer;
-
-  Framework* framework = lookupFramework(offer->frameworkId);
-  CHECK(framework != NULL);
-
-  // Count resources in the offer.
-  unordered_map<Slave*, Resources> resourcesOffered;
-  foreach (const SlaveResources& r, offer->resources) {
-    resourcesOffered[r.slave] = r.resources;
-  }
-
-  // Count used resources and check that its tasks are valid.
-  unordered_map<Slave*, Resources> resourcesUsed;
-  foreach (const TaskDescription& task, tasks) {
-    // Check whether the task is on a valid slave.
-    Slave* slave = lookupSlave(task.slave_id());
-    if (slave == NULL || resourcesOffered.count(slave) == 0) {
-      terminateFramework(framework, 0, "Invalid slave in offer reply");
-      return;
-    }
-
-    // Check whether or not the resources for the task are valid.
-    // TODO(benh): In the future maybe we can also augment the
-    // protobuf to deal with fragmentation purposes by providing some
-    // sort of minimum amount of resources required per task.
-
-    if (task.resources().size() == 0) {
-      terminateFramework(framework, 0, "Invalid resources for task");
-      return;
-    }
-
-    foreach (const Resource& resource, task.resources()) {
-      if (!Resources::isAllocatable(resource)) {
-        // TODO(benh): Also send back the invalid resources as a string?
-        terminateFramework(framework, 0, "Invalid resources for task");
-        return;
-      }
-    }
+    foreach (const Resource& resource, task.resources()) {
+      if (!Resources::isAllocatable(resource)) {
+        // TODO(benh): Also send back the invalid resources as a string?
+        terminateFramework(framework, 0, "Invalid resources for task");
+        return;
+      }
+    }
 
     resourcesUsed[slave] += task.resources();
   }
@@ -1513,12 +1191,12 @@ void Master::processOfferReply(SlotOffer
   }
 
   // Check that there are no duplicate task IDs.
-  unordered_set<TaskID> idsInResponse;
+  hashset<TaskID> idsInResponse;
   foreach (const TaskDescription& task, tasks) {
     if (framework->tasks.count(task.task_id()) > 0 ||
         idsInResponse.count(task.task_id()) > 0) {
-      terminateFramework(framework, 0, "Duplicate task ID: " +
-                         lexical_cast<string>(task.task_id()));
+      string error = "Duplicate task ID: " + task.task_id().value();
+      terminateFramework(framework, 0, error);
       return;
     }
     idsInResponse.insert(task.task_id());
@@ -1531,11 +1209,18 @@ void Master::processOfferReply(SlotOffer
 
   // Get out the timeout for left over resources (if exists), and use
   // that to calculate the expiry timeout.
-  int timeout = DEFAULT_REFUSAL_TIMEOUT;
+  double timeout = DEFAULT_REFUSAL_TIMEOUT;
 
   for (int i = 0; i < params.param_size(); i++) {
     if (params.param(i).key() == "timeout") {
-      timeout = lexical_cast<int>(params.param(i).value());
+      try {
+        timeout = boost::lexical_cast<double>(params.param(i).value());
+      } catch (boost::bad_lexical_cast&) {
+        string error = "Failed to convert value '" +
+          params.param(i).value() + "' for key 'timeout' to an integer";
+        terminateFramework(framework, 0, error);
+        return;
+      }
       break;
     }
   }
@@ -1566,7 +1251,7 @@ void Master::processOfferReply(SlotOffer
   }
   
   // Return the resources left to the allocator.
-  removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
+  removeOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
 }
 
 
@@ -1575,7 +1260,7 @@ void Master::launchTask(Framework* frame
   // The invariant right now is that launchTask is called only for
   // TaskDescriptions where the slave is still valid (see the code
   // above in processOfferReply).
-  Slave* slave = lookupSlave(task.slave_id());
+  Slave* slave = getSlave(task.slave_id());
   CHECK(slave != NULL);
 
   // Determine the executor ID for this task.
@@ -1584,7 +1269,7 @@ void Master::launchTask(Framework* frame
     : framework->info.executor().executor_id();
 
   Task* t = new Task();
-  t->mutable_framework_id()->MergeFrom(framework->frameworkId);
+  t->mutable_framework_id()->MergeFrom(framework->id);
   t->mutable_executor_id()->MergeFrom(executorId);
   t->set_state(TASK_STARTING);
   t->set_name(task.name());
@@ -1599,79 +1284,28 @@ void Master::launchTask(Framework* frame
 
   LOG(INFO) << "Launching " << t << " on " << slave;
 
-  MSG<M2S_RUN_TASK> out;
-  out.mutable_framework()->MergeFrom(framework->info);
-  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-  out.set_pid(framework->pid);
-  out.mutable_task()->MergeFrom(task);
-  send(slave->pid, out);
-
-  statistics.launched_tasks++;
-}
-
-
-// Terminate a framework, sending it a particular error message
-// TODO: Make the error codes and messages programmer-friendly
-void Master::terminateFramework(Framework* framework,
-                                int32_t code,
-                                const string& message)
-{
-  LOG(INFO) << "Terminating " << framework << " due to error: " << message;
-
-  MSG<M2F_ERROR> out;
-  out.set_code(code);
-  out.set_message(message);
-  send(framework->pid, out);
+  RunTaskMessage message;
+  message.mutable_framework()->MergeFrom(framework->info);
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  message.set_pid(framework->pid);
+  message.mutable_task()->MergeFrom(task);
+  send(slave->pid, message);
 
-  removeFramework(framework);
-}
-
-
-// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeSlotOffer(SlotOffer* offer,
-                             OfferReturnReason reason,
-                             const vector<SlaveResources>& resourcesUnused)
-{
-  // Remove from slaves.
-  foreach (SlaveResources& r, offer->resources) {
-    CHECK(r.slave != NULL);
-    r.slave->resourcesOffered -= r.resources;
-    r.slave->slotOffers.erase(offer);
-  }
-    
-  // Remove from framework
-  Framework *framework = lookupFramework(offer->frameworkId);
-  CHECK(framework != NULL);
-  framework->removeOffer(offer);
-
-  // Also send framework a rescind message unless the reason we are
-  // removing the offer is that the framework replied to it
-  if (reason != ORR_FRAMEWORK_REPLIED) {
-    MSG<M2F_RESCIND_OFFER> out;
-    out.mutable_offer_id()->MergeFrom(offer->offerId);
-    send(framework->pid, out);
-  }
-  
-  // Tell the allocator about the unused resources.
-  allocator->offerReturned(offer, reason, resourcesUnused);
-  
-  // Delete it
-  slotOffers.erase(offer->offerId);
-  delete offer;
+  stats.tasks[TASK_STARTING]++;
 }
 
 
 void Master::addFramework(Framework* framework)
 {
-  CHECK(frameworks.count(framework->frameworkId) == 0);
+  CHECK(frameworks.count(framework->id) == 0);
+
+  frameworks[framework->id] = framework;
 
-  frameworks[framework->frameworkId] = framework;
-  pidToFrameworkId[framework->pid] = framework->frameworkId;
   link(framework->pid);
 
-  MSG<M2F_REGISTER_REPLY> out;
-  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-  send(framework->pid, out);
+  FrameworkRegisteredMessage message;
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  send(framework->pid, message);
 
   allocator->frameworkAdded(framework);
 }
@@ -1683,103 +1317,115 @@ void Master::failoverFramework(Framework
 {
   const UPID& oldPid = framework->pid;
 
-  // Remove the framework's slot offers (if they weren't removed before)..
+  // Remove the framework's slot offers (if they weren't removed before).
   // TODO(benh): Consider just reoffering these to the new framework.
-  foreachcopy (SlotOffer* offer, framework->slotOffers) {
-    removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+  foreach (Offer* offer, utils::copy(framework->offers)) {
+    removeOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
   }
 
-  MSG<M2F_ERROR> out;
-  out.set_code(1);
-  out.set_message("Framework failover");
-  send(oldPid, out);
+  {
+    FrameworkErrorMessage message;
+    message.set_code(1);
+    message.set_message("Framework failover");
+    send(oldPid, message);
+  }
 
   // TODO(benh): unlink(oldPid);
-  pidToFrameworkId.erase(oldPid);
-  pidToFrameworkId[newPid] = framework->frameworkId;
 
   framework->pid = newPid;
   link(newPid);
 
-  // Kill the failover timer.
-  if (framework->failoverTimer != NULL) {
-    process::post(framework->failoverTimer->self(), process::TERMINATE);
-    process::wait(framework->failoverTimer->self());
-    delete framework->failoverTimer;
-    framework->failoverTimer = NULL;
-  }
-
   // Make sure we can get offers again.
   framework->active = true;
 
-  MSG<M2F_REGISTER_REPLY> reply;
-  reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
-  send(newPid, reply);
+  framework->reregisteredTime = elapsedTime();
+
+  FrameworkRegisteredMessage message;
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  send(newPid, message);
+}
+
+
+void Master::terminateFramework(Framework* framework,
+                                int32_t code,
+                                const string& error)
+{
+  LOG(INFO) << "Terminating " << framework << " due to error: " << error;
+
+  FrameworkErrorMessage message;
+  message.set_code(code);
+  message.set_message(error);
+  send(framework->pid, message);
+
+  removeFramework(framework);
 }
 
 
-// Kill all of a framework's tasks, delete the framework object, and
-// reschedule slot offers for slots that were assigned to this framework
 void Master::removeFramework(Framework* framework)
-{ 
+{
   framework->active = false;
   // TODO: Notify allocator that a framework removal is beginning?
   
-  // Tell slaves to kill the framework
-  foreachpair (_, Slave *slave, slaves) {
-    MSG<M2S_KILL_FRAMEWORK> out;
-    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-    send(slave->pid, out);
+  // Tell slaves to shutdown the framework.
+  foreachvalue (Slave* slave, slaves) {
+    ShutdownFrameworkMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id);
+    send(slave->pid, message);
   }
 
   // Remove pointers to the framework's tasks in slaves
-  foreachpaircopy (_, Task *task, framework->tasks) {
-    Slave *slave = lookupSlave(task->slave_id());
+  foreachvalue (Task* task, utils::copy(framework->tasks)) {
+    Slave* slave = getSlave(task->slave_id());
     CHECK(slave != NULL);
-    removeTask(task, TRR_FRAMEWORK_LOST);
+    removeTask(framework, slave, task, TRR_FRAMEWORK_LOST);
   }
   
   // Remove the framework's slot offers (if they weren't removed before).
-  foreachcopy (SlotOffer* offer, framework->slotOffers) {
-    removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
+  foreach (Offer* offer, utils::copy(framework->offers)) {
+    removeOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
   }
 
   // TODO(benh): Similar code between removeFramework and
   // failoverFramework needs to be shared!
 
   // TODO(benh): unlink(framework->pid);
-  pidToFrameworkId.erase(framework->pid);
 
   // Delete it.
-  frameworks.erase(framework->frameworkId);
+  frameworks.erase(framework->id);
   allocator->frameworkRemoved(framework);
   delete framework;
 }
 
 
-void Master::addSlave(Slave* slave)
+void Master::addSlave(Slave* slave, bool reregister)
 {
   CHECK(slave != NULL);
 
-  slaves[slave->slaveId] = slave;
-  pidToSlaveId[slave->pid] = slave->slaveId;
+  slaves[slave->id] = slave;
+
   link(slave->pid);
 
   allocator->slaveAdded(slave);
 
-  MSG<M2S_REGISTER_REPLY> out;
-  out.mutable_slave_id()->MergeFrom(slave->slaveId);
-  send(slave->pid, out);
+  if (!reregister) {
+    SlaveRegisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(slave->pid, message);
+  } else {
+    SlaveReregisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(slave->pid, message);
+  }
 
   // TODO(benh):
   //     // Ask the slaves manager to monitor this slave for us.
-  //     process::dispatch(slavesManager->self(), &SlavesManager::monitor,
-  //                       slave->pid, slave->info, slave->slaveId);
+  //     dispatch(slavesManager->self(), &SlavesManager::monitor,
+  //              slave->pid, slave->info, slave->id);
 
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(slave->pid, slave->info,
-                                      slave->slaveId, slavesManager->self());
-  process::spawn(slave->observer);
+                                      slave->id, slavesManager->self());
+  spawn(slave->observer);
 }
 
 
@@ -1787,7 +1433,7 @@ void Master::readdSlave(Slave* slave, co
 {
   CHECK(slave != NULL);
 
-  addSlave(slave);
+  addSlave(slave, true);
 
   for (int i = 0; i < tasks.size(); i++) {
     Task* task = new Task(tasks[i]);
@@ -1801,20 +1447,20 @@ void Master::readdSlave(Slave* slave, co
     // will add them then. We also tell this slave the current
     // framework pid for this task. Again, we do the same thing
     // if a framework currently isn't registered.
-    Framework* framework = lookupFramework(task->framework_id());
+    Framework* framework = getFramework(task->framework_id());
     if (framework != NULL) {
       framework->addTask(task);
-      MSG<M2S_UPDATE_FRAMEWORK> out;
-      out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-      out.set_pid(framework->pid);
-      send(slave->pid, out);
+      UpdateFrameworkMessage message;
+      message.mutable_framework_id()->MergeFrom(framework->id);
+      message.set_pid(framework->pid);
+      send(slave->pid, message);
     } else {
       // TODO(benh): We should really put a timeout on how long we
       // keep tasks running on a slave that never have frameworks
       // reregister and claim them.
       LOG(WARNING) << "Possibly orphaned task " << task->task_id()
                    << " of framework " << task->framework_id()
-                   << " running on slave " << slave->slaveId;
+                   << " running on slave " << slave->id;
     }
   }
 }
@@ -1828,8 +1474,8 @@ void Master::removeSlave(Slave* slave)
   // TODO: Notify allocator that a slave removal is beginning?
   
   // Remove pointers to slave's tasks in frameworks, and send status updates
-  foreachpaircopy (_, Task* task, slave->tasks) {
-    Framework *framework = lookupFramework(task->framework_id());
+  foreachvalue (Task* task, utils::copy(slave->tasks)) {
+    Framework* framework = getFramework(task->framework_id());
     // A framework might not actually exist because the master failed
     // over and the framework hasn't reconnected. This can be a tricky
     // situation for frameworks that want to have high-availability,
@@ -1837,21 +1483,25 @@ void Master::removeSlave(Slave* slave)
     // status update about this task.  Perhaps in the future what we
     // want to do is create a local Framework object to represent that
     // framework until it fails over. See the TODO above in
-    // S2M_REREGISTER_SLAVE.
+    // Master::reregisterSlave.
     if (framework != NULL) {
-      MSG<M2F_STATUS_UPDATE> out;
-      out.mutable_framework_id()->MergeFrom(task->framework_id());
-      TaskStatus* status = out.mutable_status();
+      StatusUpdateMessage message;
+      StatusUpdate* update = message.mutable_update();
+      update->mutable_framework_id()->MergeFrom(task->framework_id());
+      update->mutable_executor_id()->MergeFrom(task->executor_id());
+      update->mutable_slave_id()->MergeFrom(task->slave_id());
+      TaskStatus* status = update->mutable_status();
       status->mutable_task_id()->MergeFrom(task->task_id());
-      status->mutable_slave_id()->MergeFrom(task->slave_id());
       status->set_state(TASK_LOST);
-      send(framework->pid, out);
+      update->set_timestamp(elapsedTime());
+      update->set_uuid(UUID::random().toBytes());
+      send(framework->pid, message);
     }
-    removeTask(task, TRR_SLAVE_LOST);
+    removeTask(framework, slave, task, TRR_SLAVE_LOST);
   }
 
   // Remove slot offers from the slave; this will also rescind them
-  foreachcopy (SlotOffer* offer, slave->slotOffers) {
+  foreach (Offer* offer, utils::copy(slave->offers)) {
     // Only report resources on slaves other than this one to the allocator
     vector<SlaveResources> otherSlaveResources;
     foreach (const SlaveResources& r, offer->resources) {
@@ -1859,49 +1509,48 @@ void Master::removeSlave(Slave* slave)
         otherSlaveResources.push_back(r);
       }
     }
-    removeSlotOffer(offer, ORR_SLAVE_LOST, otherSlaveResources);
+    removeOffer(offer, ORR_SLAVE_LOST, otherSlaveResources);
   }
   
   // Remove slave from any filters
-  foreachpair (_, Framework* framework, frameworks) {
+  foreachvalue (Framework* framework, frameworks) {
     framework->slaveFilter.erase(slave);
   }
   
   // Send lost-slave message to all frameworks (this helps them re-run
   // previously finished tasks whose output was on the lost slave)
-  foreachpair (_, Framework* framework, frameworks) {
-    MSG<M2F_LOST_SLAVE> out;
-    out.mutable_slave_id()->MergeFrom(slave->slaveId);
-    send(framework->pid, out);
+  foreachvalue (Framework* framework, frameworks) {
+    LostSlaveMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(framework->pid, message);
   }
 
   // TODO(benh):
   //     // Tell the slaves manager to stop monitoring this slave for us.
-  //     process::dispatch(slavesManager->self(), &SlavesManager::forget,
-  //                       slave->pid, slave->info, slave->slaveId);
+  //     dispatch(slavesManager->self(), &SlavesManager::forget,
+  //              slave->pid, slave->info, slave->id);
 
   // Kill the slave observer.
-  process::post(slave->observer->self(), process::TERMINATE);
-  process::wait(slave->observer->self());
+  terminate(slave->observer);
+  wait(slave->observer);
+
   delete slave->observer;
 
   // TODO(benh): unlink(slave->pid);
-  pidToSlaveId.erase(slave->pid);
 
   // Delete it
-  slaves.erase(slave->slaveId);
+  slaves.erase(slave->id);
   allocator->slaveRemoved(slave);
   delete slave;
 }
 
 
 // Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(Task* task, TaskRemovalReason reason)
+void Master::removeTask(Framework* framework,
+                        Slave* slave,
+                        Task* task,
+                        TaskRemovalReason reason)
 {
-  Framework* framework = lookupFramework(task->framework_id());
-  Slave* slave = lookupSlave(task->slave_id());
-  CHECK(framework != NULL);
-  CHECK(slave != NULL);
   framework->removeTask(task->task_id());
   slave->removeTask(task);
   allocator->taskRemoved(task, reason);
@@ -1909,10 +1558,66 @@ void Master::removeTask(Task* task, Task
 }
 
 
-Allocator* Master::createAllocator()
+void Master::removeOffer(Offer* offer,
+                         OfferReturnReason reason,
+                         const vector<SlaveResources>& resourcesUnused)
+{
+  // Remove from slaves.
+  foreach (SlaveResources& r, offer->resources) {
+    CHECK(r.slave != NULL);
+    r.slave->resourcesOffered -= r.resources;
+    r.slave->offers.erase(offer);
+  }
+    
+  // Remove from framework
+  Framework *framework = getFramework(offer->frameworkId);
+  CHECK(framework != NULL);
+  framework->removeOffer(offer);
+
+  // Also send framework a rescind message unless the reason we are
+  // removing the offer is that the framework replied to it
+  if (reason != ORR_FRAMEWORK_REPLIED) {
+    RescindResourceOfferMessage message;
+    message.mutable_offer_id()->MergeFrom(offer->id);
+    send(framework->pid, message);
+  }
+  
+  // Tell the allocator about the unused resources.
+  allocator->offerReturned(offer, reason, resourcesUnused);
+  
+  // Delete it
+  offers.erase(offer->id);
+  delete offer;
+}
+
+
+Framework* Master::getFramework(const FrameworkID& frameworkId)
 {
-  LOG(INFO) << "Creating \"" << allocatorType << "\" allocator";
-  return AllocatorFactory::instantiate(allocatorType, this);
+  if (frameworks.count(frameworkId) > 0) {
+    return frameworks[frameworkId];
+  } else {
+    return NULL;
+  }
+}
+
+
+Slave* Master::getSlave(const SlaveID& slaveId)
+{
+  if (slaves.count(slaveId) > 0) {
+    return slaves[slaveId];
+  } else {
+    return NULL;
+  }
+}
+
+
+Offer* Master::getOffer(const OfferID& offerId)
+{
+  if (offers.count(offerId) > 0) {
+    return offers[offerId];
+  } else {
+    return NULL;
+  }
 }
 
 
@@ -1921,10 +1626,14 @@ Allocator* Master::createAllocator()
 // and FWID is an increasing integer.
 FrameworkID Master::newFrameworkId()
 {
-  ostringstream oss;
-  oss << masterId << "-" << setw(4) << setfill('0') << nextFrameworkId++;
+  std::ostringstream out;
+
+  out << masterId << "-" << std::setw(4)
+      << std::setfill('0') << nextFrameworkId++;
+
   FrameworkID frameworkId;
-  frameworkId.set_value(oss.str());
+  frameworkId.set_value(out.str());
+
   return frameworkId;
 }
 
@@ -1932,7 +1641,7 @@ FrameworkID Master::newFrameworkId()
 OfferID Master::newOfferId()
 {
   OfferID offerId;
-  offerId.set_value(masterId + "-" + lexical_cast<string>(nextOfferId++));
+  offerId.set_value(masterId + "-" + utils::stringify(nextOfferId++));
   return offerId;
 }
 
@@ -1940,12 +1649,220 @@ OfferID Master::newOfferId()
 SlaveID Master::newSlaveId()
 {
   SlaveID slaveId;
-  slaveId.set_value(masterId + "-" + lexical_cast<string>(nextSlaveId++));
+  slaveId.set_value(masterId + "-" + utils::stringify(nextSlaveId++));
   return slaveId;
 }
 
 
-const Configuration& Master::getConfiguration()
+Promise<HttpResponse> Master::http_info_json(const HttpRequest& request)
 {
-  return conf;
+  LOG(INFO) << "HTTP request for '/master/info.json'";
+
+  std::ostringstream out;
+
+  out <<
+    "{" <<
+    "\"built_date\":\"" << build::DATE << "\"," <<
+    "\"build_user\":\"" << build::USER << "\"," <<
+    "\"start_time\":\"" << startTime << "\"," <<
+    "\"pid\":\"" << self() << "\"" <<
+    "}";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Master::http_frameworks_json(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/master/frameworks.json'";
+
+  std::ostringstream out;
+
+  out << "[";
+
+  foreachvalue (Framework* framework, frameworks) {
+    out <<
+      "{" <<
+      "\"id\":\"" << framework->id << "\"," <<
+      "\"name\":\"" << framework->info.name() << "\"," <<
+      "\"user\":\"" << framework->info.user() << "\""
+      "},";
+  }
+
+  // Backup the put pointer to overwrite the last comma (hack).
+  if (frameworks.size() > 0) {
+    long pos = out.tellp();
+    out.seekp(pos - 1);
+  }
+
+  out << "]";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
+  response.body = out.str().data();
+  return response;
 }
+
+
+Promise<HttpResponse> Master::http_slaves_json(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/master/slaves.json'";
+
+  std::ostringstream out;
+
+  out << "[";
+
+  foreachvalue (Slave* slave, slaves) {
+    // TODO(benh): Send all of the resources (as JSON).
+    Resources resources(slave->info.resources());
+    Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
+    Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
+    out <<
+      "{" <<
+      "\"id\":\"" << slave->id << "\"," <<
+      "\"hostname\":\"" << slave->info.hostname() << "\"," <<
+      "\"cpus\":" << cpus.value() << "," <<
+      "\"mem\":" << mem.value() <<
+      "},";
+  }
+
+  // Backup the put pointer to overwrite the last comma (hack).
+  if (slaves.size() > 0) {
+    long pos = out.tellp();
+    out.seekp(pos - 1);
+  }
+
+  out << "]";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Master::http_tasks_json(const HttpRequest& request)
+{
+  LOG(INFO) << "HTTP request for '/master/tasks.json'";
+
+  std::ostringstream out;
+
+  out << "[";
+
+  foreachvalue (Framework* framework, frameworks) {
+    foreachvalue (Task* task, framework->tasks) {
+      // TODO(benh): Send all of the resources (as JSON).
+      Resources resources(task->resources());
+      Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
+      Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
+      out <<
+        "{" <<
+        "\"task_id\":\"" << task->task_id() << "\"," <<
+        "\"framework_id\":\"" << task->framework_id() << "\"," <<
+        "\"slave_id\":\"" << task->slave_id() << "\"," <<
+        "\"name\":\"" << task->name() << "\"," <<
+        "\"state\":\"" << task->state() << "\"," <<
+        "\"cpus\":" << cpus.value() << "," <<
+        "\"mem\":" << mem.value() <<
+        "},";
+    }
+  }
+
+  // Backup the put pointer to overwrite the last comma (hack).
+  if (frameworks.size() > 0) {
+    long pos = out.tellp();
+    out.seekp(pos - 1);
+  }
+
+  out << "]";
+
+  HttpOKResponse response;
+  response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+  response.headers["Content-Length"] = utils::stringify(out.str().size());
+  response.body = out.str().data();
+  return response;
+}
+
+
+Promise<HttpResponse> Master::http_stats_json(const HttpRequest& request)
+{
+  LOG(INFO) << "Http request for '/master/stats.json'";
+
+  std::ostringstream out;
+
+  out << std::setprecision(10);
+
+  out <<
+    "{" <<
+    "\"uptime\":" << elapsedTime() - startTime << "," <<
+    "\"total_schedulers\":" << frameworks.size() << "," <<
+    "\"active_schedulers\":" << getActiveFrameworks().size() << "," <<
+    "\"activated_slaves\":" << slaveHostnamePorts.size() << "," <<
+    "\"connected_slaves\":" << slaves.size() << "," <<
+    "\"started_tasks\":" << stats.tasks[TASK_STARTING] << "," <<
+    "\"finished_tasks\":" << stats.tasks[TASK_FINISHED] << "," <<
+    "\"killed_tasks\":" << stats.tasks[TASK_KILLED] << "," <<
+    "\"failed_tasks\":" << stats.tasks[TASK_FAILED] << "," <<
+    "\"lost_tasks\":" << stats.tasks[TASK_LOST] << "," <<
+    "\"valid_status_updates\":" << stats.validStatusUpdates << "," <<
+    "\"invalid_status_updates\":" << stats.invalidStatusUpdates << "," <<
+    "\"valid_framework_messages\":" << stats.validFrameworkMessages << "," <<
+    "\"invalid_framework_messages\":" << stats.invalidFrameworkMessages <<
+    "}";
+
+  HttpOKResponse response;

[... 50 lines stripped ...]