You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:34:19 UTC

svn commit: r1131794 - in /incubator/mesos/trunk: include/ src/ src/tests/ src/third_party/libprocess/

Author: benh
Date: Sun Jun  5 05:34:18 2011
New Revision: 1131794

URL: http://svn.apache.org/viewvc?rev=1131794&view=rev
Log:
Factoring out MasterDetector and basic implementation of scheduler failover.

Modified:
    incubator/mesos/trunk/include/nexus_sched.hpp
    incubator/mesos/trunk/src/Makefile.in
    incubator/mesos/trunk/src/local.cpp
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/master.hpp
    incubator/mesos/trunk/src/master_detector.cpp
    incubator/mesos/trunk/src/master_detector.hpp
    incubator/mesos/trunk/src/master_main.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_local.cpp
    incubator/mesos/trunk/src/nexus_local.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/process_based_isolation_module.cpp
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/slave.hpp
    incubator/mesos/trunk/src/slave_main.cpp
    incubator/mesos/trunk/src/tests/test_master.cpp
    incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
    incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp
    incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp
    incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
    incubator/mesos/trunk/src/url_processor.cpp
    incubator/mesos/trunk/src/zookeeper.hpp

Modified: incubator/mesos/trunk/include/nexus_sched.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_sched.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_sched.hpp (original)
+++ incubator/mesos/trunk/include/nexus_sched.hpp Sun Jun  5 05:34:18 2011
@@ -11,7 +11,7 @@ namespace nexus {
 
 class SchedulerDriver;
 
-namespace internal { class SchedulerProcess; }
+namespace internal { class SchedulerProcess; class MasterDetector; }
 
 
 /**
@@ -77,7 +77,10 @@ public:
 class NexusSchedulerDriver : public SchedulerDriver
 {
 public:
-  NexusSchedulerDriver(Scheduler* sched, const std::string& master);
+  NexusSchedulerDriver(Scheduler* sched,
+		       const std::string& url,
+		       FrameworkID fid = "");
+
   virtual ~NexusSchedulerDriver();
 
   // Lifecycle methods
@@ -102,12 +105,16 @@ private:
   // Internal utility method to report an error to the scheduler
   void error(int code, const std::string& message);
 
-  std::string master;
   Scheduler* sched;
+  std::string url;
+  FrameworkID fid;
 
   // LibProcess process for communicating with master
   internal::SchedulerProcess* process;
 
+  // Coordination between masters
+  internal::MasterDetector* detector;
+
   // Are we currently registered with the master
   bool running;
   

Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun  5 05:34:18 2011
@@ -53,11 +53,9 @@ LDFLAGS += -Lthird_party/libprocess
 # Add libev to LDFLAGS.
 LDFLAGS += -L$(LIBEV)/.libs
 
-# Add glog and gtest to include and lib paths
-CXXFLAGS += -I$(GLOG)/src
-CXXFLAGS += -I$(GTEST)/include
-LDFLAGS += -L$(GLOG)/.libs
-LDFLAGS += -L$(GTEST)/lib/.libs
+# Add glog and gtest to include and lib paths.
+CXXFLAGS += -I$(GLOG)/src -I$(GTEST)/include
+LDFLAGS += -L$(GLOG)/.libs -L$(GTEST)/lib/.libs
 
 # Add local ZooKeeper to include and lib paths if necessary.
 ifeq ($(LOCAL_ZOOKEEPER),yes)
@@ -102,7 +100,7 @@ NEXUS_LIBS = $(SCHED_LIB) $(EXEC_LIB) $(
 MASTER_OBJ = master.o allocator_factory.o simple_allocator.o
 SLAVE_OBJ = slave.o launcher.o isolation_module_factory.o \
 	    process_based_isolation_module.o
-COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o master_detector.o url_processor.o ft_messaging.o zookeeper.o
+COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o master_detector.o url_processor.o zookeeper.o
 EXEC_LIB_OBJ = nexus_exec.o
 SCHED_LIB_OBJ = nexus_sched.o nexus_local.o params.o
 TEST_OBJ = tests/main.o tests/test_master.o tests/test_resources.o

Modified: incubator/mesos/trunk/src/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local.cpp (original)
+++ incubator/mesos/trunk/src/local.cpp Sun Jun  5 05:34:18 2011
@@ -9,6 +9,8 @@ using std::cerr;
 using std::endl;
 using std::string;
 
+using namespace nexus::internal;
+
 
 int main (int argc, char **argv)
 {
@@ -60,7 +62,7 @@ int main (int argc, char **argv)
     }
   }
 
-  const PID &master = run_nexus(slaves, cpus, mem, true, quiet);
+  const PID &master = local::launch(slaves, cpus, mem, true, quiet);
 
   Process::wait(master);
 

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:34:18 2011
@@ -109,49 +109,15 @@ public:
 }
 
 
-Master::Master(const string &zk)
-  : isFT(false), masterDetector(NULL), nextFrameworkId(0), nextSlaveId(0), 
-    nextSlotOfferId(0), allocatorType("simple"), masterId(0)
-{
-  if (zk != "") {
-    pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
-    if (urlPair.first == UrlProcessor::ZOO) {
-      isFT = true;
-      zkServers = urlPair.second;
-    } else {
-      LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
-      exit(1);
-    }
-  }
-}
-
-
-Master::Master(const string& _allocatorType, const string &zk)
-  : isFT(false), masterDetector(NULL), nextFrameworkId(0), nextSlaveId(0), 
-    nextSlotOfferId(0), allocatorType(_allocatorType), masterId(0)
-{
-  if (zk != "") {
-    pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
-    if (urlPair.first == UrlProcessor::ZOO) {
-      isFT = true;
-      zkServers = urlPair.second;
-    } else {
-      LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
-      exit(1);
-    }
-  }
-}
+Master::Master(const string& _allocatorType)
+  : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
+    allocatorType(_allocatorType), masterId(0) {}
                    
 
 Master::~Master()
 {
   LOG(INFO) << "Shutting down master";
 
-  if (masterDetector != NULL) {
-    delete masterDetector;
-    masterDetector = NULL;
-  }
-
   delete allocator;
 
   foreachpair (_, Framework *framework, frameworks) {
@@ -175,7 +141,7 @@ state::MasterState * Master::getState()
   std::ostringstream oss;
   oss << self();
   state::MasterState *state =
-    new state::MasterState(BUILD_DATE, BUILD_USER, oss.str(), isFT);
+    new state::MasterState(BUILD_DATE, BUILD_USER, oss.str());
 
   foreachpair (_, Slave *s, slaves) {
     state::Slave *slave = new state::Slave(s->id, s->hostname, s->publicDns,
@@ -285,28 +251,22 @@ void Master::operator () ()
 {
   LOG(INFO) << "Master started at nexus://" << self();
 
-  if (isFT) {
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-    masterDetector = new MasterDetector(zkServers, ZNODE, self(), true);
-  } else {
-    send(self(), pack<GOT_MASTER_SEQ>("0"));
-  }
-
-  // Don't do anything until we get a sequence identifier.
-  bool waitingForSeq = true;
-  do {
-    switch (receive()) {
-      case GOT_MASTER_SEQ: {
-	string mySeq;
-	unpack<GOT_MASTER_SEQ>(mySeq);
-	masterId = lexical_cast<long>(mySeq);
-	LOG(INFO) << "Master ID:" << masterId;
-	waitingForSeq = false;
-	break;
-      }
+  // Don't do anything until we get an identifier.
+  while (true) {
+    if (receive() == GOT_MASTER_ID) {
+      string id;
+      unpack<GOT_MASTER_ID>(id);
+      masterId = lexical_cast<long>(id);
+      LOG(INFO) << "Master ID:" << masterId;
+      break;
+    } else {
+      LOG(INFO) << "Oops! We're dropping a message since "
+		<< "we haven't received an identifier yet!";
     }
-  } while (waitingForSeq);
+  }
 
+  // Create the allocator (we do this after the constructor because it
+  // leaks 'this').
   allocator = createAllocator();
   if (!allocator)
     LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
@@ -321,18 +281,18 @@ void Master::operator () ()
       // TODO(benh): We might have been the master, but then got
       // partitioned, and now we are finding out once we reconnect
       // that we are no longer the master, so we should just die.
-      LOG(INFO) << "new master detected ... maybe it's us!";
+      LOG(INFO) << "New master detected ... maybe it's us!";
       break;
     }
 
     case NO_MASTER_DETECTED: {
-      LOG(INFO) << "no master detected ... maybe we're next!";
+      LOG(INFO) << "No master detected ... maybe we're next!";
       break;
     }
 
     case F2M_REGISTER_FRAMEWORK: {
-      FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
-
+      FrameworkID fid = lexical_cast<string>(masterId) + "-"
+	+ lexical_cast<string>(nextFrameworkId++);
       Framework *framework = new Framework(from(), fid);
       unpack<F2M_REGISTER_FRAMEWORK>(framework->name,
 				     framework->user,
@@ -349,19 +309,32 @@ void Master::operator () ()
     }
 
     case F2M_REREGISTER_FRAMEWORK: {
-
       Framework *framework = new Framework(from());
+      bool failover;
       unpack<F2M_REREGISTER_FRAMEWORK>(framework->id,
                                        framework->name,
                                        framework->user,
-                                       framework->executorInfo);
+                                       framework->executorInfo,
+				       failover);
 
       if (framework->id == "") {
-        DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
-        framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+	LOG(ERROR) << "Framework reconnect/failover without an id!";
+	send(framework->pid, pack<M2F_ERROR>(1, "Missing framework id"));
+	break;
+      }
+
+      LOG(INFO) << "Reregistering " << framework << " at " << framework->pid;
+
+      if (frameworks[framework->id] != NULL) {
+	if (failover) {
+	  terminateFramework(frameworks[framework->id], 1, "Failover");
+	} else {
+	  LOG(INFO) << "Framework reregistering with an already used id!";
+	  send(framework->pid, pack<M2F_ERROR>(1, "Framework id in use"));
+	  break;
+	}
       }
 
-      LOG(INFO) << "Registering " << framework << " at " << framework->pid;
       frameworks[framework->id] = framework;
       pidToFid[framework->pid] = framework->id;
 
@@ -372,12 +345,6 @@ void Master::operator () ()
       allocator->frameworkAdded(framework);
       if (framework->executorInfo.uri == "")
         terminateFramework(framework, 1, "No executor URI given");
-
-       timeval tv;
-       gettimeofday(&tv, NULL);
-       
-       DLOG(INFO) << tv.tv_sec << "." << tv.tv_usec << " STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
-
       break;
     }
 
@@ -473,8 +440,11 @@ void Master::operator () ()
 	slave->addTask(tip);
         updateFrameworkTasks(tip);
       }
+
+      // TODO(benh|alig): We should put a timeout on how long we keep
+      // tasks running that never have frameworks reregister that
+      // claim them.
   
-     //alibandali
       LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
       slaves[slave->id] = slave;
       pidToSid[slave->pid] = slave->id;
@@ -511,15 +481,13 @@ void Master::operator () ()
       DLOG(INFO) << "FT: prepare relay seq:"<< seq() << " from: "<< from();
       if (Slave *slave = lookupSlave(sid)) {
 	if (Framework *framework = lookupFramework(fid)) {
-	  // Pass on the status update to the framework
-
+	  // Pass on the status update to the framework.
           forward(framework->pid);
-
           if (duplicate()) {
             LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << seq();
             break;
-          } 
-          // Update the task state locally
+          }
+          // Update the task state locally.
           Task *task = slave->lookupTask(fid, tid);
           if (task != NULL) {
             LOG(INFO) << "Status update: " << task << " is in state " << state;

Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun  5 05:34:18 2011
@@ -29,7 +29,7 @@
 #include "resources.hpp"
 #include "master_detector.hpp"
 #include "task.hpp"
-#include "url_processor.hpp"
+
 
 namespace nexus { namespace internal { namespace master {
 
@@ -263,9 +263,6 @@ enum TaskRemovalReason
 class Master : public Tuple<ReliableProcess>
 {
 protected:
-  bool isFT;
-  string zkServers;
-  MasterDetector *masterDetector;
   unordered_map<FrameworkID, Framework *> frameworks;
   unordered_map<SlaveID, Slave *> slaves;
   unordered_map<OfferID, SlotOffer *> slotOffers;
@@ -277,16 +274,13 @@ protected:
   long nextSlaveId;         // Used to give each slave a unique ID.
   long nextSlotOfferId; // Used to give each slot offer a unique ID.
 
-
   string allocatorType;
   Allocator *allocator;
 
   long masterId; // Used to differentiate different master in FT mode, will be ephemeral id
 
 public:
-  Master(const string &zk = "");
-
-  Master(const string& _allocatorType, const string &zk = "");
+  Master(const string& _allocatorType = "simple");
   
   ~Master();
 

Modified: incubator/mesos/trunk/src/master_detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master_detector.cpp (original)
+++ incubator/mesos/trunk/src/master_detector.cpp Sun Jun  5 05:34:18 2011
@@ -11,34 +11,277 @@
 
 #include <boost/lexical_cast.hpp>
 
+#include "config.hpp"
 #include "fatal.hpp"
+#include "foreach.hpp"
 #include "master_detector.hpp"
 #include "messages.hpp"
+#include "url_processor.hpp"
+#include "zookeeper.hpp"
 
 using namespace nexus;
 using namespace nexus::internal;
 
+using namespace std;
+
 using boost::lexical_cast;
 
 
-MasterDetector::MasterDetector(const string &_servers, const string &_znode,
-			       const PID &_pid, bool _contend)
+class ZooKeeperMasterDetector : public MasterDetector, public Watcher
+{
+public:
+  /**
+   * Uses ZooKeeper for both detecting masters and contending to be a
+   * master.
+   *
+   * @param server comma separated list of server host:port pairs
+   *
+   * @param znode top-level "ZooKeeper node" (directory) to use
+   * @param pid libprocess pid to send messages/updates to (and to
+   * use for contending to be a master)
+   * @param contend true if should contend to be master (not needed
+   * for slaves and frameworks)
+   * @param quiet verbosity logging level for undelying ZooKeeper library
+   */
+  ZooKeeperMasterDetector(const std::string &servers,
+			  const std::string &znode,
+			  const PID &pid,
+			  bool contend = false,
+			  bool quiet = false);
+
+  virtual ~ZooKeeperMasterDetector();
+
+  /** 
+   * ZooKeeper watcher callback.
+   */
+  virtual void process(ZooKeeper *zk, int type, int state,
+		       const std::string &path);
+
+  /**
+   * @return unique id of the current master
+   */
+  virtual std::string getCurrentMasterId();
+
+  /**
+   * @return libprocess PID of the current master
+   */
+  virtual PID getCurrentMasterPID();
+
+private:
+  /**
+  * TODO(alig): Comment this object.
+  */
+  void setId(const std::string &s);
+
+  /**
+  * TODO(alig): Comment this object.
+  */
+  std::string getId();
+
+  /**
+  * TODO(alig): Comment this object.
+  */
+  void detectMaster();
+
+  /**
+  * TODO(alig): Comment this object.
+  */
+  PID lookupMasterPID(const std::string &seq) const;
+
+  std::string servers;
+  std::string znode;
+  PID pid;
+  bool contend;
+  bool reconnect;
+
+  ZooKeeper *zk;
+
+  // Our sequence string if contending to be a master.
+  std::string mySeq;
+
+  std::string currentMasterSeq;
+  PID currentMasterPID;
+};
+
+
+
+MasterDetector::~MasterDetector() {}
+
+
+MasterDetector * MasterDetector::create(const std::string &url,
+					const PID &pid,
+					bool contend,
+					bool quiet)
+{
+  if (url == "")
+    if (contend)
+      return new BasicMasterDetector(pid);
+    else
+      fatal("cannot use specified url to detect master");
+
+  MasterDetector *detector = NULL;
+
+  // Parse the url.
+  pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(url);
+
+  switch (urlPair.first) {
+    // ZooKeeper URL.
+    case UrlProcessor::ZOO: {
+      const string &servers = urlPair.second;
+      detector = new ZooKeeperMasterDetector(servers, ZNODE, pid, contend, quiet);
+      break;
+    }
+
+    // Nexus URL or libprocess pid.
+    case UrlProcessor::NEXUS:
+    case UrlProcessor::UNKNOWN: {
+      if (contend) {
+	// TODO(benh): Wierdnesses like this makes it seem like there
+	// should be a separate elector and detector. In particular,
+	// it doesn't make sense to pass a libprocess pid and attempt
+	// to contend (at least not right now).
+	fatal("cannot contend to be a master with specified url");
+      } else {
+	PID master = make_pid(urlPair.second.c_str());
+	if (!master)
+	  fatal("cannot use specified url to detect master");
+	detector = new BasicMasterDetector(master, pid);
+      }
+      break;
+    }
+  }
+
+  return detector;
+}
+
+
+void MasterDetector::destroy(MasterDetector *detector)
+{
+  if (detector != NULL)
+    delete detector;
+}
+
+
+BasicMasterDetector::BasicMasterDetector(const PID &_master)
+  : master(_master)
+{
+  // Send a master id.
+  {
+    const string &s =
+      Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>("0"));
+    Process::post(master, GOT_MASTER_ID, s.data(), s.size());
+  }
+
+  // Elect the master.
+  {
+    const string &s =
+      Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+    Process::post(master, NEW_MASTER_DETECTED, s.data(), s.size());
+  }
+}
+
+
+BasicMasterDetector::BasicMasterDetector(const PID &_master,
+					 const PID &pid,
+					 bool elect)
+  : master(_master)
+{
+  if (elect) {
+    // Send a master id.
+    {
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>("0"));
+      Process::post(master, GOT_MASTER_ID, s.data(), s.size());
+    }
+
+    // Elect the master.
+    {
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+      Process::post(master, NEW_MASTER_DETECTED, s.data(), s.size());
+    }
+  }
+
+  // Tell the pid about the master.
+  const string &s =
+    Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+  Process::post(pid, NEW_MASTER_DETECTED, s.data(), s.size());
+}
+
+
+BasicMasterDetector::BasicMasterDetector(const PID &_master,
+					 const vector<PID> &pids,
+					 bool elect)
+  : master(_master)
+{
+  if (elect) {
+    // Send a master id.
+    {
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>("0"));
+      Process::post(master, GOT_MASTER_ID, s.data(), s.size());
+    }
+
+    // Elect the master.
+    {
+      const string &s =
+	Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+      Process::post(master, NEW_MASTER_DETECTED, s.data(), s.size());
+    }
+  }
+
+  // Tell each pid about the master.
+  foreach (const PID &pid, pids) {
+    const string &s =
+      Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+    Process::post(pid, NEW_MASTER_DETECTED, s.data(), s.size());
+  }
+}
+
+
+BasicMasterDetector::~BasicMasterDetector() {}
+
+
+string BasicMasterDetector::getCurrentMasterId()
+{
+  return "0";
+}
+
+
+PID BasicMasterDetector::getCurrentMasterPID()
+{
+  return master;
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const string &_servers,
+						 const string &_znode,
+						 const PID &_pid,
+						 bool _contend,
+						 bool quiet)
   : servers(_servers), znode(_znode), pid(_pid),
     contend(_contend), reconnect(false)
 {
+  // Set verbosity level for underlying ZooKeeper library logging.
+  // TODO(benh): Put this in the C++ API.
+  zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
+
+  // Start up the ZooKeeper connection!
   zk = new ZooKeeper(servers, 10000, this);
 }
 
 
-MasterDetector::~MasterDetector()
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
 {
-  if (zk != NULL)
+  if (zk != NULL) {
     delete zk;
+    zk = NULL;
+  }
 }
 
 
-void MasterDetector::process(ZooKeeper *zk, int type, int state,
-			     const string &path)
+void ZooKeeperMasterDetector::process(ZooKeeper *zk, int type, int state,
+				      const string &path)
 {
   int ret;
   string result;
@@ -85,12 +328,12 @@ void MasterDetector::process(ZooKeeper *
 		"Make sure ZooKeeper is running on: %s",
 		zk->error(ret), servers.c_str());
 
-	setMySeq(result);
-	LOG(INFO) << "Created ephemeral/sequence:" << getMySeq();
+	setId(result);
+	LOG(INFO) << "Created ephemeral/sequence:" << getId();
 
 	const string &s =
-	  Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_SEQ>(getMySeq()));
-	Process::post(pid, GOT_MASTER_SEQ, s.data(), s.size());
+	  Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>(getId()));
+	Process::post(pid, GOT_MASTER_ID, s.data(), s.size());
       }
 
       // Now determine who the master is (it may be us).
@@ -139,7 +382,25 @@ void MasterDetector::process(ZooKeeper *
 }
 
 
-void MasterDetector::detectMaster()
+void ZooKeeperMasterDetector::setId(const string &s)
+{
+  string seq = s;
+  // Converts "/path/to/znode/000000131" to "000000131".
+  int pos;
+  if ((pos = seq.find_last_of('/')) != string::npos) {  
+    mySeq = seq.erase(0, pos + 1);
+  } else
+    mySeq = "";
+}
+
+
+string ZooKeeperMasterDetector::getId() 
+{
+  return mySeq;
+}
+
+
+void ZooKeeperMasterDetector::detectMaster()
 {
   vector<string> results;
 
@@ -184,7 +445,7 @@ void MasterDetector::detectMaster()
 }
 
 
-PID MasterDetector::lookupMasterPID(const string &seq) const
+PID ZooKeeperMasterDetector::lookupMasterPID(const string &seq) const
 {
   CHECK(!seq.empty());
 
@@ -203,11 +464,13 @@ PID MasterDetector::lookupMasterPID(cons
 }
 
 
-string MasterDetector::getCurrentMasterSeq() const {
+string ZooKeeperMasterDetector::getCurrentMasterId()
+{
   return currentMasterSeq;
 }
 
 
-PID MasterDetector::getCurrentMasterPID() const {
+PID ZooKeeperMasterDetector::getCurrentMasterPID()
+{
   return currentMasterPID;
 }

Modified: incubator/mesos/trunk/src/master_detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master_detector.hpp (original)
+++ incubator/mesos/trunk/src/master_detector.hpp Sun Jun  5 05:34:18 2011
@@ -7,100 +7,100 @@
 #include <climits>
 #include <cstdlib>
 
-#include "zookeeper.hpp"
-
-using namespace std;
 
+namespace nexus { namespace internal {
 
 /**
- * Implements functionality for a) detecting masters b) contending to be a master.
+ * Implements functionality for:
+ *   a) detecting masters
+ *   b) contending to be a master
  */
-class MasterDetector : public Watcher {
+class MasterDetector
+{
+public:
+  virtual ~MasterDetector() = 0;
+
+  /**
+   * Creates a master detector that, given the specified url, knows
+   * how to connect to the master, or contend to be a master. The
+   * master detector sends messages to the specified pid when a new
+   * master is elected, a master is lost, etc.
+   *
+   * @param url string possibly containing zoo://, zoofile://, nexus://
+   * @param pid libprocess pid to both receive our messages and be
+   * used if we should contend
+   * @param contend true if should contend to be master
+   * @return instance of MasterDetector
+   */
+  static MasterDetector * create(const std::string &url,
+				 const PID &pid,
+				 bool contend = false,
+				 bool quiet = true);
+
+  /**
+   * Cleans up and deallocates the detector.
+   */
+  static void destroy(MasterDetector *detector);
+
+  /**
+   * @return unique id of the current master
+   */
+  virtual std::string getCurrentMasterId() = 0;
+
+  /**
+   * @return libprocess PID of the current master
+   */
+  virtual PID getCurrentMasterPID() = 0;
+};
+
+
+class BasicMasterDetector : public MasterDetector
+{
 public:
-  /** 
-   * Connects to ZooKeeper and possibly contends for master.
-   * 
-   * @param server comma separated list of zookeeper server host:port
-   *   pairs
-   * @param znode ZooKeeper node (directory)
-   * @param pid libprocess pid that we should send messages to (and to
-   *   contend)
-   * @param contend true if should contend to be master (not needed
-   *   for slaves and frameworks)
-   */
-  MasterDetector(const string &server, const string &znode,
-		 const PID &_pid, bool contend = false);
-
-  ~MasterDetector();
-
-  /** 
-   * ZooKeeper watcher.
-   */
-  virtual void process(ZooKeeper *zk, int type, int state, const string &path);
-
-  /** 
-   * @return ZooKeeper unique sequence number of the current master.
-   */
-  string getCurrentMasterSeq() const;
-
-  /** 
-   * @return libprocess PID of the current master. 
-   */
-  PID getCurrentMasterPID() const;
-
-  /**
-   * @return Unique ZooKeeper sequence number (only if contending for master, otherwise "").
-   */
-  string getMySeq() const
-  {
-    return mySeq;
-  }
-
-  /**
-   * Adjusts ZooKeepers level of debugging output.
-   * @param quiet true makes ZK quiet, whereas false makes ZK output DEBUG messages
-   */ 
-  static void setQuiet(bool quiet)
-  {
-    zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
-  }
-
-private: 
-  void setMySeq(const string &s)
-  {
-    string seq = s;
-    // Converts "/path/to/znode/000000131" to "000000131".
-    int pos;
-    if ((pos = seq.find_last_of('/')) != string::npos) {  
-      mySeq = seq.erase(0, pos + 1);
-    } else
-      mySeq = "";
-  }
-
-  /*
-  * TODO(alig): Comment this object.
-  */
-  void detectMaster();
-
-  /*
-  * TODO(alig): Comment this object.
-  */
-  PID lookupMasterPID(const string &seq) const;
-
-  string servers;
-  string znode;
-  PID pid;
-  bool contend;
-  bool reconnect;
+  /**
+   * Create a new master detector where the specified pid contends to
+   * be the master and gets elected by default.
+   *
+   * @param master libprocess pid to send messages/updates and be the
+   * master
+   */
+  BasicMasterDetector(const PID &master);
+
+  /**
+   * Create a new master detector where the 'master' pid is 
+   * the master (no contending).
+   *
+   * @param master libprocess pid to send messages/updates and be the
+   * master
+   * @param pid/pids libprocess pids to send messages/updates to regarding
+   * the master
+   * @param elect if true then contend and elect the specified master
+   */
+  BasicMasterDetector(const PID &master,
+		      const PID &pid,
+		      bool elect = false);
+
+  BasicMasterDetector(const PID &master,
+		      const std::vector<PID> &pids,
+		      bool elect = false);
 
-  ZooKeeper *zk;
+  virtual ~BasicMasterDetector();
 
-  // Our sequence number if contending.
-  string mySeq;
+  /**
+   * @return unique id of the current master
+   */
+  virtual std::string getCurrentMasterId();
 
-  string currentMasterSeq;
-  PID currentMasterPID;
+  /**
+   * @return libprocess PID of the current master
+   */
+  virtual PID getCurrentMasterPID();
+
+private:
+  PID master;
 };
 
+}} /* namespace nexus { namespace internal { */
+
 #endif /* __MASTER_DETECTOR_HPP__ */
 

Modified: incubator/mesos/trunk/src/master_main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_main.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master_main.cpp (original)
+++ incubator/mesos/trunk/src/master_main.cpp Sun Jun  5 05:34:18 2011
@@ -3,7 +3,6 @@
 
 #include "master.hpp"
 #include "master_webui.hpp"
-#include "url_processor.hpp"
 
 using std::cerr;
 using std::endl;
@@ -14,14 +13,14 @@ using namespace nexus::internal::master;
 void usage(const char* programName)
 {
   cerr << "Usage: " << programName
+       << " [--url URL]"
        << " [--port PORT]"
        << " [--allocator ALLOCATOR]"
-       << " [--zookeeper ZOO_SERVERS]"
        << " [--quiet]" << endl
        << endl
-       << "ZOO_SERVERS is a url of the form: "
-       << "zoo://host1:port1,host2:port2,... or "
-       << "zoofile://file where file contains a host:port pair per line"
+       << "URL (used for contending to be a master) may be one of:" << endl
+       << "  zoo://host1:port1,host2:port2,..." << endl
+       << "  zoofile://file where file contains a host:port pair per line"
        << endl;
 }
 
@@ -34,31 +33,30 @@ int main (int argc, char **argv)
   }
 
   option options[] = {
+    {"url", required_argument, 0, 'u'},
     {"allocator", required_argument, 0, 'a'},
     {"port", required_argument, 0, 'p'},
-    {"zookeeper", required_argument, 0, 'z'},
     {"quiet", no_argument, 0, 'q'},
   };
 
-  bool isFT = false;
-  string zookeeper = "";
-  bool quiet = false;
+  string url = "";
   string allocator = "simple";
+  string znode = ZNODE;
+  bool quiet = false;
 
   int opt;
   int index;
-  while ((opt = getopt_long(argc, argv, "a:p:z:q", options, &index)) != -1) {
+  while ((opt = getopt_long(argc, argv, "u:a:p:q", options, &index)) != -1) {
     switch (opt) {
+      case 'u':
+        url = optarg;
+        break;
       case 'a':
         allocator = optarg;
         break;
       case 'p':
         setenv("LIBPROCESS_PORT", optarg, 1);
         break;
-      case 'z':
-	isFT = true;
-        zookeeper = optarg;
-	break;
       case 'q':
         quiet = true;
         break;
@@ -71,22 +69,22 @@ int main (int argc, char **argv)
     }
   }
 
-  if (!quiet)
-    google::SetStderrLogging(google::INFO);
-  else if (isFT)
-    MasterDetector::setQuiet(true);
-
+  // TODO(benh): Don't log to /tmp behind a sys admin's back!
   FLAGS_log_dir = "/tmp";
   FLAGS_logbufsecs = 1;
   google::InitGoogleLogging(argv[0]);
 
+  if (!quiet)
+    google::SetStderrLogging(google::INFO);
+
   LOG(INFO) << "Build: " << BUILD_DATE << " by " << BUILD_USER;
   LOG(INFO) << "Starting Nexus master";
-  if (isFT)
-    LOG(INFO) << "Nexus in fault-tolerant mode";
-  Master *master = new Master(allocator, zookeeper);
+
+  Master *master = new Master(allocator);
   PID pid = Process::spawn(master);
 
+  MasterDetector *detector = MasterDetector::create(url, pid, true, quiet);
+
 #ifdef NEXUS_WEBUI
   if (chdir(dirname(argv[0])) != 0)
     fatalerror("could not change into %s for running webui", dirname(argv[0]));
@@ -94,5 +92,8 @@ int main (int argc, char **argv)
 #endif
   
   Process::wait(pid);
+
+  MasterDetector::destroy(detector);
+
   return 0;
 }

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 05:34:18 2011
@@ -5,6 +5,7 @@
 #include <string>
 #include <vector>
 
+#include <reliable.hpp>
 #include <tuple.hpp>
 
 #include <nexus.hpp>
@@ -20,8 +21,9 @@ namespace nexus { namespace internal {
 
 enum MessageType {
   /* From framework to master. */
-  F2M_REGISTER_FRAMEWORK = PROCESS_MSGID,
+  F2M_REGISTER_FRAMEWORK = RELIABLE_MSGID,
   F2M_REREGISTER_FRAMEWORK,
+  F2M_FAILOVER_FRAMEWORK,
   F2M_UNREGISTER_FRAMEWORK,
   F2M_SLOT_OFFER_REPLY,
   F2M_REVIVE_OFFERS,
@@ -55,9 +57,9 @@ enum MessageType {
   SH2M_HEARTBEAT,
 
   /* From master detector to processes */
+  GOT_MASTER_ID,
   NEW_MASTER_DETECTED,
   NO_MASTER_DETECTED,
-  GOT_MASTER_SEQ,
   
   /* From master to slave. */
   M2S_REGISTER_REPLY,
@@ -125,7 +127,8 @@ TUPLE(F2M_REREGISTER_FRAMEWORK,
       (std::string /*fid*/,
        std::string /*name*/,
        std::string /*user*/,
-       ExecutorInfo));
+       ExecutorInfo,
+       bool /*failover*/));
 
 TUPLE(F2M_UNREGISTER_FRAMEWORK,
       (FrameworkID));
@@ -238,8 +241,8 @@ TUPLE(NEW_MASTER_DETECTED,
 TUPLE(NO_MASTER_DETECTED,
       ());
 
-TUPLE(GOT_MASTER_SEQ,
-      (std::string /* seq */));
+TUPLE(GOT_MASTER_ID,
+      (std::string /* id */));
   
 TUPLE(M2S_REGISTER_REPLY,
       (SlaveID));

Modified: incubator/mesos/trunk/src/nexus_local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_local.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_local.cpp (original)
+++ incubator/mesos/trunk/src/nexus_local.cpp Sun Jun  5 05:34:18 2011
@@ -15,16 +15,8 @@ using nexus::internal::slave::Slave;
 using namespace nexus::internal;
 
 
-/* TODO(benh): Remove this dependency! */
-/* List of ZooKeeper host:port pairs. */
-std::string zookeeper = "";
-
-
 namespace {
 
-static Master* currentMaster = NULL;
-static vector<Slave*>* currentSlaves = NULL;
-
 static pthread_once_t glog_initialized = PTHREAD_ONCE_INIT;
 
 
@@ -35,47 +27,62 @@ void initialize_glog() {
 } /* namespace { */
 
 
-PID run_nexus(int slaves, int32_t cpus, int64_t mem,
-              bool ownLogging, bool quiet)
+namespace nexus { namespace internal { namespace local {
+
+static Master *master = NULL;
+static vector<Slave*> *slaves = NULL;
+static MasterDetector *detector = NULL;
+
+
+PID launch(int numSlaves, int32_t cpus, int64_t mem,
+	   bool initLogging, bool quiet)
 {
-  if (currentMaster != NULL)
-    fatal("Call to run_nexus while it is already running");
-  
-  if (ownLogging) {
+  if (master != NULL)
+    fatal("can only launch one local cluster at a time (for now)");
+
+  if (initLogging) {
     pthread_once(&glog_initialized, initialize_glog);
     if (!quiet)
       google::SetStderrLogging(google::INFO);
   }
 
-  currentMaster = new Master();
+  master = new Master();
+  slaves = new vector<Slave*>();
 
-  Process::spawn(currentMaster);
+  PID pid = Process::spawn(master);
 
-  currentSlaves = new vector<Slave*>();
+  vector<PID> pids;
 
-  for (int i = 0; i < slaves; i++) {
-    Slave* s = new Slave(currentMaster->getPID(), Resources(cpus, mem), true);
-    currentSlaves->push_back(s);
-    Process::spawn(s);
+  for (int i = 0; i < numSlaves; i++) {
+    Slave* slave = new Slave(Resources(cpus, mem), true);
+    slaves->push_back(slave);
+    pids.push_back(Process::spawn(slave));
   }
 
-  return currentMaster->getPID();
+  detector = new BasicMasterDetector(pid, pids, true);
+
+  return pid;
 }
 
 
-void kill_nexus()
+void shutdown()
 {
-  Process::post(currentMaster->getPID(), M2M_SHUTDOWN);
+  Process::post(master->getPID(), M2M_SHUTDOWN);
 
-  Process::wait(currentMaster->getPID());
-  delete currentMaster;
-  currentMaster = NULL;
-
-  for (int i = 0; i < currentSlaves->size(); i++) {
-    Process::wait(currentSlaves->at(i));
-    delete currentSlaves->at(i);
+  Process::wait(master->getPID());
+  delete master;
+  master = NULL;
+
+  for (int i = 0; i < slaves->size(); i++) {
+    Process::wait(slaves->at(i));
+    delete slaves->at(i);
   }
 
-  delete currentSlaves;
-  currentSlaves = NULL;
+  delete slaves;
+  slaves = NULL;
+
+  delete detector;
+  detector = NULL;
 }
+
+}}} /* namespace nexus { namespace internal { namespace local { */

Modified: incubator/mesos/trunk/src/nexus_local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_local.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_local.hpp (original)
+++ incubator/mesos/trunk/src/nexus_local.hpp Sun Jun  5 05:34:18 2011
@@ -6,8 +6,13 @@
 #include "master.hpp"
 #include "slave.hpp"
 
-PID run_nexus(int slaves, int32_t cpus, int64_t mem,
-              bool ownLogging, bool quiet);
-void kill_nexus();
+namespace nexus { namespace internal { namespace local {
+
+PID launch(int numSlaves, int32_t cpus, int64_t mem,
+	   bool initLogging, bool quiet);
+
+void shutdown();
+
+}}}
 
 #endif /* LOCAL_HPP */

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 05:34:18 2011
@@ -26,7 +26,6 @@
 #include "messages.hpp"
 #include "nexus_local.hpp"
 #include "nexus_sched.hpp"
-#include "url_processor.hpp"
 #include "master_detector.hpp"
 
 #define REPLY_TIMEOUT 20
@@ -48,14 +47,6 @@ using namespace nexus::internal;
 
 namespace nexus { namespace internal {
 
-/**
- * TODO(benh): Update this comment.
- * Scheduler process, responsible for interacting with the master
- * and responding to Nexus API calls from schedulers. In order to
- * allow a message to be sent back to the master we allow friend
- * functions to invoke 'send'. Therefore, care must be done to insure
- * any synchronization necessary is performed.
- */
 
 class RbReply : public Tuple<Process>
 {    
@@ -94,127 +85,107 @@ private:
   const TaskID tid;
 };
 
+
+/**
+ * TODO(benh): Update this comment.
+ * Scheduler process, responsible for interacting with the master
+ * and responding to Nexus API calls from schedulers. In order to
+ * allow a message to be sent back to the master we allow friend
+ * functions to invoke 'send'. Therefore, care must be done to insure
+ * any synchronization necessary is performed.
+ */
 class SchedulerProcess : public Tuple<ReliableProcess>
 {
 public:
   friend class nexus::NexusSchedulerDriver;
 
 private:
-  PID master;
   NexusSchedulerDriver* driver;
   Scheduler* sched;
   FrameworkID fid;
   string frameworkName;
   ExecutorInfo execInfo;
-  bool isFT;
-  string zkServers;
-  MasterDetector *masterDetector;
 
-  unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
-  unordered_map<SlaveID, PID> savedSlavePids;
+  PID master;
 
   volatile bool terminate;
 
+  unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
+  unordered_map<SlaveID, PID> savedSlavePids;
+
   unordered_map<TaskID, RbReply *> rbReplies;
 
 public:
-  SchedulerProcess(const string &_master,
-                   NexusSchedulerDriver* _driver,
+  SchedulerProcess(NexusSchedulerDriver* _driver,
                    Scheduler* _sched,
+		   FrameworkID _fid,
                    const string& _frameworkName,
                    const ExecutorInfo& _execInfo)
     : driver(_driver),
       sched(_sched),
-      fid(""),
-      terminate(false),
+      fid(_fid),
       frameworkName(_frameworkName),
       execInfo(_execInfo),
-      masterDetector(NULL)
-{
-  pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
-  if (urlPair.first == UrlProcessor::ZOO) {
-    isFT = true;
-    zkServers = urlPair.second;
-    //  } else if (urlPair.first == UrlProcessor::NEXUS) {
-  } else {
-    isFT = false; 
-    if (urlPair.first == UrlProcessor::NEXUS)
-      master = make_pid(urlPair.second.c_str());
-    else
-      master = make_pid(_master.c_str());
-    
-    if (!master)
-    {
-      cerr << "Scheduler failed to resolve master PID " << urlPair.second << endl;
-      exit(1);
-    }
-  }
-}
+      master(PID()),
+      terminate(false) {}
 
-~SchedulerProcess()
-{
-  if (masterDetector != NULL) {
-    delete masterDetector;
-    masterDetector = NULL;
-  }
-}
+  ~SchedulerProcess() {}
 
 protected:
   void operator () ()
   {
-    // Get username of current user
+    // Get username of current user.
     struct passwd* passwd;
     if ((passwd = getpwuid(getuid())) == NULL)
       fatal("failed to get username information");
-    string user(passwd->pw_name);
 
-    if (isFT) {
-      LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-      masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
-    } else {
-      send(self(), pack<NEW_MASTER_DETECTED>("0", master));
-    }
+    const string user(passwd->pw_name);
 
     while(true) {
       // Rather than send a message to this process when it is time to
-      // complete, we set a flag that gets re-read. Sending a message
+      // terminate, we set a flag that gets re-read. Sending a message
       // requires some sort of matching or priority reads that
       // libprocess currently doesn't support. Note that this field is
-      // only read by this process (after setting it in the
-      // destructor), so we don't need to protect it in any way. In
-      // fact, using a lock to protect it (or for providing atomicity
-      // for cleanup, for example), might lead to deadlock with the
-      // client code because we already use a lock in SchedulerDriver. That
-      // being said, for now we make terminate 'volatile' to guarantee
-      // that each read is getting a fresh copy.
+      // only read by this process, so we don't need to protect it in
+      // any way. In fact, using a lock to protect it (or for
+      // providing atomicity for cleanup, for example), might lead to
+      // deadlock with the client code because we already use a lock
+      // in SchedulerDriver. That being said, for now we make
+      // terminate 'volatile' to guarantee that each read is getting a
+      // fresh copy.
       // TODO(benh): Do a coherent read so as to avoid using 'volatile'.
       if (terminate)
         return;
 
+      // TODO(benh): We need to break the receive every so often to
+      // check if 'terminate' has been set. It would be better to just
+      // send a message rather than have a timeout (see the comment
+      // above for why sending a message will still require us to use
+      // the terminate flag).
       switch(receive(2)) {
-      // TODO(benh): We need to break the receive loop every so often
-      // to check if 'terminate' has been set .. but rather than use a
-      // timeout in receive, it would be nice to send a message, but
-      // see above.
 
       case NEW_MASTER_DETECTED: {
 	string masterSeq;
 	PID masterPid;
 	unpack<NEW_MASTER_DETECTED>(masterSeq, masterPid);
 
-	LOG(INFO) << "New master at " << masterPid
-		  << " with ephemeral id:" << masterSeq;
+	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+
+	// Connect as a failover if this is the first master we are
+        // being told about a master AND we already have an id.
+        bool failover = !master && fid != "";
 
         redirect(master, masterPid);
 	master = masterPid;
 	link(master);
 
-	if (fid.empty()) {
+	if (fid == "") {
 	  // Touched for the very first time.
-	  send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));	  
+	  send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
 	} else {
-	  // Not the first time.
-	  send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
+	  // Not the first time, or failing over.
+	  send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user,
+						      execInfo, failover));
 	}
 	break;
       }
@@ -260,17 +231,13 @@ protected:
 	// Remove the offer since we saved all the PIDs we might use.
         savedOffers.erase(oid);
 
-        if (isFT) {
-          foreach(const TaskDescription &task, tasks) {
-            RbReply *rr = new RbReply(self(), task.taskId);
-            rbReplies[task.taskId] = rr;
-            link(spawn(rr));
-          }
-        }
+	foreach(const TaskDescription &task, tasks) {
+	  RbReply *rr = new RbReply(self(), task.taskId);
+	  rbReplies[task.taskId] = rr;
+	  link(spawn(rr));
+	}
         
         send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
-
-	
         break;
       }
 
@@ -352,12 +319,8 @@ protected:
       }
 
       case PROCESS_EXIT: {
-	if (isFT) {
-	  LOG(WARNING) << "Connection to master lost .. waiting for new master.";
-	} else {
-	  const char* message = "Connection to master failed";
-	  invoke(bind(&Scheduler::error, sched, driver, -1, message));
-	}
+	// TODO(benh): Don't wait for a new master forever.
+	LOG(WARNING) << "Connection to master lost .. waiting for new master.";
         break;
       }
 
@@ -416,17 +379,19 @@ ExecutorInfo Scheduler::getExecutorInfo(
 
 
 // Default implementation of Scheduler::error that logs to stderr
-void Scheduler::error(SchedulerDriver* s, int code, const string &message)
+void Scheduler::error(SchedulerDriver* driver, int code, const string &message)
 {
   cerr << "Nexus error: " << message
        << " (error code: " << code << ")" << endl;
-  s->stop();
+  driver->stop();
 }
 
 
 NexusSchedulerDriver::NexusSchedulerDriver(Scheduler* _sched,
-                               const string &_master)
-  : sched(_sched), master(_master), running(false), process(NULL)
+					   const string &_url,
+					   FrameworkID _fid)
+  : sched(_sched), url(_url), fid(_fid), running(false),
+    process(NULL), detector(NULL)
 {
   // Create mutex and condition variable
   pthread_mutexattr_t attr;
@@ -458,56 +423,58 @@ NexusSchedulerDriver::~NexusSchedulerDri
   // to the user somehow.
   Process::wait(process);
   delete process;
+
+  MasterDetector::destroy(detector);
+
+  // Check and see if we need to shutdown a local cluster.
+  if (url == "local" || url == "localquiet")
+    local::shutdown();
 }
 
 
-void NexusSchedulerDriver::start()
+int NexusSchedulerDriver::start()
 {
   Lock lock(&mutex);
 
   if (running) {
-    error(1, "cannot call start - scheduler is already running");
-    return;
+    //error(1, "cannot call start - scheduler is already running");
+    return - 1;
   }
 
-  PID pid;
-  
-  string initServer = master;
-
-  if (master == string("localquiet")) {
-    // TODO(benh): Look up resources in environment variables.
-    pid = run_nexus(1, 1, 1073741824, true, true);
-    ostringstream ss;
-    ss << pid;
-    initServer = ss.str();
-  } else if (master == string("local")) {
-    // TODO(benh): Look up resources in environment variables.
-    pid = run_nexus(1, 1, 1073741824, true, false);
-    ostringstream ss;
-    ss << pid;
-    initServer = ss.str();
-  } 
-
   const string& frameworkName = sched->getFrameworkName(this);
   const ExecutorInfo& executorInfo = sched->getExecutorInfo(this);
 
-  process = new SchedulerProcess(initServer, this, sched, frameworkName, executorInfo);
-  
-  Process::spawn(process);
+  process = new SchedulerProcess(this, sched, fid, frameworkName, executorInfo);
+  PID pid = Process::spawn(process);
+
+  // Check and see if we need to launch a local cluster.
+  if (url == "local") {
+    // TODO(benh): Get number of slaves and resources per slave from
+    // command line (or environment or configuration?).
+    PID master = local::launch(1, 1, 1073741824, true, false);
+    detector = new BasicMasterDetector(master, pid);
+  } else if (url == "localquiet") {
+    // TODO(benh): Get number of slaves and resources per slave from
+    // command line (or environment?).
+    PID master = local::launch(1, 1, 1073741824, true, true);
+    detector = new BasicMasterDetector(master, pid);
+  } else {
+    detector = MasterDetector::create(url, pid, false, true);
+  }
 
   running = true;
-}
 
+  return 0;
+}
 
 
-void NexusSchedulerDriver::stop()
+int NexusSchedulerDriver::stop()
 {
   Lock lock(&mutex);
 
   if (!running) {
     // Don't issue an error (could lead to an infinite loop).
-    // TODO(benh): It would be much cleaner to return success or failure!
-    return;
+    return -1;
   }
 
   // TODO(benh): Do a Process::post instead?
@@ -519,97 +486,112 @@ void NexusSchedulerDriver::stop()
   running = false;
 
   pthread_cond_signal(&cond);
+
+  return 0;
 }
 
 
-void NexusSchedulerDriver::join()
+int NexusSchedulerDriver::join()
 {
   Lock lock(&mutex);
   while (running)
     pthread_cond_wait(&cond, &mutex);
+
+  return 0;
 }
 
 
-void NexusSchedulerDriver::run()
+int NexusSchedulerDriver::run()
 {
-  start();
-  join();
+  int ret = start();
+  return ret != 0 ? ret : join();
 }
 
 
-void NexusSchedulerDriver::killTask(TaskID tid)
+int NexusSchedulerDriver::killTask(TaskID tid)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    error(1, "cannot call killTask - scheduler is not running");
-    return;
+    //error(1, "cannot call killTask - scheduler is not running");
+    return -1;
   }
 
   // TODO(benh): Do a Process::post instead?
 
   process->send(process->master,
                 process->pack<F2M_KILL_TASK>(process->fid, tid));
+
+  return 0;
 }
 
 
-void NexusSchedulerDriver::replyToOffer(OfferID offerId,
-                                        const vector<TaskDescription> &tasks,
-                                        const string_map &params)
+int NexusSchedulerDriver::replyToOffer(OfferID offerId,
+				       const vector<TaskDescription> &tasks,
+				       const string_map &params)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    error(1, "cannot call replyToOffer - scheduler is not running");
-    return;
+    //error(1, "cannot call replyToOffer - scheduler is not running");
+    return -1;
   }
 
   // TODO(benh): Do a Process::post instead?
   
   process->send(process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
+
+  return 0;
 }
 
 
-void NexusSchedulerDriver::reviveOffers()
+int NexusSchedulerDriver::reviveOffers()
 {
   Lock lock(&mutex);
 
   if (!running) {
-    error(1, "cannot call reviveOffers - scheduler is not running");
-    return;
+    //error(1, "cannot call reviveOffers - scheduler is not running");
+    return -1;
   }
 
   // TODO(benh): Do a Process::post instead?
 
   process->send(process->master,
                 process->pack<F2M_REVIVE_OFFERS>(process->fid));
+
+  return 0;
 }
 
 
-void NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
+int NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    error(1, "cannot call sendFrameworkMessage - scheduler is not running");
-    return;
+    //error(1, "cannot call sendFrameworkMessage - scheduler is not running");
+    return -1;
   }
 
+  // TODO(benh): Do a Process::post instead?
+
   process->send(process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message));
+
+  return 0;
 }
 
 
-void NexusSchedulerDriver::sendHints(const string_map& hints)
+int NexusSchedulerDriver::sendHints(const string_map& hints)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    error(1, "cannot call sendHints - scheduler is not running");
-    return;
+    //error(1, "cannot call sendHints - scheduler is not running");
+    return -1;
   }
 
   // TODO(*): Send the hints; for now, we do nothing
-  error(1, "sendHints is not yet implemented");
+  //error(1, "sendHints is not yet implemented");
+  return -1;
 }
 
 

Modified: incubator/mesos/trunk/src/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/process_based_isolation_module.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/process_based_isolation_module.cpp Sun Jun  5 05:34:18 2011
@@ -118,7 +118,7 @@ ProcessBasedIsolationModule::Reaper::Rea
   : module(m)
 {}
 
-  
+
 void ProcessBasedIsolationModule::Reaper::operator () ()
 {
   link(module->slave->getPID());

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 05:34:18 2011
@@ -2,16 +2,20 @@
 
 #include <fstream>
 #include <algorithm>
+
+#include "isolation_module_factory.hpp"
 #include "slave.hpp"
 #include "slave_webui.hpp"
-#include "isolation_module_factory.hpp"
-#include "url_processor.hpp"
 
-#define FT_TIMEOUT 10
+// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
+#ifdef __sun__
+#define gethostbyname2(name, _) gethostbyname(name)
+#endif
 
 using std::list;
 using std::make_pair;
 using std::ostringstream;
+using std::istringstream;
 using std::pair;
 using std::queue;
 using std::string;
@@ -25,10 +29,6 @@ using namespace nexus;
 using namespace nexus::internal;
 using namespace nexus::internal::slave;
 
-// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
-#ifdef __sun__
-#define gethostbyname2(name, _) gethostbyname(name)
-#endif
 
 namespace {
 
@@ -63,61 +63,18 @@ public:
 } /* namespace */
 
 
-Slave::Slave(const string &_master, Resources _resources, bool _local)
-  : masterDetector(NULL), resources(_resources), local(_local), id(""),
-    isolationType("process"), isolationModule(NULL)
-{
-  pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
-  if (urlPair.first == UrlProcessor::ZOO) {
-    isFT = true;
-    zkServers = urlPair.second;
-  } else 
-  {
-    isFT = false;
-    if (urlPair.first == UrlProcessor::NEXUS)
-      master = make_pid(urlPair.second.c_str());
-    else
-      master = make_pid(_master.c_str());
-    
-    if (!master)
-    {
-      cerr << "Slave failed to resolve master PID " << urlPair.second << endl;
-      exit(1);
-    }
-  }
-}
-
+Slave::Slave(Resources _resources, bool _local, const string &_isolationType)
+  : id(""), resources(_resources), local(_local),
+    isolationType(_isolationType), isolationModule(NULL) {}
 
-Slave::Slave(const string &_master, Resources _resources, bool _local,
-	     const string &_isolationType)
-  : masterDetector(NULL), resources(_resources), local(_local), id(""),
-    isolationType(_isolationType), isolationModule(NULL)
-{
-  pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
-  if (urlPair.first == UrlProcessor::ZOO) {
-    isFT = true;
-    zkServers = urlPair.second;
-  } else {
-    isFT = false;
-    if (urlPair.first == UrlProcessor::NEXUS)
-      master = make_pid(urlPair.second.c_str());
-    else
-      master = make_pid(_master.c_str());
-    
-    if (!master)
-    {
-      cerr << "Slave failed to resolve master PID " << urlPair.second << endl;
-      exit(1);
-    }
-  }
-}
 
 Slave::~Slave()
 {
-  if (masterDetector != NULL) {
-    delete masterDetector;
-    masterDetector = NULL;
-  }
+  // TODO(matei): Add support for factory style destroy of objects!
+  if (isolationModule != NULL)
+    delete isolationModule;
+
+  // TODO(benh): Shut down and free executors?
 }
 
 
@@ -151,13 +108,6 @@ void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
 
-  if (isFT) {
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-    masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
-  } else {
-    send(self(), pack<NEW_MASTER_DETECTED>("0", master));
-  }
-
   // Get our hostname
   char buf[256];
   gethostname(buf, sizeof(buf));
@@ -171,7 +121,7 @@ void Slave::operator () ()
   if (!publicDns) {
     publicDns = hostname;
   }
-  
+
   FrameworkID fid;
   TaskID tid;
   TaskState taskState;
@@ -186,8 +136,7 @@ void Slave::operator () ()
 	PID masterPid;
 	unpack<NEW_MASTER_DETECTED>(masterSeq, masterPid);
 
-	LOG(INFO) << "New master at " << masterPid
-		  << " with ephemeral id:" << masterSeq;
+	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
 
         redirect(master, masterPid);
 	master = masterPid;
@@ -329,7 +278,6 @@ void Slave::operator () ()
           isolationModule->resourcesChanged(fw);
           // Tell executor that it's registered and give it its queued tasks
           send(from(), pack<S2E_REGISTER_REPLY>(this->id,
-                                                hostname,
                                                 fw->name,
                                                 fw->executorInfo.initArg));
           sendQueuedTasks(fw);
@@ -351,12 +299,12 @@ void Slave::operator () ()
             isolationModule->resourcesChanged(fw);
           }
         }
-        // Pass on the update to the master
-        if (isFT) {
-          string msg = tupleToString(pack<S2M_FT_STATUS_UPDATE>(id, fid, tid, taskState, data));
-          rsend(master, S2M_FT_STATUS_UPDATE, msg.c_str(), sizeof(msg.c_str()));
-        } else
-          send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
+
+        // Pass on the update to the master.
+	const string &msg =
+	  tupleToString(pack<S2M_FT_STATUS_UPDATE>(id, fid, tid, taskState,
+						   data));
+	rsend(master, S2M_FT_STATUS_UPDATE, msg.data(), msg.size());
         break;
       }
 
@@ -377,56 +325,42 @@ void Slave::operator () ()
         LOG(INFO) << "Process exited: " << from();
 
         if (from() == master) {
-	  // TODO: Fault tolerance!
-	  if (isFT) {
-	    LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected."; 
-	  } else {
-	    LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
-	    if (isolationModule != NULL)
-	      delete isolationModule;
-	    // TODO: Shut down executors?
-	    return;
+	  LOG(WARNING) << "Master disconnected! "
+		       << "Waiting for a new master to be elected.";
+	  // TODO(benh): After so long waiting for a master, commit suicide.
+	} else {
+	  // Check if an executor has exited.
+	  foreachpair (_, Executor *ex, executors) {
+	    if (from() == ex->pid) {
+	      LOG(INFO) << "Executor for framework " << ex->frameworkId
+			<< " disconnected";
+	      Framework *framework = getFramework(fid);
+	      if (framework != NULL) {
+		send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
+		killFramework(framework);
+	      }
+	      break;
+	    }
 	  }
 	}
 
-        foreachpair (_, Executor *ex, executors) {
-          if (from() == ex->pid) {
-            LOG(INFO) << "Executor for framework " << ex->frameworkId
-                      << " disconnected";
-	    Framework *framework = getFramework(fid);
-	    if (framework != NULL) {
-	      send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
-	      killFramework(framework);
-	    }
-            break;
-          }
-        }
-
         break;
       }
 
       case M2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by master: " << from();
-	// TODO(matei): Add support for factory style destroy of objects!
-	if (isolationModule != NULL)
-	  delete isolationModule;
-        // TODO: Shut down executors?
         return;
       }
 
-
       case S2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by " << from();
-	// TODO(matei): Add support for factory style destroy of objects!
-	if (isolationModule != NULL)
-	  delete isolationModule;
-        // TODO: Shut down executors?
         return;
       }
 
       case PROCESS_TIMEOUT: {
 	break;
       }
+
       default: {
         LOG(ERROR) << "Received unknown message ID " << msgid()
                    << " from " << from();
@@ -512,6 +446,8 @@ void Slave::killFramework(Framework *fw)
 
 
 // Called by isolation module when an executor process exits
+// TODO(benh): Make this callback be a message so that we can avoid
+// race conditions.
 void Slave::executorExited(FrameworkID fid, int status)
 {
   if (Framework *f = getFramework(fid)) {

Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun  5 05:34:18 2011
@@ -165,8 +165,6 @@ public:
   typedef unordered_map<FrameworkID, Executor*> ExecutorMap;
   
   bool isFT;
-  string zkServers;
-  MasterDetector *masterDetector;
   PID master;
   SlaveID id;
   Resources resources;
@@ -177,10 +175,8 @@ public:
   IsolationModule *isolationModule;
 
 public:
-  Slave(const string &_master, Resources resources, bool _local);
-
-  Slave(const string &_master, Resources resources, bool _local,
-        const string& isolationType);
+  Slave(Resources resources, bool local,
+	const string& isolationType = "process");
 
   virtual ~Slave();
 

Modified: incubator/mesos/trunk/src/slave_main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave_main.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave_main.cpp (original)
+++ incubator/mesos/trunk/src/slave_main.cpp Sun Jun  5 05:34:18 2011
@@ -8,19 +8,19 @@ using namespace std;
 using namespace nexus::internal::slave;
 
 
-/* List of ZooKeeper host:port pairs. */
-string zookeeper = "";
-
-
 void usage(const char *programName)
 {
   cerr << "Usage: " << programName
+       << " [--url URL]"
        << " [--cpus NUM]"
        << " [--mem NUM]"
        << " [--isolation TYPE]"
-       << " [--zookeeper host:port]"
-       << " [--quiet]"
-       << " <master_pid>"
+       << " [--quiet]" << endl
+       << endl
+       << "URL (used for connecting to a master) may be one of:" << endl
+       << "  nexus://id@host:port" << endl
+       << "  zoo://host1:port1,host2:port2,..." << endl
+       << "  zoofile://file where file contains a host:port pair per line"
        << endl;
 }
 
@@ -33,22 +33,26 @@ int main(int argc, char **argv)
   }
 
   option options[] = {
+    {"url", required_argument, 0, 'u'},
     {"cpus", required_argument, 0, 'c'},
     {"mem", required_argument, 0, 'm'},
     {"isolation", required_argument, 0, 'i'},
-    {"zookeeper", required_argument, 0, 'z'},
     {"quiet", no_argument, 0, 'q'},
   };
 
+  string url = "";
   Resources resources(1, 1 * Gigabyte);
   string isolation = "process";
   bool quiet = false;
 
   int opt;
   int index;
-  while ((opt = getopt_long(argc, argv, "c:m:i:z:q", options, &index)) != -1) {
+  while ((opt = getopt_long(argc, argv, "u:c:m:i:q", options, &index)) != -1) {
     switch (opt) {
-      case 'c':
+      case 'u':
+        url = optarg;
+        break;
+      case 'c': 
 	resources.cpus = atoi(optarg);
         break;
       case 'm':
@@ -57,15 +61,6 @@ int main(int argc, char **argv)
       case 'i':
 	isolation = optarg;
         break;
-      case 'z':
-#ifndef USING_ZOOKEEPER
-	cerr << "--zookeeper not supported in this build" << endl;
-	usage(argv[0]);
-	exit(1);
-#else
-	zookeeper = optarg;
-#endif
-        break;
       case 'q':
         quiet = true;
         break;
@@ -80,36 +75,19 @@ int main(int argc, char **argv)
 
   if (!quiet)
     google::SetStderrLogging(google::INFO);
-  else
-    LeaderDetector::setQuiet(true);
 
+  FLAGS_log_dir = "/tmp";
   FLAGS_logbufsecs = 1;
   google::InitGoogleLogging(argv[0]);
 
-  // Check that we either have zookeeper as an argument or exactly one
-  // non-option argument (i.e., the master PID).
-  if (zookeeper.empty() && optind != argc - 1) {
-    usage(argv[0]);
-    exit(1);
-  }
-
-  // Resolve the master PID.
-  PID master;
-
-  if (zookeeper.empty()) {
-    istringstream iss(argv[optind]);
-    if (!(iss >> master)) {
-      cerr << "Failed to resolve master PID " << argv[optind] << endl;
-      exit(1);
-    }
-  }
-
   LOG(INFO) << "Build: " << BUILD_DATE << " by " << BUILD_USER;
   LOG(INFO) << "Starting Nexus slave";
 
-  Slave* slave = new Slave(master, resources, false, isolation);
+  Slave* slave = new Slave(resources, false, isolation);
   PID pid = Process::spawn(slave);
 
+  MasterDetector *detector = MasterDetector::create(url, pid, false, quiet);
+
 #ifdef NEXUS_WEBUI
   if (chdir(dirname(argv[0])) != 0)
     fatalerror("could not change into %s for running webui", dirname(argv[0]));
@@ -117,5 +95,8 @@ int main(int argc, char **argv)
 #endif
 
   Process::wait(pid);
+
+  MasterDetector::destroy(detector);
+
   return 0;
 }

Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun  5 05:34:18 2011
@@ -65,26 +65,26 @@ public:
 TEST(MasterTest, NoopFrameworkWithOneSlave)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
   NoopScheduler sched(1);
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_TRUE(sched.registeredCalled);
   EXPECT_EQ(1, sched.offersGotten);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, NoopFrameworkWithMultipleSlaves)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(10, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
   NoopScheduler sched(10);
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_TRUE(sched.registeredCalled);
   EXPECT_EQ(1, sched.offersGotten);
-  kill_nexus();
+  local::shutdown();
 }
 
 
@@ -122,7 +122,7 @@ public:
 TEST(MasterTest, DuplicateTaskIdsInResponse)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "1";
@@ -134,14 +134,14 @@ TEST(MasterTest, DuplicateTaskIdsInRespo
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Duplicate task ID: 1", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, TooMuchMemoryInTask)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "1";
@@ -151,14 +151,14 @@ TEST(MasterTest, TooMuchMemoryInTask)
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, TooMuchCpuInTask)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "4";
@@ -168,14 +168,14 @@ TEST(MasterTest, TooMuchCpuInTask)
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, TooLittleCpuInTask)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "0";
@@ -185,14 +185,14 @@ TEST(MasterTest, TooLittleCpuInTask)
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Invalid task size: <0 CPUs, 1073741824 MEM>", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, TooLittleMemoryInTask)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "1";
@@ -202,14 +202,14 @@ TEST(MasterTest, TooLittleMemoryInTask)
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Invalid task size: <1 CPUs, 1 MEM>", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, TooMuchMemoryAcrossTasks)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "1";
@@ -220,14 +220,14 @@ TEST(MasterTest, TooMuchMemoryAcrossTask
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, TooMuchCpuAcrossTasks)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+  PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
   vector<TaskDescription> tasks;
   map<string, string> params;
   params["cpus"] = "2";
@@ -238,14 +238,14 @@ TEST(MasterTest, TooMuchCpuAcrossTasks)
   NexusSchedulerDriver driver(&sched, master);
   driver.run();
   EXPECT_EQ("Too many resources accepted", sched.errorMessage);
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, ResourcesReofferedAfterReject)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(10, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
 
   NoopScheduler sched1(10);
   NexusSchedulerDriver driver1(&sched1, master);
@@ -259,14 +259,14 @@ TEST(MasterTest, ResourcesReofferedAfter
   EXPECT_TRUE(sched2.registeredCalled);
   EXPECT_EQ(1, sched2.offersGotten);
 
-  kill_nexus();
+  local::shutdown();
 }
 
 
 TEST(MasterTest, ResourcesReofferedAfterBadResponse)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
-  PID master = run_nexus(1, 2, 1 * Gigabyte, false, false);
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
 
   vector<TaskDescription> tasks;
   map<string, string> params;
@@ -284,7 +284,7 @@ TEST(MasterTest, ResourcesReofferedAfter
   EXPECT_TRUE(sched2.registeredCalled);
   EXPECT_EQ(1, sched2.offersGotten);
 
-  kill_nexus();
+  local::shutdown();
 }
 
 
@@ -324,9 +324,11 @@ TEST(MasterTest, SlaveLost)
   Master m;
   PID master = Process::spawn(&m);
 
-  Slave s(master, Resources(2, 1 * Gigabyte), true);
+  Slave s(Resources(2, 1 * Gigabyte), true);
   PID slave = Process::spawn(&s);
 
+  BasicMasterDetector detector(master, slave, true);
+
   SlaveLostScheduler sched(slave);
 
   NexusSchedulerDriver driver(&sched, master);
@@ -342,3 +344,80 @@ TEST(MasterTest, SlaveLost)
 
 
 /* TODO(benh): Test lost slave due to missing heartbeats. */
+
+
+class FailoverScheduler : public Scheduler
+{
+public:
+  bool registeredCalled;
+  
+  FailoverScheduler() : registeredCalled(false) {}
+
+  virtual ~FailoverScheduler() {}
+
+  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+    return ExecutorInfo("noexecutor", "");
+  }
+
+  virtual void registered(SchedulerDriver *d, FrameworkID fid) {
+    LOG(INFO) << "FailoverScheduler registered";
+    registeredCalled = true;
+    d->stop();
+  }
+};
+
+
+class FailingScheduler : public Scheduler
+{
+public:
+  FailoverScheduler *failoverSched;
+  PID master;
+  NexusSchedulerDriver *driver;
+  string errorMessage;
+
+  FailingScheduler(FailoverScheduler *_failoverSched, const PID &_master)
+    : failoverSched(_failoverSched), master(_master) {}
+
+  virtual ~FailingScheduler() {
+    delete driver;
+  }
+
+  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+    return ExecutorInfo("noexecutor", "");
+  }
+
+  virtual void registered(SchedulerDriver*, FrameworkID fid) {
+    LOG(INFO) << "FailingScheduler registered";
+    driver = new NexusSchedulerDriver(failoverSched, master, fid);
+    driver->start();
+  }
+
+  virtual void error(SchedulerDriver* d,
+                     int code,
+                     const std::string& message) {
+    errorMessage = message;
+    d->stop();
+  }
+};
+
+
+TEST(MasterTest, SchedulerFailover)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+
+  FailoverScheduler failoverSched;
+  FailingScheduler failingSched(&failoverSched, master);
+
+  NexusSchedulerDriver driver(&failingSched, master);
+  driver.run();
+
+  EXPECT_EQ("Failover", failingSched.errorMessage);
+
+  failingSched.driver->join();
+
+  EXPECT_TRUE(failoverSched.registeredCalled);
+
+  local::shutdown();
+}

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun  5 05:34:18 2011
@@ -161,7 +161,7 @@ void ReliableProcess::rsend(const PID &t
 }
 
 
-MSGID ReliableProcess::receive(time_t secs)
+MSGID ReliableProcess::receive(double secs)
 {
   // Record sequence number for current (now old) _reliable_ message
   // and also free the message.

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun  5 05:34:18 2011
@@ -22,7 +22,7 @@ class ReliableProcess : public Process
 {
 public:
   ReliableProcess();
-  ~ReliableProcess();
+  virtual ~ReliableProcess();
 
 protected:
   /**
@@ -74,7 +74,7 @@ protected:
   virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
 
   /* Blocks for message at most specified seconds. */
-  virtual MSGID receive(time_t);
+  virtual MSGID receive(double secs);
 
   /**
    * Redirect unacknolwedged messages to be sent to a different PID.

Modified: incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp Sun Jun  5 05:34:18 2011
@@ -9,6 +9,12 @@ namespace process { namespace serializat
 
 /* TODO(*): Check stream errors! Report errors! Ahhhh! */
 
+void serializer::operator & (const bool &b)
+{
+  stream.put(b ? 1 : 0);
+}
+
+
 void serializer::operator & (const int32_t &i)
 {
   uint32_t netInt = htonl((uint32_t) i);
@@ -52,6 +58,12 @@ void serializer::operator & (const PID &
 }
 
 
+void deserializer::operator & (bool &b)
+{
+  b = stream.get();
+}
+
+
 void deserializer::operator & (int32_t &i)
 {
   uint32_t netInt;

Modified: incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp Sun Jun  5 05:34:18 2011
@@ -14,6 +14,7 @@ struct serializer
 
   serializer(std::ostringstream& s) : stream(s) {}
 
+  void operator & (const bool &);
   void operator & (const int32_t &);
   void operator & (const int64_t &);
   void operator & (const size_t &);
@@ -27,6 +28,7 @@ struct deserializer
 
   deserializer(std::istringstream &s) : stream(s) {}
 
+  void operator & (bool &);
   void operator & (int32_t &);
   void operator & (int64_t &);
   void operator & (size_t &);

Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun  5 05:34:18 2011
@@ -171,7 +171,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -186,7 +186,7 @@ protected:
     const char *data;
     size_t length;
     
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -204,7 +204,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -223,7 +223,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -244,7 +244,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -267,7 +267,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -292,7 +292,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -319,7 +319,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -348,7 +348,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -379,7 +379,7 @@ protected:
     const char *data;
     size_t length;
 
-    data = Process::body(&length);
+    data = P::body(&length);
     std::string s(data, length);
     std::istringstream is(s);
     process::serialization::deserializer d(is);
@@ -455,7 +455,7 @@ protected:
 
     std::string data = os.str();
 
-    Process::send(to, ID, data.data(), data.size());
+    P::send(to, ID, data.data(), data.size());
   }
 
   template <MSGID ID>
@@ -585,7 +585,7 @@ protected:
 
   MSGID receive(double secs)
   {
-    return Process::receive(secs);
+    return P::receive(secs);
   }
 
   template <MSGID ID>

Modified: incubator/mesos/trunk/src/url_processor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/url_processor.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/url_processor.cpp (original)
+++ incubator/mesos/trunk/src/url_processor.cpp Sun Jun  5 05:34:18 2011
@@ -53,8 +53,7 @@ pair<UrlProcessor::URLType, string> UrlP
 
   } else {
 
-    LOG(WARNING) << "Could not parse master/zoo URL";
-    return pair<UrlProcessor::URLType, string>(UrlProcessor::UNKNOWN, "");
+    return pair<UrlProcessor::URLType, string>(UrlProcessor::UNKNOWN, url);
 
   }
 }

Modified: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun  5 05:34:18 2011
@@ -7,9 +7,7 @@
  *
  * To provide for varying underlying implementations the pimpl idiom
  * (also known as the compiler-firewall, bridge pattern, etc) was used
- * for the ZooKeeper class. While the Watcher class may need some to
- * support varying underlying implementation details, we choose for
- * now to keep it a cleaner interface.
+ * for the ZooKeeper class.
  *
  * Author: Benjamin Hindman <be...@berkeley.edu>
 */