You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC

svn commit: r1236485 [3/7] - in /incubator/mesos/trunk: ./ include/mesos/ src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/ src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/ third_party/libprocess/include/pr...

Modified: incubator/mesos/trunk/src/zookeeper/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/zookeeper.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/zookeeper.cpp Fri Jan 27 01:25:13 2012
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-#include <assert.h>
-
 #include <glog/logging.h>
 
 #include <iostream>
@@ -32,12 +30,6 @@
 
 #include "zookeeper/zookeeper.hpp"
 
-// DO NOT REMOVE! Removing this will require also changing which
-// ZooKeeper library get's used for linking, right now the Makefile is
-// assuming the multithreaded library will get used ...
-#define USE_THREADED_ZOOKEEPER
-
-using boost::cref;
 using boost::tuple;
 
 using process::Future;
@@ -45,9 +37,6 @@ using process::PID;
 using process::Process;
 using process::Promise;
 
-using std::cerr;
-using std::cout;
-using std::endl;
 using std::map;
 using std::string;
 using std::vector;
@@ -110,7 +99,7 @@ public:
    if (processes.count(watcher) > 0) {
       WatcherProcess* process = processes[watcher];
       processes.erase(watcher);
-      process::post(process->self(), process::TERMINATE);
+      process::terminate(process->self());
       process::wait(process->self());
       delete process;
       return true;
@@ -151,7 +140,9 @@ Watcher::Watcher()
   while (initializing);
 
   WatcherProcess* process =
-    process::call(manager->self(), &WatcherProcessManager::create, this);
+    process::dispatch(manager->self(),
+                      &WatcherProcessManager::create,
+                      this).get();
 
   if (process == NULL) {
     fatal("failed to initialize Watcher");
@@ -161,15 +152,12 @@ Watcher::Watcher()
 
 Watcher::~Watcher()
 {
-  process::call(manager->self(), &WatcherProcessManager::destroy, this);
+  process::dispatch(manager->self(), &WatcherProcessManager::destroy, this)
+    .await();
 }
 
 
-#ifndef USE_THREADED_ZOOKEEPER
-class ZooKeeperImpl : public Process<ZooKeeperImpl>
-#else
 class ZooKeeperImpl
-#endif // USE_THREADED_ZOOKEEPER
 {
 public:
   ZooKeeperImpl(ZooKeeper* zk,
@@ -179,11 +167,13 @@ public:
     : zk(zk), servers(servers), timeout(timeout), watcher(watcher)
   {
     if (watcher == NULL) {
-      fatalerror("cannot instantiate ZooKeeper with NULL watcher");
+      LOG(FATAL) << "Cannot instantiate ZooKeeper with NULL watcher";
     }
 
     // Lookup PID of the WatcherProcess associated with the Watcher.
-    pid = call(manager->self(), &WatcherProcessManager::lookup, watcher);
+    pid = process::dispatch(manager->self(),
+                            &WatcherProcessManager::lookup,
+                            watcher).get();
 
     // N.B. The Watcher and thus WatcherProcess may already be gone,
     // in which case, each dispatch to the WatcherProcess that we do
@@ -193,7 +183,7 @@ public:
 
     zh = zookeeper_init(servers.c_str(), event, timeout.value, NULL, this, 0);
     if (zh == NULL) {
-      fatalerror("failed to create ZooKeeper (zookeeper_init)");
+      PLOG(FATAL) << "Failed to create ZooKeeper, zookeeper_init";
     }
   }
 
@@ -201,222 +191,155 @@ public:
   {
     int ret = zookeeper_close(zh);
     if (ret != ZOK) {
-      fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
+      LOG(FATAL) << "Failed to cleanup ZooKeeper, zookeeper_close: "
+                 << zerror(ret);
     }
   }
 
-  Promise<int> authenticate(const string& scheme, const string& credentials)
+  Future<int> authenticate(const string& scheme, const string& credentials)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
+
+    Future<int> future = promise->future();
 
-    tuple<Promise<int> >* args = new tuple<Promise<int> >(promise);
+    tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
 
     int ret = zoo_add_auth(zh, scheme.c_str(), credentials.data(),
                            credentials.size(), voidCompletion, args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
+    return future;
   }
 
-  Promise<int> create(const string& path, const string& data,
-                      const ACL_vector& acl, int flags, string* result)
+  Future<int> create(const string& path, const string& data,
+                     const ACL_vector& acl, int flags, string* result)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
+
+    Future<int> future = promise->future();
 
-    tuple<Promise<int>, string*>* args =
-      new tuple<Promise<int>, string*>(promise, result);
+    tuple<Promise<int>*, string*>* args =
+      new tuple<Promise<int>*, string*>(promise, result);
 
     int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
                           flags, stringCompletion, args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
+    return future;
   }
 
-  Promise<int> remove(const string& path, int version)
+  Future<int> remove(const string& path, int version)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
+
+    Future<int> future = promise->future();
 
-    tuple<Promise<int> >* args = new tuple<Promise<int> >(promise);
+    tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
 
     int ret = zoo_adelete(zh, path.c_str(), version, voidCompletion, args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
+    return future;
   }
 
-  Promise<int> exists(const string& path, bool watch, Stat* stat)
+  Future<int> exists(const string& path, bool watch, Stat* stat)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
 
-    tuple<Promise<int>, Stat*>* args =
-      new tuple<Promise<int>, Stat*>(promise, stat);
+    Future<int> future = promise->future();
+
+    tuple<Promise<int>*, Stat*>* args =
+      new tuple<Promise<int>*, Stat*>(promise, stat);
 
     int ret = zoo_aexists(zh, path.c_str(), watch, statCompletion, args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
+    return future;
   }
 
-  Promise<int> get(const string& path, bool watch, string* result, Stat* stat)
+  Future<int> get(const string& path, bool watch, string* result, Stat* stat)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
+
+    Future<int> future = promise->future();
 
-    tuple<Promise<int>, string*, Stat*>* args =
-      new tuple<Promise<int>, string*, Stat*>(promise, result, stat);
+    tuple<Promise<int>*, string*, Stat*>* args =
+      new tuple<Promise<int>*, string*, Stat*>(promise, result, stat);
 
     int ret = zoo_aget(zh, path.c_str(), watch, dataCompletion, args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
+    return future;
   }
 
-  Promise<int> getChildren(const string& path, bool watch,
-                           vector<string>* results)
+  Future<int> getChildren(const string& path,
+                          bool watch,
+                          vector<string>* results)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
 
-    tuple<Promise<int>, vector<string>*>* args =
-      new tuple<Promise<int>, vector<string>*>(promise, results);
+    Future<int> future = promise->future();
+
+    tuple<Promise<int>*, vector<string>*>* args =
+      new tuple<Promise<int>*, vector<string>*>(promise, results);
 
     int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
                                 args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
+    return future;
   }
 
-  Promise<int> set(const string& path, const string& data, int version)
+  Future<int> set(const string& path, const string& data, int version)
   {
-    Promise<int> promise;
+    Promise<int>* promise = new Promise<int>();
+
+    Future<int> future = promise->future();
 
-    tuple<Promise<int>, Stat*>* args =
-      new tuple<Promise<int>, Stat*>(promise, NULL);
+    tuple<Promise<int>*, Stat*>* args =
+      new tuple<Promise<int>*, Stat*>(promise, NULL);
 
     int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
                        version, statCompletion, args);
 
     if (ret != ZOK) {
-      promise.set(ret);
+      delete promise;
       delete args;
+      return ret;
     }
 
-    return promise;
-  }
-
-#ifndef USE_THREADED_ZOOKEEPER
-protected:
-  virtual void operator () ()
-  {
-    while (true) {
-      int fd;
-      int ops;
-      timeval tv;
-
-      prepare(&fd, &ops, &tv);
-
-      double secs = tv.tv_sec + (tv.tv_usec * 1e-6);
-
-      // Cause await to return immediately if the file descriptor is
-      // not valid (for example because the connection timed out) and
-      // secs is 0 because that will block indefinitely.
-      if (fd == -1 && secs == 0) {
-	secs = -1;
-      }
-
-      if (poll(fd, ops, secs, false)) {
-	// Either timer expired (might be 0) or data became available on fd.
-	process(fd, ops);
-      } else {
-        // Okay, a message must have been received. Handle only one
-        // message at a time so as not to delay any necessary internal
-        // processing.
-        serve(0, true);
-        if (name() == process::TERMINATE) {
-          return;
-        } else if (name() != process::NOTHING) {
-          fatal("unexpected interruption of 'poll'");
-        }
-      }
-    }
-  }
-
-  bool prepare(int* fd, int* ops, timeval* tv)
-  {
-    int interest = 0;
-
-    int ret = zookeeper_interest(zh, fd, &interest, tv);
-
-    // If in some disconnected state, try again later.
-    if (ret == ZINVALIDSTATE ||
-        ret == ZCONNECTIONLOSS ||
-	ret == ZOPERATIONTIMEOUT) {
-      return false;
-    }
-
-    if (ret != ZOK) {
-      fatal("zookeeper_interest failed! (%s)", zerror(ret));
-    }
-
-    *ops = 0;
-
-    if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
-      *ops |= RDWR;
-    } else if (interest & ZOOKEEPER_READ) {
-      *ops |= RDONLY;
-    } else if (interest & ZOOKEEPER_WRITE) {
-      *ops |= WRONLY;
-    }
-
-    return true;
-  }
-
-  void process(int fd, int ops)
-  {
-    int events = 0;
-
-    if (ready(fd, RDONLY)) {
-      events |= ZOOKEEPER_READ;
-    } if (ready(fd, WRONLY)) {
-      events |= ZOOKEEPER_WRITE;
-    }
-
-    int ret = zookeeper_process(zh, events);
-
-    // If in some disconnected state, try again later.
-    if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
-      return;
-    }
-
-    if (ret != ZOK && ret != ZNOTHING) {
-      fatal("zookeeper_process failed! (%s)", zerror(ret));
-    }
+    return future;
   }
-#endif // USE_THREADED_ZOOKEEPER
 
 private:
   static void event(zhandle_t* zh, int type, int state,
@@ -430,23 +353,24 @@ private:
 
   static void voidCompletion(int ret, const void *data)
   {
-    const tuple<Promise<int> >* args =
-      reinterpret_cast<const tuple<Promise<int> >*>(data);
+    const tuple<Promise<int>*>* args =
+      reinterpret_cast<const tuple<Promise<int>*>*>(data);
 
-    Promise<int> promise = (*args).get<0>();
+    Promise<int>* promise = (*args).get<0>();
 
-    promise.set(ret);
+    promise->set(ret);
 
+    delete promise;
     delete args;
   }
 
 
   static void stringCompletion(int ret, const char* value, const void* data)
   {
-    const tuple<Promise<int>, string*> *args =
-      reinterpret_cast<const tuple<Promise<int>, string*>*>(data);
+    const tuple<Promise<int>*, string*> *args =
+      reinterpret_cast<const tuple<Promise<int>*, string*>*>(data);
 
-    Promise<int> promise = (*args).get<0>();
+    Promise<int>* promise = (*args).get<0>();
     string* result = (*args).get<1>();
 
     if (ret == 0) {
@@ -455,18 +379,19 @@ private:
       }
     }
 
-    promise.set(ret);
+    promise->set(ret);
 
+    delete promise;
     delete args;
   }
 
 
   static void statCompletion(int ret, const Stat* stat, const void* data)
   {
-    const tuple<Promise<int>, Stat*>* args =
-      reinterpret_cast<const tuple<Promise<int>, Stat*>*>(data);
+    const tuple<Promise<int>*, Stat*>* args =
+      reinterpret_cast<const tuple<Promise<int>*, Stat*>*>(data);
 
-    Promise<int> promise = (*args).get<0>();
+    Promise<int>* promise = (*args).get<0>();
     Stat *stat_result = (*args).get<1>();
 
     if (ret == 0) {
@@ -475,8 +400,9 @@ private:
       }
     }
 
-    promise.set(ret);
+    promise->set(ret);
 
+    delete promise;
     delete args;
   }
 
@@ -484,10 +410,10 @@ private:
   static void dataCompletion(int ret, const char* value, int value_len,
 			     const Stat* stat, const void* data)
   {
-    const tuple<Promise<int>, string*, Stat*>* args =
-      reinterpret_cast<const tuple<Promise<int>, string*, Stat*>*>(data);
+    const tuple<Promise<int>*, string*, Stat*>* args =
+      reinterpret_cast<const tuple<Promise<int>*, string*, Stat*>*>(data);
 
-    Promise<int> promise = (*args).get<0>();
+    Promise<int>* promise = (*args).get<0>();
     string* result = (*args).get<1>();
     Stat* stat_result = (*args).get<2>();
 
@@ -501,8 +427,9 @@ private:
       }
     }
 
-    promise.set(ret);
+    promise->set(ret);
 
+    delete promise;
     delete args;
   }
 
@@ -510,10 +437,10 @@ private:
   static void stringsCompletion(int ret, const String_vector* values,
 				const void* data)
   {
-    const tuple<Promise<int>, vector<string>*>* args =
-      reinterpret_cast<const tuple<Promise<int>, vector<string>*>*>(data);
+    const tuple<Promise<int>*, vector<string>*>* args =
+      reinterpret_cast<const tuple<Promise<int>*, vector<string>*>*>(data);
 
-    Promise<int> promise = (*args).get<0>();
+    Promise<int>* promise = (*args).get<0>();
     vector<string>* results = (*args).get<1>();
 
     if (ret == 0) {
@@ -524,8 +451,9 @@ private:
       }
     }
 
-    promise.set(ret);
+    promise->set(ret);
 
+    delete promise;
     delete args;
   }
 
@@ -548,18 +476,11 @@ ZooKeeper::ZooKeeper(const string& serve
                      Watcher* watcher)
 {
   impl = new ZooKeeperImpl(this, servers, timeout, watcher);
-#ifndef USE_THREADED_ZOOKEEPER
-  process::spawn(impl);
-#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 ZooKeeper::~ZooKeeper()
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  process::post(impl->self(), process::TERMINATE);
-  process::wait(impl->self());
-#endif // USE_THREADED_ZOOKEEPER
   delete impl;
 }
 
@@ -578,87 +499,45 @@ int64_t ZooKeeper::getSessionId()
 
 int ZooKeeper::authenticate(const string& scheme, const string& credentials)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::authenticate,
-                       cref(scheme), cref(credentials));
-#else
-  Promise<int> promise = impl->authenticate(scheme, credentials);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->authenticate(scheme, credentials).get();
 }
 
 
 int ZooKeeper::create(const string& path, const string& data,
                       const ACL_vector& acl, int flags, string* result)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::create,
-                       cref(path), cref(data), cref(acl), flags, result);
-#else
-  Promise<int> promise = impl->create(path, data, acl, flags, result);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->create(path, data, acl, flags, result).get();
 }
 
 
 int ZooKeeper::remove(const string& path, int version)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::remove,
-                       cref(path), version);
-#else
-  Promise<int> promise = impl->remove(path, version);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->remove(path, version).get();
 }
 
 
 int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::exists,
-                       cref(path), watch, stat);
-#else
-  Promise<int> promise = impl->exists(path, watch, stat);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->exists(path, watch, stat).get();
 }
 
 
 int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::get,
-                       cref(path), watch, result, stat);
-#else
-  Promise<int> promise = impl->get(path, watch, result, stat);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->get(path, watch, result, stat).get();
 }
 
 
 int ZooKeeper::getChildren(const string& path, bool watch,
                            vector<string>* results)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::getChildren,
-                       cref(path), watch, results);
-#else
-  Promise<int> promise = impl->getChildren(path, watch, results);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->getChildren(path, watch, results).get();
 }
 
 
 int ZooKeeper::set(const string& path, const string& data, int version)
 {
-#ifndef USE_THREADED_ZOOKEEPER
-  return process::call(impl->self(), &ZooKeeperImpl::set,
-                       cref(path), cref(data), version);
-#else
-  Promise<int> promise = impl->set(path, data, version);
-  return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+  return impl->set(path, data, version).get();
 }
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/Makefile.in?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/third_party/libprocess/Makefile.in Fri Jan 27 01:25:13 2012
@@ -74,8 +74,7 @@ GMOCK_OBJ = gmock-all.o
 
 GMOCK_LIB = gmock.a
 
-LIBPROCESS_OBJ = src/process.o src/pid.o src/fatal.o src/tokenize.o	\
-                 src/latch.o src/timer.o
+LIBPROCESS_OBJ = src/process.o src/pid.o src/latch.o
 
 LIBPROCESS_LIB = libprocess.a
 

Added: incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,25 @@
+#ifndef __PROCESS_CLOCK_HPP__
+#define __PROCESS_CLOCK_HPP__
+
+namespace process {
+
+// Forward declarations.
+class ProcessBase;
+
+class Clock
+{
+public:
+  static double now();
+  static double now(ProcessBase* process);
+  static void pause();
+  static bool paused();
+  static void resume();
+  static void advance(double secs);
+  static void update(double secs);
+  static void update(ProcessBase* process, double secs);
+  static void order(ProcessBase* from, ProcessBase* to);
+};
+
+} // namespace process {
+
+#endif // __PROCESS_CLOCK_HPP__

Added: incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,272 @@
+#ifndef __PROCESS_DEFER_HPP__
+#define __PROCESS_DEFER_HPP__
+
+#include <process/deferred.hpp>
+#include <process/dispatch.hpp>
+
+namespace process {
+
+// The defer mechanism is very similar to the dispatch mechanism (see
+// dispatch.hpp), however, rather than scheduling the method to get
+// invoked, the defer mechanism returns a 'deferred' object that when
+// invoked does the underlying dispatch. Similar to dispatch, we
+// provide the C++11 variadic template definitions first, and then use
+// Boost preprocessor macros to provide the actual definitions.
+
+// First, definitions of defer for methods returning void:
+//
+// template <typename T, typename ...P>
+// deferred<void(void)> void defer(const PID<T>& pid,
+//                                 void (T::*method)(P...),
+//                                 P... p)
+// {
+//   void (*dispatch)(const PID<T>&, void (T::*)(P...), P...) =
+//     &process::template dispatch<T, P...>;
+
+//   return deferred<void(void)>(
+//       std::tr1::bind(dispatch, pid, method, std::forward<P>(p)...));
+// }
+
+template <typename T>
+deferred<void(void)> defer(
+    const PID<T>& pid,
+    void (T::*method)(void))
+{
+  void (*dispatch)(const PID<T>&, void (T::*)(void)) =
+    &process::template dispatch<T>;
+
+  return deferred<void(void)>(
+      std::tr1::bind(dispatch, pid, method));
+}
+
+template <typename T>
+deferred<void(void)> defer(
+    const Process<T>& process,
+    void (T::*method)(void))
+{
+  return defer(process.self(), method);
+}
+
+template <typename T>
+deferred<void(void)> defer(
+    const Process<T>* process,
+    void (T::*method)(void))
+{
+  return defer(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<void(void)> defer(const PID<T>& pid,                         \
+                             void (T::*method)(ENUM_PARAMS(N, P)),      \
+                             ENUM_BINARY_PARAMS(N, A, a))               \
+  {                                                                     \
+    void (*dispatch)(const PID<T>&, void (T::*)(ENUM_PARAMS(N, P))) =   \
+      &process::template dispatch<T, ENUM_PARAMS(N, P), ENUM_PARAMS(N, A)>; \
+                                                                        \
+    return deferred<void(void)>(                                        \
+        std::tr1::bind(dispatch, pid, method, ENUM_PARAMS(N, a)));      \
+  }                                                                     \
+                                                                        \
+  template <typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<void(void)> defer(const Process<T>& process,                 \
+                             void (T::*method)(ENUM_PARAMS(N, P)),      \
+                             ENUM_BINARY_PARAMS(N, A, a))               \
+  {                                                                     \
+    return defer(process.self(), method, ENUM_PARAMS(N, a));            \
+  }                                                                     \
+                                                                        \
+  template <typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<void(void)> defer(const Process<T>* process,                 \
+                             void (T::*method)(ENUM_PARAMS(N, P)),      \
+                             ENUM_BINARY_PARAMS(N, A, a))               \
+  {                                                                     \
+    return defer(process->self(), method, ENUM_PARAMS(N, a));           \
+  }
+
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+
+// Next, definitions of defer for methods returning future:
+//
+// template <typename R, typename T, typename ...P>
+// deferred<Future<R>(void)> void defer(const PID<T>& pid,
+//                                      Future<R> (T::*method)(P...),
+//                                      P... p)
+// {
+//   Future<R> (*dispatch)(const PID<T>&, Future<R> (T::*)(P...), P...) =
+//     &process::template dispatch<R, T, P...>;
+//
+//   return deferred<Future<R>(void)>(
+//       std::tr1::bind(dispatch, pid, method, std::forward<P>(p)...));
+// }
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+    const PID<T>& pid,
+    Future<R> (T::*method)(void))
+{
+  Future<R> (*dispatch)(const PID<T>&, Future<R> (T::*)(void)) =
+    &process::template dispatch<R, T>;
+
+  return deferred<Future<R>(void)>(
+      std::tr1::bind(dispatch, pid, method));
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+    const Process<T>& process,
+    Future<R> (T::*method)(void))
+{
+  return defer(process.self(), method);
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+    const Process<T>* process,
+    Future<R> (T::*method)(void))
+{
+  return defer(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<Future<R>(void)> defer(                                      \
+      const PID<T>& pid,                                                \
+      Future<R> (T::*method)(ENUM_PARAMS(N, P)),                       \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    Future<R> (*dispatch)(const PID<T>&, Future<R> (T::*)(ENUM_PARAMS(N, P)), ENUM_PARAMS(N, A)) = \
+      &process::template dispatch<R, T, ENUM_PARAMS(N, P), ENUM_PARAMS(N, A)>; \
+                                                                        \
+    return deferred<Future<R>(void)>(                                   \
+        std::tr1::bind(dispatch, pid, method, ENUM_PARAMS(N, a)));      \
+  }                                                                     \
+                                                                        \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<Future<R>(void)> defer(                                      \
+      const Process<T>& process,                                        \
+      Future<R> (T::*method)(ENUM_PARAMS(N, P)),                       \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    return defer(process.self(), method, ENUM_PARAMS(N, a));            \
+  }                                                                     \
+                                                                        \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<Future<R>(void)> defer(                                      \
+      const Process<T>* process,                                        \
+      Future<R> (T::*method)(ENUM_PARAMS(N, P)),                       \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    return defer(process->self(), method, ENUM_PARAMS(N, a));           \
+  }
+
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+
+// Next, definitions of defer for methods returning a value:
+//
+// template <typename R, typename T, typename ...P>
+// deferred<Future<R>(void)> void defer(const PID<T>& pid,
+//                                      R (T::*method)(P...),
+//                                      P... p)
+// {
+//   Future<R> (*dispatch)(const PID<T>&, R (T::*)(P...), P...) =
+//     &process::template dispatch<R, T, P...>;
+//
+//   return deferred<Future<R>(void)>(
+//       std::tr1::bind(dispatch, pid, method, std::forward<P>(p)...));
+// }
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+    const PID<T>& pid,
+    R (T::*method)(void))
+{
+  Future<R> (*dispatch)(const PID<T>&, R (T::*)(void)) =
+    &process::template dispatch<R, T>;
+
+  return deferred<Future<R>(void)>(
+      std::tr1::bind(dispatch, pid, method));
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+    const Process<T>& process,
+    R (T::*method)(void))
+{
+  return defer(process.self(), method);
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+    const Process<T>* process,
+    R (T::*method)(void))
+{
+  return defer(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<Future<R>(void)> defer(                                      \
+      const PID<T>& pid,                                                \
+      R (T::*method)(ENUM_PARAMS(N, P)),                                \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    Future<R> (*dispatch)(const PID<T>&, R (T::*)(ENUM_PARAMS(N, P)), ENUM_PARAMS(N, A)) = \
+      &process::template dispatch<R, T, ENUM_PARAMS(N, P), ENUM_PARAMS(N, A)>; \
+                                                                        \
+    return deferred<Future<R>(void)>(                                   \
+        std::tr1::bind(dispatch, pid, method, ENUM_PARAMS(N, a)));      \
+  }                                                                     \
+                                                                        \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<Future<R>(void)> defer(                                      \
+      const Process<T>& process,                                        \
+      R (T::*method)(ENUM_PARAMS(N, P)),                                \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    return defer(process.self(), method, ENUM_PARAMS(N, a));            \
+  }                                                                     \
+                                                                        \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  deferred<Future<R>(void)> defer(                                      \
+      const Process<T>* process,                                        \
+      R (T::*method)(ENUM_PARAMS(N, P)),                                \
+      ENUM_BINARY_PARAMS(N, A, a))                                      \
+  {                                                                     \
+    return defer(process->self(), method, ENUM_PARAMS(N, a));           \
+  }
+
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+} // namespace process {
+
+#endif // __PROCESS_DEFER_HPP__

Added: incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,76 @@
+#ifndef __PROCESS_DEFERRED_HPP__
+#define __PROCESS_DEFERRED_HPP__
+
+#include <tr1/functional>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/preprocessor.hpp>
+
+namespace process {
+
+// Acts like a function call but runs within an asynchronous execution
+// context such as an Executor or a ProcessBase (since only an
+// executor or the 'defer' routines are allowed to create them).
+template <typename F>
+struct deferred : std::tr1::function<F>
+{
+private:
+  // Only an Executor and the 'defer' routines can create these.
+  friend class Executor;
+
+  template <typename T>
+  friend deferred<void(void)> defer(
+      const PID<T>& pid,
+      void (T::*method)(void));
+
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  friend deferred<void(void)> defer(const PID<T>& pid,                  \
+                                    void (T::*method)(ENUM_PARAMS(N, P)), \
+                                    ENUM_BINARY_PARAMS(N, A, a));
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+  template <typename R, typename T>
+  friend deferred<Future<R>(void)> defer(
+      const PID<T>& pid,
+      Future<R> (T::*method)(void));
+
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  friend deferred<Future<R>(void)> defer(                               \
+      const PID<T>& pid,                                                \
+      Future<R> (T::*method)(ENUM_PARAMS(N, P)),                        \
+      ENUM_BINARY_PARAMS(N, A, a));
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+  template <typename R, typename T>
+  friend deferred<Future<R>(void)> defer(
+      const PID<T>& pid,
+      R (T::*method)(void));
+
+#define TEMPLATE(Z, N, DATA)                                            \
+  template <typename R,                                                 \
+            typename T,                                                 \
+            ENUM_PARAMS(N, typename P),                                 \
+            ENUM_PARAMS(N, typename A)>                                 \
+  friend deferred<Future<R>(void)> defer(                               \
+      const PID<T>& pid,                                                \
+      R (T::*method)(ENUM_PARAMS(N, P)),                                \
+      ENUM_BINARY_PARAMS(N, A, a));
+  REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+  deferred(const std::tr1::function<F>& f) : std::tr1::function<F>(f) {}
+};
+
+} // namespace process {
+
+#endif // __PROCESS_DEFERRED_HPP__