You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:08:03 UTC

svn commit: r1132253 [1/5] - in /incubator/mesos/trunk: src/ src/detector/ src/exec/ src/master/ src/messaging/ src/sched/ src/slave/ src/tests/ third_party/libprocess/ third_party/libprocess/third_party/ry-http-parser-1c3624a/

Author: benh
Date: Sun Jun  5 09:08:02 2011
New Revision: 1132253

URL: http://svn.apache.org/viewvc?rev=1132253&view=rev
Log:
This is a rather large, rambling, commit. My apologies, one thing led
to the next. This commit adds a host of new features, most of them
instigated from libproces.

Libprocess:
* Changed libprocess message IDs to message names (strings).
* Changed the internal protocol used by libprocess to HTTP.
* Added support for sending data directly to a browser (Proxy).

Mesos:
* Updated code to use new message names.
* Added "handler" routines so that messages could be handled as
  function callbacks (more like RPC).
* Added an example in the master of how to get an HTTP message and
  respond.

Added:
    incubator/mesos/trunk/src/messaging/messages.cpp
    incubator/mesos/trunk/third_party/libprocess/decoder.hpp
    incubator/mesos/trunk/third_party/libprocess/encoder.hpp
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/.gitignore
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/CONTRIBUTIONS
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/LICENSE-MIT
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/README.md
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/http_parser.c
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/http_parser.h
    incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/test.c
    incubator/mesos/trunk/third_party/libprocess/tokenize.cpp
    incubator/mesos/trunk/third_party/libprocess/tokenize.hpp
Modified:
    incubator/mesos/trunk/src/Makefile.in
    incubator/mesos/trunk/src/detector/detector.cpp
    incubator/mesos/trunk/src/detector/zookeeper.cpp
    incubator/mesos/trunk/src/exec/exec.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/master/webui.cpp
    incubator/mesos/trunk/src/messaging/messages.hpp
    incubator/mesos/trunk/src/messaging/messages.proto
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
    incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
    incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
    incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/slave/webui.cpp
    incubator/mesos/trunk/src/tests/master_test.cpp
    incubator/mesos/trunk/src/tests/utils.hpp
    incubator/mesos/trunk/third_party/libprocess/Makefile.in
    incubator/mesos/trunk/third_party/libprocess/pid.cpp
    incubator/mesos/trunk/third_party/libprocess/pid.hpp
    incubator/mesos/trunk/third_party/libprocess/process.cpp
    incubator/mesos/trunk/third_party/libprocess/process.hpp

Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun  5 09:08:02 2011
@@ -134,7 +134,8 @@ SWIG_WEBUI_OBJ = $(MASTER_SWIG_WEBUI_OBJ
 COMMON_OBJ = common/fatal.o common/lock.o detector/detector.o		\
 	     detector/url_processor.o configurator/configurator.o	\
 	     common/string_utils.o common/logging.o			\
-	     common/date_utils.o
+	     common/date_utils.o common/tokenize.o common/resources.o	\
+	     messaging/messages.o
 
 ifeq ($(WITH_ZOOKEEPER),1)
   COMMON_OBJ += detector/zookeeper.o
@@ -204,6 +205,14 @@ MESOS_JAVA_JAR = $(LIBDIR)/java/mesos.ja
 
 MESOS_PYTHON_LIB = $(LIBDIR)/python/_mesos.so
 
+MESOS_PYTHON_LIB_OBJ = python/native/module.o \
+                       python/native/proxy_scheduler.o \
+                       python/native/mesos_scheduler_driver_impl.o \
+                       python/native/proxy_executor.o \
+                       python/native/mesos_executor_driver_impl.o
+
+MESOS_PYTHON_FILE = $(LIBDIR)/python/mesos.py
+
 # We copy all the webui files into the bin directory.
 WEBUI_FILES = $(BINDIR)/webui/bottle-0.8.3		\
               $(BINDIR)/webui/common/webui_lib.py	\
@@ -326,7 +335,7 @@ $(MESOS_PROJD_EXE): $(SRCDIR)/slave/proj
 
 java: $(MESOS_JAVA_LIB) $(MESOS_JAVA_JAR)
 
-$(MESOS_JAVA_JAR): $(SRCDIR)/java/src/mesos/*.java | $(LIBDIR)/java
+$(MESOS_JAVA_JAR): $(SRCDIR)/java/src/mesos/*.java @top_srcdir@/include/mesos.proto | $(LIBDIR)/java
 ifdef JAVA_HOME
 	mkdir -p @top_builddir@/$(PROTOBUF)/java/src/main/java
 	$(PROTOC) --java_out=@top_builddir@/$(PROTOBUF)/java/src/main/java -I@top_srcdir@/$(PROTOBUF)/src @top_srcdir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto
@@ -341,21 +350,32 @@ ifdef JAVA_HOME
 endif
 
 $(MESOS_JAVA_LIB_OBJ): %.o: $(SRCDIR)/%.cpp $(MESOS_JAVA_JAR)
-	$(CXX) -c $(CXXFLAGS) -Ijava/jni -I$(JAVA_HEADERS) -I$(JAVA_HEADERS)/$(OS_NAME) -o $@ $<
+	$(CXX) -c $(CXXFLAGS) -Ijava/jni -I$(JAVA_HOME)/include -I$(JAVA_HOME)/include/$(OS_NAME) -I $(JAVA_HEADERS) -o $@ $<
 
 $(MESOS_JAVA_LIB): $(MESOS_JAVA_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/java
 ifdef JAVA_HOME
 	$(CXX) $(CXXFLAGS) -shared -o $@ $(MESOS_JAVA_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(LIBS)
 endif
 
-python: $(MESOS_PYTHON_LIB)
+python: $(MESOS_PYTHON_LIB) $(MESOS_PYTHON_FILE) $(MESOS_PYTHON_PROTOBUFS)
+
+$(MESOS_PYTHON_LIB_OBJ): %.o: $(SRCDIR)/%.cpp
+ifdef PYTHON_HEADERS
+	mkdir -p python/native
+	$(CXX) -c $(CXXFLAGS) -Ipython/native -I$(PYTHON_HEADERS) -o $@ $<
+endif
+
+$(MESOS_PYTHON_LIB): $(MESOS_PYTHON_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/python
+ifdef PYTHON_HEADERS
+	$(CXX) $(CXXFLAGS) -shared -o $@ $(MESOS_PYTHON_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(PYTHON_LDFLAGS) $(LIBS)
+endif
 
-$(MESOS_PYTHON_LIB): $(SRCDIR)/swig/mesos.i $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/python
+$(MESOS_PYTHON_FILE): $(SRCDIR)/python/src/mesos.py @top_srcdir@/include/mesos.proto | $(LIBDIR)/python
 ifdef PYTHON_HEADERS
-	mkdir -p swig/python
-	$(SWIG) -c++ -python -threads -I@top_srcdir@/include -o swig/python/mesos_wrap.cpp -outdir swig/python $(SRCDIR)/swig/mesos.i
-	$(CXX) $(CXXFLAGS) -I$(PYTHON_HEADERS) -shared -o $@ swig/python/mesos_wrap.cpp $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(PYTHON_LDFLAGS) $(LIBS)
-	cp -r swig/python/mesos.py $(LIBDIR)/python/mesos.py
+	cp -r @top_srcdir@/$(PROTOBUF)/python @top_builddir@/$(PROTOBUF)/python
+	cp $< $@
+	$(PROTOC) --python_out=@top_builddir@/$(PROTOBUF)/python -I@top_srcdir@/$(PROTOBUF)/src @top_srcdir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto
+	$(PROTOC) --python_out=$(LIBDIR)/python -I@top_srcdir@/include @top_srcdir@/include/mesos.proto
 endif
 
 $(WEBUI_FILES): $(BINDIR)/%: $(SRCDIR)/% | $(WEBUI_DIRECTORIES)
@@ -375,8 +395,7 @@ $(CONFDIR)/deploy-env.sh: | $(SRCDIR)/co
 test: all
 	$(MAKE) -C tests test
 
-#all: $(MESOS_LIBS) $(MESOS_EXES) java python $(WEBUI_FILES) $(CONF_FILES) $(DEPLOY_FILES)
-all: $(MESOS_LIBS) $(MESOS_EXES) java $(WEBUI_FILES) $(CONF_FILES) $(DEPLOY_FILES)
+all: $(MESOS_LIBS) $(MESOS_EXES) java python $(WEBUI_FILES) $(CONF_FILES) $(DEPLOY_FILES)
 	$(MAKE) -C examples
 	$(MAKE) -C tests
 

Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun  5 09:08:02 2011
@@ -42,17 +42,11 @@ public:
 protected:
   virtual void operator () ()
   {
-    switch (receive(120)) {
-      case PROCESS_TIMEOUT: {
-	LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
-		   << "(automagically) reconnect";
-	MesosProcess::post(pid, MASTER_DETECTION_FAILURE);
-	break; 
-      }
-
-      default: {
-	break;
-      }
+    receive(120);
+    if (name() == TIMEOUT) {
+      LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
+                 << "(automagically) reconnect";
+      MesosProcess::post(pid, MASTER_DETECTION_FAILURE);
     }
   }
 
@@ -217,14 +211,14 @@ BasicMasterDetector::BasicMasterDetector
 {
   // Send a master token.
   {
-    Message<GOT_MASTER_TOKEN> msg;
+    MSG<GOT_MASTER_TOKEN> msg;
     msg.set_token("0");
     MesosProcess::post(master, msg);
   }
 
   // Elect the master.
   {
-    Message<NEW_MASTER_DETECTED> msg;
+    MSG<NEW_MASTER_DETECTED> msg;
     msg.set_pid(master);
     MesosProcess::post(master, msg);
   }
@@ -239,21 +233,21 @@ BasicMasterDetector::BasicMasterDetector
   if (elect) {
     // Send a master token.
     {
-      Message<GOT_MASTER_TOKEN> msg;
+      MSG<GOT_MASTER_TOKEN> msg;
       msg.set_token("0");
       MesosProcess::post(master, msg);
     }
 
     // Elect the master.
     {
-      Message<NEW_MASTER_DETECTED> msg;
+      MSG<NEW_MASTER_DETECTED> msg;
       msg.set_pid(master);
       MesosProcess::post(master, msg);
     }
   }
 
   // Tell the pid about the master.
-  Message<NEW_MASTER_DETECTED> msg;
+  MSG<NEW_MASTER_DETECTED> msg;
   msg.set_pid(master);
   MesosProcess::post(pid, msg);
 }
@@ -267,14 +261,14 @@ BasicMasterDetector::BasicMasterDetector
   if (elect) {
     // Send a master token.
     {
-      Message<GOT_MASTER_TOKEN> msg;
+      MSG<GOT_MASTER_TOKEN> msg;
       msg.set_token("0");
       MesosProcess::post(master, msg);
     }
 
     // Elect the master.
     {
-      Message<NEW_MASTER_DETECTED> msg;
+      MSG<NEW_MASTER_DETECTED> msg;
       msg.set_pid(master);
       MesosProcess::post(master, msg);
     }
@@ -282,7 +276,7 @@ BasicMasterDetector::BasicMasterDetector
 
   // Tell each pid about the master.
   foreach (const PID &pid, pids) {
-    Message<NEW_MASTER_DETECTED> msg;
+    MSG<NEW_MASTER_DETECTED> msg;
     msg.set_pid(master);
     MesosProcess::post(pid, msg);
   }
@@ -326,7 +320,7 @@ ZooKeeperMasterDetector::~ZooKeeperMaste
 {
   // Kill the timer (if running), and then the actual ZooKeeper instance.
   if (timer != NULL) {
-    Process::post(timer->self(), PROCESS_MSGID);
+    Process::post(timer->self(), TERMINATE);
     Process::wait(timer->self());
     delete timer;
     timer = NULL;
@@ -390,7 +384,7 @@ void ZooKeeperMasterDetector::process(Zo
 	setId(result);
 	LOG(INFO) << "Created ephemeral/sequence:" << getId();
 
-        Message<GOT_MASTER_TOKEN> msg;
+        MSG<GOT_MASTER_TOKEN> msg;
         msg.set_token(getId());
         MesosProcess::post(pid, msg);
       }
@@ -403,7 +397,7 @@ void ZooKeeperMasterDetector::process(Zo
 
       // Kill the reconnect timer.
       if (timer != NULL) {
-	Process::post(timer->self(), PROCESS_MSGID);
+	Process::post(timer->self(), TERMINATE);
 	Process::wait(timer->self());
 	delete timer;
 	timer = NULL;
@@ -510,7 +504,7 @@ void ZooKeeperMasterDetector::detectMast
     if (currentMasterPID == PID()) {
       MesosProcess::post(pid, NO_MASTER_DETECTED);
     } else {
-      Message<NEW_MASTER_DETECTED> msg;
+      MSG<NEW_MASTER_DETECTED> msg;
       msg.set_pid(currentMasterPID);
       MesosProcess::post(pid, msg);
     }

Modified: incubator/mesos/trunk/src/detector/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/zookeeper.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/detector/zookeeper.cpp Sun Jun  5 09:08:02 2011
@@ -5,293 +5,150 @@
 
 #include <process.hpp>
 
+#include <boost/tuple/tuple.hpp>
+
 #include "zookeeper.hpp"
 
 #include "common/fatal.hpp"
 
+using boost::cref;
+using boost::tuple;
+
+using std::cerr;
+using std::cout;
+using std::endl;
 using std::map;
 using std::string;
 using std::vector;
 
 
-/* Forward (and first) declaration of ZooKeeperProcess. */
-class ZooKeeperProcess;
-
-
-enum {
-  /* Generic messages: */
-  TERMINATE = PROCESS_MSGID, // Terminate process.
-
-  /* WatcherProcessManager messages: */
-  OK, // Generic success response.
-  ERROR, // Generic failure response.
-  REGISTER, // Register WatcherProcess.
-  UNREGISTER, // Unregister WatcherProcess.
-  LOOKUP_PROCESS, // Lookup WatcherProcess associated with Watcher.
-  LOOKUP_PID, // Lookup WatcherProcess PID associated with Watcher.
-
-  /* WatcherProcess messages: */
-  EVENT, // Invoke Watcher::process callback.
-
-  /* ZooKeeperProcess messages: */
-  COMPLETED, // After an asynchronous "call" has completed.
-  CREATE, // Perform an asynchronous create.
-  REMOVE, // Perform an asynchronous remove (delete).
-  EXISTS, // Perform an asysnchronous exists.
-  GET, // Perform an asynchronous get.
-  GET_CHILDREN, // Perform an asynchronous get_children.
-  SET, // Perform an asynchronous set.
-};
-
+/* Singleton instance of WatcherProcessManager. */
+class WatcherProcessManager;
 
-/* Event "message" for invoking Watcher. */
-struct Event
-{
-  ZooKeeper *zk;
-  int type;
-  int state;
-  string path;
-};
-
-
-/* Create "message" for performing ZooKeeper::create. */
-struct CreateCall
-{
-  int ret;
-  const string *path;
-  const string *data;
-  const ACL_vector *acl;
-  int flags;
-  string *result;
-  PID from;
-  ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Remove "message" for performing ZooKeeper::remove. */
-struct RemoveCall
-{
-  int ret;
-  const string *path;
-  int version;
-  PID from;
-  ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Exists "message" for performing ZooKeeper::exists. */
-struct ExistsCall
-{
-  int ret;
-  const string *path;
-  bool watch;
-  Stat *stat;
-  PID from;
-  ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Get "message" for performing ZooKeeper::get. */
-struct GetCall
-{
-  int ret;
-  const string *path;
-  bool watch;
-  string *result;
-  Stat *stat;
-  PID from;
-  ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* GetChildren "message" for performing ZooKeeper::getChildren. */
-struct GetChildrenCall
-{
-  int ret;
-  const string *path;
-  bool watch;
-  vector<string> *results;
-  PID from;
-  ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Set "message" for performing ZooKeeper::set. */
-struct SetCall
-{
-  int ret;
-  const string *path;
-  const string *data;
-  int version;
-  PID from;
-  ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* PID of singleton instance of WatcherProcessManager. */
-PID manager;
+WatcherProcessManager *manager;
 
 
 /**
  * In order to make callbacks on Watcher, we create a proxy
- * WatcherProcess. The ZooKeeperProcess (defined below) stores the PID
- * of the WatcherProcess associated with a watcher and sends it an
- * event "message" which it uses to invoke Watcher::process. Care
- * needed to be taken to assure that a WatcherProcess was only valid
- * as long as a Watcher was valid. This was done by ensuring that the
- * WatcherProcess object created gets cleaned up in ~Watcher(). We
- * wanted to keep the Watcher interface clean and simple, so rather
- * than add a member in Watcher that points to a WatcherProcess
- * instance (or points to a WatcherImpl), we choose to create a
- * WatcherProcessManager that stores the Watcher and WatcherProcess
- * associations. The WatcherProcessManager is akin to having a shared
- * dictionary or hashtable and using locks to access it rathe then
- * sending and receiving messages. Their is probably a performance hit
- * here, but it would be interesting to see how bad the perforamnce is
- * across a range of low and high-contention states.
+ * WatcherProcess. The ZooKeeperProcess (defined below) dispatches
+ * "events" to the WatcherProcess which then invokes
+ * Watcher::process. Care needed to be taken to assure that a
+ * WatcherProcess was only valid as long as a Watcher was valid. This
+ * was done by ensuring that the WatcherProcess object created gets
+ * "terminated" in ~Watcher(). A pointer to the WatcherProcess might
+ * still be valid in ZooKeeperProcess (see below), but any messages
+ * sent to it will get dropped because it is no longer running. The
+ * ZooKeeperProcess is responsible for actually deleting the
+ * WatcherProcess. We wanted to keep the Watcher interface clean and
+ * simple, so rather than add a member in Watcher that points to a
+ * WatcherProcess instance (or points to a WatcherImpl), we choose to
+ * create a WatcherProcessManager that stores the Watcher and
+ * WatcherProcess associations. The WatcherProcessManager is akin to
+ * having a shared dictionary or hashtable and using locks to access
+ * it rather then sending and receiving messages. Their is probably a
+ * performance hit here, but it would be interesting to see how bad
+ * the perforamnce is across a range of low and high-contention
+ * states.
  */
 class WatcherProcess : public Process
 {
-  friend class WatcherProcessManager;
+public:
+  WatcherProcess(Watcher *_watcher) : watcher(_watcher) {}
 
-private:
-  Watcher *watcher;
+  void event(ZooKeeper *zk, int type, int state, const char *path)
+  {
+    watcher->process(zk, type, state, path);
+  }
 
 protected:
-  void operator () ()
+  virtual void operator () ()
   {
-    WatcherProcess *process = this;
-    send(manager, REGISTER,
-         reinterpret_cast<char *>(&process), sizeof(process));
-    if (receive() != OK)
-      fatal("failed to setup underlying watcher mechanism");
-    while (true) {
-      switch (receive()) {
-      case EVENT: {
-	Event *event =
-	  *reinterpret_cast<Event **>(const_cast<char *>(body(NULL)));
-	watcher->process(event->zk, event->type, event->state, event->path);
-	delete event;
-	break;
-      }
-      case TERMINATE:
-	send(manager, UNREGISTER,
-             reinterpret_cast<char *>(&process), sizeof(process));
-        if (receive() != OK)
-	  fatal("failed to cleanup underlying watcher mechanism");
-	return;
-      }
-    }
+    do serve();
+    while (name() != TERMINATE);
   }
 
-public:
-  WatcherProcess(Watcher *_watcher) : watcher(_watcher) {}
+private:
+  Watcher *watcher;
 };
 
 
 class WatcherProcessManager : public Process
 {
-private:
-  map<Watcher *, WatcherProcess *> watchers;
+public:
+  Result<WatcherProcess *> create(Watcher *watcher)
+  {
+    WatcherProcess *process = new WatcherProcess(watcher);
+    spawn(process);
+    processes[watcher] = process;
+    return process;
+  }
 
-protected:
-  void operator () ()
+  void destroy(Watcher *watcher)
   {
-    while (true) {
-      switch (receive()) {
-        case REGISTER: {
-	  WatcherProcess *process =
-	    *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
-	  Watcher *watcher = process->watcher;
-	  assert(watchers.find(watcher) == watchers.end());
-	  watchers[watcher] = process;
-	  send(from(), OK);
-	  break;
-	}
-        case UNREGISTER: {
-	  WatcherProcess *process =
-	    *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
-	  Watcher *watcher = process->watcher;
-	  assert(watchers.find(watcher) != watchers.end());
-	  watchers.erase(watcher);
-	  send(from(), OK);
-	  break;
-	}
-        case LOOKUP_PROCESS: {
-	  Watcher *watcher =
-	    *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
-	  if (watchers.find(watcher) != watchers.end()) {
-	    WatcherProcess *process = watchers[watcher];
-	    send(from(), OK, reinterpret_cast<char *>(&process), sizeof(process));
-	  } else {
-	    send(from(), ERROR);
-	  }
-	  break;
-	}
-        case LOOKUP_PID: {
-	  Watcher *watcher =
-	    *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
-	  if (watchers.find(watcher) != watchers.end()) {
-	    WatcherProcess *process = watchers[watcher];
-	    const PID &pid = process->self();
-	    send(from(), OK, reinterpret_cast<const char *>(&pid), sizeof(pid));
-	  } else {
-	    send(from(), ERROR);
-	  }
-	  break;
-	}
-      }
+    if (processes.count(watcher) > 0) {
+      WatcherProcess *process = processes[watcher];
+      processes.erase(watcher);
+      send(process->self(), TERMINATE);
+      wait(process->self());
+      delete process;
+    }
+  }
+
+  Result<bool> terminate(Watcher *watcher)
+  {
+    if (processes.count(watcher) > 0) {
+      WatcherProcess *process = processes[watcher];
+      send(process->self(), TERMINATE);
+      wait(process->self());
+      return true;
+    }
+
+    return false;
+  }
+
+  Result<WatcherProcess *> lookup(Watcher *watcher)
+  {
+    if (processes.count(watcher) > 0) {
+      return processes[watcher];
+    } else {
+      return NULL;
     }
   }
+
+private:
+  map<Watcher *, WatcherProcess *> processes;
 };
 
 
 Watcher::Watcher()
 {
-  // Confirm we have allocated the WatcherProcessManager.
+  // Confirm we have created the WatcherProcessManager.
   static volatile bool initialized = false;
   static volatile bool initializing = true;
 
   // Confirm everything is initialized.
   if (!initialized) {
     if (__sync_bool_compare_and_swap(&initialized, false, true)) {
-	manager = Process::spawn(new WatcherProcessManager());
-	initializing = false;
-      }
+      manager = new WatcherProcessManager();
+      Process::spawn(manager);
+      initializing = false;
     }
+  }
 
   while (initializing);
 
-  Process::spawn(new WatcherProcess(this));
+  WatcherProcess *process =
+    Process::call(manager, &WatcherProcessManager::create, this);
+
+  if (process == NULL) {
+    fatal("failed to initialize Watcher");
+  }
 }
 
 
 Watcher::~Watcher()
 {
-  class WatcherProcessWaiter : public Process
-  {
-  private:
-    Watcher *watcher;
-
-  protected:
-    void operator () ()
-    {
-      send(manager, LOOKUP_PROCESS,
-           reinterpret_cast<char *>(&watcher), sizeof(watcher));
-      if (receive() != OK)
-	fatal("failed to deallocate resources associated with Watcher");
-      WatcherProcess *process =
-	*reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
-      send(process->self(), TERMINATE);
-      wait(process->self());
-      delete process;
-    }
-
-  public:
-    WatcherProcessWaiter(Watcher *_watcher) : watcher(_watcher) {}
-  } watcherProcessWaiter(this);
-
-  Process::wait(Process::spawn(&watcherProcessWaiter));
+  Process::call(manager, &WatcherProcessManager::terminate, this);
 }
 
 
@@ -300,159 +157,158 @@ class ZooKeeperImpl : public Process {};
 
 class ZooKeeperProcess : public ZooKeeperImpl
 {
-  friend class ZooKeeper;
-
-private:
-  ZooKeeper *zk; // ZooKeeper instance.
-  string hosts; // ZooKeeper host:port pairs.
-  int timeout; // ZooKeeper session timeout.
-  zhandle_t *zh; // ZooKeeper connection handle.
-  
-  Watcher *watcher; // Associated Watcher instance. 
-  PID watcherProcess; // PID of WatcherProcess that invokes Watcher.
-
-  static void watch(zhandle_t *zh, int type, int state,
-		    const char *path, void *context)
-  {
-    ZooKeeperProcess *zooKeeperProcess =
-      static_cast<ZooKeeperProcess*>(context);
-    Event *event = new Event();
-    event->zk = zooKeeperProcess->zk;
-    event->type = type;
-    event->state = state;
-    event->path = path;
-    zooKeeperProcess->send(zooKeeperProcess->watcherProcess,
-			   EVENT,
-			   reinterpret_cast<char *>(&event),
-			   sizeof(Event *));
-  }
-  
-  static void createCompletion(int ret, const char *value, const void *data)
+public:
+  ZooKeeperProcess(ZooKeeper *_zk, const string &_hosts, int _timeout,
+                   Watcher *_watcher)
+    : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher),
+      watcherProcess(NULL)
   {
-    CreateCall *createCall =
-      static_cast<CreateCall *>(const_cast<void *>((data)));
-    createCall->ret = ret;
-    if (createCall->result != NULL && value != NULL)
-      createCall->result->assign(value);
-    createCall->zooKeeperProcess->send(createCall->from, COMPLETED);
-  }
+    if (watcher == NULL) {
+      fatalerror("cannot instantiate ZooKeeper with NULL watcher");
+    }
 
-  static void removeCompletion(int ret, const void *data)
-  {
-    RemoveCall *removeCall =
-      static_cast<RemoveCall *>(const_cast<void *>((data)));
-    removeCall->ret = ret;
-    removeCall->zooKeeperProcess->send(removeCall->from, COMPLETED);
+    zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
+    if (zh == NULL) {
+      fatalerror("failed to create ZooKeeper (zookeeper_init)");
+    }
   }
 
-  static void existsCompletion(int ret, const Stat *stat, const void *data)
+  ~ZooKeeperProcess()
   {
-    ExistsCall *existsCall =
-      static_cast<ExistsCall *>(const_cast<void *>((data)));
-    existsCall->ret = ret;
-    if (existsCall->stat != NULL && stat != NULL)
-      *(existsCall->stat) = *(stat);      
-    existsCall->zooKeeperProcess->send(existsCall->from, COMPLETED);
+    int ret = zookeeper_close(zh);
+    if (ret != ZOK) {
+      fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
+    }
+
+    if (watcherProcess != NULL) {
+      Process::dispatch(manager, &WatcherProcessManager::destroy, watcher);
+    }
   }
 
-  static void getCompletion(int ret, const char *value, int value_len,
-			    const Stat *stat, const void *data)
+public:
+  Result<int> create(const string &path, const string &data,
+                     const ACL_vector &acl, int flags, string *result)
   {
-    GetCall *getCall =
-      static_cast<GetCall *>(const_cast<void *>((data)));
-    getCall->ret = ret;
-    if (getCall->result != NULL && value != NULL && value_len > 0)
-      getCall->result->assign(value, value_len);
-    if (getCall->stat != NULL && stat != NULL)
-      *(getCall->stat) = *(stat);
-    getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
+    Promise<int> promise;
+
+    tuple<Promise<int>, string *> *args =
+      new tuple<Promise<int>, string *>(promise, result);
+
+    int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
+                          flags, stringCompletion, args);
+
+    if (ret != ZOK) {
+      promise.set(ret);
+      delete args;
+    }
+
+    return promise;
   }
 
-  static void getChildrenCompletion(int ret, const String_vector *results,
-				    const void *data)
+  Result<int> remove(const string &path, int version)
   {
-    GetChildrenCall *getChildrenCall =
-      static_cast<GetChildrenCall *>(const_cast<void *>((data)));
-    getChildrenCall->ret = ret;
-    if (getChildrenCall->results != NULL && results != NULL) {
-      for (int i = 0; i < results->count; i++) {
-	getChildrenCall->results->push_back(results->data[i]);
-      }
+    Promise<int> promise;
+
+    tuple<Promise<int> > *args = new tuple<Promise<int> >(promise);
+
+    int ret = zoo_adelete(zh, path.c_str(), version, voidCompletion, args);
+
+    if (ret != ZOK) {
+      promise.set(ret);
+      delete args;
     }
-    getChildrenCall->zooKeeperProcess->send(getChildrenCall->from, COMPLETED);
+
+    return promise;
   }
 
-  static void setCompletion(int ret, const Stat *stat, const void *data)
+  Result<int> exists(const string &path, bool watch, Stat *stat)
   {
-    SetCall *setCall =
-      static_cast<SetCall *>(const_cast<void *>((data)));
-    setCall->ret = ret;
-    setCall->zooKeeperProcess->send(setCall->from, COMPLETED);
+    Promise<int> promise;
+
+    tuple<Promise<int>, Stat *> *args =
+      new tuple<Promise<int>, Stat *>(promise, stat);
+
+    int ret = zoo_aexists(zh, path.c_str(), watch, statCompletion, args);
+
+    if (ret != ZOK) {
+      promise.set(ret);
+      delete args;
+    }
+
+    return promise;
   }
 
-  bool prepare(int *fd, int *ops, timeval *tv)
+  Result<int> get(const string &path, bool watch, string *result, Stat *stat)
   {
-    int interest = 0;
+    Promise<int> promise;
 
-    int ret = zookeeper_interest(zh, fd, &interest, tv);
+    tuple<Promise<int>, string *, Stat *> *args =
+      new tuple<Promise<int>, string *, Stat *>(promise, result, stat);
 
-    // If in some disconnected state, try again later.
-    if (ret == ZINVALIDSTATE ||
-	ret == ZCONNECTIONLOSS ||
-	ret == ZOPERATIONTIMEOUT)
-      return false;
+    int ret = zoo_aget(zh, path.c_str(), watch, dataCompletion, args);
 
-    if (ret != ZOK)
-      fatal("zookeeper_interest failed! (%s)", zerror(ret));
+    if (ret != ZOK) {
+      promise.set(ret);
+      delete args;
+    }
 
-    *ops = 0;
+    return promise;
+  }
 
-    if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
-      *ops |= RDWR;
-    } else if (interest & ZOOKEEPER_READ) {
-      *ops |= RDONLY;
-    } else if (interest & ZOOKEEPER_WRITE) {
-      *ops |= WRONLY;
+  Result<int> getChildren(const string &path, bool watch,
+                          vector<string> *results)
+  {
+    Promise<int> promise;
+
+    tuple<Promise<int>, vector<string> *> *args =
+      new tuple<Promise<int>, vector<string> *>(promise, results);
+
+    int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
+                                args);
+
+    if (ret != ZOK) {
+      promise.set(ret);
+      delete args;
     }
 
-    return true;
+    return promise;
   }
 
-  void process(int fd, int ops)
+  Result<int> set(const string &path, const string &data, int version)
   {
-    int events = 0;
+    Promise<int> promise;
 
-    if (ready(fd, RDONLY)) {
-      events |= ZOOKEEPER_READ;
-    } if (ready(fd, WRONLY)) {
-      events |= ZOOKEEPER_WRITE;
-    }
+    tuple<Promise<int>, Stat *> *args =
+      new tuple<Promise<int>, Stat *>(promise, NULL);
 
-    int ret = zookeeper_process(zh, events);
+    int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
+                       version, statCompletion, args);
 
-    // If in some disconnected state, try again later.
-    if (ret == ZINVALIDSTATE ||
-	ret == ZCONNECTIONLOSS)
-      return;
+    if (ret != ZOK) {
+      promise.set(ret);
+      delete args;
+    }
 
-    if (ret != ZOK && ret != ZNOTHING)
-      fatal("zookeeper_process failed! (%s)", zerror(ret));
+    return promise;
   }
 
 protected:
-  void operator () ()
+  virtual void operator () ()
   {
-    // Lookup and cache the WatcherProcess PID associated with our
-    // Watcher _before_ we yield control via calling zookeeper_process
-    // so that Watcher callbacks can occur.
-    send(manager, LOOKUP_PID,
-         reinterpret_cast<char *>(&watcher), sizeof(watcher));
-    if (receive() != OK)
-      fatal("failed to setup underlying ZooKeeper mechanisms");
+    // Lookup and cache the WatcherProcess associated with our Watcher
+    // before we yield control via calling zookeeper_process so that
+    // Watcher callbacks can occur.
+    watcherProcess = call(manager, &WatcherProcessManager::lookup, watcher);
+
+    // A NULL WatcherProcess means that our Watcher must not have been
+    // constructed correctly. Note that a Watcher might have already
+    // been cleaned up (and therefore the WatcherProcess terminated),
+    // but we are responsible for actually calling destroy and thus
+    // cleaning up the WatcherProcess, so it shouldn't be NULL here.
+    if (watcherProcess == NULL)
+      fatal("cannot use Watcher (has it been constructed?)");
 
     // TODO(benh): Link with WatcherProcess?
-    watcherProcess =
-      *reinterpret_cast<PID *>(const_cast<char *>(body(NULL)));
 
     while (true) {
       int fd;
@@ -466,405 +322,262 @@ protected:
       // Cause await to return immediately if the file descriptor is
       // not valid (for example because the connection timed out) and
       // secs is 0 because that will block indefinitely.
-      if (fd == -1 && secs == 0)
+      if (fd == -1 && secs == 0) {
 	secs = -1;
+      }
 
       if (await(fd, ops, secs, false)) {
 	// Either timer expired (might be 0) or data became available on fd.
 	process(fd, ops);
       } else {
-	// TODO(benh): Don't handle incoming "calls" until we are connected!
-	switch (receive()) {
-	  case CREATE: {
-	    CreateCall *createCall =
-	      *reinterpret_cast<CreateCall **>(const_cast<char *>(body(NULL)));
-	    createCall->from = from();
-	    createCall->zooKeeperProcess = this;
-	    int ret = zoo_acreate(zh, createCall->path->c_str(),
-				  createCall->data->data(),
-				  createCall->data->size(),
-				  createCall->acl, createCall->flags,
-				  createCompletion, createCall);
-	    if (ret != ZOK) {
-	      createCall->ret = ret;
-	      send(createCall->from, COMPLETED);
-	    }
-	    break;
-	  }
-	  case REMOVE: {
-	    RemoveCall *removeCall =
-	      *reinterpret_cast<RemoveCall **>(const_cast<char *>(body(NULL)));
-	    removeCall->from = from();
-	    removeCall->zooKeeperProcess = this;
-	    int ret = zoo_adelete(zh, removeCall->path->c_str(),
-				  removeCall->version,
-				  removeCompletion, removeCall);
-	    if (ret != ZOK) {
-	      removeCall->ret = ret;
-	      send(removeCall->from, COMPLETED);
-	    }
-	    break;
-	  }
-	  case EXISTS: {
-	    ExistsCall *existsCall =
-	      *reinterpret_cast<ExistsCall **>(const_cast<char *>(body(NULL)));
-	    existsCall->from = from();
-	    existsCall->zooKeeperProcess = this;
-	    int ret = zoo_aexists(zh, existsCall->path->c_str(),
-				  existsCall->watch,
-				  existsCompletion, existsCall);
-	    if (ret != ZOK) {
-	      existsCall->ret = ret;
-	      send(existsCall->from, COMPLETED);
-	    }
-	    break;
-	  }
-	  case GET: {
-	    GetCall *getCall =
-	      *reinterpret_cast<GetCall **>(const_cast<char *>(body(NULL)));
-	    getCall->from = from();
-	    getCall->zooKeeperProcess = this;
-	    int ret = zoo_aget(zh, getCall->path->c_str(), getCall->watch,
-			       getCompletion, getCall);
-	    if (ret != ZOK) {
-	      getCall->ret = ret;
-	      send(getCall->from, COMPLETED);
-	    }
-	    break;
-	  }
-	  case GET_CHILDREN: {
-	    GetChildrenCall *getChildrenCall =
-	      *reinterpret_cast<GetChildrenCall **>(const_cast<char *>(body(NULL)));
-	    getChildrenCall->from = from();
-	    getChildrenCall->zooKeeperProcess = this;
-	    int ret = zoo_aget_children(zh, getChildrenCall->path->c_str(),
-					getChildrenCall->watch,
-					getChildrenCompletion,
-					getChildrenCall);
-	    if (ret != ZOK) {
-	      getChildrenCall->ret = ret;
-	      send(getChildrenCall->from, COMPLETED);
-	    }
-	    break;
-	  }
-	  case SET: {
-	    SetCall *setCall =
-	      *reinterpret_cast<SetCall **>(const_cast<char *>(body(NULL)));
-	    setCall->from = from();
-	    setCall->zooKeeperProcess = this;
-	    int ret = zoo_aset(zh, setCall->path->c_str(),
-			       setCall->data->data(),
-			       setCall->data->size(),
-			       setCall->version,
-			       setCompletion, setCall);
-	    if (ret != ZOK) {
-	      setCall->ret = ret;
-	      send(setCall->from, COMPLETED);
-	    }
-	    break;
-	  }
-	  case TERMINATE: {
-	    return;
-	  }
-	  default: {
-	    fatal("unexpected interruption during await");
-	  }
-	}
+        // Okay, a message must have been received. Handle only one
+        // message at a time so as not to delay any necessary internal
+        // processing.
+        serve(0, false);
+        if (name() == TERMINATE) {
+          return;
+        } else {
+          fatal("unexpected interruption during await");
+        }
       }
     }
   }
 
-public:
-  ZooKeeperProcess(ZooKeeper *_zk,
-		   const string &_hosts,
-		   int _timeout,
-		   Watcher *_watcher)
-    : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher)
+private:
+  static void watch(zhandle_t *zh, int type, int state,
+		    const char *path, void *ctx)
   {
-    zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
-    if (zh == NULL)
-      fatalerror("failed to create ZooKeeper (zookeeper_init)");
+    ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess*>(ctx);
+    Process::dispatch(zooKeeperProcess->watcherProcess, &WatcherProcess::event,
+                      zooKeeperProcess->zk, type, state, path);
   }
 
-  ~ZooKeeperProcess()
+  static void voidCompletion(int ret, const void *data)
   {
-    int ret = zookeeper_close(zh);
-    if (ret != ZOK)
-      fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
-  }
-};
+    const tuple<Promise<int> > *args =
+      reinterpret_cast<const tuple<Promise<int> > *>(data);
 
+    Promise<int> promise = (*args).get<0>();
 
+    promise.set(ret);
 
-ZooKeeper::ZooKeeper(const string &hosts, int timeout, Watcher *watcher)
-{
-  impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
-  Process::spawn(impl);
-}
+    delete args;
+  }
 
 
-ZooKeeper::~ZooKeeper()
-{
-  Process::post(impl->self(), TERMINATE);
-  Process::wait(impl->self());
-  delete impl;
-}
+  static void stringCompletion(int ret, const char *value, const void *data)
+  {
+    const tuple<Promise<int>, string *> *args =
+      reinterpret_cast<const tuple<Promise<int>, string *> *>(data);
 
+    Promise<int> promise = (*args).get<0>();
+    string *result = (*args).get<1>();
 
-int ZooKeeper::getState()
-{
-  ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess *>(impl);
-  return zoo_state(zooKeeperProcess->zh);
-}
+    if (result != NULL && value != NULL) {
+      result->assign(value);
+    }
 
+    promise.set(ret);
 
-int ZooKeeper::create(const string &path,
-		      const string &data,
-		      const ACL_vector &acl,
-		      int flags,
-		      string *result)
-{
-  CreateCall createCall;
-  createCall.path = &path;
-  createCall.data = &data;
-  createCall.acl = &acl;
-  createCall.flags = flags;
-  createCall.result = result;
-
-  class CreateCallProcess : public Process
-  {
-  private:
-    ZooKeeperProcess *zooKeeperProcess;
-    CreateCall *createCall;
-
-  protected:
-    void operator () ()
-    {
-      send(zooKeeperProcess->self(),
-           CREATE,
-           reinterpret_cast<char *>(&createCall),
-           sizeof(CreateCall *));
-      if (receive() != COMPLETED)
-	createCall->ret = ZSYSTEMERROR;
-    }
-
-  public:
-    CreateCallProcess(ZooKeeperProcess *_zooKeeperProcess,
-		      CreateCall *_createCall)
-      : zooKeeperProcess(_zooKeeperProcess),
-	createCall(_createCall)
-    {}
-  } createCallProcess(static_cast<ZooKeeperProcess *>(impl),
-		      &createCall);
+    delete args;
+  }
 
-  Process::wait(Process::spawn(&createCallProcess));
 
-  return createCall.ret;
-}
+  static void statCompletion(int ret, const Stat *stat, const void *data)
+  {
+    const tuple<Promise<int>, Stat *> *args =
+      reinterpret_cast<const tuple<Promise<int>, Stat *> *>(data);
 
+    Promise<int> promise = (*args).get<0>();
+    Stat *stat_result = (*args).get<1>();
 
+    if (stat_result != NULL && stat != NULL) {
+      *stat_result = *stat;
+    }
 
-int ZooKeeper::remove(const string &path, int version)
-{
-  RemoveCall removeCall;
-  removeCall.path = &path;
-  removeCall.version = version;
+    promise.set(ret);
+
+    delete args;
+  }
 
-  class RemoveCallProcess : public Process
+  static void dataCompletion(int ret, const char *value, int value_len,
+                             const Stat *stat, const void *data)
   {
-  private:
-    ZooKeeperProcess *zooKeeperProcess;
-    RemoveCall *removeCall;
+    const tuple<Promise<int>, string *, Stat *> *args =
+      reinterpret_cast<const tuple<Promise<int>, string *, Stat *> *>(data);
+
+    Promise<int> promise = (*args).get<0>();
+    string *result = (*args).get<1>();
+    Stat *stat_result = (*args).get<2>();
 
-  protected:
-    void operator () ()
-    {
-      send(zooKeeperProcess->self(),
-           REMOVE,
-           reinterpret_cast<char *>(&removeCall),
-           sizeof(RemoveCall *));
-      if (receive() != COMPLETED)
-	removeCall->ret = ZSYSTEMERROR;
+    if (result != NULL && value != NULL && value_len > 0) {
+      result->assign(value, value_len);
     }
 
-  public:
-    RemoveCallProcess(ZooKeeperProcess *_zooKeeperProcess,
-		      RemoveCall *_removeCall)
-      : zooKeeperProcess(_zooKeeperProcess),
-	removeCall(_removeCall)
-    {}
-  } removeCallProcess(static_cast<ZooKeeperProcess *>(impl),
-		      &removeCall);
+    if (stat_result != NULL && stat != NULL) {
+      *stat_result = *stat;
+    }
 
-  Process::wait(Process::spawn(&removeCallProcess));
+    promise.set(ret);
 
-  return removeCall.ret;
-}
+    delete args;
+  }
 
+  static void stringsCompletion(int ret, const String_vector *values,
+                                const void *data)
+  {
+    const tuple<Promise<int>, vector<string> *> *args =
+      reinterpret_cast<const tuple<Promise<int>, vector<string> *> *>(data);
 
-int ZooKeeper::exists(const string &path,
-		      bool watch,
-		      Stat *stat)
-{
-  ExistsCall existsCall;
-  existsCall.path = &path;
-  existsCall.watch = watch;
-  existsCall.stat = stat;
+    Promise<int> promise = (*args).get<0>();
+    vector<string> *results = (*args).get<1>();
+
+    if (results != NULL && values != NULL) {
+      for (int i = 0; i < values->count; i++) {
+	results->push_back(values->data[i]);
+      }
+    }
+
+    promise.set(ret);
 
-  class ExistsCallProcess : public Process
+    delete args;
+  }
+
+  bool prepare(int *fd, int *ops, timeval *tv)
   {
-  private:
-    ZooKeeperProcess *zooKeeperProcess;
-    ExistsCall *existsCall;
+    int interest = 0;
 
-  protected:
-    void operator () ()
-    {
-      send(zooKeeperProcess->self(),
-           EXISTS,
-           reinterpret_cast<char *>(&existsCall),
-           sizeof(ExistsCall *));
-      if (receive() != COMPLETED)
-	existsCall->ret = ZSYSTEMERROR;
-    }
+    int ret = zookeeper_interest(zh, fd, &interest, tv);
 
-  public:
-    ExistsCallProcess(ZooKeeperProcess *_zooKeeperProcess,
-		      ExistsCall *_existsCall)
-      : zooKeeperProcess(_zooKeeperProcess),
-	existsCall(_existsCall)
-    {}
-  } existsCallProcess(static_cast<ZooKeeperProcess *>(impl),
-		      &existsCall);
+    // If in some disconnected state, try again later.
+    if (ret == ZINVALIDSTATE ||
+        ret == ZCONNECTIONLOSS ||
+	ret == ZOPERATIONTIMEOUT) {
+      return false;
+    }
 
-  Process::wait(Process::spawn(&existsCallProcess));
+    if (ret != ZOK) {
+      fatal("zookeeper_interest failed! (%s)", zerror(ret));
+    }
 
-  return existsCall.ret;
-}
+    *ops = 0;
 
+    if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+      *ops |= RDWR;
+    } else if (interest & ZOOKEEPER_READ) {
+      *ops |= RDONLY;
+    } else if (interest & ZOOKEEPER_WRITE) {
+      *ops |= WRONLY;
+    }
 
-int ZooKeeper::get(const string &path,
-		   bool watch,
-		   string *result,
-		   Stat *stat)
-{
-  GetCall getCall;
-  getCall.path = &path;
-  getCall.watch = watch;
-  getCall.result = result;
-  getCall.stat = stat;
+    return true;
+  }
 
-  class GetCallProcess : public Process
+  void process(int fd, int ops)
   {
-  private:
-    ZooKeeperProcess *zooKeeperProcess;
-    GetCall *getCall;
+    int events = 0;
+
+    if (ready(fd, RDONLY)) {
+      events |= ZOOKEEPER_READ;
+    } if (ready(fd, WRONLY)) {
+      events |= ZOOKEEPER_WRITE;
+    }
+
+    int ret = zookeeper_process(zh, events);
+
+    // If in some disconnected state, try again later.
+    if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
+      return;
+    }
 
-  protected:
-    void operator () ()
-    {
-      send(zooKeeperProcess->self(),
-           GET,
-           reinterpret_cast<char *>(&getCall),
-           sizeof(GetCall *));
-      if (receive() != COMPLETED)
-	getCall->ret = ZSYSTEMERROR;
+    if (ret != ZOK && ret != ZNOTHING) {
+      fatal("zookeeper_process failed! (%s)", zerror(ret));
     }
+  }
+
+private:
+  friend class ZooKeeper;
+
+  ZooKeeper *zk; // ZooKeeper instance.
+  string hosts; // ZooKeeper host:port pairs.
+  int timeout; // ZooKeeper session timeout.
+  zhandle_t *zh; // ZooKeeper connection handle.
+  
+  Watcher *watcher; // Associated Watcher instance. 
+  WatcherProcess *watcherProcess; // PID of WatcherProcess that invokes Watcher.
+};
 
-  public:
-    GetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
-		   GetCall *_getCall)
-      : zooKeeperProcess(_zooKeeperProcess),
-	getCall(_getCall)
-    {}
-  } getCallProcess(static_cast<ZooKeeperProcess *>(impl),
-		   &getCall);
 
-  Process::wait(Process::spawn(&getCallProcess));
 
-  return getCall.ret;
+ZooKeeper::ZooKeeper(const string &hosts, int timeout, Watcher *watcher)
+{
+  impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
+  Process::spawn(impl);
 }
 
 
-int ZooKeeper::getChildren(const string &path,
-			   bool watch,
-			   vector<string> *results)
+ZooKeeper::~ZooKeeper()
 {
-  GetChildrenCall getChildrenCall;
-  getChildrenCall.path = &path;
-  getChildrenCall.watch = watch;
-  getChildrenCall.results = results;
+  Process::post(impl->self(), TERMINATE);
+  Process::wait(impl->self());
+  delete impl;
+}
 
-  class GetChildrenCallProcess : public Process
-  {
-  private:
-    ZooKeeperProcess *zooKeeperProcess;
-    GetChildrenCall *getChildrenCall;
 
-  protected:
-    void operator () ()
-    {
-      send(zooKeeperProcess->self(),
-           GET_CHILDREN,
-           reinterpret_cast<char *>(&getChildrenCall),
-           sizeof(GetChildrenCall *));
-      if (receive() != COMPLETED)
-	getChildrenCall->ret = ZSYSTEMERROR;
-    }
+int ZooKeeper::getState()
+{
+  ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess *>(impl);
+  return zoo_state(zooKeeperProcess->zh);
+}
 
-  public:
-    GetChildrenCallProcess(ZooKeeperProcess *_zooKeeperProcess,
-			   GetChildrenCall *_getChidlrenCall)
-      : zooKeeperProcess(_zooKeeperProcess),
-	getChildrenCall(_getChidlrenCall)
-    {}
-  } getChildrenCallProcess(static_cast<ZooKeeperProcess *>(impl),
-		   &getChildrenCall);
 
-  Process::wait(Process::spawn(&getChildrenCallProcess));
+int ZooKeeper::create(const string &path, const string &data,
+                      const ACL_vector &acl, int flags, string *result)
+{
+  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+  return Process::call(process, &ZooKeeperProcess::create,
+                       cref(path), cref(data), cref(acl), flags, result);
+}
+
 
-  return getChildrenCall.ret;
+int ZooKeeper::remove(const string &path, int version)
+{
+  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+  return Process::call(process, &ZooKeeperProcess::remove,
+                       cref(path), version);
 }
 
 
-int ZooKeeper::set(const string &path,
-		   const string &data,
-		   int version)
+int ZooKeeper::exists(const string &path, bool watch, Stat *stat)
 {
-  SetCall setCall;
-  setCall.path = &path;
-  setCall.data = &data;
-  setCall.version = version;
+  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+  return Process::call(process, &ZooKeeperProcess::exists,
+                       cref(path), watch, stat);
+}
 
-  class SetCallProcess : public Process
-  {
-  private:
-    ZooKeeperProcess *zooKeeperProcess;
-    SetCall *setCall;
 
-  protected:
-    void operator () ()
-    {
-      send(zooKeeperProcess->self(),
-           SET,
-           reinterpret_cast<char *>(&setCall),
-           sizeof(SetCall *));
-      if (receive() != COMPLETED)
-	setCall->ret = ZSYSTEMERROR;
-    }
+int ZooKeeper::get(const string &path, bool watch, string *result, Stat *stat)
+{
+  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+  return Process::call(process, &ZooKeeperProcess::get,
+                       cref(path), watch, result, stat);
+}
 
-  public:
-    SetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
-		   SetCall *_setCall)
-      : zooKeeperProcess(_zooKeeperProcess),
-	setCall(_setCall)
-    {}
-  } setCallProcess(static_cast<ZooKeeperProcess *>(impl),
-		   &setCall);
 
-  Process::wait(Process::spawn(&setCallProcess));
+int ZooKeeper::getChildren(const string &path, bool watch,
+                           vector<string> *results)
+{
+  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+  return Process::call(process, &ZooKeeperProcess::getChildren,
+                       cref(path), watch, results);
+}
 
-  return setCall.ret;
+
+int ZooKeeper::set(const string &path, const string &data, int version)
+{
+  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+  return Process::call(process, &ZooKeeperProcess::set,
+                       cref(path), cref(data), version);
 }
 
 
@@ -879,7 +592,7 @@ const char * ZooKeeper::error(int ret) c
 // public:
 //   void process(ZooKeeper *zk, int type, int state, const string &path)
 //   {
-//     std::cout << "TestWatcher::process" << std::endl;
+//     cout << "TestWatcher::process" << endl;
 //   }
 // };
 

Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun  5 09:08:02 2011
@@ -1,6 +1,7 @@
 #include <signal.h>
 
-#include <cerrno>
+#include <glog/logging.h>
+
 #include <iostream>
 #include <string>
 #include <sstream>
@@ -25,8 +26,6 @@ using boost::bind;
 using boost::cref;
 using boost::unordered_map;
 
-using std::cerr;
-using std::endl;
 using std::string;
 
 
@@ -47,10 +46,12 @@ public:
 protected:
   virtual void operator () ()
   {
+    VLOG(1) << "Executor started at: " << self();
+
     link(slave);
 
     // Register with slave.
-    Message<E2S_REGISTER_EXECUTOR> out;
+    MSG<E2S_REGISTER_EXECUTOR> out;
     out.mutable_framework_id()->MergeFrom(frameworkId);
     out.mutable_executor_id()->MergeFrom(executorId);
     send(slave, out);
@@ -69,18 +70,24 @@ protected:
 
       switch(receive(2)) {
         case S2E_REGISTER_REPLY: {
-          const Message<S2E_REGISTER_REPLY>& msg = message();
+          const MSG<S2E_REGISTER_REPLY>& msg = message();
+
           slaveId = msg.args().slave_id();
+
+          VLOG(1) << "Executor registered on slave " << slaveId;
+
           invoke(bind(&Executor::init, executor, driver, cref(msg.args())));
           break;
         }
 
         case S2E_RUN_TASK: {
-          const Message<S2E_RUN_TASK>& msg = message();
+          const MSG<S2E_RUN_TASK>& msg = message();
 
           const TaskDescription& task = msg.task();
 
-          Message<E2S_STATUS_UPDATE> out;
+          VLOG(1) << "Executor asked to run a task " << task.task_id();
+
+          MSG<E2S_STATUS_UPDATE> out;
           out.mutable_framework_id()->MergeFrom(frameworkId);
           TaskStatus* status = out.mutable_status();
           status->mutable_task_id()->MergeFrom(task.task_id());
@@ -93,14 +100,20 @@ protected:
         }
 
         case S2E_KILL_TASK: {
-          const Message<S2E_KILL_TASK>& msg = message();
+          const MSG<S2E_KILL_TASK>& msg = message();
+
+          VLOG(1) << "Executor asked to kill task " << msg.task_id();
+
           invoke(bind(&Executor::killTask, executor, driver,
                       cref(msg.task_id())));
           break;
         }
 
         case S2E_FRAMEWORK_MESSAGE: {
-          const Message<S2E_FRAMEWORK_MESSAGE>& msg = message();
+          const MSG<S2E_FRAMEWORK_MESSAGE>& msg = message();
+
+          VLOG(1) << "Executor passed message";
+
           const FrameworkMessage& message = msg.message();
           invoke(bind(&Executor::frameworkMessage, executor, driver,
                       cref(message)));
@@ -108,6 +121,7 @@ protected:
         }
 
         case S2E_KILL_EXECUTOR: {
+          VLOG(1) << "Executor asked to shutdown";
           invoke(bind(&Executor::shutdown, executor, driver));
           if (!local)
             exit(0);
@@ -116,6 +130,8 @@ protected:
         }
 
         case PROCESS_EXIT: {
+          VLOG(1) << "Slave exited, trying to shutdown";
+
           // TODO: Pass an argument to shutdown to tell it this is abnormal?
           invoke(bind(&Executor::shutdown, executor, driver));
 
@@ -135,9 +151,9 @@ protected:
         }
 
         default: {
+          VLOG(1) << "Received unknown message ID " << msgid()
+                  << " from " << from();
           // TODO: Is this serious enough to exit?
-          cerr << "Received unknown message ID " << msgid()
-               << " from " << from() << endl;
           break;
         }
       }
@@ -307,7 +323,7 @@ int MesosExecutorDriver::sendStatusUpdat
   }
 
   // TODO(benh): Do a dispatch to Executor first?
-  Message<E2S_STATUS_UPDATE> out;
+  MSG<E2S_STATUS_UPDATE> out;
   out.mutable_framework_id()->MergeFrom(process->frameworkId);
   out.mutable_status()->MergeFrom(status);
   process->send(process->slave, out);
@@ -335,7 +351,7 @@ int MesosExecutorDriver::sendFrameworkMe
   }
 
   // TODO(benh): Do a dispatch to Executor first?
-  Message<E2S_FRAMEWORK_MESSAGE> out;
+  MSG<E2S_FRAMEWORK_MESSAGE> out;
   out.mutable_framework_id()->MergeFrom(process->frameworkId);
   out.mutable_message()->MergeFrom(message);
   process->send(process->slave, out);

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 09:08:02 2011
@@ -81,7 +81,7 @@ protected:
       receive();
       CHECK(msgid() == M2M_GET_STATE_REPLY);
 
-      const Message<M2M_GET_STATE_REPLY>& msg = message();
+      const MSG<M2M_GET_STATE_REPLY>& msg = message();
 
       state::MasterState *state =
         *(state::MasterState **) msg.pointer().data();
@@ -120,14 +120,16 @@ private:
 
 
 Master::Master()
-  : nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+  : MesosProcess("master"),
+    nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
 {
   allocatorType = "simple";
 }
 
 
 Master::Master(const Configuration& _conf)
-  : conf(_conf), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+  : MesosProcess("master"),
+    conf(_conf), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
 {
   allocatorType = conf.get("allocator", "simple");
 }
@@ -316,7 +318,7 @@ void Master::operator () ()
               << "we haven't received an identifier yet!";  
   }
 
-  const Message<GOT_MASTER_TOKEN>& msg = message();
+  const MSG<GOT_MASTER_TOKEN>& msg = message();
 
   // The master ID is comprised of the current date and some ephemeral
   // token (e.g., determined by ZooKeeper).
@@ -337,7 +339,7 @@ void Master::operator () ()
     switch (receive()) {
 
     case NEW_MASTER_DETECTED: {
-      const Message<NEW_MASTER_DETECTED>& msg = message();
+      const MSG<NEW_MASTER_DETECTED>& msg = message();
 
       // Check and see if we are (1) still waiting to be the active
       // master, (2) newly active master, (3) no longer active master,
@@ -372,7 +374,7 @@ void Master::operator () ()
     }
 
     case F2M_REGISTER_FRAMEWORK: {
-      const Message<F2M_REGISTER_FRAMEWORK>& msg = message();
+      const MSG<F2M_REGISTER_FRAMEWORK>& msg = message();
 
       Framework *framework =
         new Framework(msg.framework(), newFrameworkId(), from(), elapsed());
@@ -381,7 +383,7 @@ void Master::operator () ()
 
       if (framework->info.executor().uri() == "") {
         LOG(INFO) << framework << " registering without an executor URI";
-        Message<M2F_ERROR> out;
+        MSG<M2F_ERROR> out;
         out.set_code(1);
         out.set_message("No executor URI given");
         send(from(), out);
@@ -393,7 +395,7 @@ void Master::operator () ()
       if (framework->info.user() == "root" && rootSubmissions == false) {
         LOG(INFO) << framework << " registering as root, but "
                   << "root submissions are disabled on this cluster";
-        Message<M2F_ERROR> out;
+        MSG<M2F_ERROR> out;
         out.set_code(1);
         out.set_message("User 'root' is not allowed to run frameworks");
         send(from(), out);
@@ -406,11 +408,11 @@ void Master::operator () ()
     }
 
     case F2M_REREGISTER_FRAMEWORK: {
-      const Message<F2M_REREGISTER_FRAMEWORK> &msg = message();
+      const MSG<F2M_REREGISTER_FRAMEWORK> &msg = message();
 
       if (msg.framework_id() == "") {
         LOG(ERROR) << "Framework re-registering without an id!";
-        Message<M2F_ERROR> out;
+        MSG<M2F_ERROR> out;
         out.set_code(1);
         out.set_message("Missing framework id");
         send(from(), out);
@@ -420,7 +422,7 @@ void Master::operator () ()
       if (msg.framework().executor().uri() == "") {
         LOG(INFO) << "Framework " << msg.framework_id() << " re-registering "
                   << "without an executor URI";
-        Message<M2F_ERROR> out;
+        MSG<M2F_ERROR> out;
         out.set_code(1);
         out.set_message("No executor URI given");
         send(from(), out);
@@ -449,7 +451,7 @@ void Master::operator () ()
           LOG(INFO) << "Framework " << msg.framework_id()
                     << " re-registering with an already used id "
                     << " and not failing over!";
-          Message<M2F_ERROR> out;
+          MSG<M2F_ERROR> out;
           out.set_code(1);
           out.set_message("Framework id in use");
           send(from(), out);
@@ -483,7 +485,7 @@ void Master::operator () ()
       // it currently isn't running any tasks. This could be a
       // potential scalability issue ...
       foreachpair (_, Slave *slave, slaves) {
-        Message<M2S_UPDATE_FRAMEWORK> out;
+        MSG<M2S_UPDATE_FRAMEWORK> out;
         out.mutable_framework_id()->MergeFrom(msg.framework_id());
         out.set_pid(from());
         send(slave->pid, out);
@@ -492,7 +494,7 @@ void Master::operator () ()
     }
 
     case F2M_UNREGISTER_FRAMEWORK: {
-      const Message<F2M_UNREGISTER_FRAMEWORK>& msg = message();
+      const MSG<F2M_UNREGISTER_FRAMEWORK>& msg = message();
 
       LOG(INFO) << "Asked to unregister framework " << msg.framework_id();
 
@@ -508,7 +510,7 @@ void Master::operator () ()
     }
 
     case F2M_RESOURCE_OFFER_REPLY: {
-      const Message<F2M_RESOURCE_OFFER_REPLY>& msg = message();
+      const MSG<F2M_RESOURCE_OFFER_REPLY>& msg = message();
 
       Framework *framework = lookupFramework(msg.framework_id());
       if (framework != NULL) {
@@ -528,7 +530,7 @@ void Master::operator () ()
           // immediately report any tasks in it as lost (it would
           // probably be better to have better error messages here).
           foreach (const TaskDescription &task, tasks) {
-            Message<M2F_STATUS_UPDATE> out;
+            MSG<M2F_STATUS_UPDATE> out;
             out.mutable_framework_id()->MergeFrom(msg.framework_id());
             TaskStatus *status = out.mutable_status();
             status->mutable_task_id()->MergeFrom(task.task_id());
@@ -542,7 +544,7 @@ void Master::operator () ()
     }
 
     case F2M_REVIVE_OFFERS: {
-      const Message<F2M_REVIVE_OFFERS>& msg = message();
+      const MSG<F2M_REVIVE_OFFERS>& msg = message();
 
       Framework *framework = lookupFramework(msg.framework_id());
       if (framework != NULL) {
@@ -554,7 +556,7 @@ void Master::operator () ()
     }
 
     case F2M_KILL_TASK: {
-      const Message<F2M_KILL_TASK>& msg = message();
+      const MSG<F2M_KILL_TASK>& msg = message();
 
       LOG(INFO) << "Asked to kill task " << msg.task_id()
 		<< " of framework " << msg.framework_id();
@@ -568,7 +570,7 @@ void Master::operator () ()
 	  LOG(ERROR) << "Cannot kill task " << msg.task_id()
 		     << " of framework " << msg.framework_id()
 		     << " because it cannot be found";
-          Message<M2F_STATUS_UPDATE> out;
+          MSG<M2F_STATUS_UPDATE> out;
           out.mutable_framework_id()->MergeFrom(task->framework_id());
           TaskStatus *status = out.mutable_status();
           status->mutable_task_id()->MergeFrom(task->task_id());
@@ -581,13 +583,13 @@ void Master::operator () ()
     }
 
     case F2M_FRAMEWORK_MESSAGE: {
-      const Message<F2M_FRAMEWORK_MESSAGE>& msg = message();
+      const MSG<F2M_FRAMEWORK_MESSAGE>& msg = message();
 
       Framework *framework = lookupFramework(msg.framework_id());
       if (framework != NULL) {
         Slave *slave = lookupSlave(msg.message().slave_id());
         if (slave != NULL) {
-          Message<M2S_FRAMEWORK_MESSAGE> out;
+          MSG<M2S_FRAMEWORK_MESSAGE> out;
           out.mutable_framework_id()->MergeFrom(msg.framework_id());
           out.mutable_message()->MergeFrom(msg.message());
           send(slave->pid, out);
@@ -597,13 +599,13 @@ void Master::operator () ()
     }
 
     case F2M_STATUS_UPDATE_ACK: {
-      const Message<F2M_STATUS_UPDATE_ACK>& msg = message();
+      const MSG<F2M_STATUS_UPDATE_ACK>& msg = message();
 
       Framework *framework = lookupFramework(msg.framework_id());
       if (framework != NULL) {
         Slave *slave = lookupSlave(msg.slave_id());
         if (slave != NULL) {
-          Message<M2S_STATUS_UPDATE_ACK> out;
+          MSG<M2S_STATUS_UPDATE_ACK> out;
           out.mutable_framework_id()->MergeFrom(msg.framework_id());
           out.mutable_slave_id()->MergeFrom(msg.slave_id());
           out.mutable_task_id()->MergeFrom(msg.task_id());
@@ -614,7 +616,7 @@ void Master::operator () ()
     }
 
     case S2M_REGISTER_SLAVE: {
-      const Message<S2M_REGISTER_SLAVE>& msg = message();
+      const MSG<S2M_REGISTER_SLAVE>& msg = message();
 
       Slave* slave = new Slave(msg.slave(), newSlaveId(), from(), elapsed());
 
@@ -627,7 +629,7 @@ void Master::operator () ()
 
       allocator->slaveAdded(slave);
 
-      Message<M2S_REGISTER_REPLY> out;
+      MSG<M2S_REGISTER_REPLY> out;
       out.mutable_slave_id()->MergeFrom(slave->slaveId);
       out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
       send(slave->pid, out);
@@ -635,7 +637,7 @@ void Master::operator () ()
     }
 
     case S2M_REREGISTER_SLAVE: {
-      const Message<S2M_REREGISTER_SLAVE>& msg = message();
+      const MSG<S2M_REREGISTER_SLAVE>& msg = message();
 
       LOG(INFO) << "Re-registering " << msg.slave_id() << " at " << from();
 
@@ -662,7 +664,7 @@ void Master::operator () ()
       pidToSlaveId[slave->pid] = slave->slaveId;
       link(slave->pid);
 
-      Message<M2S_REREGISTER_REPLY> out;
+      MSG<M2S_REREGISTER_REPLY> out;
       out.mutable_slave_id()->MergeFrom(slave->slaveId);
       out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
       send(slave->pid, out);
@@ -675,7 +677,7 @@ void Master::operator () ()
         Framework *framework = lookupFramework(task->framework_id());
         if (framework != NULL) {
           framework->addTask(task);
-          Message<M2S_UPDATE_FRAMEWORK> out;
+          MSG<M2S_UPDATE_FRAMEWORK> out;
           out.mutable_framework_id()->MergeFrom(framework->frameworkId);
           out.set_pid(framework->pid);
           send(slave->pid, out);
@@ -689,7 +691,7 @@ void Master::operator () ()
     }
 
     case S2M_UNREGISTER_SLAVE: {
-      const Message<S2M_UNREGISTER_SLAVE>& msg = message();
+      const MSG<S2M_UNREGISTER_SLAVE>& msg = message();
 
       LOG(INFO) << "Asked to unregister slave " << msg.slave_id();
 
@@ -702,7 +704,7 @@ void Master::operator () ()
     }
 
     case S2M_STATUS_UPDATE: {
-      const Message<S2M_STATUS_UPDATE>& msg = message();
+      const MSG<S2M_STATUS_UPDATE>& msg = message();
 
       const TaskStatus& status = msg.status();
 
@@ -716,7 +718,7 @@ void Master::operator () ()
         Framework* framework = lookupFramework(msg.framework_id());
         if (framework != NULL) {
 	  // Pass on the (transformed) status update to the framework.
-          Message<M2F_STATUS_UPDATE> out;
+          MSG<M2F_STATUS_UPDATE> out;
           out.mutable_framework_id()->MergeFrom(msg.framework_id());
           out.mutable_status()->MergeFrom(status);
           send(framework->pid, out);
@@ -748,13 +750,13 @@ void Master::operator () ()
     }
 
     case S2M_FRAMEWORK_MESSAGE: {
-      const Message<S2M_FRAMEWORK_MESSAGE>& msg = message();
+      const MSG<S2M_FRAMEWORK_MESSAGE>& msg = message();
 
       Slave *slave = lookupSlave(msg.message().slave_id());
       if (slave != NULL) {
         Framework *framework = lookupFramework(msg.framework_id());
         if (framework != NULL) {
-          Message<M2S_FRAMEWORK_MESSAGE> out;
+          MSG<M2S_FRAMEWORK_MESSAGE> out;
           out.mutable_framework_id()->MergeFrom(msg.framework_id());
           out.mutable_message()->MergeFrom(msg.message());
           send(framework->pid, out);
@@ -764,7 +766,7 @@ void Master::operator () ()
     }
 
     case S2M_EXITED_EXECUTOR: {
-      const Message<S2M_EXITED_EXECUTOR>&msg = message();
+      const MSG<S2M_EXITED_EXECUTOR>&msg = message();
 
       Slave *slave = lookupSlave(msg.slave_id());
       if (slave != NULL) {
@@ -780,7 +782,7 @@ void Master::operator () ()
           foreachpaircopy (_, Task* task, framework->tasks) {
             if (task->slave_id() == slave->slaveId &&
                 task->executor_id() == msg.executor_id()) {
-              Message<M2F_STATUS_UPDATE> out;
+              MSG<M2F_STATUS_UPDATE> out;
               out.mutable_framework_id()->MergeFrom(task->framework_id());
               TaskStatus *status = out.mutable_status();
               status->mutable_task_id()->MergeFrom(task->task_id());
@@ -803,7 +805,7 @@ void Master::operator () ()
     }
 
     case SH2M_HEARTBEAT: {
-      const Message<SH2M_HEARTBEAT>& msg = message();
+      const MSG<SH2M_HEARTBEAT>& msg = message();
 
       Slave *slave = lookupSlave(msg.slave_id());
       if (slave != NULL) {
@@ -834,7 +836,7 @@ void Master::operator () ()
     }
 
     case M2M_FRAMEWORK_EXPIRED: {
-      const Message<M2M_FRAMEWORK_EXPIRED>&msg = message();
+      const MSG<M2M_FRAMEWORK_EXPIRED>&msg = message();
 
       Framework* framework = lookupFramework(msg.framework_id());
       if (framework != NULL) {
@@ -889,7 +891,7 @@ void Master::operator () ()
 
     case M2M_GET_STATE: {
       state::MasterState *state = getState();
-      Message<M2M_GET_STATE_REPLY> out;
+      MSG<M2M_GET_STATE_REPLY> out;
       out.set_pointer((char *) &state, sizeof(state));
       send(from(), out);
       break;
@@ -902,6 +904,13 @@ void Master::operator () ()
       return;
     }
 
+    case vars: {
+      LOG(INFO) << "HTTP request for 'vars'";
+      const string& data = conf.str();
+      Process::send(from(), "response", data.data(), data.size());
+      break;
+    }
+
     default:
       LOG(ERROR) << "Received unknown message (" << msgid()
                  << ") from " << from();
@@ -929,16 +938,16 @@ OfferID Master::makeOffer(Framework *fra
 
   LOG(INFO) << "Sending " << offer << " to " << framework;
 
-  Message<M2F_RESOURCE_OFFER> out;
+  MSG<M2F_RESOURCE_OFFER> out;
   out.mutable_offer_id()->MergeFrom(offerId);
 
   foreach (const SlaveResources& r, resources) {
-    SlaveOffer* offer = out.add_offer();
+    SlaveOffer* offer = out.add_offers();
     offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
     offer->set_hostname(r.slave->info.hostname());
     offer->mutable_resources()->MergeFrom(r.resources);
 
-    out.add_pid(r.slave->pid);
+    out.add_pids(r.slave->pid);
   }
 
   send(framework->pid, out);
@@ -1091,7 +1100,7 @@ void Master::launchTask(Framework* frame
 
   LOG(INFO) << "Launching " << t << " on " << slave;
 
-  Message<M2S_RUN_TASK> out;
+  MSG<M2S_RUN_TASK> out;
   out.mutable_framework()->MergeFrom(framework->info);
   out.mutable_framework_id()->MergeFrom(framework->frameworkId);
   out.set_pid(framework->pid);
@@ -1115,7 +1124,7 @@ void Master::killTask(Task *task)
   Slave *slave = lookupSlave(task->slave_id());
   CHECK(slave != NULL);
 
-  Message<M2S_KILL_TASK> out;
+  MSG<M2S_KILL_TASK> out;
   out.mutable_framework_id()->MergeFrom(framework->frameworkId);
   out.mutable_task_id()->MergeFrom(task->task_id());
   send(slave->pid, out);
@@ -1130,7 +1139,7 @@ void Master::terminateFramework(Framewor
 {
   LOG(INFO) << "Terminating " << framework << " due to error: " << message;
 
-  Message<M2F_ERROR> out;
+  MSG<M2F_ERROR> out;
   out.set_code(code);
   out.set_message(message);
   send(framework->pid, out);
@@ -1159,7 +1168,7 @@ void Master::removeSlotOffer(SlotOffer *
   // Also send framework a rescind message unless the reason we are
   // removing the offer is that the framework replied to it
   if (reason != ORR_FRAMEWORK_REPLIED) {
-    Message<M2F_RESCIND_OFFER> out;
+    MSG<M2F_RESCIND_OFFER> out;
     out.mutable_offer_id()->MergeFrom(offer->offerId);
     send(framework->pid, out);
   }
@@ -1181,7 +1190,7 @@ void Master::addFramework(Framework *fra
   pidToFrameworkId[framework->pid] = framework->frameworkId;
   link(framework->pid);
 
-  Message<M2F_REGISTER_REPLY> out;
+  MSG<M2F_REGISTER_REPLY> out;
   out.mutable_framework_id()->MergeFrom(framework->frameworkId);
   send(framework->pid, out);
 
@@ -1201,7 +1210,7 @@ void Master::failoverFramework(Framework
     removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
   }
 
-  Message<M2F_ERROR> out;
+  MSG<M2F_ERROR> out;
   out.set_code(1);
   out.set_message("Framework failover");
   send(oldPid, out);
@@ -1224,7 +1233,7 @@ void Master::failoverFramework(Framework
   // Make sure we can get offers again.
   framework->active = true;
 
-  Message<M2F_REGISTER_REPLY> reply;
+  MSG<M2F_REGISTER_REPLY> reply;
   reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
   send(newPid, reply);
 }
@@ -1239,7 +1248,7 @@ void Master::removeFramework(Framework *
   
   // Tell slaves to kill the framework
   foreachpair (_, Slave *slave, slaves) {
-    Message<M2S_KILL_FRAMEWORK> out;
+    MSG<M2S_KILL_FRAMEWORK> out;
     out.mutable_framework_id()->MergeFrom(framework->frameworkId);
     send(slave->pid, out);
   }
@@ -1287,7 +1296,7 @@ void Master::removeSlave(Slave *slave)
     // framework until it fails over. See the TODO above in
     // S2M_REREGISTER_SLAVE.
     if (framework != NULL) {
-      Message<M2F_STATUS_UPDATE> out;
+      MSG<M2F_STATUS_UPDATE> out;
       out.mutable_framework_id()->MergeFrom(task->framework_id());
       TaskStatus *status = out.mutable_status();
       status->mutable_task_id()->MergeFrom(task->task_id());
@@ -1317,7 +1326,7 @@ void Master::removeSlave(Slave *slave)
   // Send lost-slave message to all frameworks (this helps them re-run
   // previously finished tasks whose output was on the lost slave)
   foreachpair (_, Framework *framework, frameworks) {
-    Message<M2F_LOST_SLAVE> out;
+    MSG<M2F_LOST_SLAVE> out;
     out.mutable_slave_id()->MergeFrom(slave->slaveId);
     send(framework->pid, out);
   }

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun  5 09:08:02 2011
@@ -12,7 +12,7 @@
 #include <string>
 #include <vector>
 
-#include <reliable.hpp>
+#include <process.hpp>
 
 #include <glog/logging.h>
 
@@ -97,7 +97,7 @@ protected:
     do {
       switch (receive(FRAMEWORK_FAILOVER_TIMEOUT)) {
         case PROCESS_TIMEOUT: {
-          Message<M2M_FRAMEWORK_EXPIRED> msg;
+          MSG<M2M_FRAMEWORK_EXPIRED> msg;
           msg.mutable_framework_id()->set_value(frameworkId.value());
           send(master, msg);
           return;

Modified: incubator/mesos/trunk/src/master/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/webui.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/webui.cpp (original)
+++ incubator/mesos/trunk/src/master/webui.cpp Sun Jun  5 09:08:02 2011
@@ -87,7 +87,7 @@ public:
     receive();
     CHECK(msgid() == M2M_GET_STATE_REPLY);
 
-    const Message<M2M_GET_STATE_REPLY>& msg = message();
+    const MSG<M2M_GET_STATE_REPLY>& msg = message();
 
     masterState =
       *(state::MasterState **) msg.pointer().data();

Added: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132253&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (added)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun  5 09:08:02 2011
@@ -0,0 +1,109 @@
+#include "messaging/messages.hpp"
+
+
+namespace mesos { namespace internal {
+
+boost::unordered_map<std::string, MSGID> MesosProcess::ids;
+boost::unordered_map<MSGID, std::string> MesosProcess::names;
+
+
+static struct Initialization
+{
+  Initialization()
+  {
+    MesosProcess::ids[EXIT] = PROCESS_EXIT;
+    MesosProcess::names[PROCESS_EXIT] = EXIT;
+
+    MesosProcess::ids[TIMEOUT] = PROCESS_TIMEOUT;
+    MesosProcess::names[PROCESS_TIMEOUT] = TIMEOUT;
+  }
+} __initialization__;
+
+
+struct InitializeMessage
+{
+  InitializeMessage(const std::string& name, MSGID id)
+  {
+    MesosProcess::ids[name] = id;
+    MesosProcess::names[id] = name;
+  }
+};
+
+
+#define INITIALIZE_MESSAGE(ID)               \
+    static InitializeMessage __ ## ID(#ID, ID)
+
+
+INITIALIZE_MESSAGE(F2M_REGISTER_FRAMEWORK);
+INITIALIZE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
+INITIALIZE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
+INITIALIZE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
+INITIALIZE_MESSAGE(F2M_REVIVE_OFFERS);
+INITIALIZE_MESSAGE(F2M_KILL_TASK);
+INITIALIZE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(F2M_STATUS_UPDATE_ACK);
+
+INITIALIZE_MESSAGE(M2F_REGISTER_REPLY);
+INITIALIZE_MESSAGE(M2F_RESOURCE_OFFER);
+INITIALIZE_MESSAGE(M2F_RESCIND_OFFER);
+INITIALIZE_MESSAGE(M2F_STATUS_UPDATE);
+INITIALIZE_MESSAGE(M2F_LOST_SLAVE);
+INITIALIZE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(M2F_ERROR);
+
+INITIALIZE_MESSAGE(S2M_REGISTER_SLAVE);
+INITIALIZE_MESSAGE(S2M_REREGISTER_SLAVE);
+INITIALIZE_MESSAGE(S2M_UNREGISTER_SLAVE);
+INITIALIZE_MESSAGE(S2M_STATUS_UPDATE);
+INITIALIZE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(S2M_EXITED_EXECUTOR);
+
+INITIALIZE_MESSAGE(SH2M_HEARTBEAT);
+  
+INITIALIZE_MESSAGE(M2S_REGISTER_REPLY);
+INITIALIZE_MESSAGE(M2S_REREGISTER_REPLY);
+INITIALIZE_MESSAGE(M2S_RUN_TASK);
+INITIALIZE_MESSAGE(M2S_KILL_TASK);
+INITIALIZE_MESSAGE(M2S_KILL_FRAMEWORK);
+INITIALIZE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(M2S_UPDATE_FRAMEWORK);
+INITIALIZE_MESSAGE(M2S_STATUS_UPDATE_ACK);
+INITIALIZE_MESSAGE(M2S_SHUTDOWN);
+
+INITIALIZE_MESSAGE(E2S_REGISTER_EXECUTOR);
+INITIALIZE_MESSAGE(E2S_STATUS_UPDATE);
+INITIALIZE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
+
+INITIALIZE_MESSAGE(S2E_REGISTER_REPLY);
+INITIALIZE_MESSAGE(S2E_RUN_TASK);
+INITIALIZE_MESSAGE(S2E_KILL_TASK);
+INITIALIZE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(S2E_KILL_EXECUTOR);
+
+#ifdef __sun__
+INITIALIZE_MESSAGE(PD2S_REGISTER_PROJD);
+INITIALIZE_MESSAGE(PD2S_PROJD_READY);
+INITIALIZE_MESSAGE(S2PD_UPDATE_RESOURCES);
+INITIALIZE_MESSAGE(S2PD_KILL_ALL);
+#endif // __sun__
+
+INITIALIZE_MESSAGE(M2M_GET_STATE);
+INITIALIZE_MESSAGE(M2M_GET_STATE_REPLY);
+INITIALIZE_MESSAGE(M2M_TIMER_TICK);
+INITIALIZE_MESSAGE(M2M_FRAMEWORK_EXPIRED);
+INITIALIZE_MESSAGE(M2M_SHUTDOWN);
+
+INITIALIZE_MESSAGE(S2S_GET_STATE);
+INITIALIZE_MESSAGE(S2S_GET_STATE_REPLY);
+INITIALIZE_MESSAGE(S2S_SHUTDOWN);
+
+INITIALIZE_MESSAGE(GOT_MASTER_TOKEN);
+INITIALIZE_MESSAGE(NEW_MASTER_DETECTED);
+INITIALIZE_MESSAGE(NO_MASTER_DETECTED);
+INITIALIZE_MESSAGE(MASTER_DETECTION_FAILURE);
+
+INITIALIZE_MESSAGE(vars);
+
+INITIALIZE_MESSAGE(MESOS_MSGID);
+
+}} // namespace mesos { namespace internal {