You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:19:53 UTC

svn commit: r1132287 [1/2] - in /incubator/mesos/trunk/src: common/ detector/ local/ master/ messaging/ sched/ slave/ tests/

Author: benh
Date: Sun Jun  5 09:19:53 2011
New Revision: 1132287

URL: http://svn.apache.org/viewvc?rev=1132287&view=rev
Log:
Added a "SlavesManager" to help coordinate which slaves are an active part of a Mesos cluster. This includes ZooKeeper support for when using ZooKeeper as the persistant backing store of active slaves.

Modified:
    incubator/mesos/trunk/src/common/resources.cpp
    incubator/mesos/trunk/src/common/tokenize.cpp
    incubator/mesos/trunk/src/common/tokenize.hpp
    incubator/mesos/trunk/src/common/zookeeper.cpp
    incubator/mesos/trunk/src/detector/detector.cpp
    incubator/mesos/trunk/src/local/local.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/messaging/messages.cpp
    incubator/mesos/trunk/src/messaging/messages.hpp
    incubator/mesos/trunk/src/messaging/messages.proto
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/master_test.cpp

Modified: incubator/mesos/trunk/src/common/resources.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/resources.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/resources.cpp (original)
+++ incubator/mesos/trunk/src/common/resources.cpp Sun Jun  5 09:19:53 2011
@@ -569,7 +569,7 @@ Resource Resources::parse(const string& 
   if (index == 0) {
     // This is a ranges.
     Resource::Ranges ranges;
-    const vector<string>& tokens = tokenize(temp, "[]-,\n");
+    const vector<string>& tokens = tokenize::tokenize(temp, "[]-,\n");
     if (tokens.size() % 2 != 0) {
       LOG(FATAL) << "Error parsing value for " << name
                  << ", expecting one or more \"ranges\"";
@@ -596,7 +596,7 @@ Resource Resources::parse(const string& 
     if (index == 0) {
       // This is a set.
       Resource::Set set;
-      const vector<string>& tokens = tokenize(temp, "{},\n");
+      const vector<string>& tokens = tokenize::tokenize(temp, "{},\n");
       for (int i = 0; i < tokens.size(); i++) {
         set.add_item(tokens[i]);
       }
@@ -633,10 +633,10 @@ Resources Resources::parse(const string&
   // Tokenize and parse the value of "resources".
   Resources resources;
 
-  vector<string> tokens = tokenize(s, ";\n");
+  vector<string> tokens = tokenize::tokenize(s, ";\n");
 
   for (int i = 0; i < tokens.size(); i++) {
-    const vector<string>& pairs = tokenize(tokens[i], ":");
+    const vector<string>& pairs = tokenize::tokenize(tokens[i], ":");
     if (pairs.size() != 2) {
       LOG(FATAL) << "bad value for resources, missing ':' within " << pairs[0];
     }

Modified: incubator/mesos/trunk/src/common/tokenize.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/tokenize.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/tokenize.cpp (original)
+++ incubator/mesos/trunk/src/common/tokenize.cpp Sun Jun  5 09:19:53 2011
@@ -1,10 +1,15 @@
+#include <sstream>
+
+#include "foreach.hpp"
 #include "tokenize.hpp"
 
+using std::map;
 using std::string;
+using std::ostringstream;
 using std::vector;
 
 
-namespace mesos { namespace internal {
+namespace tokenize {
 
 vector<string> tokenize(const string& s, const string& delims)
 {
@@ -30,4 +35,24 @@ vector<string> tokenize(const string& s,
   }
 }
 
-}} /* namespace mesos { namespace internal { */
+
+map<string, vector<string> > pairs(const string& s, const string& delim1, const string& delim2)
+{
+  map<string, vector<string> > result;
+
+  const vector<string>& tokens = tokenize(s, delim1);
+  foreach (const string& token, tokens) {
+    const vector<string>& pairs = tokenize(token, delim2);
+    if (pairs.size() != 2) {
+      ostringstream out;
+      out << "failed to split '" << token << "' with '" << delim2 << "'";
+      throw TokenizeException(out.str());
+    }
+
+    result[pairs[0]].push_back(pairs[1]);
+  }
+
+  return result;
+}
+
+} // namespace tokenize {

Modified: incubator/mesos/trunk/src/common/tokenize.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/tokenize.hpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/tokenize.hpp (original)
+++ incubator/mesos/trunk/src/common/tokenize.hpp Sun Jun  5 09:19:53 2011
@@ -1,18 +1,33 @@
 #ifndef __TOKENIZE_HPP__
 #define __TOKENIZE_HPP__
 
+#include <map>
+#include <stdexcept>
 #include <string>
 #include <vector>
 
 
-namespace mesos { namespace internal {
+namespace tokenize {
+
+class TokenizeException : public std::runtime_error
+{
+public:
+  TokenizeException(const std::string& what) : std::runtime_error(what) {}
+};
+
 
 /**
  * Utility function to tokenize a string based on some delimiters.
  */
-std::vector<std::string> tokenize(const std::string& s, const std::string& delims);
+std::vector<std::string> tokenize(const std::string& s,
+                                  const std::string& delims);
+
+
+std::map<std::string, std::vector<std::string> > pairs(const std::string& s,
+                                                       const std::string& delim1,
+                                                       const std::string& delim2);
 
-}} /* namespace mesos { namespace internal { */
+} // namespace tokenize {
 
 #endif // __TOKENIZE_HPP__
 

Modified: incubator/mesos/trunk/src/common/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/zookeeper.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/common/zookeeper.cpp Sun Jun  5 09:19:53 2011
@@ -331,7 +331,7 @@ private:
   {
     ZooKeeperProcess* zooKeeperProcess = static_cast<ZooKeeperProcess*>(ctx);
     process::dispatch(zooKeeperProcess->pid, &WatcherProcess::event,
-                      zooKeeperProcess->zk, type, state, path);
+                      zooKeeperProcess->zk, type, state, string(path));
   }
 
   static void voidCompletion(int ret, const void *data)
@@ -355,8 +355,10 @@ private:
     Promise<int> promise = (*args).get<0>();
     string* result = (*args).get<1>();
 
-    if (result != NULL && value != NULL) {
-      result->assign(value);
+    if (ret == 0) {
+      if (result != NULL) {
+        result->assign(value);
+      }
     }
 
     promise.set(ret);
@@ -373,8 +375,10 @@ private:
     Promise<int> promise = (*args).get<0>();
     Stat *stat_result = (*args).get<1>();
 
-    if (stat_result != NULL && stat != NULL) {
-      *stat_result = *stat;
+    if (ret == 0) {
+      if (stat_result != NULL) {
+        *stat_result = *stat;
+      }
     }
 
     promise.set(ret);
@@ -392,12 +396,14 @@ private:
     string* result = (*args).get<1>();
     Stat* stat_result = (*args).get<2>();
 
-    if (result != NULL && value != NULL && value_len > 0) {
-      result->assign(value, value_len);
-    }
+    if (ret == 0) {
+      if (result != NULL) {
+        result->assign(value, value_len);
+      }
 
-    if (stat_result != NULL && stat != NULL) {
-      *stat_result = *stat;
+      if (stat_result != NULL) {
+        *stat_result = *stat;
+      }
     }
 
     promise.set(ret);
@@ -414,9 +420,11 @@ private:
     Promise<int> promise = (*args).get<0>();
     vector<string>* results = (*args).get<1>();
 
-    if (results != NULL && values != NULL) {
-      for (int i = 0; i < values->count; i++) {
-	results->push_back(values->data[i]);
+    if (ret == 0) {
+      if (results != NULL) {
+        for (int i = 0; i < values->count; i++) {
+          results->push_back(values->data[i]);
+        }
       }
     }
 

Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun  5 09:19:53 2011
@@ -10,16 +10,15 @@
 
 #include "common/fatal.hpp"
 #include "common/foreach.hpp"
+#ifdef WITH_ZOOKEEPER
+#include "common/zookeeper.hpp"
+#endif
 
 #include "messaging/messages.hpp"
 
 #include "detector.hpp"
 #include "url_processor.hpp"
 
-#ifdef WITH_ZOOKEEPER
-#include "zookeeper.hpp"
-#endif
-
 using namespace mesos;
 using namespace mesos::internal;
 
@@ -71,8 +70,8 @@ public:
    * for slaves and frameworks)
    * @param quiet verbosity logging level for undelying ZooKeeper library
    */
-  ZooKeeperMasterDetector(const std::string& servers,
-			  const std::string& znode,
+  ZooKeeperMasterDetector(const string& servers,
+			  const string& znode,
 			  const UPID& pid,
 			  bool contend = false,
 			  bool quiet = false);
@@ -82,19 +81,18 @@ public:
   /** 
    * ZooKeeper watcher callback.
    */
-  virtual void process(ZooKeeper *zk, int type, int state,
-		       const std::string &path);
+  virtual void process(ZooKeeper *zk, int type, int state, const string &path);
 
 private:
   /**
    * @param s sequence id
    */
-  void setId(const std::string &s);
+  void setId(const string &s);
 
   /**
    * @return current sequence id if contending to be a master
    */
-  std::string getId();
+  string getId();
 
   /**
    * Attempts to detect a master.
@@ -105,10 +103,10 @@ private:
    * @param seq sequence id of a master
    * @return PID corresponding to a master
    */
-  UPID lookupMasterPID(const std::string &seq) const;
+  UPID lookupMasterPID(const string &seq) const;
 
-  std::string servers;
-  std::string znode;
+  string servers;
+  string znode;
   UPID pid;
   bool contend;
   bool reconnect;
@@ -116,9 +114,9 @@ private:
   ZooKeeper *zk;
 
   // Our sequence string if contending to be a master.
-  std::string mySeq;
+  string mySeq;
 
-  std::string currentMasterSeq;
+  string currentMasterSeq;
   UPID currentMasterPID;
 
   // Reconnect timer.
@@ -130,7 +128,7 @@ private:
 MasterDetector::~MasterDetector() {}
 
 
-MasterDetector* MasterDetector::create(const std::string &url,
+MasterDetector* MasterDetector::create(const string &url,
                                        const UPID &pid,
                                        bool contend,
                                        bool quiet)
@@ -154,15 +152,21 @@ MasterDetector* MasterDetector::create(c
       // TODO(benh): Consider actually using the chroot feature of
       // ZooKeeper, rather than just using it's syntax.
       size_t index = urlPair.second.find("/");
-      if (index == string::npos)
+      if (index == string::npos) {
 	fatal("expecting chroot path for ZooKeeper");
+      }
+
+      const string &servers = urlPair.second.substr(0, index);
+
       const string &znode = urlPair.second.substr(index);
-      if (znode == "/")
+      if (znode == "/") {
 	fatal("expecting chroot path for ZooKeeper ('/' is not supported)");
-      const string &servers = urlPair.second.substr(0, index);
+      }
+
       detector = new ZooKeeperMasterDetector(servers, znode, pid, contend, quiet);
 #else
-      fatal("ZooKeeper not supported in this build");
+      fatal("Cannot detect masters with 'zoo://', "
+            "ZooKeeper is not supported in this build");
 #endif // #ifdef WITH_ZOOKEEPER
       break;
     }
@@ -435,8 +439,8 @@ void ZooKeeperMasterDetector::process(Zo
 
     reconnect = true;
   } else {
-    LOG(INFO) << "Unimplemented watch event: (state is "
-	      << state << " and type is " << type << ")";
+    LOG(WARNING) << "Unimplemented watch event: (state is "
+                 << state << " and type is " << type << ")";
   }
 }
 

Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Sun Jun  5 09:19:53 2011
@@ -55,10 +55,12 @@ static MasterDetector* detector = NULL;
 
 void registerOptions(Configurator* configurator)
 {
-  configurator->addOption<int>("slaves", 's', "Number of slaves", 1);
   Logging::registerOptions(configurator);
   Master::registerOptions(configurator);
   Slave::registerOptions(configurator);
+  configurator->addOption<int>("num_slaves",
+                               "Number of slaves to create for local cluster",
+                               1);
 }
 
 
@@ -69,7 +71,8 @@ PID<Master> launch(int numSlaves,
                    bool quiet)
 {
   Configuration conf;
-  conf.set("slaves", numSlaves);
+  conf.set("slaves", "*");
+  conf.set("num_slaves", numSlaves);
   conf.set("quiet", quiet);
 
   stringstream out;
@@ -83,16 +86,18 @@ PID<Master> launch(int numSlaves,
 PID<Master> launch(const Configuration& conf,
                    bool initLogging)
 {
-  int numSlaves = conf.get<int>("slaves", 1);
+  int numSlaves = conf.get<int>("num_slaves", 1);
   bool quiet = conf.get<bool>("quiet", false);
 
-  if (master != NULL)
+  if (master != NULL) {
     fatal("can only launch one local cluster at a time (for now)");
+  }
 
   if (initLogging) {
     pthread_once(&glog_initialized, initialize_glog);
-    if (!quiet)
+    if (!quiet) {
       google::SetStderrLogging(google::INFO);
+    }
   }
 
   master = new Master(conf);

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 09:19:53 2011
@@ -4,7 +4,12 @@
 
 #include <google/protobuf/descriptor.h>
 
+#include "config/config.hpp"
+
 #include "common/date_utils.hpp"
+#ifdef WITH_ZOOKEEPER
+#include "common/zookeeper.hpp"
+#endif
 
 #include "allocator.hpp"
 #include "allocator_factory.hpp"
@@ -15,10 +20,13 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::master;
 
+using boost::bad_lexical_cast;
 using boost::lexical_cast;
 using boost::unordered_map;
 using boost::unordered_set;
 
+using process::HttpInternalServerErrorResponse;
+using process::HttpNotFoundResponse;
 using process::HttpOKResponse;
 using process::HttpResponse;
 using process::HttpRequest;
@@ -28,6 +36,7 @@ using process::Promise;
 using process::UPID;
 
 using std::endl;
+using std::map;
 using std::max;
 using std::ostringstream;
 using std::pair;
@@ -54,7 +63,7 @@ protected:
     while (true) {
       receive(1);
       if (name() == process::TIMEOUT) {
-        dispatch(master, &Master::timerTick);
+        process::dispatch(master, &Master::timerTick);
       } else if (name() == process::EXITED) {
 	return;
       }
@@ -118,13 +127,711 @@ private:
   const PID<Master> master;
 };
 
+} // namespace {
+
+
+#ifdef WITH_ZOOKEEPER
+
+// Forward declaration of watcher.
+class ZooKeeperSlavesManagerStorageWatcher;
+
+
+class ZooKeeperSlavesManagerStorage : public SlavesManagerStorage
+{
+public:
+  ZooKeeperSlavesManagerStorage(const string& _servers,
+                               const string& _znode,
+                               const PID<SlavesManager>& _slavesManager);
+
+  virtual ~ZooKeeperSlavesManagerStorage();
+
+  virtual Promise<bool> add(const string& hostname, uint16_t port);
+  virtual Promise<bool> remove(const string& hostname, uint16_t port);
+  virtual Promise<bool> activate(const string& hostname, uint16_t port);
+  virtual Promise<bool> deactivate(const string& hostname, uint16_t port);
+
+  Promise<bool> connected();
+  Promise<bool> reconnecting();
+  Promise<bool> reconnected();
+  Promise<bool> expired();
+  Promise<bool> updated(const string& path);
+
+private:
+  const string servers;
+  const string znode;
+  const PID<SlavesManager> slavesManager;
+  ZooKeeper* zk;
+  ZooKeeperSlavesManagerStorageWatcher* watcher;
+};
+
+
+class ZooKeeperSlavesManagerStorageWatcher : public Watcher
+{
+public:
+  ZooKeeperSlavesManagerStorageWatcher(const PID<ZooKeeperSlavesManagerStorage>& _pid)
+    : pid(_pid), reconnect(false) {}
+
+  virtual ~ZooKeeperSlavesManagerStorageWatcher() {}
+
+  virtual void process(ZooKeeper* zk, int type, int state, const string& path)
+  {
+    if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+      // Check if this is a reconnect.
+      if (!reconnect) {
+        // Initial connect.
+        process::dispatch(pid, &ZooKeeperSlavesManagerStorage::connected);
+      } else {
+        // Reconnected.
+        process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnected);
+      }
+    } else if ((state == ZOO_EXPIRED_SESSION_STATE) && (type == ZOO_SESSION_EVENT)) {
+      // Session expiration. Let the manager take care of it.
+      process::dispatch(pid, &ZooKeeperSlavesManagerStorage::expired);
+
+      // If this watcher is reused, the next connect won't be a reconnect.
+      reconnect = false;
+    } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
+      // Let the manager deal with file changes.
+      process::dispatch(pid, &ZooKeeperSlavesManagerStorage::updated, path);
+    } else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
+      // The client library automatically reconnects, taking into
+      // account failed servers in the connection string,
+      // appropriately handling the "herd effect", etc.
+      reconnect = true;
+      process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnecting);
+    } else {
+      LOG(WARNING) << "Unimplemented watch event: (state is "
+                   << state << " and type is " << type << ")";
+    }
+  }
+
+private:
+  const PID<ZooKeeperSlavesManagerStorage> pid;
+  bool reconnect;
+};
+
+
+ZooKeeperSlavesManagerStorage::ZooKeeperSlavesManagerStorage(const string& _servers,
+                                                             const string& _znode,
+                                                             const PID<SlavesManager>& _slavesManager)
+  : servers(_servers), znode(_znode), slavesManager(_slavesManager)
+{
+  PID<ZooKeeperSlavesManagerStorage> pid(*this);
+  watcher = new ZooKeeperSlavesManagerStorageWatcher(pid);
+  zk = new ZooKeeper(servers, 10000, watcher);
+}
+
+
+ZooKeeperSlavesManagerStorage::~ZooKeeperSlavesManagerStorage()
+{
+  delete zk;
+  delete watcher;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::add(const string& hostname, uint16_t port)
+{
+  int ret;
+  string result;
+  Stat stat;
+
+  ret = zk->get(znode + "/active", true, &result, &stat);
+
+  if (ret != ZOK) {
+    LOG(WARNING) << "Failed to get '" << znode + "/active"
+                 << "' in ZooKeeper! (" << zk->error(ret) << ")";
+    return false;
+  }
+
+  ostringstream out;
+
+  if (result.size() == 0) {
+    out << hostname << ":" << port;
+  } else {
+    out << "," << hostname << ":" << port;
+  }
+
+  result += out.str();
+
+  // Set the data in the znode.
+  ret = zk->set(znode + "/active", result, stat.version);
+
+  if (ret != ZOK) {
+    LOG(WARNING) << "Could not add slave " << hostname << ":" << port
+                 << " to '" << znode + "/active' in ZooKeeper! ("
+                 << zk->error(ret) << ")";
+    return false;
+  }
+
+  return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::remove(const string& hostname, uint16_t port)
+{
+  string files[] = { "/active", "/inactive" };
+
+  foreach (const string& file, files) {
+    int ret;
+    string result;
+    Stat stat;
+
+    ret = zk->get(znode + file, true, &result, &stat);
+
+    if (ret != ZOK) {
+      LOG(WARNING) << "Failed to get '" << znode + file
+                   << "' in ZooKeeper! (" << zk->error(ret) << ")";
+      return false;
+    }
+
+    ostringstream out;
+    out << hostname << ":" << port;
+
+    size_t index = result.find(out.str());
+
+    if (index != string::npos) {
+      if (index == 0) {
+        result.erase(index, out.str().size() + 1);
+      } else {
+        result.erase(index - 1, out.str().size() + 1);
+      }
+
+      // Set the data in the znode.
+      ret = zk->set(znode + file, result, stat.version);
+
+      if (ret != ZOK) {
+        LOG(WARNING) << "Could not remove slave " << hostname << ":" << port
+                     << " to '" << znode + file << "' in ZooKeeper! ("
+                     << zk->error(ret) << ")";
+        return false;
+      }
+    }
+  }
+
+  return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::activate(const string& hostname, uint16_t port)
+{
+  fatal("unimplemented");
+  return false;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::deactivate(const string& hostname, uint16_t port)
+{
+  fatal("unimplemented");
+  return false;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::connected()
+{
+  int ret;
+
+  static const string delimiter = "/";
+
+  // Assume the znode that was created does not end with a "/".
+  CHECK(znode.at(znode.length() - 1) != '/');
+
+  // Create directory path znodes as necessary.
+  size_t index = znode.find(delimiter, 0);
+
+  while (index < string::npos) {
+    // Get out the prefix to create.
+    index = znode.find(delimiter, index + 1);
+    string prefix = znode.substr(0, index);
+
+    // Create the node (even if it already exists).
+    ret = zk->create(prefix, "", ZOO_OPEN_ACL_UNSAFE, 0, NULL);
+
+    if (ret != ZOK && ret != ZNODEEXISTS) {
+      // Okay, consider this a failure (maybe we lost our connection
+      // to ZooKeeper), increment the failure count, log the issue,
+      // and perhaps try again when ZooKeeper issues get sorted out.
+      LOG(WARNING) << "Failed to create '" << znode
+                   << "' in ZooKeeper! (" << zk->error(ret) << ")";
+      return false;
+    }
+  }
+
+  // Now make sure the 'active' znode is created.
+  ret = zk->create(znode + "/active", "", ZOO_OPEN_ACL_UNSAFE, 0, NULL);
+
+  if (ret != ZOK && ret != ZNODEEXISTS) {
+    LOG(WARNING) << "Failed to create '" << znode + "/active"
+                 << "' in ZooKeeper! (" << zk->error(ret) << ")";
+    return false;
+  }
+
+  // Now make sure the 'inactive' znode is created.
+  ret = zk->create(znode + "/inactive", "", ZOO_OPEN_ACL_UNSAFE, 0, NULL);
+
+  if (ret != ZOK && ret != ZNODEEXISTS) {
+    LOG(WARNING) << "Failed to create '" << znode + "/inactive"
+                 << "' in ZooKeeper! (" << zk->error(ret) << ")";
+    return false;
+  }
+
+  // Reconcile what's in the znodes versus what we have in memory
+  // (this also puts watches on these znodes).
+  updated(znode + "/active");
+  updated(znode + "/inactive");
+
+  return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::reconnecting()
+{
+  LOG(INFO) << "ZooKeeperSlavesManagerStorage is attempting to reconnect";
+  return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::reconnected()
+{
+  LOG(INFO) << "ZooKeeperSlavesManagerStorage has reconnected";
+
+  // Reconcile what's in the znodes versus what we have in memory
+  // (this also puts watches on these znodes).
+  updated(znode + "/active");
+  updated(znode + "/inactive");
+
+  return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::expired()
+{
+  LOG(WARNING) << "ZooKeeperSlavesManagerStorage session expired!";
+
+  CHECK(zk != NULL);
+  delete zk;
+
+  zk = new ZooKeeper(servers, 10000, watcher);
+
+  // TODO(benh): Put mechanisms in place such that reconnects may
+  // fail (or just take too long).
+
+  return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::updated(const string& path)
+{
+  LOG(INFO) << "Slave information at '" << path
+            << "' in ZooKeeper has been updated ... propogating changes";
+
+  int ret;
+  string result;
+
+  if (path == znode + "/active") {
+    ret = zk->get(znode + "/active", true, &result, NULL);
+
+    if (ret != ZOK) {
+      LOG(WARNING) << "Failed to get '" << znode + "/active"
+                   << "' in ZooKeeper! (" << zk->error(ret) << ")";
+      return false;
+    }
+
+    // Parse what's in ZooKeeper into hostname port pairs.
+    map<string, set<uint16_t> > active;
+
+    const vector<string>& tokens = tokenize::tokenize(result, ",");
+    foreach (const string& token, tokens) {
+      const vector<string>& pairs = tokenize::tokenize(token, ":");
+      if (pairs.size() != 2) {
+        LOG(WARNING) << "Bad data in '" << znode + "/active"
+                     << "', could not parse " << token;
+        return false;
+      }
+
+      try {
+        active[pairs[0]].insert(lexical_cast<uint16_t>(pairs[1]));
+      } catch (const bad_lexical_cast&) {
+        LOG(WARNING) << "Bad data in '" << znode + "/active"
+                     << "', could not parse " << token;
+        return false;
+      }
+    }
+
+    process::dispatch(slavesManager, &SlavesManager::updateActive, active);
+  } else if (path == znode + "/inactive") {
+    ret = zk->get(znode + "/inactive", true, &result, NULL);
+
+    if (ret != ZOK) {
+      LOG(WARNING) << "Failed to get '" << znode + "/inactive"
+                   << "' in ZooKeeper! (" << zk->error(ret) << ")";
+      return false;
+    }
+
+    // Parse what's in ZooKeeper into hostname port pairs.
+    map<string, set<uint16_t> > inactive;
+
+    const vector<string>& tokens = tokenize::tokenize(result, ",");
+    foreach (const string& token, tokens) {
+      const vector<string>& pairs = tokenize::tokenize(token, ":");
+      if (pairs.size() != 2) {
+        LOG(WARNING) << "Bad data in '" << znode + "/inactive"
+                     << "', could not parse " << token;
+        return false;
+      }
+
+      try {
+        inactive[pairs[0]].insert(lexical_cast<uint16_t>(pairs[1]));
+      } catch (const bad_lexical_cast&) {
+        LOG(WARNING) << "Bad data in '" << znode + "/inactive"
+                     << "', could not parse " << token;
+        return false;
+      }
+    }
+
+    process::dispatch(slavesManager, &SlavesManager::updateInactive, inactive);
+  } else {
+    LOG(WARNING) << "Not expecting changes to path '"
+                 << path << "' in ZooKeeper";
+    return false;
+  }
+
+  return true;
+}
+
+#endif // WITH_ZOOKEEPER
+
+
+SlavesManager::SlavesManager(const Configuration& conf,
+                             const PID<Master>& _master)
+  : process::Process<SlavesManager>("slaves"),
+    master(_master)
+{
+  // Create the slave manager storage based on configuration.
+  const string& slaves = conf.get<string>("slaves", "*");
+
+  // Check if 'slaves' starts with "zoo://".
+  string zoo = "zoo://";
+  size_t index = slaves.find(zoo);
+  if (index == 0) {
+#ifdef WITH_ZOOKEEPER
+    // TODO(benh): Consider actually using the chroot feature of
+    // ZooKeeper, rather than just using it's syntax.
+    string temp = slaves.substr(zoo.size());
+    index = temp.find("/");
+    if (index == string::npos) {
+      fatal("Expecting chroot path for ZooKeeper");
+    }
+
+    const string& servers = temp.substr(0, index);
+
+    const string& znode = temp.substr(index);
+    if (znode == "/") {
+      fatal("Expecting chroot path for ZooKeeper ('/' is not supported)");
+    }
+
+    storage = new ZooKeeperSlavesManagerStorage(servers, znode, self());
+    process::spawn(storage);
+#else
+    fatal("Cannot get active/inactive slave information using 'zoo://',"
+          " ZooKeeper is not supported in this build");
+#endif // WITH_ZOOKEEPER
+  } else {
+    // Parse 'slaves' as initial hostname:port pairs.
+    if (slaves != "*") {
+      const vector<string>& tokens = tokenize::tokenize(slaves, ",");
+      foreach (const string& token, tokens) {
+        const vector<string>& pairs = tokenize::tokenize(token, ":");
+        if (pairs.size() != 2) {
+          fatal("Failed to parse \"%s\" in option 'slaves'", token.c_str());
+        }
+
+        try {
+          active[pairs[0]].insert(lexical_cast<uint16_t>(pairs[1]));
+        } catch (const bad_lexical_cast&) {
+          fatal("Failed to parse \"%s\" in option 'slaves'", token.c_str());
+        }
+      }
+    }
+
+    storage = new SlavesManagerStorage();
+    process::spawn(storage);
+  }
+
+  // Set up our HTTP endpoints.
+  install("add", &SlavesManager::add);
+  install("remove", &SlavesManager::remove);
+}
+
+
+SlavesManager::~SlavesManager()
+{
+  // TODO(benh): Terminate and deallocate 'storage'.
+}
+
+
+void SlavesManager::registerOptions(Configurator* configurator)
+{
+  configurator->addOption<string>("slaves",
+                                  "Initial slaves that should be "
+                                  "considered part of this cluster "
+                                  "(or if using ZooKeeper a URL)", "*");
+}
+
+
+bool SlavesManager::add(const string& hostname, uint16_t port)
+{
+  // Make sure this slave is not currently deactivated or activated.
+  if (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0) {
+    LOG(WARNING) << "Attempted to add deactivated slave at "
+                 << hostname << ":" << port;
+    return false;
+  } else if (active.count(hostname) == 0 || active[hostname].count(port) == 0) {
+    // Get the storage system to persist the addition.
+    bool result = process::call(storage->self(), &SlavesManagerStorage::add,
+                                hostname, port);
+    if (result) {
+      LOG(INFO) << "Adding slave at " << hostname << ":" << port;
+      // Tell the master that this slave is now active (so it can
+      // allow the slave to register).
+      process::dispatch(master, &Master::activatedSlaveHostnamePort,
+                        hostname, port);
+      active[hostname].insert(port);
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+bool SlavesManager::remove(const string& hostname, uint16_t port)
+{
+  // Make sure the slave is currently activated or deactivated.
+  if ((active.count(hostname) > 0 && active[hostname].count(port) > 0) ||
+      (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0)) {
+    // Get the storage system to persist the removal.
+    bool result = process::call(storage->self(), &SlavesManagerStorage::remove,
+                                hostname, port);
+    if (result) {
+      LOG(INFO) << "Removing slave at " << hostname << ":" << port;
+      if (active.count(hostname) > 0 && active[hostname].count(port) > 0) {
+        process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+                          hostname, port);
+        active[hostname].erase(port);
+        if (active[hostname].size() == 0) active.erase(hostname);
+      }
+
+      if (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0) {
+        inactive[hostname].erase(port);
+        if (inactive[hostname].size() == 0) inactive.erase(hostname);
+      }
+
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+bool SlavesManager::activate(const string& hostname, uint16_t port)
+{
+  // Make sure the slave is currently deactivated.
+  if (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0) {
+    // Get the storage system to persist the activation.
+    bool result = process::call(storage->self(), &SlavesManagerStorage::activate,
+                                hostname, port);
+    if (result) {
+      LOG(INFO) << "Activating slave at " << hostname << ":" << port;
+      process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+                        hostname, port);
+      active[hostname].insert(port);
+      inactive[hostname].erase(port);
+      if (inactive[hostname].size() == 0) inactive.erase(hostname);
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+bool SlavesManager::deactivate(const string& hostname, uint16_t port)
+{
+  // Make sure the slave is currently activated.
+  if (active.count(hostname) > 0 && active[hostname].count(port) > 0) {
+    // Get the storage system to persist the deactivation.
+    bool result = process::call(storage->self(), &SlavesManagerStorage::deactivate,
+                                hostname, port);
+    if (result) {
+      LOG(INFO) << "Deactivating slave at " << hostname << ":" << port;
+      process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+                        hostname, port);
+      active[hostname].erase(port);
+      if (active[hostname].size() == 0) active.erase(hostname);
+      inactive[hostname].insert(port);
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+void SlavesManager::updateActive(const map<string, set<uint16_t> >& _updated)
+{
+  // TODO(benh): Remove this unnecessary copy. The code below uses the
+  // [] operator to make it easier to read, but [] can't be used on
+  // something that is const, hence the copy. Ugh.
+  map<string, set<uint16_t> > updated = _updated;
+
+  // Loop through the current active slave hostname:port pairs and
+  // remove all that are not found in updated.
+  foreachpair (const string& hostname, _, active) {
+    if (updated.count(hostname) == 0) {
+      foreach (uint16_t port, active[hostname]) {
+        LOG(INFO) << "Removing slave at " << hostname << ":" << port;
+        process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+                          hostname, port);
+      }
+      active.erase(hostname);
+    } else {
+      foreach (uint16_t port, active[hostname]) {
+        if (updated[hostname].count(port) == 0) {
+          LOG(INFO) << "Removing slave at " << hostname << ":" << port;
+          process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+                            hostname, port);
+          active[hostname].erase(port);
+          if (active[hostname].size() == 0) active.erase(hostname);
+        }
+      }
+    }
+  }
+
+  // Now loop through the updated slave hostname:port pairs and add
+  // all that are not found in active.
+  foreachpair (const string& hostname, _, updated) {
+    if (active.count(hostname) == 0) {
+      foreach (uint16_t port, updated[hostname]) {
+        LOG(INFO) << "Adding slave at " << hostname << ":" << port;
+        process::dispatch(master, &Master::activatedSlaveHostnamePort,
+                          hostname, port);
+        active[hostname].insert(port);
+      }
+    } else {
+      foreach (uint16_t port, active[hostname]) {
+        if (active[hostname].count(port) == 0) {
+          LOG(INFO) << "Adding slave at " << hostname << ":" << port;
+          process::dispatch(master, &Master::activatedSlaveHostnamePort,
+                            hostname, port);
+          active[hostname].insert(port);
+        }
+      }
+    }
+  }
+}
+
+
+void SlavesManager::updateInactive(const map<string, set<uint16_t> >& updated)
+{
+  inactive = updated;
+}
+
+
+Promise<HttpResponse> SlavesManager::add(const HttpRequest& request)
+{
+  // Parse the query to get out the slave hostname and port.
+  string hostname = "";
+  uint16_t port = 0;
+
+  map<string, vector<string> > pairs = tokenize::pairs(request.query, ",", "=");
+
+  if (pairs.size() != 2) {
+    LOG(WARNING) << "Malformed query string when trying to add a slave";
+    return HttpNotFoundResponse();
+  }
+
+  // Make sure there is at least a 'hostname=' and 'port='.
+  if (pairs.count("hostname") == 0) {
+    LOG(WARNING) << "Missing 'hostname' in query string"
+                 << " when trying to add a slave";
+    return HttpNotFoundResponse();
+  } else if (pairs.count("port") == 0) {
+    LOG(WARNING) << "Missing 'port' in query string"
+                 << " when trying to add a slave";
+    return HttpNotFoundResponse();
+  }
+
+  hostname = pairs["hostname"].front();
+
+  // Check that 'port' is valid.
+  try {
+    port = lexical_cast<uint16_t>(pairs["port"].front());
+  } catch (const bad_lexical_cast&) {
+    LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+                 << "'  when trying to add a slave";
+    return HttpNotFoundResponse();
+  }
+
+  LOG(INFO) << "Asked to add slave at " << hostname << ":" << port;
+
+  if (add(hostname, port)) {
+    return HttpOKResponse();
+  } else {
+    return HttpInternalServerErrorResponse();
+  }
+}
+
+
+Promise<HttpResponse> SlavesManager::remove(const HttpRequest& request)
+{
+  // Parse the query to get out the slave hostname and port.
+  string hostname = "";
+  uint16_t port = 0;
+
+  // TODO(benh): Don't use tokenize::pairs to get better errors?
+  map<string, vector<string> > pairs = tokenize::pairs(request.query, ",", "=");
+
+  if (pairs.size() != 2) {
+    LOG(WARNING) << "Malformed query string when trying to remove a slave";
+    return HttpNotFoundResponse();
+  }
+
+  // Make sure there is at least a 'hostname=' and 'port='.
+  if (pairs.count("hostname") == 0) {
+    LOG(WARNING) << "Missing 'hostname' in query string"
+                 << " when trying to remove a slave";
+    return HttpNotFoundResponse();
+  } else if (pairs.count("port") == 0) {
+    LOG(WARNING) << "Missing 'port' in query string"
+                 << " when trying to remove a slave";
+    return HttpNotFoundResponse();
+  }
+
+  hostname = pairs["hostname"].front();
+
+  // Check that 'port' is valid.
+  try {
+    port = lexical_cast<uint16_t>(pairs["port"].front());
+  } catch (const bad_lexical_cast&) {
+    LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+                 << "'  when trying to remove a slave";
+    return HttpNotFoundResponse();
+  }
+
+  LOG(INFO) << "Asked to remove slave at " << hostname << ":" << port;
+
+  if (remove(hostname, port)) {
+    return HttpOKResponse();
+  } else {
+    return HttpInternalServerErrorResponse();
+  }
 }
 
 
 Master::Master()
   : MesosProcess<Master>("master"),
-    active(false),
-    nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+    active(false), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
 {
   allocatorType = "simple";
   initialize();
@@ -133,8 +840,8 @@ Master::Master()
 
 Master::Master(const Configuration& _conf)
   : MesosProcess<Master>("master"),
-    active(false),
-    conf(_conf), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+    active(false), conf(_conf), nextFrameworkId(0), nextSlaveId(0),
+    nextOfferId(0)
 {
   allocatorType = conf.get("allocator", "simple");
   initialize();
@@ -145,26 +852,25 @@ Master::~Master()
 {
   LOG(INFO) << "Shutting down master";
 
-  delete allocator;
-
-  foreachpair (_, Framework *framework, frameworks) {
-    foreachpair(_, Task *task, framework->tasks)
-      delete task;
-    delete framework;
+  foreachpaircopy (_, Framework* framework, frameworks) {
+    removeFramework(framework);
   }
 
-  foreachpair (_, Slave *slave, slaves) {
-    delete slave;
+  foreachpaircopy (_, Slave* slave, slaves) {
+    removeSlave(slave);
   }
 
-  foreachpair (_, SlotOffer *offer, slotOffers) {
-    delete offer;
-  }
+  delete allocator;
+
+  CHECK(slotOffers.size() == 0);
+
+  // TODO(benh): Terminate and delete slave manager!
 }
 
 
 void Master::registerOptions(Configurator* configurator)
 {
+  SlavesManager::registerOptions(configurator);
   configurator->addOption<string>("allocator", 'a', "Allocation module name",
                                   "simple");
   configurator->addOption<bool>("root_submissions",
@@ -332,11 +1038,16 @@ void Master::operator () ()
   masterId = DateUtils::currentDate() + "-" + msg.token();
   LOG(INFO) << "Master ID: " << masterId;
 
+  // Setup slave manager.
+  slavesManager = new SlavesManager(conf, self());
+  process::spawn(slavesManager);
+
   // Create the allocator (we do this after the constructor because it
   // leaks 'this').
   allocator = createAllocator();
-  if (!allocator)
+  if (!allocator) {
     LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
+  }
 
   link(spawn(new AllocatorTimer(self())));
   //link(spawn(new SharesPrinter(self())));
@@ -425,9 +1136,6 @@ void Master::initialize()
           &ExitedExecutorMessage::executor_id,
           &ExitedExecutorMessage::result);
 
-  install(SH2M_HEARTBEAT, &Master::slaveHeartbeat,
-          &HeartbeatMessage::slave_id);
-
   install(process::EXITED, &Master::exited);
 
   // Install HTTP request handlers.
@@ -738,23 +1446,10 @@ void Master::statusUpdateAck(const Frame
 
 void Master::registerSlave(const SlaveInfo& slaveInfo)
 {
-  Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsed());
-
-  LOG(INFO) << "Registering slave " << slave->slaveId
-            << " at " << slave->pid;
-
-  slaves[slave->slaveId] = slave;
-  pidToSlaveId[slave->pid] = slave->slaveId;
-  link(slave->pid);
-
-  allocator->slaveAdded(slave);
-
-  MSG<M2S_REGISTER_REPLY> out;
-  out.mutable_slave_id()->MergeFrom(slave->slaveId);
-  out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
-  send(slave->pid, out);
+  addSlave(slaveInfo, newSlaveId(), from());
 }
 
+
 void Master::reregisterSlave(const SlaveID& slaveId,
                              const SlaveInfo& slaveInfo,
                              const vector<Task>& tasks)
@@ -772,43 +1467,35 @@ void Master::reregisterSlave(const Slave
       LOG(ERROR) << "Slave re-registered with in use SlaveID!";
       send(from(), process::TERMINATE);
     } else {
-      Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsed());
-
-      slaves[slave->slaveId] = slave;
-      pidToSlaveId[slave->pid] = slave->slaveId;
-      link(slave->pid);
-
-      MSG<M2S_REREGISTER_REPLY> out;
-      out.mutable_slave_id()->MergeFrom(slave->slaveId);
-      out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
-      send(slave->pid, out);
-
-      for (int i = 0; i < tasks.size(); i++) {
-        Task* task = new Task(tasks[i]);
-
-        // Add the task to the slave.
-        slave->addTask(task);
-
-        // Try and add the task to the framework too, but since the
-        // framework might not yet be connected we won't be able to
-        // add them. However, when the framework connects later we
-        // will add them then. We also tell this slave the current
-        // framework pid for this task. Again, we do the same thing if
-        // a framework currently isn't registered.
-        Framework* framework = lookupFramework(task->framework_id());
-        if (framework != NULL) {
-          framework->addTask(task);
-          MSG<M2S_UPDATE_FRAMEWORK> out;
-          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-          out.set_pid(framework->pid);
-          send(slave->pid, out);
-        } else {
-          // TODO(benh): We should really put a timeout on how long we
-          // keep tasks running on a slave that never have frameworks
-          // reregister and claim them.
-          LOG(WARNING) << "Possibly orphaned task " << task->task_id()
-                       << " of framework " << task->framework_id()
-                       << " running on slave " << slaveId;
+      Slave* slave = addSlave(slaveInfo, slaveId, from());
+      if (slave != NULL) {
+        for (int i = 0; i < tasks.size(); i++) {
+          Task* task = new Task(tasks[i]);
+
+          // Add the task to the slave.
+          slave->addTask(task);
+
+          // Try and add the task to the framework too, but since the
+          // framework might not yet be connected we won't be able to
+          // add them. However, when the framework connects later we
+          // will add them then. We also tell this slave the current
+          // framework pid for this task. Again, we do the same thing
+          // if a framework currently isn't registered.
+          Framework* framework = lookupFramework(task->framework_id());
+          if (framework != NULL) {
+            framework->addTask(task);
+            MSG<M2S_UPDATE_FRAMEWORK> out;
+            out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+            out.set_pid(framework->pid);
+            send(slave->pid, out);
+          } else {
+            // TODO(benh): We should really put a timeout on how long we
+            // keep tasks running on a slave that never have frameworks
+            // reregister and claim them.
+            LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+                         << " of framework " << task->framework_id()
+                         << " running on slave " << slaveId;
+          }
         }
       }
     }
@@ -941,28 +1628,36 @@ void Master::exitedExecutor(const SlaveI
 }
 
 
-void Master::slaveHeartbeat(const SlaveID& slaveId)
+void Master::activatedSlaveHostnamePort(const string& hostname, uint16_t port)
 {
-  Slave* slave = lookupSlave(slaveId);
-  if (slave != NULL) {
-    slave->lastHeartbeat = elapsed();
-  } else {
-    LOG(WARNING) << "Received heartbeat for UNKNOWN slave "
-                 << slaveId << " from " << from();
-  }
+  LOG(INFO) << "Master now considering a slave at "
+            << hostname << ":" << port << " as active";
+  slaveHostnamePorts[hostname].insert(port);
 }
 
 
-void Master::timerTick()
+void Master::deactivatedSlaveHostnamePort(const string& hostname, uint16_t port)
 {
-  foreachpaircopy (_, Slave* slave, slaves) {
-    if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
-      LOG(INFO) << "Slave " << slave->slaveId
-                << " missing heartbeats ... considering disconnected";
-      removeSlave(slave);
+  if (slaveHostnamePorts.count(hostname) > 0 &&
+      slaveHostnamePorts[hostname].count(port) > 0) {
+    // Look for a connected slave and remove it.
+    foreachpair (_, Slave* slave, slaves) {
+      if (slave->info.hostname() == hostname && slave->pid.port == port) {
+        LOG(WARNING) << "Removing slave " << slave->slaveId
+                     << " because it has been deactivated";
+        removeSlave(slave);
+        break;
+      }
     }
+
+    slaveHostnamePorts[hostname].erase(port);
+    if (slaveHostnamePorts[hostname].size() == 0) slaveHostnamePorts.erase(hostname);
   }
+}
+
 
+void Master::timerTick()
+{
   // Check which framework filters can be expired.
   foreachpair (_, Framework* framework, frameworks) {
     framework->removeExpiredFilters(elapsed());
@@ -1053,604 +1748,6 @@ Promise<HttpResponse> Master::vars(const
 }
 
 
-//   while (true) {
-//     switch (receive()) {
-
-//     case NEW_MASTER_DETECTED: {
-//       const MSG<NEW_MASTER_DETECTED>& msg = message();
-
-//       // Check and see if we are (1) still waiting to be the active
-//       // master, (2) newly active master, (3) no longer active master,
-//       // or (4) still active master.
-//       PID pid(msg.pid());
-
-//       if (pid != self() && !active) {
-// 	LOG(INFO) << "Waiting to be master!";
-//       } else if (pid == self() && !active) {
-// 	LOG(INFO) << "Acting as master!";
-// 	active = true;
-//       } else if (pid != self() && active) {
-// 	LOG(FATAL) << "No longer active master ... committing suicide!";
-//       } else if (pid == self() && active) {
-// 	LOG(INFO) << "Still acting as master!";
-//       }
-//       break;
-//     }
-
-//     case NO_MASTER_DETECTED: {
-//       if (active) {
-// 	LOG(FATAL) << "No longer active master ... committing suicide!";
-//       } else {
-// 	LOG(FATAL) << "No master detected (?) ... committing suicide!";
-//       }
-//       break;
-//     }
-
-//     case MASTER_DETECTION_FAILURE: {
-//       LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
-//       break;
-//     }
-
-//     case F2M_REGISTER_FRAMEWORK: {
-//       const MSG<F2M_REGISTER_FRAMEWORK>& msg = message();
-
-//       Framework *framework =
-//         new Framework(msg.framework(), newFrameworkId(), from(), elapsed());
-
-//       LOG(INFO) << "Registering " << framework << " at " << framework->pid;
-
-//       if (framework->info.executor().uri() == "") {
-//         LOG(INFO) << framework << " registering without an executor URI";
-//         MSG<M2F_ERROR> out;
-//         out.set_code(1);
-//         out.set_message("No executor URI given");
-//         send(from(), out);
-//         delete framework;
-//         break;
-//       }
-
-//       bool rootSubmissions = conf.get<bool>("root_submissions", true);
-//       if (framework->info.user() == "root" && rootSubmissions == false) {
-//         LOG(INFO) << framework << " registering as root, but "
-//                   << "root submissions are disabled on this cluster";
-//         MSG<M2F_ERROR> out;
-//         out.set_code(1);
-//         out.set_message("User 'root' is not allowed to run frameworks");
-//         send(from(), out);
-//         delete framework;
-//         break;
-//       }
-
-//       addFramework(framework);
-//       break;
-//     }
-
-//     case F2M_REREGISTER_FRAMEWORK: {
-//       const MSG<F2M_REREGISTER_FRAMEWORK> &msg = message();
-
-//       if (msg.framework_id() == "") {
-//         LOG(ERROR) << "Framework re-registering without an id!";
-//         MSG<M2F_ERROR> out;
-//         out.set_code(1);
-//         out.set_message("Missing framework id");
-//         send(from(), out);
-//         break;
-//       }
-
-//       if (msg.framework().executor().uri() == "") {
-//         LOG(INFO) << "Framework " << msg.framework_id() << " re-registering "
-//                   << "without an executor URI";
-//         MSG<M2F_ERROR> out;
-//         out.set_code(1);
-//         out.set_message("No executor URI given");
-//         send(from(), out);
-//         break;
-//       }
-
-//       LOG(INFO) << "Re-registering framework " << msg.framework_id()
-//                 << " at " << from();
-
-//       if (frameworks.count(msg.framework_id()) > 0) {
-//         // Using the "generation" of the scheduler allows us to keep a
-//         // scheduler that got partitioned but didn't die (in ZooKeeper
-//         // speak this means didn't lose their session) and then
-//         // eventually tried to connect to this master even though
-//         // another instance of their scheduler has reconnected. This
-//         // might not be an issue in the future when the
-//         // master/allocator launches the scheduler can get restarted
-//         // (if necessary) by the master and the master will always
-//         // know which scheduler is the correct one.
-//         if (msg.generation() == 0) {
-//           LOG(INFO) << "Framework " << msg.framework_id() << " failed over";
-//           failoverFramework(frameworks[msg.framework_id()], from());
-//           // TODO: Should we check whether the new scheduler has given
-//           // us a different framework name, user name or executor info?
-//         } else {
-//           LOG(INFO) << "Framework " << msg.framework_id()
-//                     << " re-registering with an already used id "
-//                     << " and not failing over!";
-//           MSG<M2F_ERROR> out;
-//           out.set_code(1);
-//           out.set_message("Framework id in use");
-//           send(from(), out);
-//           break;
-//         }
-//       } else {
-//         // We don't have a framework with this ID, so we must be a newly
-//         // elected Mesos master to which either an existing scheduler or a
-//         // failed-over one is connecting. Create a Framework object and add
-//         // any tasks it has that have been reported by reconnecting slaves.
-//         Framework *framework =
-//           new Framework(msg.framework(), msg.framework_id(), from(), elapsed());
-
-//         // TODO(benh): Check for root submissions like above!
-
-//         addFramework(framework);
-//         // Add any running tasks reported by slaves for this framework.
-//         foreachpair (const SlaveID& slaveId, Slave *slave, slaves) {
-//           foreachpair (_, Task *task, slave->tasks) {
-//             if (framework->frameworkId == task->framework_id()) {
-//               framework->addTask(task);
-//             }
-//           }
-//         }
-//       }
-
-//       CHECK(frameworks.count(msg.framework_id()) > 0);
-
-//       // Broadcast the new framework pid to all the slaves. We have to
-//       // broadcast because an executor might be running on a slave but
-//       // it currently isn't running any tasks. This could be a
-//       // potential scalability issue ...
-//       foreachpair (_, Slave *slave, slaves) {
-//         MSG<M2S_UPDATE_FRAMEWORK> out;
-//         out.mutable_framework_id()->MergeFrom(msg.framework_id());
-//         out.set_pid(from());
-//         send(slave->pid, out);
-//       }
-//       break;
-//     }
-
-//     case F2M_UNREGISTER_FRAMEWORK: {
-//       const MSG<F2M_UNREGISTER_FRAMEWORK>& msg = message();
-
-//       LOG(INFO) << "Asked to unregister framework " << msg.framework_id();
-
-//       Framework *framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-//         if (framework->pid == from())
-//           removeFramework(framework);
-//         else
-//           LOG(WARNING) << from() << " tried to unregister framework; "
-//                        << "expecting " << framework->pid;
-//       }
-//       break;
-//     }
-
-//     case F2M_RESOURCE_OFFER_REPLY: {
-//       const MSG<F2M_RESOURCE_OFFER_REPLY>& msg = message();
-
-//       Framework *framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-
-//         // Copy out the task descriptions (could optimize).
-//         vector<TaskDescription> tasks;
-//         for (int i = 0; i < msg.task_size(); i++) {
-//           tasks.push_back(msg.task(i));
-//         }
-
-//         SlotOffer *offer = lookupSlotOffer(msg.offer_id());
-//         if (offer != NULL) {
-//           processOfferReply(offer, tasks, msg.params());
-//         } else {
-//           // The slot offer is gone, meaning that we rescinded it, it
-//           // has already been replied to, or that the slave was lost;
-//           // immediately report any tasks in it as lost (it would
-//           // probably be better to have better error messages here).
-//           foreach (const TaskDescription &task, tasks) {
-//             MSG<M2F_STATUS_UPDATE> out;
-//             out.mutable_framework_id()->MergeFrom(msg.framework_id());
-//             TaskStatus *status = out.mutable_status();
-//             status->mutable_task_id()->MergeFrom(task.task_id());
-//             status->mutable_slave_id()->MergeFrom(task.slave_id());
-//             status->set_state(TASK_LOST);
-//             send(framework->pid, out);
-//           }
-//         }
-//       }
-//       break;
-//     }
-
-//     case F2M_REVIVE_OFFERS: {
-//       const MSG<F2M_REVIVE_OFFERS>& msg = message();
-
-//       Framework *framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-//         LOG(INFO) << "Reviving offers for " << framework;
-//         framework->slaveFilter.clear();
-//         allocator->offersRevived(framework);
-//       }
-//       break;
-//     }
-
-//     case F2M_KILL_TASK: {
-//       const MSG<F2M_KILL_TASK>& msg = message();
-
-//       LOG(INFO) << "Asked to kill task " << msg.task_id()
-// 		<< " of framework " << msg.framework_id();
-
-//       Framework *framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-//         Task *task = framework->lookupTask(msg.task_id());
-//         if (task != NULL) {
-//           killTask(task);
-// 	} else {
-// 	  LOG(ERROR) << "Cannot kill task " << msg.task_id()
-// 		     << " of framework " << msg.framework_id()
-// 		     << " because it cannot be found";
-//           MSG<M2F_STATUS_UPDATE> out;
-//           out.mutable_framework_id()->MergeFrom(task->framework_id());
-//           TaskStatus *status = out.mutable_status();
-//           status->mutable_task_id()->MergeFrom(task->task_id());
-//           status->mutable_slave_id()->MergeFrom(task->slave_id());
-//           status->set_state(TASK_LOST);
-//           send(framework->pid, out);
-//         }
-//       }
-//       break;
-//     }
-
-//     case F2M_FRAMEWORK_MESSAGE: {
-//       const MSG<F2M_FRAMEWORK_MESSAGE>& msg = message();
-
-//       Framework *framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-//         Slave *slave = lookupSlave(msg.message().slave_id());
-//         if (slave != NULL) {
-//           MSG<M2S_FRAMEWORK_MESSAGE> out;
-//           out.mutable_framework_id()->MergeFrom(msg.framework_id());
-//           out.mutable_message()->MergeFrom(msg.message());
-//           send(slave->pid, out);
-//         }
-//       }
-//       break;
-//     }
-
-//     case F2M_STATUS_UPDATE_ACK: {
-//       const MSG<F2M_STATUS_UPDATE_ACK>& msg = message();
-
-//       Framework *framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-//         Slave *slave = lookupSlave(msg.slave_id());
-//         if (slave != NULL) {
-//           MSG<M2S_STATUS_UPDATE_ACK> out;
-//           out.mutable_framework_id()->MergeFrom(msg.framework_id());
-//           out.mutable_slave_id()->MergeFrom(msg.slave_id());
-//           out.mutable_task_id()->MergeFrom(msg.task_id());
-//           send(slave->pid, out);
-//         }
-//       }
-//       break;
-//     }
-
-//     case S2M_REGISTER_SLAVE: {
-//       const MSG<S2M_REGISTER_SLAVE>& msg = message();
-
-//       Slave* slave = new Slave(msg.slave(), newSlaveId(), from(), elapsed());
-
-//       LOG(INFO) << "Registering slave " << slave->slaveId
-//                 << " at " << slave->pid;
-
-//       slaves[slave->slaveId] = slave;
-//       pidToSlaveId[slave->pid] = slave->slaveId;
-//       link(slave->pid);
-
-//       allocator->slaveAdded(slave);
-
-//       MSG<M2S_REGISTER_REPLY> out;
-//       out.mutable_slave_id()->MergeFrom(slave->slaveId);
-//       out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
-//       send(slave->pid, out);
-//       break;
-//     }
-
-//     case S2M_REREGISTER_SLAVE: {
-//       const MSG<S2M_REREGISTER_SLAVE>& msg = message();
-
-//       LOG(INFO) << "Re-registering " << msg.slave_id() << " at " << from();
-
-//       if (msg.slave_id() == "") {
-//         LOG(ERROR) << "Slave re-registered without a SlaveID!";
-//         send(from(), TERMINATE);
-//         break;
-//       }
-
-//       // TODO(benh): Once we support handling session expiration, we
-//       // will want to handle having a slave re-register with us when
-//       // we already have them recorded (i.e., the below if statement
-//       // will evaluate to true).
-
-//       if (lookupSlave(msg.slave_id()) != NULL) {
-//         LOG(ERROR) << "Slave re-registered with in use SlaveID!";
-//         send(from(), TERMINATE);
-//         break;
-//       }
-
-//       Slave* slave = new Slave(msg.slave(), msg.slave_id(), from(), elapsed());
-
-//       slaves[slave->slaveId] = slave;
-//       pidToSlaveId[slave->pid] = slave->slaveId;
-//       link(slave->pid);
-
-//       MSG<M2S_REREGISTER_REPLY> out;
-//       out.mutable_slave_id()->MergeFrom(slave->slaveId);
-//       out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
-//       send(slave->pid, out);
-
-//       for (int i = 0; i < msg.task_size(); i++) {
-//         Task *task = new Task(msg.task(i));
-//         slave->addTask(task);
-
-//         // Tell this slave the current framework pid for this task.
-//         Framework *framework = lookupFramework(task->framework_id());
-//         if (framework != NULL) {
-//           framework->addTask(task);
-//           MSG<M2S_UPDATE_FRAMEWORK> out;
-//           out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-//           out.set_pid(framework->pid);
-//           send(slave->pid, out);
-//         }
-//       }
-
-//       // TODO(benh|alig): We should put a timeout on how long we keep
-//       // tasks running that never have frameworks reregister that
-//       // claim them.a
-//       break;
-//     }
-
-//     case S2M_UNREGISTER_SLAVE: {
-//       const MSG<S2M_UNREGISTER_SLAVE>& msg = message();
-
-//       LOG(INFO) << "Asked to unregister slave " << msg.slave_id();
-
-//       // TODO(benh): Check that only the slave is asking to unregister?
-
-//       Slave *slave = lookupSlave(msg.slave_id());
-//       if (slave != NULL)
-//         removeSlave(slave);
-//       break;
-//     }
-
-//     case S2M_STATUS_UPDATE: {
-//       const MSG<S2M_STATUS_UPDATE>& msg = message();
-
-//       const TaskStatus& status = msg.status();
-
-//       LOG(INFO) << "Status update: task " << status.task_id()
-// 		<< " of framework " << msg.framework_id()
-// 		<< " is now in state "
-// 		<< TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
-//       Slave* slave = lookupSlave(status.slave_id());
-//       if (slave != NULL) {
-//         Framework* framework = lookupFramework(msg.framework_id());
-//         if (framework != NULL) {
-// 	  // Pass on the (transformed) status update to the framework.
-//           MSG<M2F_STATUS_UPDATE> out;
-//           out.mutable_framework_id()->MergeFrom(msg.framework_id());
-//           out.mutable_status()->MergeFrom(status);
-//           send(framework->pid, out);
-
-//           // Lookup the task and see if we need to update anything locally.
-//           Task *task = slave->lookupTask(msg.framework_id(), status.task_id());
-//           if (task != NULL) {
-//             task->set_state(status.state());
-//             // Remove the task if necessary.
-//             if (status.state() == TASK_FINISHED ||
-//                 status.state() == TASK_FAILED ||
-//                 status.state() == TASK_KILLED ||
-//                 status.state() == TASK_LOST) {
-//               removeTask(task, TRR_TASK_ENDED);
-//             }
-//           } else {
-// 	    LOG(WARNING) << "Status update error: couldn't lookup "
-//                          << "task " << status.task_id();
-// 	  }
-//         } else {
-//           LOG(WARNING) << "Status update error: couldn't lookup "
-//                        << "framework " << msg.framework_id();
-//         }
-//       } else {
-//         LOG(WARNING) << "Status update error: couldn't lookup slave "
-//                      << status.slave_id();
-//       }
-//       break;
-//     }
-
-//     case S2M_FRAMEWORK_MESSAGE: {
-//       const MSG<S2M_FRAMEWORK_MESSAGE>& msg = message();
-
-//       Slave *slave = lookupSlave(msg.message().slave_id());
-//       if (slave != NULL) {
-//         Framework *framework = lookupFramework(msg.framework_id());
-//         if (framework != NULL) {
-//           MSG<M2S_FRAMEWORK_MESSAGE> out;
-//           out.mutable_framework_id()->MergeFrom(msg.framework_id());
-//           out.mutable_message()->MergeFrom(msg.message());
-//           send(framework->pid, out);
-//         }
-//       }
-//       break;
-//     }
-
-//     case S2M_EXITED_EXECUTOR: {
-//       const MSG<S2M_EXITED_EXECUTOR>&msg = message();
-
-//       Slave *slave = lookupSlave(msg.slave_id());
-//       if (slave != NULL) {
-//         Framework *framework = lookupFramework(msg.framework_id());
-//         if (framework != NULL) {
-//           LOG(INFO) << "Executor " << msg.executor_id()
-//                     << " of framework " << framework->frameworkId
-//                     << " on slave " << slave->slaveId
-//                     << " (" << slave->info.hostname() << ") "
-//                     << "exited with status " << msg.status();
-
-//           // Tell the framework which tasks have been lost.
-//           foreachpaircopy (_, Task* task, framework->tasks) {
-//             if (task->slave_id() == slave->slaveId &&
-//                 task->executor_id() == msg.executor_id()) {
-//               MSG<M2F_STATUS_UPDATE> out;
-//               out.mutable_framework_id()->MergeFrom(task->framework_id());
-//               TaskStatus *status = out.mutable_status();
-//               status->mutable_task_id()->MergeFrom(task->task_id());
-//               status->mutable_slave_id()->MergeFrom(task->slave_id());
-//               status->set_state(TASK_LOST);
-//               send(framework->pid, out);
-
-//               LOG(INFO) << "Removing " << task << " because of lost executor";
-
-//               removeTask(task, TRR_EXECUTOR_LOST);
-//             }
-//           }
-
-//           // TODO(benh): Send the framework it's executor's exit
-//           // status? Or maybe at least have something like
-//           // M2F_EXECUTOR_LOST?
-//         }
-//       }
-//       break;
-//     }
-
-//     case SH2M_HEARTBEAT: {
-//       const MSG<SH2M_HEARTBEAT>& msg = message();
-
-//       Slave *slave = lookupSlave(msg.slave_id());
-//       if (slave != NULL) {
-//         slave->lastHeartbeat = elapsed();
-//       } else {
-//         LOG(WARNING) << "Received heartbeat for UNKNOWN slave "
-//                      << msg.slave_id() << " from " << from();
-//       }
-//       break;
-//     }
-
-//     case M2M_TIMER_TICK: {
-//       foreachpaircopy (_, Slave *slave, slaves) {
-// 	if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
-// 	  LOG(INFO) << slave
-//                     << " missing heartbeats ... considering disconnected";
-// 	  removeSlave(slave);
-// 	}
-//       }
-
-//       // Check which framework filters can be expired.
-//       foreachpair (_, Framework *framework, frameworks)
-//         framework->removeExpiredFilters(elapsed());
-
-//       // Do allocations!
-//       allocator->timerTick();
-//       break;
-//     }
-
-//     case M2M_FRAMEWORK_EXPIRED: {
-//       const MSG<M2M_FRAMEWORK_EXPIRED>&msg = message();
-
-//       Framework* framework = lookupFramework(msg.framework_id());
-//       if (framework != NULL) {
-// 	LOG(INFO) << "Framework failover timer expired, removing "
-// 		  << framework;
-// 	removeFramework(framework);
-//       }
-//       break;
-//     }
-
-//     case PROCESS_EXIT: {
-//       // TODO(benh): Could we get PROCESS_EXIT from a network partition?
-//       LOG(INFO) << "Process exited: " << from();
-//       if (pidToFrameworkId.count(from()) > 0) {
-//         const FrameworkID& frameworkId = pidToFrameworkId[from()];
-//         Framework* framework = lookupFramework(frameworkId);
-//         if (framework != NULL) {
-//           LOG(INFO) << framework << " disconnected";
-
-// 	  // Stop sending offers here for now.
-// 	  framework->active = false;
-
-//           // Remove the framework's slot offers.
-//           foreachcopy (SlotOffer* offer, framework->slotOffers) {
-//             removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
-//           }
-
-//    	  framework->failoverTimer = new FrameworkFailoverTimer(self(), frameworkId);
-//    	  link(spawn(framework->failoverTimer));
-// // 	  removeFramework(framework);
-//         }
-//       } else if (pidToSlaveId.count(from()) > 0) {
-//         const SlaveID& slaveId = pidToSlaveId[from()];
-//         Slave* slave = lookupSlave(slaveId);
-//         if (slave != NULL) {
-//           LOG(INFO) << slave << " disconnected";
-//           removeSlave(slave);
-//         }
-//       } else {
-// 	foreachpair (_, Framework *framework, frameworks) {
-// 	  if (framework->failoverTimer != NULL &&
-// 	      framework->failoverTimer->self() == from()) {
-// 	    LOG(ERROR) << "Bad framework failover timer, removing "
-// 		       << framework;
-// 	    removeFramework(framework);
-// 	    break;
-// 	  }
-// 	}
-//       }
-//       break;
-//     }
-
-//     case M2M_GET_STATE: {
-//       state::MasterState *state = getState();
-//       MSG<M2M_GET_STATE_REPLY> out;
-//       out.set_pointer((char *) &state, sizeof(state));
-//       send(from(), out);
-//       break;
-//     }
-    
-//     case M2M_SHUTDOWN: {
-//       LOG(INFO) << "Asked to shut down by " << from();
-//       foreachpair (_, Slave *slave, slaves)
-//         send(slave->pid, TERMINATE);
-//       return;
-//     }
-
-//     case vars: {
-//       LOG(INFO) << "HTTP request for 'vars'";
-
-//       ostringstream out;
-
-//       out <<
-//         "build_date " << build::DATE << "\n" <<
-//         "build_user " <<  build::USER << "\n" <<
-//         "build_flags " <<  build::FLAGS << "\n" <<
-//         "frameworks_count " << frameworks.size() << "\n";
-
-//       // Also add the configuration values.
-//       foreachpair (const string& key, const string& value, conf.getMap()) {
-//         out << key << " " << value << "\n";
-//       }
-
-//       Process::send(from(), "response", out.str().data(), out.str().size());
-//       break;
-//     }
-
-//     default:
-//       LOG(ERROR) << "Received unknown message (" << msgid()
-//                  << ") from " << from();
-//       break;
-//     }
-//   }
-// }
-
-
 OfferID Master::makeOffer(Framework* framework,
                           const vector<SlaveResources>& resources)
 {
@@ -1845,7 +1942,7 @@ void Master::launchTask(Framework* frame
 // TODO: Make the error codes and messages programmer-friendly
 void Master::terminateFramework(Framework* framework,
                                 int32_t code,
-                                const std::string& message)
+                                const string& message)
 {
   LOG(INFO) << "Terminating " << framework << " due to error: " << message;
 
@@ -1981,13 +2078,60 @@ void Master::removeFramework(Framework* 
   // TODO(benh): unlink(framework->pid);
   pidToFrameworkId.erase(framework->pid);
 
-  // Delete it
+  // Delete it.
   frameworks.erase(framework->frameworkId);
   allocator->frameworkRemoved(framework);
   delete framework;
 }
 
 
+Slave* Master::addSlave(const SlaveInfo& slaveInfo,
+                        const SlaveID& slaveId,
+                        const UPID& pid)
+{
+  // TODO(benh): Start a reverse lookup to ensure IP maps to hostname.
+
+  Slave* slave = NULL;
+
+  // Check whether all slaves, or at least this slave is allocated.
+  bool allocated = (conf.get<string>("slaves", "*") == "*") ||
+    (slaveHostnamePorts.count(slaveInfo.hostname()) > 0 &&
+     slaveHostnamePorts[slaveInfo.hostname()].count(pid.port) > 0);
+
+  if (allocated) {
+    Slave* slave = new Slave(slaveInfo, slaveId, pid, elapsed());
+
+    LOG(INFO) << "Registering slave " << slave->slaveId
+              << " at " << slave->pid;
+
+    slaves[slave->slaveId] = slave;
+    pidToSlaveId[slave->pid] = slave->slaveId;
+    link(slave->pid);
+
+    allocator->slaveAdded(slave);
+
+    MSG<M2S_REGISTER_REPLY> out;
+    out.mutable_slave_id()->MergeFrom(slave->slaveId);
+    send(slave->pid, out);
+
+    // TODO(benh):
+    //     // Ask the slaves manager to monitor this slave for us.
+    //     process::dispatch(slavesManager->self(), &SlavesManager::monitor,
+    //                       slave->pid, slave->info, slave->slaveId);
+
+    // Set up an observer for the slave.
+    slave->observer = new SlaveObserver(slave->pid, slave->info,
+                                        slave->slaveId, slavesManager->self());
+    process::spawn(slave->observer);
+  } else {
+    LOG(WARNING) << "Cannot add slave at " << slaveInfo.hostname()
+                 << " because not in allocated set of slaves!";
+  }
+
+  return slave;
+}
+
+
 // Lose all of a slave's tasks and delete the slave object
 void Master::removeSlave(Slave* slave)
 { 
@@ -2042,6 +2186,16 @@ void Master::removeSlave(Slave* slave)
     send(framework->pid, out);
   }
 
+  // TODO(benh):
+  //     // Tell the slaves manager to stop monitoring this slave for us.
+  //     process::dispatch(slavesManager->self(), &SlavesManager::forget,
+  //                       slave->pid, slave->info, slave->slaveId);
+
+  // Kill the slave observer.
+  process::post(slave->observer->self(), process::TERMINATE);
+  process::wait(slave->observer->self());
+  delete slave->observer;
+
   // TODO(benh): unlink(slave->pid);
   pidToSlaveId.erase(slave->pid);
 

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun  5 09:19:53 2011
@@ -26,6 +26,7 @@
 #include "common/fatal.hpp"
 #include "common/foreach.hpp"
 #include "common/resources.hpp"
+#include "common/tokenize.hpp"
 #include "common/type_utils.hpp"
 
 #include "configurator/configurator.hpp"
@@ -92,6 +93,8 @@ struct SlotOffer;
 struct Framework;
 struct Slave;
 class Allocator;
+class Master;
+class SlavesHttpServer;
 
 
 // Resources offered on a particular slave.
@@ -105,6 +108,60 @@ struct SlaveResources
 };
 
 
+// A resource offer.
+struct SlotOffer
+{
+  OfferID offerId;
+  FrameworkID frameworkId;
+  std::vector<SlaveResources> resources;
+
+  SlotOffer(const OfferID& _offerId,
+            const FrameworkID& _frameworkId,
+            const std::vector<SlaveResources>& _resources)
+    : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
+};
+
+
+class SlavesManagerStorage : public process::Process<SlavesManagerStorage>
+{
+public:
+  virtual process::Promise<bool> add(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Promise<bool> remove(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Promise<bool> activate(const std::string& hostname, uint16_t port) { return true; }
+  virtual process::Promise<bool> deactivate(const std::string& hostname, uint16_t port) { return true; }
+};
+
+
+class SlavesManager : public process::Process<SlavesManager>
+{
+public:
+  SlavesManager(const Configuration& conf, const process::PID<Master>& _master);
+
+  virtual ~SlavesManager();
+
+  static void registerOptions(Configurator* configurator);
+
+  bool add(const std::string& hostname, uint16_t port);
+  bool remove(const std::string& hostname, uint16_t port);
+  bool activate(const std::string& hostname, uint16_t port);
+  bool deactivate(const std::string& hostname, uint16_t port);
+
+  void updateActive(const std::map<std::string, std::set<uint16_t> >& updated);
+  void updateInactive(const std::map<std::string, std::set<uint16_t> >& updated);
+
+private:
+  process::Promise<process::HttpResponse> add(const process::HttpRequest& request);
+  process::Promise<process::HttpResponse> remove(const process::HttpRequest& request);
+
+  const process::PID<Master> master;
+
+  std::map<std::string, std::set<uint16_t> > active;
+  std::map<std::string, std::set<uint16_t> > inactive;
+
+  SlavesManagerStorage* storage;
+};
+
+
 class Master : public MesosProcess<Master>
 {
 public:
@@ -113,7 +170,7 @@ public:
   
   virtual ~Master();
 
-  static void registerOptions(Configurator* conf);
+  static void registerOptions(Configurator* configurator);
 
   process::Promise<state::MasterState*> getState();
   
@@ -159,7 +216,8 @@ public:
                       const FrameworkID& frameworkId,
                       const ExecutorID& executorId,
                       int32_t result);
-  void slaveHeartbeat(const SlaveID& slaveId);
+  void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
+  void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
   void timerTick();
   void frameworkExpired(const FrameworkID& frameworkId);
   void exited();
@@ -209,6 +267,10 @@ protected:
   // reschedule slot offers for slots that were assigned to this framework
   void removeFramework(Framework* framework);
 
+  // Add a slave.
+  Slave* addSlave(const SlaveInfo& slaveInfo, const SlaveID& slaveId,
+                  const process::UPID& pid);
+
   // Lose all of a slave's tasks and delete the slave object
   void removeSlave(Slave* slave);
 
@@ -221,7 +283,11 @@ protected:
   const Configuration& getConfiguration();
 
 private:
-  Configuration conf;
+  const Configuration conf;
+
+  SlavesManager* slavesManager;
+
+  boost::unordered_map<std::string, boost::unordered_set<uint16_t> > slaveHostnamePorts;
 
   boost::unordered_map<FrameworkID, Framework*> frameworks;
   boost::unordered_map<SlaveID, Slave*> slaves;
@@ -247,6 +313,138 @@ private:
 };
 
 
+const double SLAVE_PONG_TIMEOUT = 15.0;
+const int MAX_SLAVE_TIMEOUTS = 5;
+
+
+class SlaveObserver : public process::Process<SlaveObserver>
+{
+public:
+  SlaveObserver(const process::UPID& _slave,
+                const SlaveInfo& _slaveInfo,
+                const SlaveID& _slaveId,
+                const process::PID<SlavesManager>& _slavesManager)
+    : slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId),
+      slavesManager(_slavesManager), timeouts(0), pinged(false) {}
+
+  virtual ~SlaveObserver() {}
+
+protected:
+  virtual void operator () ()
+  {
+    // Send a ping some interval after we heard the last pong. Or if
+    // we don't hear a pong, increment the number of timeouts from the
+    // slave and try and send another ping. If we eventually timeout too
+    // many missed pongs in a row, consider the slave dead.
+    do {
+      std::cout.precision(15);
+      std::cout << self() << " elapsed: " << elapsed() << std::endl;
+      receive(SLAVE_PONG_TIMEOUT);
+      if (name() == PONG) {
+        timeouts = 0;
+        pinged = false;
+      } else if (name() == process::TIMEOUT) {
+        if (pinged) {
+          timeouts++;
+          pinged = false;
+          std::cout << self() << " missing slave PONG, timeouts = " << timeouts << std::endl;
+        }
+
+        send(slave, PING);
+        pinged = true;
+      } else if (name() == process::TERMINATE) {
+        return;
+      } 
+    } while (timeouts < MAX_SLAVE_TIMEOUTS);
+
+    // Tell the slave manager to deactivate the slave, this will take
+    // care of updating the master too.
+    while (!process::call(slavesManager, &SlavesManager::deactivate,
+                          slaveInfo.hostname(), slave.port)) {
+      LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
+    }
+  }
+
+private:
+  const process::UPID slave;
+  const SlaveInfo slaveInfo;
+  const SlaveID slaveId;
+  const process::PID<SlavesManager> slavesManager;
+  int timeouts;
+  bool pinged;
+};
+
+
+// A connected slave.
+struct Slave
+{
+  SlaveInfo info;
+  SlaveID slaveId;
+  process::UPID pid;
+
+  bool active; // Turns false when slave is being removed
+  double connectTime;
+  double lastHeartbeat;
+  
+  Resources resourcesOffered; // Resources currently in offers
+  Resources resourcesInUse;   // Resources currently used by tasks
+
+  boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
+  boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
+
+  SlaveObserver* observer;
+  
+  Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
+        const process::UPID& _pid, double time)
+    : info(_info), slaveId(_slaveId), pid(_pid), active(true),
+      connectTime(time), lastHeartbeat(time) {}
+
+  ~Slave() {}
+
+  Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
+  {
+    foreachpair (_, Task* task, tasks) {
+      if (task->framework_id() == frameworkId && task->task_id() == taskId) {
+        return task;
+      }
+    }
+
+    return NULL;
+  }
+
+  void addTask(Task* task)
+  {
+    std::pair<FrameworkID, TaskID> key =
+      std::make_pair(task->framework_id(), task->task_id());
+    CHECK(tasks.count(key) == 0);
+    tasks[key] = task;
+    foreach (const Resource& resource, task->resources()) {
+      resourcesInUse += resource;
+    }
+  }
+  
+  void removeTask(Task* task)
+  {
+    std::pair<FrameworkID, TaskID> key =
+      std::make_pair(task->framework_id(), task->task_id());
+    CHECK(tasks.count(key) > 0);
+    tasks.erase(key);
+    foreach (const Resource& resource, task->resources()) {
+      resourcesInUse -= resource;
+    }
+  }
+  
+  Resources resourcesFree()
+  {
+    Resources resources;
+    foreach (const Resource& resource, info.resources()) {
+      resources += resource;
+    }
+    return resources - (resourcesOffered + resourcesInUse);
+  }
+};
+
+
 class FrameworkFailoverTimer : public process::Process<FrameworkFailoverTimer>
 {
 public:
@@ -275,20 +473,6 @@ private:
 };
 
 
-// A resource offer.
-struct SlotOffer
-{
-  OfferID offerId;
-  FrameworkID frameworkId;
-  std::vector<SlaveResources> resources;
-
-  SlotOffer(const OfferID& _offerId,
-            const FrameworkID& _frameworkId,
-            const std::vector<SlaveResources>& _resources)
-    : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
-};
-
-
 // An connected framework.
 struct Framework
 {
@@ -388,74 +572,6 @@ struct Framework
 };
 
 
-// A connected slave.
-struct Slave
-{
-  SlaveInfo info;
-  SlaveID slaveId;
-  process::UPID pid;
-
-  bool active; // Turns false when slave is being removed
-  double connectTime;
-  double lastHeartbeat;
-  
-  Resources resourcesOffered; // Resources currently in offers
-  Resources resourcesInUse;   // Resources currently used by tasks
-
-  boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
-  boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
-  
-  Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
-        const process::UPID& _pid, double time)
-    : info(_info), slaveId(_slaveId), pid(_pid), active(true),
-      connectTime(time), lastHeartbeat(time) {}
-
-  ~Slave() {}
-
-  Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
-  {
-    foreachpair (_, Task* task, tasks) {
-      if (task->framework_id() == frameworkId && task->task_id() == taskId) {
-        return task;
-      }
-    }
-
-    return NULL;
-  }
-
-  void addTask(Task* task)
-  {
-    std::pair<FrameworkID, TaskID> key =
-      std::make_pair(task->framework_id(), task->task_id());
-    CHECK(tasks.count(key) == 0);
-    tasks[key] = task;
-    foreach (const Resource& resource, task->resources()) {
-      resourcesInUse += resource;
-    }
-  }
-  
-  void removeTask(Task* task)
-  {
-    std::pair<FrameworkID, TaskID> key =
-      std::make_pair(task->framework_id(), task->task_id());
-    CHECK(tasks.count(key) > 0);
-    tasks.erase(key);
-    foreach (const Resource& resource, task->resources()) {
-      resourcesInUse -= resource;
-    }
-  }
-  
-  Resources resourcesFree()
-  {
-    Resources resources;
-    foreach (const Resource& resource, info.resources()) {
-      resources += resource;
-    }
-    return resources - (resourcesOffered + resourcesInUse);
-  }
-};
-
-
 // Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc.
 
 inline std::ostream& operator << (std::ostream& stream, const SlotOffer *o)

Modified: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun  5 09:19:53 2011
@@ -1,76 +1,80 @@
 #include "messaging/messages.hpp"
 
-#define DEFINE_MESSAGE(name)                    \
+#define ALLOCATE_MESSAGE(name)                    \
   char name[] = #name
 
 namespace mesos { namespace internal {
 
 // From framework to master.
-DEFINE_MESSAGE(F2M_REGISTER_FRAMEWORK);
-DEFINE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
-DEFINE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
-DEFINE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
-DEFINE_MESSAGE(F2M_REVIVE_OFFERS);
-DEFINE_MESSAGE(F2M_KILL_TASK);
-DEFINE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(F2M_STATUS_UPDATE_ACK);
+ALLOCATE_MESSAGE(F2M_REGISTER_FRAMEWORK);
+ALLOCATE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
+ALLOCATE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
+ALLOCATE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
+ALLOCATE_MESSAGE(F2M_REVIVE_OFFERS);
+ALLOCATE_MESSAGE(F2M_KILL_TASK);
+ALLOCATE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(F2M_STATUS_UPDATE_ACK);
 
 // From master to framework.
-DEFINE_MESSAGE(M2F_REGISTER_REPLY);
-DEFINE_MESSAGE(M2F_RESOURCE_OFFER);
-DEFINE_MESSAGE(M2F_RESCIND_OFFER);
-DEFINE_MESSAGE(M2F_STATUS_UPDATE);
-DEFINE_MESSAGE(M2F_LOST_SLAVE);
-DEFINE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(M2F_ERROR);
+ALLOCATE_MESSAGE(M2F_REGISTER_REPLY);
+ALLOCATE_MESSAGE(M2F_RESOURCE_OFFER);
+ALLOCATE_MESSAGE(M2F_RESCIND_OFFER);
+ALLOCATE_MESSAGE(M2F_STATUS_UPDATE);
+ALLOCATE_MESSAGE(M2F_LOST_SLAVE);
+ALLOCATE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(M2F_ERROR);
 
 // From slave to master.
-DEFINE_MESSAGE(S2M_REGISTER_SLAVE);
-DEFINE_MESSAGE(S2M_REREGISTER_SLAVE);
-DEFINE_MESSAGE(S2M_UNREGISTER_SLAVE);
-DEFINE_MESSAGE(S2M_STATUS_UPDATE);
-DEFINE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(S2M_EXITED_EXECUTOR);
+ALLOCATE_MESSAGE(S2M_REGISTER_SLAVE);
+ALLOCATE_MESSAGE(S2M_REREGISTER_SLAVE);
+ALLOCATE_MESSAGE(S2M_UNREGISTER_SLAVE);
+ALLOCATE_MESSAGE(S2M_STATUS_UPDATE);
+ALLOCATE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(S2M_EXITED_EXECUTOR);
 
 // From slave heart to master.
-DEFINE_MESSAGE(SH2M_HEARTBEAT);
+ALLOCATE_MESSAGE(SH2M_HEARTBEAT);
 
 // From master to slave.
-DEFINE_MESSAGE(M2S_REGISTER_REPLY);
-DEFINE_MESSAGE(M2S_REREGISTER_REPLY);
-DEFINE_MESSAGE(M2S_RUN_TASK);
-DEFINE_MESSAGE(M2S_KILL_TASK);
-DEFINE_MESSAGE(M2S_KILL_FRAMEWORK);
-DEFINE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(M2S_UPDATE_FRAMEWORK);
-DEFINE_MESSAGE(M2S_STATUS_UPDATE_ACK);
+ALLOCATE_MESSAGE(M2S_REGISTER_REPLY);
+ALLOCATE_MESSAGE(M2S_REREGISTER_REPLY);
+ALLOCATE_MESSAGE(M2S_RUN_TASK);
+ALLOCATE_MESSAGE(M2S_KILL_TASK);
+ALLOCATE_MESSAGE(M2S_KILL_FRAMEWORK);
+ALLOCATE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(M2S_UPDATE_FRAMEWORK);
+ALLOCATE_MESSAGE(M2S_STATUS_UPDATE_ACK);
 
 // From executor to slave.
-DEFINE_MESSAGE(E2S_REGISTER_EXECUTOR);
-DEFINE_MESSAGE(E2S_STATUS_UPDATE);
-DEFINE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(E2S_REGISTER_EXECUTOR);
+ALLOCATE_MESSAGE(E2S_STATUS_UPDATE);
+ALLOCATE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
 
 // From slave to executor.
-DEFINE_MESSAGE(S2E_REGISTER_REPLY);
-DEFINE_MESSAGE(S2E_RUN_TASK);
-DEFINE_MESSAGE(S2E_KILL_TASK);
-DEFINE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(S2E_KILL_EXECUTOR);
+ALLOCATE_MESSAGE(S2E_REGISTER_REPLY);
+ALLOCATE_MESSAGE(S2E_RUN_TASK);
+ALLOCATE_MESSAGE(S2E_KILL_TASK);
+ALLOCATE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(S2E_KILL_EXECUTOR);
 
 #ifdef __sun__
 // From projd to slave.
-DEFINE_MESSAGE(PD2S_REGISTER_PROJD);
-DEFINE_MESSAGE(PD2S_PROJD_READY);
+ALLOCATE_MESSAGE(PD2S_REGISTER_PROJD);
+ALLOCATE_MESSAGE(PD2S_PROJD_READY);
 
 // From slave to projd.
-DEFINE_MESSAGE(S2PD_UPDATE_RESOURCES);
-DEFINE_MESSAGE(S2PD_KILL_ALL);
+ALLOCATE_MESSAGE(S2PD_UPDATE_RESOURCES);
+ALLOCATE_MESSAGE(S2PD_KILL_ALL);
 #endif // __sun__
 
 // From master detector to processes.
-DEFINE_MESSAGE(GOT_MASTER_TOKEN);
-DEFINE_MESSAGE(NEW_MASTER_DETECTED);
-DEFINE_MESSAGE(NO_MASTER_DETECTED);
-DEFINE_MESSAGE(MASTER_DETECTION_FAILURE);
+ALLOCATE_MESSAGE(GOT_MASTER_TOKEN);
+ALLOCATE_MESSAGE(NEW_MASTER_DETECTED);
+ALLOCATE_MESSAGE(NO_MASTER_DETECTED);
+ALLOCATE_MESSAGE(MASTER_DETECTION_FAILURE);
+
+// Generic messages.
+ALLOCATE_MESSAGE(PING);
+ALLOCATE_MESSAGE(PONG);
 
 }} // namespace mesos { namespace internal {

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 09:19:53 2011
@@ -75,9 +75,6 @@ MESSAGE(S2M_STATUS_UPDATE, StatusUpdateM
 MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
 MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
 
-// From slave heart to master.
-MESSAGE(SH2M_HEARTBEAT, HeartbeatMessage);
-
 // From master to slave.
 MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
 MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
@@ -116,6 +113,10 @@ MESSAGE(NEW_MASTER_DETECTED, NewMasterDe
 MESSAGE(NO_MASTER_DETECTED);
 MESSAGE(MASTER_DETECTION_FAILURE);
 
+// Generic messages.
+MESSAGE(PING);
+MESSAGE(PONG);
+
 
 // Type conversions helpful for changing between protocol buffer types
 // and standard C++ types (for parameters).

Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun  5 09:19:53 2011
@@ -131,7 +131,6 @@ message ReregisterSlaveMessage {
 
 message SlaveRegisteredMessage {
   required SlaveID slave_id = 1;
-  required double heartbeat_interval = 2;
 }