You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:00:36 UTC

svn commit: r1131711 - in /incubator/mesos/trunk/src: Makefile.in master.cpp slave.cpp third_party/libprocess/process.cpp third_party/libprocess/process.hpp third_party/libprocess/tuple-impl.hpp zookeeper.cpp zookeeper.hpp

Author: benh
Date: Sun Jun  5 05:00:35 2011
New Revision: 1131711

URL: http://svn.apache.org/viewvc?rev=1131711&view=rev
Log:
Updates to keep ZooKeeper (libprocess timing bug) and added support for passing doubles instead of time_t to libprocess functions.

Modified:
    incubator/mesos/trunk/src/Makefile.in
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/third_party/libprocess/process.cpp
    incubator/mesos/trunk/src/third_party/libprocess/process.hpp
    incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
    incubator/mesos/trunk/src/zookeeper.cpp
    incubator/mesos/trunk/src/zookeeper.hpp

Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun  5 05:00:35 2011
@@ -19,13 +19,13 @@ RUBY_HEADERS = @RUBY_HEADERS@
 WITH_WEBUI = @WITH_WEBUI@
 WEBUI_FLAGS = @WEBUI_FLAGS@
 
+WITH_ZOOKEEPER = @WITH_ZOOKEEPER@
+
 LIBEV = third_party/libprocess/third_party/libev-3.8
 
 GLOG = third_party/glog-0.3.0
 GTEST = third_party/gtest-1.4.0-patched
 
-ZOOKEEPER = third_party/zookeeper-3.3.0/src/c
-
 # Ensure that we get better debugging info.
 CFLAGS += -g
 CXXFLAGS += -g
@@ -47,16 +47,11 @@ CFLAGS += -Ithird_party/libprocess
 CXXFLAGS += -Ithird_party/libprocess
 LDFLAGS += -Lthird_party/libprocess
 
-# Add libev to LDFLAGS.
-LDFLAGS += -L$(LIBEV)/.libs
-
-# Add glog and gtest and zookeeper to include and lib paths
+# Add glog and gtest to include and lib paths
 CXXFLAGS += -I$(GLOG)/src
 CXXFLAGS += -I$(GTEST)/include
-CXXFLAGS += -I$(ZOOKEEPER)/include -I$(ZOOKEEPER)/generated -DTHREADED
 LDFLAGS += -L$(GLOG)/.libs
 LDFLAGS += -L$(GTEST)/lib/.libs
-LDFLAGS += -L$(ZOOKEEPER)/.libs
 
 # Add dependency tracking to CFLAGS, CXXFLAGS.
 CFLAGS += -MMD -MP
@@ -70,8 +65,16 @@ CXXFLAGS += -DBUILD_DATE="\"$$(date '+%Y
 CFLAGS += -DBUILD_USER="\"$$USER\""
 CXXFLAGS += -DBUILD_USER="\"$$USER\""
 
+# Add libev to LDFLAGS.
+LDFLAGS += -L$(LIBEV)/.libs
+
 # Add libev, libprocess, pthread, and dl to LIBS.
-LIBS += -lglog -lgtest -lprocess -lev -lpthread -ldl -lzookeeper_mt
+LIBS += -lglog -lgtest -lprocess -lev -lpthread -ldl
+
+# Add zookeeper_st to LIBS if necessary.
+ifeq ($(WITH_ZOOKEEPER),1)
+  LIBS += -lzookeeper_st
+endif
 
 NEXUS_EXES = nexus-master nexus-slave nexus-local nexus-launcher alltests \
 	     test-framework test-executor cpp-test-framework cpp-test-executor \
@@ -92,7 +95,7 @@ NEXUS_LIBS = $(SCHED_LIB) $(EXEC_LIB) $(
 MASTER_OBJ = master.o allocator_factory.o simple_allocator.o
 SLAVE_OBJ = slave.o launcher.o isolation_module_factory.o \
 	    process_based_isolation_module.o
-COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o leader_detector.o url_processor.o ft_messaging.o
+COMMON_OBJ = fatal.o hash_pid.o lock.o messages.o
 EXEC_LIB_OBJ = nexus_exec.o
 SCHED_LIB_OBJ = nexus_sched.o nexus_local.o params.o
 TEST_OBJ = tests/main.o tests/test_master.o tests/test_resources.o
@@ -105,6 +108,11 @@ ifeq ($(OS_NAME),linux)
   SLAVE_OBJ += lxc_isolation_module.o
 endif
 
+# Add zookeeper.cpp if necessary.
+ifeq ($(WITH_ZOOKEEPER),1)
+  COMMON_OBJ += zookeeper.o
+endif
+
 ALL_OBJ = $(MASTER_OBJ) $(SLAVE_OBJ) $(COMMON_OBJ) \
 	  $(SCHED_LIB_OBJ) $(EXEC_LIB_OBJ) $(TEST_OBJ)
 
@@ -133,7 +141,7 @@ $(NEXUS_LIBS): $(COMMON_OBJ) third_party
 $(NEXUS_EXES): $(COMMON_OBJ) third_party/libprocess/libprocess.a
 
 $(SCHED_LIB): $(SCHED_LIB_OBJ) $(MASTER_OBJ) $(SLAVE_OBJ) $(COMMON_OBJ)
-	$(AR) rcs $@ $^ third_party/libprocess/libprocess.a $(GLOG)/.libs/libglog.a $(LIBEV)/.libs/libev.a $(ZOOKEEPER)/.libs/libzookeeper_mt.a
+	$(AR) rcs $@ $^ third_party/libprocess/libprocess.a $(GLOG)/.libs/libglog.a $(LIBEV)/.libs/libev.a
 
 $(EXEC_LIB): $(EXEC_LIB_OBJ) $(COMMON_OBJ)
 	$(AR) rcs $@ $^
@@ -252,7 +260,6 @@ third_party:
 	$(MAKE) -C third_party/libprocess
 	$(MAKE) -C $(GLOG)
 	$(MAKE) -C $(GTEST)
-	$(MAKE) -C $(ZOOKEEPER)
 
 all: third_party $(NEXUS_LIBS) $(NEXUS_EXES) java python ruby
 
@@ -260,7 +267,6 @@ clean:
 	$(MAKE) -C third_party/libprocess clean
 	$(MAKE) -C $(GLOG) clean
 	$(MAKE) -C $(GTEST) clean
-	$(MAKE) -C $(ZOOKEEPER) clean
 	rm -f $(patsubst %.o, %.d, $(ALL_OBJ))
 	rm -f $(ALL_OBJ)
 	rm -f $(NEXUS_LIBS)
@@ -276,11 +282,21 @@ clean:
 	rm -f swig/java/*.class
 	rm -f swig/python/nexus.py
 	rm -f swig/python/nexus.pyc
+	rm -f master_webui.o
+	rm -f master_webui.d
 	rm -f webui/master/swig/master_wrap.h
 	rm -f webui/master/swig/master_wrap.cpp
 	rm -f webui/master/swig/master_wrap.o
+	rm -f webui/master/swig/master_wrap.d
 	rm -f webui/master/swig/master.py
 	rm -f webui/master/swig/master.pyc
-	rm -f TAGS
+	rm -f slave_webui.o
+	rm -f slave_webui.d
+	rm -f webui/slave/swig/slave_wrap.h
+	rm -f webui/slave/swig/slave_wrap.cpp
+	rm -f webui/slave/swig/slave_wrap.o
+	rm -f webui/slave/swig/slave_wrap.d
+	rm -f webui/slave/swig/slave.py
+	rm -f webui/slave/swig/slave.pyc
 
 .PHONY: default third_party all clean java python ruby test

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:00:35 2011
@@ -1,11 +1,15 @@
-#include "allocator.hpp"
-#include "master.hpp"
-#include "master_webui.hpp"
-#include "allocator_factory.hpp"
-#include "ft_messaging.hpp"
+#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
 
 #include <glog/logging.h>
 
+#ifdef USING_ZOOKEEPER
+#include <zookeeper.hpp>
+#endif
+
+#include "allocator.hpp"
+#include "allocator_factory.hpp"
+#include "master.hpp"
+#include "master_webui.hpp"
 
 using std::endl;
 using std::max;
@@ -27,8 +31,75 @@ using namespace nexus::internal;
 using namespace nexus::internal::master;
 
 
+/* List of ZooKeeper host:port pairs (from master_main.cpp/local.cpp). */
+extern string zookeeper;
+
 namespace {
 
+#ifdef USING_ZOOKEEPER
+class MasterWatcher : public Watcher
+{
+private:
+  Master *master;
+
+public:
+  void process(ZooKeeper *zk, int type, int state, const string &path)
+  {
+    // Connected!
+    if ((state == ZOO_CONNECTED_STATE) &&
+	(type == ZOO_SESSION_EVENT)) {
+      // Create znode for master identification.
+      string znode = "/home/nexus/master";
+      string dirname = "/home/nexus";
+      string delimiter = "/";
+      string contents = "";
+
+      int ret;
+      string result;
+
+      // Create directory path znodes as necessary.
+      size_t index = dirname.find(delimiter, 0);
+      while (index < string::npos) {
+	index = dirname.find(delimiter, index+1);
+	string prefix = dirname.substr(0, index);
+	ret = zk->create(prefix, contents, ZOO_CREATOR_ALL_ACL,
+			 0, &result);
+	if (ret != ZOK && ret != ZNODEEXISTS)
+	  fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+      }
+
+      // Now create znode.
+      ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
+		       ZOO_EPHEMERAL, &result);
+
+      // If node already exists, update its value.
+      if (ret == ZNODEEXISTS)
+	ret = zk->set(znode, master->getPID(), -1);
+
+      if (ret != ZOK)
+	fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+    } else if ((state == ZOO_EXPIRED_SESSION_STATE) &&
+	       (type == ZOO_SESSION_EVENT)) {
+      // TODO(benh): Reconnect if session expires. Note the Zookeeper
+      // C library retries in the case of connection timeouts,
+      // connection loss, etc. Only expired sessions require
+      // explicitly reconnecting.
+	fatal("connection to ZooKeeper expired!");
+    } else if ((state == ZOO_CONNECTING_STATE) &&
+	       (type == ZOO_SESSION_EVENT)) {
+      // The client library automatically reconnects, taking into
+      // account failed servers in the connection string,
+      // appropriately handling the "herd effect", etc.
+      LOG(INFO) << "Lost Zookeeper connection. Retrying (automagically).";
+    } else {
+      fatal("unhandled ZooKeeper event!");
+    }
+  }
+
+  MasterWatcher(Master *_master) : master(_master) {}
+};
+#endif
+
 // A process that periodically pings the master to check filter expiries, etc
 class AllocatorTimer : public Tuple<Process>
 {
@@ -111,50 +182,24 @@ public:
 }
 
 
-Master::Master(const string &zk)
-  : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0), 
-    nextSlotOfferId(0), allocatorType("simple"), masterId(0)
-{
-  if (zk != "") {
-    pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
-    if (urlPair.first == UrlProcessor::ZOO) {
-      isFT = true;
-      zkServers = urlPair.second;
-    } else {
-      LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
-      exit(1);
-    }
-  }
-  ftMsg = FTMessaging::getInstance();
-}
+Master::Master()
+  : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
+    allocatorType("simple")
+{}
 
 
-Master::Master(const string& _allocatorType, const string &zk)
-  : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0), 
-    nextSlotOfferId(0), allocatorType(_allocatorType), masterId(0)
-{
-  if (zk != "") {
-    pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
-    if (urlPair.first == UrlProcessor::ZOO) {
-      isFT = true;
-      zkServers = urlPair.second;
-    } else {
-      LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
-      exit(1);
-    }
-  }
-  ftMsg = FTMessaging::getInstance();
-}
+Master::Master(const string& _allocatorType)
+  : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
+    allocatorType(_allocatorType)
+{}
                    
 
 Master::~Master()
 {
-  if (isFT && leaderDetector != NULL)
-    delete leaderDetector;
   LOG(INFO) << "Shutting down master";
   delete allocator;
   foreachpair (_, Framework *framework, frameworks) {
-    foreachpair(_, TaskInfo *task, framework->tasks)
+    foreachpair(_, Task *task, framework->tasks)
       delete task;
     delete framework;
   }
@@ -172,7 +217,7 @@ state::MasterState * Master::getState()
   std::ostringstream oss;
   oss << self();
   state::MasterState *state =
-    new state::MasterState(BUILD_DATE, BUILD_USER, oss.str(), isFT);
+    new state::MasterState(BUILD_DATE, BUILD_USER, oss.str());
 
   foreachpair (_, Slave *s, slaves) {
     state::Slave *slave = new state::Slave(s->id, s->hostname, s->publicDns,
@@ -185,7 +230,7 @@ state::MasterState * Master::getState()
        f->executorInfo.uri, f->resources.cpus, f->resources.mem,
        f->connectTime);
     state->frameworks.push_back(framework);
-    foreachpair (_, TaskInfo *t, f->tasks) {
+    foreachpair (_, Task *t, f->tasks) {
       state::Task *task = new state::Task(t->id, t->name, t->frameworkId,
           t->slaveId, t->state, t->resources.cpus, t->resources.mem);
       framework->tasks.push_back(task);
@@ -259,43 +304,10 @@ SlotOffer * Master::lookupSlotOffer(Offe
     return NULL;
 }
 
-void Master::updateFrameworkTasks() {
-  foreachpair (SlaveID sid, Slave *slave, slaves) {
-    foreachpair (_, TaskInfo *task, slave->tasks) {
-      updateFrameworkTasks(task);
-    }
-  }
-}
-
-void Master::updateFrameworkTasks(TaskInfo *task) {
-  Framework *fwrk = lookupFramework(task->frameworkId);
-  if (fwrk != NULL) {
-    if (fwrk->tasks.find(task->id) == fwrk->tasks.end()) {
-      fwrk->tasks[task->id] = task;
-      fwrk->resources += task->resources;
-    }
-  }
-}
-
 
 void Master::operator () ()
 {
-  LOG(INFO) << "Master started at nexus://" << self();
-
-  if (isFT) {
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-    ostringstream lpid;
-    lpid << self();
-    leaderDetector = new LeaderDetector(zkServers, true, lpid.str());
-    
-    string myLeaderSeq = leaderDetector->getMySeq();
-    if (myLeaderSeq == "") {
-      LOG(FATAL) << "Cannot proceed since new FT master sequence number could not be fetched from ZK.";
-      exit(1);
-    }
-    masterId = lexical_cast<long>(myLeaderSeq);
-    LOG(INFO) << "Master ID:" << masterId;
-  }
+  LOG(INFO) << "Master started at " << self();
 
   allocator = createAllocator();
   if (!allocator)
@@ -304,16 +316,21 @@ void Master::operator () ()
   link(spawn(new AllocatorTimer(self())));
   //link(spawn(new SharesPrinter(self())));
 
+#ifdef USING_ZOOKEEPER
+  ZooKeeper *zk;
+  if (!zookeeper.empty())
+    zk = new ZooKeeper(zookeeper, 10000, new MasterWatcher(this));
+#endif
+
   while (true) {
     switch (receive()) {
 
     case F2M_REGISTER_FRAMEWORK: {
-      FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
-
+      FrameworkID fid = nextFrameworkId++;
       Framework *framework = new Framework(from(), fid);
       unpack<F2M_REGISTER_FRAMEWORK>(framework->name,
-				     framework->user,
-				     framework->executorInfo);
+                                     framework->user,
+                                     framework->executorInfo);
       LOG(INFO) << "Registering " << framework << " at " << framework->pid;
       frameworks[fid] = framework;
       pidToFid[framework->pid] = fid;
@@ -325,78 +342,13 @@ void Master::operator () ()
       break;
     }
 
-    case F2M_REREGISTER_FRAMEWORK: {
-
-      Framework *framework = new Framework(from());
-      unpack<F2M_REREGISTER_FRAMEWORK>(framework->id,
-                                       framework->name,
-                                       framework->user,
-                                       framework->executorInfo);
-
-      if (framework->id == "") {
-        DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
-        framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
-      }
-
-      LOG(INFO) << "Registering " << framework << " at " << framework->pid;
-      frameworks[framework->id] = framework;
-      pidToFid[framework->pid] = framework->id;
-
-      updateFrameworkTasks();
-
-      link(framework->pid);
-      send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
-      allocator->frameworkAdded(framework);
-      if (framework->executorInfo.uri == "")
-        terminateFramework(framework, 1, "No executor URI given");
-
-       timeval tv;
-       gettimeofday(&tv, NULL);
-       
-       DLOG(INFO) << tv.tv_sec << "." << tv.tv_usec << " STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
-
-      break;
-    }
-
     case F2M_UNREGISTER_FRAMEWORK: {
       FrameworkID fid;
       unpack<F2M_UNREGISTER_FRAMEWORK>(fid);
       LOG(INFO) << "Asked to unregister framework " << fid;
       Framework *framework = lookupFramework(fid);
       if (framework != NULL)
-	removeFramework(framework);
-      break;
-    }
-
-    case F2M_FT_SLOT_OFFER_REPLY: {
-      FrameworkID fid;
-      OfferID oid;
-      vector<TaskDescription> tasks;
-      Params params;
-      string ftId, senderStr;
-      unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
-      PID senderPid;
-      istringstream ss(senderStr);
-      ss >> senderPid;
-      if (!ftMsg->acceptMessageAckTo(senderPid, ftId, senderStr)) {
-        LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
-        break;
-      } 
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL) {
-	SlotOffer *offer = lookupSlotOffer(oid);
-	if (offer != NULL) {
-	  processOfferReply(offer, tasks, params);
-	} else {
-	  // The slot offer is gone, meaning that we rescinded it or that
-	  // the slave was lost; immediately report any tasks in it as lost
-	  foreach (TaskDescription &t, tasks) {
-	    send(framework->pid,
-		 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
-	  }
-	}
-      } else
-        DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
+        removeFramework(framework);
       break;
     }
 
@@ -408,17 +360,17 @@ void Master::operator () ()
       unpack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-	SlotOffer *offer = lookupSlotOffer(oid);
-	if (offer != NULL) {
-	  processOfferReply(offer, tasks, params);
-	} else {
-	  // The slot offer is gone, meaning that we rescinded it or that
-	  // the slave was lost; immediately report any tasks in it as lost
-	  foreach (TaskDescription &t, tasks) {
-	    send(framework->pid,
-		 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
-	  }
-	}
+        SlotOffer *offer = lookupSlotOffer(oid);
+        if (offer != NULL) {
+          processOfferReply(offer, tasks, params);
+        } else {
+          // The slot offer is gone, meaning that we rescinded it or that
+          // the slave was lost; immediately report any tasks in it as lost
+          foreach (TaskDescription &t, tasks) {
+            send(framework->pid,
+                 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+          }
+        }
       }
       break;
     }
@@ -428,9 +380,9 @@ void Master::operator () ()
       unpack<F2M_REVIVE_OFFERS>(fid);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-	LOG(INFO) << "Reviving offers for " << framework;
-	framework->slaveFilter.clear();
-	allocator->offersRevived(framework);
+        LOG(INFO) << "Reviving offers for " << framework;
+        framework->slaveFilter.clear();
+        allocator->offersRevived(framework);
       }
       break;
     }
@@ -441,39 +393,34 @@ void Master::operator () ()
       unpack<F2M_KILL_TASK>(fid, tid);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-	TaskInfo *task = framework->lookupTask(tid);
-	if (task != NULL) {
-	  LOG(INFO) << "Asked to kill " << task << " by its framework";
-	  killTask(task);
-	}
+        Task *task = framework->lookupTask(tid);
+        if (task != NULL) {
+          LOG(INFO) << "Asked to kill " << task << " by its framework";
+          killTask(task);
+        }
       }
       break;
     }
 
-    case F2M_FT_FRAMEWORK_MESSAGE: {
+    case F2M_FRAMEWORK_MESSAGE: {
       FrameworkID fid;
       FrameworkMessage message;
-      string ftId, senderStr;
-      unpack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message);
+      unpack<F2M_FRAMEWORK_MESSAGE>(fid, message);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-	Slave *slave = lookupSlave(message.slaveId);
-	if (slave != NULL) {
-	  LOG(INFO) << "Sending framework message to " << slave;
-	  send(slave->pid, pack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message));
-        } else
-          DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
-      } else
-        DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << message.slaveId;
+        Slave *slave = lookupSlave(message.slaveId);
+        if (slave != NULL) {
+          LOG(INFO) << "Sending framework message to " << slave;
+          send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
+        }
+      }
       break;
     }
 
     case S2M_REGISTER_SLAVE: {
-      string slaveId = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
-
-      Slave *slave = new Slave(from(), slaveId);
+      Slave *slave = new Slave(from(), nextSlaveId++);
       unpack<S2M_REGISTER_SLAVE>(slave->hostname, slave->publicDns,
-	  slave->resources);
+          slave->resources);
       LOG(INFO) << "Registering " << slave << " at " << slave->pid;
       slaves[slave->id] = slave;
       pidToSid[slave->pid] = slave->id;
@@ -483,73 +430,29 @@ void Master::operator () ()
       break;
     }
 
-    case S2M_REREGISTER_SLAVE: {
-      Slave *slave = new Slave(from());
-      vector<TaskInfo> taskVec;
-
-      unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
-      				   slave->resources, taskVec);
-
-      if (slave->id == "") {
-        slave->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
-        DLOG(WARNING) << "Slave re-registered without a SlaveID, generating a new id for it.";
-      }
-
-      foreach(TaskInfo &ti, taskVec) {
-        TaskInfo *tip = new TaskInfo(ti);
-	slave->addTask(tip);
-        updateFrameworkTasks(tip);
-      }
-  
-     //alibandali
-      LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
-      slaves[slave->id] = slave;
-      pidToSid[slave->pid] = slave->id;
-      link(slave->pid);
-      send(slave->pid, pack<M2S_REREGISTER_REPLY>(slave->id));
-      allocator->slaveAdded(slave);
-
-      timeval tv;
-      gettimeofday(&tv, NULL);
-       
-      DLOG(INFO) << tv.tv_sec << "." << tv.tv_usec << " STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
-       
-      break;
-    }
-
     case S2M_UNREGISTER_SLAVE: {
       SlaveID sid;
       unpack<S2M_UNREGISTER_SLAVE>(sid);
       LOG(INFO) << "Asked to unregister slave " << sid;
       Slave *slave = lookupSlave(sid);
       if (slave != NULL)
-	removeSlave(slave);
+        removeSlave(slave);
       break;
     }
 
-    case S2M_FT_STATUS_UPDATE: {
+    case S2M_STATUS_UPDATE: {
       SlaveID sid;
       FrameworkID fid;
       TaskID tid;
       TaskState state;
       string data;
-      string ftId, senderStr;
-
-      unpack<S2M_FT_STATUS_UPDATE>(ftId, senderStr, sid, fid, tid, state, data);
-      DLOG(INFO) << "FT: prepare relay ftId:"<< ftId << " from: "<< senderStr;
+      unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
       if (Slave *slave = lookupSlave(sid)) {
-	if (Framework *framework = lookupFramework(fid)) {
-	  // Pass on the status update to the framework
-
-          DLOG(INFO) << "FT: relaying ftId:"<< ftId << " from: "<< senderStr;
-          send(framework->pid, pack<M2F_FT_STATUS_UPDATE>(ftId, senderStr, tid, state, data));
-
-          if (!ftMsg->acceptMessage(ftId, senderStr)) {
-            LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
-            break;
-          } 
+        if (Framework *framework = lookupFramework(fid)) {
+          // Pass on the status update to the framework
+          send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
           // Update the task state locally
-          TaskInfo *task = slave->lookupTask(fid, tid);
+          Task *task = slave->lookupTask(fid, tid);
           if (task != NULL) {
             LOG(INFO) << "Status update: " << task << " is in state " << state;
             task->state = state;
@@ -560,65 +463,11 @@ void Master::operator () ()
               removeTask(task, TRR_TASK_ENDED);
             }
           }
-	} else
-          DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
-      } else 
-        DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
-      
-      break;
-    }
-
-    case S2M_STATUS_UPDATE: {
-      SlaveID sid;
-      FrameworkID fid;
-      TaskID tid;
-      TaskState state;
-      string data;
-      unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
-      if (Slave *slave = lookupSlave(sid)) {
-	if (Framework *framework = lookupFramework(fid)) {
-	  // Pass on the status update to the framework
-	  send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
-	  // Update the task state locally
-	  TaskInfo *task = slave->lookupTask(fid, tid);
-	  if (task != NULL) {
-	    LOG(INFO) << "Status update: " << task << " is in state " << state;
-	    task->state = state;
-	    // Remove the task if it finished or failed
-	    if (state == TASK_FINISHED || state == TASK_FAILED ||
-		state == TASK_KILLED || state == TASK_LOST) {
-	      LOG(INFO) << "Removing " << task << " because it's done";
-	      removeTask(task, TRR_TASK_ENDED);
-	    }
-	  }
-	} else
-          DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
-      } else
-        DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
-      break;
+        }
+        break;
+      }
     }
       
-    case S2M_FT_FRAMEWORK_MESSAGE: {
-      SlaveID sid;
-      FrameworkID fid;
-      FrameworkMessage message; 
-      string ftId, senderStr;
-      unpack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, sid, fid, message);
-      Slave *slave = lookupSlave(sid);
-      if (slave != NULL) {
-	Framework *framework = lookupFramework(fid);
-	if (framework != NULL) {
-
-	  send(framework->pid, pack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, message));
-
-        } else
-          DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
-      } else
-        DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << sid;
-
-      break;
-    }
-
     case S2M_FRAMEWORK_MESSAGE: {
       SlaveID sid;
       FrameworkID fid;
@@ -626,9 +475,9 @@ void Master::operator () ()
       unpack<S2M_FRAMEWORK_MESSAGE>(sid, fid, message);
       Slave *slave = lookupSlave(sid);
       if (slave != NULL) {
-	Framework *framework = lookupFramework(fid);
-	if (framework != NULL)
-	  send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+        Framework *framework = lookupFramework(fid);
+        if (framework != NULL)
+          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
       }
       break;
     }
@@ -640,18 +489,18 @@ void Master::operator () ()
       unpack<S2M_LOST_EXECUTOR>(sid, fid, status);
       Slave *slave = lookupSlave(sid);
       if (slave != NULL) {
-	Framework *framework = lookupFramework(fid);
-	if (framework != NULL) {
-	  ostringstream oss;
-	  if (status == -1) {
-	    oss << "Executor on " << slave << " (" << slave->hostname
-		<< ") disconnected";
-	  } else {
-	    oss << "Executor on " << slave << " (" << slave->hostname
-		<< ") exited with status " << status;
-	  }
-	  terminateFramework(framework, status, oss.str());
-	}
+        Framework *framework = lookupFramework(fid);
+        if (framework != NULL) {
+          ostringstream oss;
+          if (status == -1) {
+            oss << "Executor on " << slave << " (" << slave->hostname
+                << ") disconnected";
+          } else {
+            oss << "Executor on " << slave << " (" << slave->hostname
+                << ") exited with status " << status;
+          }
+          terminateFramework(framework, status, oss.str());
+        }
       }
       break;
     }
@@ -661,43 +510,19 @@ void Master::operator () ()
       unpack<SH2M_HEARTBEAT>(sid);
       Slave *slave = lookupSlave(sid);
       if (slave != NULL)
-	//LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
-	;
+        //LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
+        ;
       else
-	LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
-		     << " from " << from();
+        LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
+                     << " from " << from();
       break;
     }
 
     case M2M_TIMER_TICK: {
       LOG(INFO) << "Allocator timer tick";
       foreachpair (_, Framework *framework, frameworks)
-	framework->removeExpiredFilters();
+        framework->removeExpiredFilters();
       allocator->timerTick();
-
-      // int cnts = 0;
-      // foreachpair(_, Framework *framework, frameworks) {
-      // 	DLOG(INFO) << (cnts++) << " resourceInUse:" << framework->resources;
-      // }
-      break;
-    }
-
-    case FT_RELAY_ACK: {
-      string ftId;
-      string origPidStr;
-      unpack<FT_RELAY_ACK>(ftId, origPidStr);
-      
-      DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " forwarding it to " << origPidStr;
-            
-      PID origPid;
-      istringstream iss(origPidStr);
-      if (!(iss >> origPid)) {
-        cerr << "FT: Failed to resolve PID for originator: " << origPidStr << endl;
-        break;
-      }
-
-      send(origPid, pack<FT_RELAY_ACK>(ftId, origPidStr));
-
       break;
     }
 
@@ -742,8 +567,7 @@ void Master::operator () ()
 OfferID Master::makeOffer(Framework *framework,
                           const vector<SlaveResources>& resources)
 {
-  OfferID oid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlotOfferId++);
-
+  OfferID oid = nextSlotOfferId++;
   SlotOffer *offer = new SlotOffer(oid, framework->id, resources);
   slotOffers[offer->id] = offer;
   framework->addOffer(offer);
@@ -757,7 +581,7 @@ OfferID Master::makeOffer(Framework *fra
     Params params;
     params.set("cpus", r.resources.cpus);
     params.set("mem", r.resources.mem);
-    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap(), r.slave->pid);
+    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
     offers.push_back(offer);
   }
   send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers));
@@ -856,13 +680,13 @@ void Master::launchTask(Framework *f, co
   Resources res(params.getInt32("cpus", -1),
                 params.getInt64("mem", -1));
   Slave *slave = lookupSlave(t.slaveId);
-  TaskInfo *task = f->addTask(t.taskId, t.name, slave->id, res);
+  Task *task = f->addTask(t.taskId, t.name, slave->id, res);
   LOG(INFO) << "Launching " << task << " on " << slave;
   slave->addTask(task);
   allocator->taskAdded(task);
   send(slave->pid, pack<M2S_RUN_TASK>(
         f->id, t.taskId, f->name, f->user, f->executorInfo,
-        t.name, t.arg, t.params, (string)f->pid));
+        t.name, t.arg, t.params));
 }
 
 
@@ -872,7 +696,7 @@ void Master::rescindOffer(SlotOffer *off
 }
 
 
-void Master::killTask(TaskInfo *task)
+void Master::killTask(Task *task)
 {
   LOG(INFO) << "Killing " << task;
   Framework *framework = lookupFramework(task->frameworkId);
@@ -941,8 +765,8 @@ void Master::removeFramework(Framework *
     send(slave->pid, pack<M2S_KILL_FRAMEWORK>(framework->id));
 
   // Remove pointers to the framework's tasks in slaves
-  unordered_map<TaskID, TaskInfo *> tasksCopy = framework->tasks;
-  foreachpair (_, TaskInfo *task, tasksCopy) {
+  unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
+  foreachpair (_, Task *task, tasksCopy) {
     Slave *slave = lookupSlave(task->slaveId);
     CHECK(slave != NULL);
     removeTask(task, TRR_FRAMEWORK_LOST);
@@ -971,8 +795,8 @@ void Master::removeSlave(Slave *slave)
   // TODO: Notify allocator that a slave removal is beginning?
   
   // Remove pointers to slave's tasks in frameworks, and send status updates
-  unordered_map<pair<FrameworkID, TaskID>, TaskInfo *> tasksCopy = slave->tasks;
-  foreachpair (_, TaskInfo *task, tasksCopy) {
+  unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
+  foreachpair (_, Task *task, tasksCopy) {
     Framework *framework = lookupFramework(task->frameworkId);
     CHECK(framework != NULL);
     send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
@@ -1013,7 +837,7 @@ void Master::removeSlave(Slave *slave)
 
 
 // Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(TaskInfo *task, TaskRemovalReason reason)
+void Master::removeTask(Task *task, TaskRemovalReason reason)
 {
   Framework *framework = lookupFramework(task->frameworkId);
   Slave *slave = lookupSlave(task->slaveId);

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 05:00:35 2011
@@ -1,17 +1,19 @@
+#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
+
 #include <getopt.h>
 
-#include <fstream>
-#include <algorithm>
+#ifdef USING_ZOOKEEPER
+#include <zookeeper.hpp>
+#endif
+
+#include "isolation_module_factory.hpp"
 #include "slave.hpp"
 #include "slave_webui.hpp"
-#include "isolation_module_factory.hpp"
-#include "url_processor.hpp"
-
-#define FT_TIMEOUT 10
 
 using std::list;
 using std::make_pair;
 using std::ostringstream;
+using std::istringstream;
 using std::pair;
 using std::queue;
 using std::string;
@@ -25,13 +27,67 @@ using namespace nexus;
 using namespace nexus::internal;
 using namespace nexus::internal::slave;
 
+
 // There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
 #ifdef __sun__
 #define gethostbyname2(name, _) gethostbyname(name)
 #endif
 
+
+/* List of ZooKeeper host:port pairs (from slave_main.cpp/local.cpp). */
+extern string zookeeper;
+
 namespace {
 
+#ifdef USING_ZOOKEEPER
+class SlaveWatcher : public Watcher
+{
+private:
+  Slave *slave;
+
+public:
+  void process(ZooKeeper *zk, int type, int state, const string &path)
+  {
+    if ((state == ZOO_CONNECTED_STATE) &&
+	((type == ZOO_SESSION_EVENT) || (type == ZOO_CREATED_EVENT))) {
+      // Lookup master PID.
+      string znode = "/home/nexus/master";
+      int ret;
+
+      // Check if znode exists, if not, just return and wait.
+      ret = zk->exists(znode, true, NULL);
+
+      if (ret == ZNONODE)
+	return;
+
+      if (ret != ZOK)
+	fatal("failed to get %s! (%s)", znode.c_str(), zerror(ret));
+
+      string result;
+      ret = zk->get(znode, false, &result, NULL);
+    
+      if (ret != ZOK)
+	fatal("failed to get %s! (%s)", znode.c_str(), zerror(ret));
+
+      PID master;
+
+      istringstream iss(result);
+      if (!(iss >> master))
+	fatal("bad data at %s!", znode.c_str());
+
+      // TODO(benh): HACK! Don't just set field in slave instance!
+      slave->master = master;
+
+      Process::post(slave->getPID(), S2S_GOT_MASTER);
+    } else {
+      fatal("unhandled ZooKeeper event!");
+    }
+  }
+
+  SlaveWatcher(Slave *_slave) : slave(_slave) {}
+};
+#endif
+
 // Periodically sends heartbeats to the master
 class Heart : public Tuple<Process>
 {
@@ -63,49 +119,18 @@ public:
 } /* namespace */
 
 
-Slave::Slave(const string &_master, Resources _resources, bool _local)
-  : leaderDetector(NULL), 
-    resources(_resources), local(_local), id(""),
-    isolationType("process"), isolationModule(NULL), slaveLeaderListener(this, getPID())
-{
-  ftMsg = FTMessaging::getInstance();
-  pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
-  if (urlPair.first == UrlProcessor::ZOO) {
-    isFT = true;
-    zkServers = urlPair.second;
-  } else {
-    isFT = false;
-    istringstream iss(urlPair.second);
-    istringstream iss2(_master);
-    if (!((iss >> master) || (iss2 >> master) )) {
-      cerr << "Failed to resolve master PID " << urlPair.second << endl;
-      exit(1);
-    }
-  }
-}
+Slave::Slave(const PID &_master, Resources _resources, bool _local)
+  : master(_master), resources(_resources), local(_local), id(-1),
+    isolationType("process"), isolationModule(NULL)
+{}
 
 
-Slave::Slave(const string &_master, Resources _resources, bool _local,
-	     const string &_isolationType)
-  : leaderDetector(NULL), 
-    resources(_resources), local(_local), id(""),
-    isolationType(_isolationType), isolationModule(NULL), slaveLeaderListener(this, getPID())
-{
-  ftMsg = FTMessaging::getInstance();
-  pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
-  if (urlPair.first == UrlProcessor::ZOO) {
-    isFT = true;
-    zkServers = urlPair.second;
-  } else {
-    isFT = false;
-    istringstream iss(urlPair.second);
-    istringstream iss2(_master);
-    if (!((iss >> master) || (iss2 >> master) )) {
-      cerr << "Failed to resolve master PID " << urlPair.second << endl;
-      exit(1);
-    }
-  }
-}
+Slave::Slave(const PID &_master, Resources _resources, bool _local,
+    const string& _isolationType)
+  : master(_master), resources(_resources), local(_local), id(-1),
+    isolationType(_isolationType), isolationModule(NULL)
+{}
+
 
 Slave::~Slave() {}
 
@@ -125,7 +150,7 @@ state::SlaveState *Slave::getState()
         f->executorInfo.uri, f->executorStatus, f->resources.cpus,
         f->resources.mem);
     state->frameworks.push_back(framework);
-    foreachpair(_, TaskInfo *t, f->tasks) {
+    foreachpair(_, Task *t, f->tasks) {
       state::Task *task = new state::Task(t->id, t->name, t->state,
           t->resources.cpus, t->resources.mem);
       framework->tasks.push_back(task);
@@ -140,21 +165,6 @@ void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
 
-  if (isFT) {
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-    leaderDetector = new LeaderDetector(zkServers, false, "", NULL);
-    leaderDetector->setListener(&slaveLeaderListener); // use this instead of constructor to avoid race condition
-
-    string leaderPidStr = leaderDetector->getCurrentLeaderPID();
-    string leaderSeq = leaderDetector->getCurrentLeaderSeq();
-    LOG(INFO) << "Detected leader at " << leaderPidStr << " with ephemeral id:" << leaderSeq;
-
-    istringstream iss(leaderPidStr);
-    if (!(iss >> master)) {
-      cerr << "Failed to resolve master PID " << leaderPidStr << endl;
-    }    
-  }
-
   // Get our hostname
   char buf[256];
   gethostname(buf, sizeof(buf));
@@ -169,12 +179,13 @@ void Slave::operator () ()
     publicDns = hostname;
   }
 
-  LOG(INFO) << "Connecting to Nexus master at " << master;
-  link(master);
-
-  ftMsg->setMasterPid(master);
-
-  send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
+#ifdef USING_ZOOKEEPER
+  ZooKeeper *zk;
+  if (!zookeeper.empty())
+    zk = new ZooKeeper(zookeeper, 10000, new SlaveWatcher(this));
+#else
+  send(self(), pack<S2S_GOT_MASTER>());
+#endif
   
   FrameworkID fid;
   TaskID tid;
@@ -184,7 +195,14 @@ void Slave::operator () ()
   string data;
 
   while (true) {
-    switch (receive(FT_TIMEOUT)) {
+    switch (receive()) {
+      case S2S_GOT_MASTER: {
+	LOG(INFO) << "Connecting to Nexus master at " << master;
+	link(master);
+	send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
+	break;
+      }
+
       case M2S_REGISTER_REPLY: {
         unpack<M2S_REGISTER_REPLY>(this->id);
         LOG(INFO) << "Registered with master; given slave ID " << this->id;
@@ -195,22 +213,11 @@ void Slave::operator () ()
         break;
       }
       
-      case M2S_REREGISTER_REPLY: {
-        FrameworkID tmpfid;
-        unpack<M2S_REGISTER_REPLY>(tmpfid);
-        LOG(INFO) << "RE-registered with master; given slave ID " << tmpfid << " had "<< this->id;
-        if (this->id == "")
-          this->id = tmpfid;
-        link(spawn(new Heart(master, getPID(), this->id)));
-        break;
-      }
-      
       case M2S_RUN_TASK: {
-	FrameworkID fid;
-        string fwName, user, taskName, taskArg, fwPidStr;
+        string fwName, user, taskName, taskArg;
         ExecutorInfo execInfo;
         unpack<M2S_RUN_TASK>(fid, tid, fwName, user, execInfo,
-                             taskName, taskArg, params, fwPidStr);
+            taskName, taskArg, params);
         LOG(INFO) << "Got assigned task " << fid << ":" << tid;
         Resources res;
         res.cpus = params.getInt32("cpus", -1);
@@ -218,15 +225,12 @@ void Slave::operator () ()
         Framework *framework = getFramework(fid);
         if (framework == NULL) {
           // Framework not yet created on this node - create it
-          PID fwPid;
-          istringstream ss(fwPidStr);
-          ss >> fwPid;
-          framework = new Framework(fid, fwName, user, execInfo, fwPid);
+          framework = new Framework(fid, fwName, user, execInfo);
           frameworks[fid] = framework;
           isolationModule->frameworkAdded(framework);
           isolationModule->startExecutor(framework);
         }
-        TaskInfo *task = framework->addTask(tid, taskName, res);
+        Task *task = framework->addTask(tid, taskName, res);
         Executor *executor = getExecutor(fid);
         if (executor) {
           send(executor->pid,
@@ -256,24 +260,6 @@ void Slave::operator () ()
         break;
       }
 
-      case M2S_FT_FRAMEWORK_MESSAGE: {
-        string ftId, origPid;
-        unpack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, origPid, fid, message);
-
-        if (!ftMsg->acceptMessageAck(ftId, origPid))
-          break;
-
-        DLOG(INFO) << "FT: Received message with id: " << ftId;
-
-        if (Executor *ex = getExecutor(fid)) {
-          send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
-        }
-        // TODO(matei): If executor is not started, queue framework message?
-        // (It's probably okay to just drop it since frameworks can have
-        // the executor send a message to the master to say when it's ready.)
-        break;
-      }
-
       case M2S_FRAMEWORK_MESSAGE: {
         unpack<M2S_FRAMEWORK_MESSAGE>(fid, message);
         if (Executor *ex = getExecutor(fid)) {
@@ -333,11 +319,7 @@ void Slave::operator () ()
           }
         }
         // Pass on the update to the master
-        if (isFT) {
-          string ftId = ftMsg->getNextId();
-          ftMsg->reliableSend(ftId, pack<S2M_FT_STATUS_UPDATE>(ftId, self(), id, fid, tid, taskState, data));
-        } else
-          send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
+        send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
         break;
       }
 
@@ -345,14 +327,7 @@ void Slave::operator () ()
         unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
         // Set slave ID in case framework omitted it
         message.slaveId = this->id;
-        /*
-        if (isFT) {
-          string ftId = ftMsg->getNextId();
-          ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
-        } else
-          send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
-        */
-        send(getFramework(fid)->fwPid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+        send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
         break;
       }
 
@@ -366,16 +341,12 @@ void Slave::operator () ()
 
         if (from() == master) {
 	  // TODO: Fault tolerance!
-	   if (isFT)
-	     LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected."; 
-	   else 
-	     {
-		LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
-		if (isolationModule != NULL)
-		  delete isolationModule;
-		// TODO: Shut down executors?
-		return;
-	     }
+          LOG(ERROR) << "Master disconnected! Committing suicide ...";
+	  // TODO(matei): Add support for factory style destroy of objects!
+	  if (isolationModule != NULL)
+	    delete isolationModule;
+	  // TODO: Shut down executors?
+	  return;
 	}
 
         foreachpair (_, Executor *ex, executors) {
@@ -400,7 +371,6 @@ void Slave::operator () ()
         return;
       }
 
-
       case S2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by " << from();
 	// TODO(matei): Add support for factory style destroy of objects!
@@ -410,57 +380,6 @@ void Slave::operator () ()
         return;
       }
 
-      case LE_NEWLEADER: {
-        LOG(INFO) << "Slave got notified of new leader " << from();
-	string newLeader;
-        unpack<LE_NEWLEADER>(newLeader);
-	istringstream iss(newLeader);
-	if (!(iss >> master)) {
-	  cerr << "Failed to resolve master PID " << newLeader << endl;
-	  break;
-	}    
-	
-	LOG(INFO) << "Connecting to Nexus master at " << master;
-	link(master);
-
-        ftMsg->setMasterPid(master);
-
-	// reconstruct resourcesInUse for the master
-	// alig: do I need to include queuedTasks in this number? Don't think so.
-	Resources resourcesInUse; 
-	vector<TaskInfo> taskVec;
-
-	foreachpair(_, Framework *framework, frameworks) {
-	  foreachpair(_, TaskInfo *task, framework->tasks) {
-	    resourcesInUse += task->resources;
-	    TaskInfo ti = *task;
-	    ti.slaveId = id;
-	    taskVec.push_back(ti);
-	  }
-	}
-
-        if (id != "") // actual re-register
-          send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
-        else          // slave started before master
-          send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
-	
-	break;
-      }
-    
-      case FT_RELAY_ACK: {
-        string ftId, senderStr;
-        unpack<FT_RELAY_ACK>(ftId, senderStr);
-
-        DLOG(INFO) << "FT: got final ack for " << ftId;
-
-        ftMsg->gotAck(ftId);
-        break;
-      }
-      
-      case PROCESS_TIMEOUT: {
-        ftMsg->sendOutstanding();
-	break;
-      }
       default: {
         LOG(ERROR) << "Received unknown message ID " << msgid()
                    << " from " << from();

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun  5 05:00:35 2011
@@ -1234,7 +1234,7 @@ public:
   }
 
 
-  void receive(Process *process, time_t secs)
+  void receive(Process *process, double secs)
   {
     assert(process != NULL);
     if (secs > 0) {
@@ -1270,7 +1270,7 @@ public:
     }
   }
 #else
-  void receive(Process *process, time_t secs)
+  void receive(Process *process, double secs)
   {
     //cout << "ProcessManager::receive" << endl;
     assert(process != NULL);
@@ -1338,7 +1338,7 @@ public:
   }
 
 
-  void pause(Process *process, time_t secs)
+  void pause(Process *process, double secs)
   {
     assert(process != NULL);
 
@@ -1351,7 +1351,7 @@ public:
     }
   }
 #else
-  void pause(Process *process, time_t secs)
+  void pause(Process *process, double secs)
   {
     assert(process != NULL);
 
@@ -1883,8 +1883,10 @@ static void handle_async(struct ev_loop 
   {
     if (update_timer) {
       if (!timers->empty()) {
-	timer_watcher.repeat = timers->begin()->first - ev_now(loop) > 0 ? : 0;
-	if (timer_watcher.repeat == 0) {
+	timer_watcher.repeat = timers->begin()->first - ev_now(loop);
+	/* If timer has elapsed feed the event immediately. */
+	if (timer_watcher.repeat <= 0) {
+	  timer_watcher.repeat = 0;
 	  ev_feed_event(loop, &timer_watcher, EV_TIMEOUT);
 	} else {
 	  ev_timer_again(loop, &timer_watcher);
@@ -2961,7 +2963,7 @@ void Process::send(const PID &to, MSGID 
 }
 
 
-MSGID Process::receive(time_t secs)
+MSGID Process::receive(double secs)
 {
   //cout << "Process::receive(" << secs << ")" << endl;
   /* Free current message. */
@@ -3036,7 +3038,7 @@ MSGID Process::receive(time_t secs)
 
 
 MSGID Process::call(const PID &to, MSGID id,
-		    const char *data, size_t length, time_t secs)
+		    const char *data, size_t length, double secs)
 {
   send(to, id, data, length);
   return receive(secs);
@@ -3057,7 +3059,7 @@ const char * Process::body(size_t *lengt
 }
 
 
-void Process::pause(time_t secs)
+void Process::pause(double secs)
 {
 #ifdef USE_LITHE
   /* TODO(benh): Handle non-libprocess task/ctx (i.e., proc_thread below). */

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun  5 05:00:35 2011
@@ -177,7 +177,7 @@ protected:
   MSGID receive();
 
   /* Blocks for message at most specified seconds. */
-  MSGID receive(time_t);
+  MSGID receive(double sec);
 
   /* Sends a message to PID and then blocks for a message indefinitely. */
   MSGID call(const PID &, MSGID);
@@ -186,13 +186,13 @@ protected:
   MSGID call(const PID &, MSGID, const char *data, size_t length);
 
   /* Sends, and then blocks for a message at most specified seconds. */
-  MSGID call(const PID &, MSGID, const char *data, size_t length, time_t);
+  MSGID call(const PID &, MSGID, const char *data, size_t length, double secs);
 
   /* Returns pointer and length of body of last dequeued (current) message. */
   const char * body(size_t *length);
 
   /* Blocks at least specified seconds (may block longer). */
-  void pause(time_t);
+  void pause(double secs);
 
   /* Links with the specified PID. */
   PID link(const PID &);

Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun  5 05:00:35 2011
@@ -583,7 +583,7 @@ protected:
     return receive(0);
   }
 
-  MSGID receive(time_t secs)
+  MSGID receive(double secs)
   {
     return Process::receive(secs);
   }
@@ -613,7 +613,7 @@ protected:
   }
 
   template <MSGID ID>
-  MSGID call(const PID &to, const tuple<ID> &r, time_t secs)
+  MSGID call(const PID &to, const tuple<ID> &r, double secs)
   {
     send(to, r);
     return receive(secs);
@@ -740,13 +740,13 @@ protected:
   }
 
   template <MSGID ID>
-  MSGID call(const PID &to, time_t secs)
+  MSGID call(const PID &to, double secs)
   {
     return call(to, pack<ID>(), secs);
   }
 
   template <MSGID ID>
-  MSGID call(const PID &to, typename field<0, ID>::type t0, time_t secs)
+  MSGID call(const PID &to, typename field<0, ID>::type t0, double secs)
   {
     return call(to, pack<ID>(t0), secs);
   }
@@ -755,7 +755,7 @@ protected:
   MSGID call(const PID &to,
 	     typename field<0, ID>::type t0,
 	     typename field<1, ID>::type t1,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1), secs);
   }
@@ -765,7 +765,7 @@ protected:
 	     typename field<0, ID>::type t0,
 	     typename field<1, ID>::type t1,
 	     typename field<2, ID>::type t2,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2), secs);
   }
@@ -776,7 +776,7 @@ protected:
 	     typename field<1, ID>::type t1,
 	     typename field<2, ID>::type t2,
 	     typename field<3, ID>::type t3,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3), secs);
   }
@@ -788,7 +788,7 @@ protected:
 	     typename field<2, ID>::type t2,
 	     typename field<3, ID>::type t3,
 	     typename field<4, ID>::type t4,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3, t4), secs);
   }
@@ -801,7 +801,7 @@ protected:
 	     typename field<3, ID>::type t3,
 	     typename field<4, ID>::type t4,
 	     typename field<5, ID>::type t5,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3, t4, t5), secs);
   }
@@ -815,7 +815,7 @@ protected:
 	     typename field<4, ID>::type t4,
 	     typename field<5, ID>::type t5,
 	     typename field<6, ID>::type t6,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6), secs);
   }
@@ -830,7 +830,7 @@ protected:
 	     typename field<5, ID>::type t5,
 	     typename field<6, ID>::type t6,
 	     typename field<7, ID>::type t7,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6, t7), secs);
   }
@@ -846,7 +846,7 @@ protected:
 	     typename field<6, ID>::type t6,
 	     typename field<7, ID>::type t7,
 	     typename field<8, ID>::type t8,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6, t7, t8), secs);
   }
@@ -863,7 +863,7 @@ protected:
 	     typename field<7, ID>::type t7,
 	     typename field<8, ID>::type t8,
 	     typename field<9, ID>::type t9,
-	     time_t secs)
+	     double secs)
   {
     return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6, t7, t8, t9), secs);
   }

Modified: incubator/mesos/trunk/src/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper.cpp Sun Jun  5 05:00:35 2011
@@ -41,6 +41,7 @@ enum {
   REMOVE, // Perform an asynchronous remove (delete).
   EXISTS, // Perform an asysnchronous exists.
   GET, // Perform an asynchronous get.
+  SET, // Perform an asynchronous set.
 };
 
 
@@ -104,6 +105,18 @@ struct GetCall
 };
 
 
+/* Set "message" for performing ZooKeeper::set. */
+struct SetCall
+{
+  int ret;
+  const string *path;
+  const string *data;
+  int version;
+  PID from;
+  ZooKeeperProcess *zooKeeperProcess;
+};
+
+
 /* PID of singleton instance of WatcherProcessManager. */
 PID manager;
 
@@ -333,12 +346,26 @@ private:
     getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
   }
 
+  static void setCompletion(int ret, const Stat *stat, const void *data)
+  {
+    SetCall *setCall =
+      static_cast<SetCall *>(const_cast<void *>((data)));
+    setCall->ret = ret;
+    setCall->zooKeeperProcess->send(setCall->from, COMPLETED);
+  }
+
   void prepare(int *fd, int *ops, timeval *tv)
   {
     int interest = 0;
 
     int ret = zookeeper_interest(zh, fd, &interest, tv);
 
+    // If in some disconnected state, try again later.
+    if (ret == ZINVALIDSTATE ||
+	ret == ZCONNECTIONLOSS ||
+	ret == ZOPERATIONTIMEOUT)
+      return;
+
     if (ret != ZOK)
       fatal("zookeeper_interest failed! (%s)", zerror(ret));
 
@@ -365,6 +392,11 @@ private:
 
     int ret = zookeeper_process(zh, events);
 
+    // If in some disconnected state, try again later.
+    if (ret == ZINVALIDSTATE ||
+	ret == ZCONNECTIONLOSS)
+      return;
+
     if (ret != ZOK && ret != ZNOTHING)
       fatal("zookeeper_process failed! (%s)", zerror(ret));
   }
@@ -392,7 +424,10 @@ protected:
 
       // TODO(benh): If tv is 0, invoke await and ignore queued messages?
 
-      if (!await(fd, ops, tv, false)) {
+      if (await(fd, ops, tv, false)) {
+	// No enqueued messages and either data available on fd or timer expired.
+	process(fd, ops);
+      } else {
 	// TODO(benh): Don't handle incoming "calls" until we are connected!
 	switch (receive()) {
 	  case CREATE: {
@@ -452,6 +487,22 @@ protected:
 	    }
 	    break;
 	  }
+	  case SET: {
+	    SetCall *setCall =
+	      *reinterpret_cast<SetCall **>(const_cast<char *>(body(NULL)));
+	    setCall->from = from();
+	    setCall->zooKeeperProcess = this;
+	    int ret = zoo_aset(zh, setCall->path->c_str(),
+			       setCall->data->data(),
+			       setCall->data->size(),
+			       setCall->version,
+			       setCompletion, setCall);
+	    if (ret != ZOK) {
+	      setCall->ret = ret;
+	      send(setCall->from, COMPLETED);
+	    }
+	    break;
+	  }
 	  case TERMINATE: {
 	    return;
 	  }
@@ -459,8 +510,6 @@ protected:
 	    fatal("unexpected interruption during await");
 	  }
 	}
-      } else {
-	process(fd, ops);
       }
     }
   }
@@ -673,6 +722,46 @@ int ZooKeeper::get(const string &path,
 }
 
 
+int ZooKeeper::set(const string &path,
+		   const string &data,
+		   int version)
+{
+  SetCall setCall;
+  setCall.path = &path;
+  setCall.data = &data;
+  setCall.version = version;
+
+  class SetCallProcess : public Process
+  {
+  private:
+    ZooKeeperProcess *zooKeeperProcess;
+    SetCall *setCall;
+
+  protected:
+    void operator () ()
+    {
+      if (call(zooKeeperProcess->getPID(),
+	       SET,
+	       reinterpret_cast<char *>(&setCall),
+	       sizeof(SetCall *)) != COMPLETED)
+	setCall->ret = ZSYSTEMERROR;
+    }
+
+  public:
+    SetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+		   SetCall *_setCall)
+      : zooKeeperProcess(_zooKeeperProcess),
+	setCall(_setCall)
+    {}
+  } setCallProcess(static_cast<ZooKeeperProcess *>(impl),
+		   &setCall);
+
+  Process::wait(Process::spawn(&setCallProcess));
+
+  return setCall.ret;
+}
+
+
 const char * ZooKeeper::error(int ret) const
 {
   return zerror(ret);

Modified: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun  5 05:00:35 2011
@@ -230,6 +230,26 @@ public:
 	  Stat *stat);
 
   /**
+   * \brief sets the data associated with a node.
+   * 
+   * \param path the name of the node. Expressed as a file name with slashes 
+   * separating ancestors of the node.
+   * \param data the data to be written to the node.
+   * \param version the expected version of the node. The function will fail if 
+   * the actual version of the node does not match the expected version. If -1 is 
+   * used the version check will not take place. 
+   * \return the return code for the function call.
+   * ZOK operation completed succesfully
+   * ZNONODE the node does not exist.
+   * ZNOAUTH the client does not have permission.
+   * ZBADVERSION expected version does not match actual version.
+   * ZBADARGUMENTS - invalid input parameters
+   * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+   * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+   */
+  int set(const std::string &path, const std::string &data, int version);
+
+  /**
    * \brief return a string describing the last error.
    * 
    * \return string corresponding to the last error