You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC
svn commit: r1236485 [3/7] - in /incubator/mesos/trunk: ./ include/mesos/
src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/
src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/
third_party/libprocess/include/pr...
Modified: incubator/mesos/trunk/src/zookeeper/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/zookeeper.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/zookeeper.cpp Fri Jan 27 01:25:13 2012
@@ -16,8 +16,6 @@
* limitations under the License.
*/
-#include <assert.h>
-
#include <glog/logging.h>
#include <iostream>
@@ -32,12 +30,6 @@
#include "zookeeper/zookeeper.hpp"
-// DO NOT REMOVE! Removing this will require also changing which
-// ZooKeeper library get's used for linking, right now the Makefile is
-// assuming the multithreaded library will get used ...
-#define USE_THREADED_ZOOKEEPER
-
-using boost::cref;
using boost::tuple;
using process::Future;
@@ -45,9 +37,6 @@ using process::PID;
using process::Process;
using process::Promise;
-using std::cerr;
-using std::cout;
-using std::endl;
using std::map;
using std::string;
using std::vector;
@@ -110,7 +99,7 @@ public:
if (processes.count(watcher) > 0) {
WatcherProcess* process = processes[watcher];
processes.erase(watcher);
- process::post(process->self(), process::TERMINATE);
+ process::terminate(process->self());
process::wait(process->self());
delete process;
return true;
@@ -151,7 +140,9 @@ Watcher::Watcher()
while (initializing);
WatcherProcess* process =
- process::call(manager->self(), &WatcherProcessManager::create, this);
+ process::dispatch(manager->self(),
+ &WatcherProcessManager::create,
+ this).get();
if (process == NULL) {
fatal("failed to initialize Watcher");
@@ -161,15 +152,12 @@ Watcher::Watcher()
Watcher::~Watcher()
{
- process::call(manager->self(), &WatcherProcessManager::destroy, this);
+ process::dispatch(manager->self(), &WatcherProcessManager::destroy, this)
+ .await();
}
-#ifndef USE_THREADED_ZOOKEEPER
-class ZooKeeperImpl : public Process<ZooKeeperImpl>
-#else
class ZooKeeperImpl
-#endif // USE_THREADED_ZOOKEEPER
{
public:
ZooKeeperImpl(ZooKeeper* zk,
@@ -179,11 +167,13 @@ public:
: zk(zk), servers(servers), timeout(timeout), watcher(watcher)
{
if (watcher == NULL) {
- fatalerror("cannot instantiate ZooKeeper with NULL watcher");
+ LOG(FATAL) << "Cannot instantiate ZooKeeper with NULL watcher";
}
// Lookup PID of the WatcherProcess associated with the Watcher.
- pid = call(manager->self(), &WatcherProcessManager::lookup, watcher);
+ pid = process::dispatch(manager->self(),
+ &WatcherProcessManager::lookup,
+ watcher).get();
// N.B. The Watcher and thus WatcherProcess may already be gone,
// in which case, each dispatch to the WatcherProcess that we do
@@ -193,7 +183,7 @@ public:
zh = zookeeper_init(servers.c_str(), event, timeout.value, NULL, this, 0);
if (zh == NULL) {
- fatalerror("failed to create ZooKeeper (zookeeper_init)");
+ PLOG(FATAL) << "Failed to create ZooKeeper, zookeeper_init";
}
}
@@ -201,222 +191,155 @@ public:
{
int ret = zookeeper_close(zh);
if (ret != ZOK) {
- fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
+ LOG(FATAL) << "Failed to cleanup ZooKeeper, zookeeper_close: "
+ << zerror(ret);
}
}
- Promise<int> authenticate(const string& scheme, const string& credentials)
+ Future<int> authenticate(const string& scheme, const string& credentials)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
+
+ Future<int> future = promise->future();
- tuple<Promise<int> >* args = new tuple<Promise<int> >(promise);
+ tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
int ret = zoo_add_auth(zh, scheme.c_str(), credentials.data(),
credentials.size(), voidCompletion, args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
+ return future;
}
- Promise<int> create(const string& path, const string& data,
- const ACL_vector& acl, int flags, string* result)
+ Future<int> create(const string& path, const string& data,
+ const ACL_vector& acl, int flags, string* result)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
+
+ Future<int> future = promise->future();
- tuple<Promise<int>, string*>* args =
- new tuple<Promise<int>, string*>(promise, result);
+ tuple<Promise<int>*, string*>* args =
+ new tuple<Promise<int>*, string*>(promise, result);
int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
flags, stringCompletion, args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
+ return future;
}
- Promise<int> remove(const string& path, int version)
+ Future<int> remove(const string& path, int version)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
+
+ Future<int> future = promise->future();
- tuple<Promise<int> >* args = new tuple<Promise<int> >(promise);
+ tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
int ret = zoo_adelete(zh, path.c_str(), version, voidCompletion, args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
+ return future;
}
- Promise<int> exists(const string& path, bool watch, Stat* stat)
+ Future<int> exists(const string& path, bool watch, Stat* stat)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
- tuple<Promise<int>, Stat*>* args =
- new tuple<Promise<int>, Stat*>(promise, stat);
+ Future<int> future = promise->future();
+
+ tuple<Promise<int>*, Stat*>* args =
+ new tuple<Promise<int>*, Stat*>(promise, stat);
int ret = zoo_aexists(zh, path.c_str(), watch, statCompletion, args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
+ return future;
}
- Promise<int> get(const string& path, bool watch, string* result, Stat* stat)
+ Future<int> get(const string& path, bool watch, string* result, Stat* stat)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
+
+ Future<int> future = promise->future();
- tuple<Promise<int>, string*, Stat*>* args =
- new tuple<Promise<int>, string*, Stat*>(promise, result, stat);
+ tuple<Promise<int>*, string*, Stat*>* args =
+ new tuple<Promise<int>*, string*, Stat*>(promise, result, stat);
int ret = zoo_aget(zh, path.c_str(), watch, dataCompletion, args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
+ return future;
}
- Promise<int> getChildren(const string& path, bool watch,
- vector<string>* results)
+ Future<int> getChildren(const string& path,
+ bool watch,
+ vector<string>* results)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
- tuple<Promise<int>, vector<string>*>* args =
- new tuple<Promise<int>, vector<string>*>(promise, results);
+ Future<int> future = promise->future();
+
+ tuple<Promise<int>*, vector<string>*>* args =
+ new tuple<Promise<int>*, vector<string>*>(promise, results);
int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
+ return future;
}
- Promise<int> set(const string& path, const string& data, int version)
+ Future<int> set(const string& path, const string& data, int version)
{
- Promise<int> promise;
+ Promise<int>* promise = new Promise<int>();
+
+ Future<int> future = promise->future();
- tuple<Promise<int>, Stat*>* args =
- new tuple<Promise<int>, Stat*>(promise, NULL);
+ tuple<Promise<int>*, Stat*>* args =
+ new tuple<Promise<int>*, Stat*>(promise, NULL);
int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
version, statCompletion, args);
if (ret != ZOK) {
- promise.set(ret);
+ delete promise;
delete args;
+ return ret;
}
- return promise;
- }
-
-#ifndef USE_THREADED_ZOOKEEPER
-protected:
- virtual void operator () ()
- {
- while (true) {
- int fd;
- int ops;
- timeval tv;
-
- prepare(&fd, &ops, &tv);
-
- double secs = tv.tv_sec + (tv.tv_usec * 1e-6);
-
- // Cause await to return immediately if the file descriptor is
- // not valid (for example because the connection timed out) and
- // secs is 0 because that will block indefinitely.
- if (fd == -1 && secs == 0) {
- secs = -1;
- }
-
- if (poll(fd, ops, secs, false)) {
- // Either timer expired (might be 0) or data became available on fd.
- process(fd, ops);
- } else {
- // Okay, a message must have been received. Handle only one
- // message at a time so as not to delay any necessary internal
- // processing.
- serve(0, true);
- if (name() == process::TERMINATE) {
- return;
- } else if (name() != process::NOTHING) {
- fatal("unexpected interruption of 'poll'");
- }
- }
- }
- }
-
- bool prepare(int* fd, int* ops, timeval* tv)
- {
- int interest = 0;
-
- int ret = zookeeper_interest(zh, fd, &interest, tv);
-
- // If in some disconnected state, try again later.
- if (ret == ZINVALIDSTATE ||
- ret == ZCONNECTIONLOSS ||
- ret == ZOPERATIONTIMEOUT) {
- return false;
- }
-
- if (ret != ZOK) {
- fatal("zookeeper_interest failed! (%s)", zerror(ret));
- }
-
- *ops = 0;
-
- if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
- *ops |= RDWR;
- } else if (interest & ZOOKEEPER_READ) {
- *ops |= RDONLY;
- } else if (interest & ZOOKEEPER_WRITE) {
- *ops |= WRONLY;
- }
-
- return true;
- }
-
- void process(int fd, int ops)
- {
- int events = 0;
-
- if (ready(fd, RDONLY)) {
- events |= ZOOKEEPER_READ;
- } if (ready(fd, WRONLY)) {
- events |= ZOOKEEPER_WRITE;
- }
-
- int ret = zookeeper_process(zh, events);
-
- // If in some disconnected state, try again later.
- if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
- return;
- }
-
- if (ret != ZOK && ret != ZNOTHING) {
- fatal("zookeeper_process failed! (%s)", zerror(ret));
- }
+ return future;
}
-#endif // USE_THREADED_ZOOKEEPER
private:
static void event(zhandle_t* zh, int type, int state,
@@ -430,23 +353,24 @@ private:
static void voidCompletion(int ret, const void *data)
{
- const tuple<Promise<int> >* args =
- reinterpret_cast<const tuple<Promise<int> >*>(data);
+ const tuple<Promise<int>*>* args =
+ reinterpret_cast<const tuple<Promise<int>*>*>(data);
- Promise<int> promise = (*args).get<0>();
+ Promise<int>* promise = (*args).get<0>();
- promise.set(ret);
+ promise->set(ret);
+ delete promise;
delete args;
}
static void stringCompletion(int ret, const char* value, const void* data)
{
- const tuple<Promise<int>, string*> *args =
- reinterpret_cast<const tuple<Promise<int>, string*>*>(data);
+ const tuple<Promise<int>*, string*> *args =
+ reinterpret_cast<const tuple<Promise<int>*, string*>*>(data);
- Promise<int> promise = (*args).get<0>();
+ Promise<int>* promise = (*args).get<0>();
string* result = (*args).get<1>();
if (ret == 0) {
@@ -455,18 +379,19 @@ private:
}
}
- promise.set(ret);
+ promise->set(ret);
+ delete promise;
delete args;
}
static void statCompletion(int ret, const Stat* stat, const void* data)
{
- const tuple<Promise<int>, Stat*>* args =
- reinterpret_cast<const tuple<Promise<int>, Stat*>*>(data);
+ const tuple<Promise<int>*, Stat*>* args =
+ reinterpret_cast<const tuple<Promise<int>*, Stat*>*>(data);
- Promise<int> promise = (*args).get<0>();
+ Promise<int>* promise = (*args).get<0>();
Stat *stat_result = (*args).get<1>();
if (ret == 0) {
@@ -475,8 +400,9 @@ private:
}
}
- promise.set(ret);
+ promise->set(ret);
+ delete promise;
delete args;
}
@@ -484,10 +410,10 @@ private:
static void dataCompletion(int ret, const char* value, int value_len,
const Stat* stat, const void* data)
{
- const tuple<Promise<int>, string*, Stat*>* args =
- reinterpret_cast<const tuple<Promise<int>, string*, Stat*>*>(data);
+ const tuple<Promise<int>*, string*, Stat*>* args =
+ reinterpret_cast<const tuple<Promise<int>*, string*, Stat*>*>(data);
- Promise<int> promise = (*args).get<0>();
+ Promise<int>* promise = (*args).get<0>();
string* result = (*args).get<1>();
Stat* stat_result = (*args).get<2>();
@@ -501,8 +427,9 @@ private:
}
}
- promise.set(ret);
+ promise->set(ret);
+ delete promise;
delete args;
}
@@ -510,10 +437,10 @@ private:
static void stringsCompletion(int ret, const String_vector* values,
const void* data)
{
- const tuple<Promise<int>, vector<string>*>* args =
- reinterpret_cast<const tuple<Promise<int>, vector<string>*>*>(data);
+ const tuple<Promise<int>*, vector<string>*>* args =
+ reinterpret_cast<const tuple<Promise<int>*, vector<string>*>*>(data);
- Promise<int> promise = (*args).get<0>();
+ Promise<int>* promise = (*args).get<0>();
vector<string>* results = (*args).get<1>();
if (ret == 0) {
@@ -524,8 +451,9 @@ private:
}
}
- promise.set(ret);
+ promise->set(ret);
+ delete promise;
delete args;
}
@@ -548,18 +476,11 @@ ZooKeeper::ZooKeeper(const string& serve
Watcher* watcher)
{
impl = new ZooKeeperImpl(this, servers, timeout, watcher);
-#ifndef USE_THREADED_ZOOKEEPER
- process::spawn(impl);
-#endif // USE_THREADED_ZOOKEEPER
}
ZooKeeper::~ZooKeeper()
{
-#ifndef USE_THREADED_ZOOKEEPER
- process::post(impl->self(), process::TERMINATE);
- process::wait(impl->self());
-#endif // USE_THREADED_ZOOKEEPER
delete impl;
}
@@ -578,87 +499,45 @@ int64_t ZooKeeper::getSessionId()
int ZooKeeper::authenticate(const string& scheme, const string& credentials)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::authenticate,
- cref(scheme), cref(credentials));
-#else
- Promise<int> promise = impl->authenticate(scheme, credentials);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->authenticate(scheme, credentials).get();
}
int ZooKeeper::create(const string& path, const string& data,
const ACL_vector& acl, int flags, string* result)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::create,
- cref(path), cref(data), cref(acl), flags, result);
-#else
- Promise<int> promise = impl->create(path, data, acl, flags, result);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->create(path, data, acl, flags, result).get();
}
int ZooKeeper::remove(const string& path, int version)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::remove,
- cref(path), version);
-#else
- Promise<int> promise = impl->remove(path, version);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->remove(path, version).get();
}
int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::exists,
- cref(path), watch, stat);
-#else
- Promise<int> promise = impl->exists(path, watch, stat);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->exists(path, watch, stat).get();
}
int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::get,
- cref(path), watch, result, stat);
-#else
- Promise<int> promise = impl->get(path, watch, result, stat);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->get(path, watch, result, stat).get();
}
int ZooKeeper::getChildren(const string& path, bool watch,
vector<string>* results)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::getChildren,
- cref(path), watch, results);
-#else
- Promise<int> promise = impl->getChildren(path, watch, results);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->getChildren(path, watch, results).get();
}
int ZooKeeper::set(const string& path, const string& data, int version)
{
-#ifndef USE_THREADED_ZOOKEEPER
- return process::call(impl->self(), &ZooKeeperImpl::set,
- cref(path), cref(data), version);
-#else
- Promise<int> promise = impl->set(path, data, version);
- return promise.future().get();
-#endif // USE_THREADED_ZOOKEEPER
+ return impl->set(path, data, version).get();
}
Modified: incubator/mesos/trunk/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/Makefile.in?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/third_party/libprocess/Makefile.in Fri Jan 27 01:25:13 2012
@@ -74,8 +74,7 @@ GMOCK_OBJ = gmock-all.o
GMOCK_LIB = gmock.a
-LIBPROCESS_OBJ = src/process.o src/pid.o src/fatal.o src/tokenize.o \
- src/latch.o src/timer.o
+LIBPROCESS_OBJ = src/process.o src/pid.o src/latch.o
LIBPROCESS_LIB = libprocess.a
Added: incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,25 @@
+#ifndef __PROCESS_CLOCK_HPP__
+#define __PROCESS_CLOCK_HPP__
+
+namespace process {
+
+// Forward declarations.
+class ProcessBase;
+
+class Clock
+{
+public:
+ static double now();
+ static double now(ProcessBase* process);
+ static void pause();
+ static bool paused();
+ static void resume();
+ static void advance(double secs);
+ static void update(double secs);
+ static void update(ProcessBase* process, double secs);
+ static void order(ProcessBase* from, ProcessBase* to);
+};
+
+} // namespace process {
+
+#endif // __PROCESS_CLOCK_HPP__
Added: incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,272 @@
+#ifndef __PROCESS_DEFER_HPP__
+#define __PROCESS_DEFER_HPP__
+
+#include <process/deferred.hpp>
+#include <process/dispatch.hpp>
+
+namespace process {
+
+// The defer mechanism is very similar to the dispatch mechanism (see
+// dispatch.hpp), however, rather than scheduling the method to get
+// invoked, the defer mechanism returns a 'deferred' object that when
+// invoked does the underlying dispatch. Similar to dispatch, we
+// provide the C++11 variadic template definitions first, and then use
+// Boost preprocessor macros to provide the actual definitions.
+
+// First, definitions of defer for methods returning void:
+//
+// template <typename T, typename ...P>
+// deferred<void(void)> void defer(const PID<T>& pid,
+// void (T::*method)(P...),
+// P... p)
+// {
+// void (*dispatch)(const PID<T>&, void (T::*)(P...), P...) =
+// &process::template dispatch<T, P...>;
+
+// return deferred<void(void)>(
+// std::tr1::bind(dispatch, pid, method, std::forward<P>(p)...));
+// }
+
+template <typename T>
+deferred<void(void)> defer(
+ const PID<T>& pid,
+ void (T::*method)(void))
+{
+ void (*dispatch)(const PID<T>&, void (T::*)(void)) =
+ &process::template dispatch<T>;
+
+ return deferred<void(void)>(
+ std::tr1::bind(dispatch, pid, method));
+}
+
+template <typename T>
+deferred<void(void)> defer(
+ const Process<T>& process,
+ void (T::*method)(void))
+{
+ return defer(process.self(), method);
+}
+
+template <typename T>
+deferred<void(void)> defer(
+ const Process<T>* process,
+ void (T::*method)(void))
+{
+ return defer(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<void(void)> defer(const PID<T>& pid, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ void (*dispatch)(const PID<T>&, void (T::*)(ENUM_PARAMS(N, P))) = \
+ &process::template dispatch<T, ENUM_PARAMS(N, P), ENUM_PARAMS(N, A)>; \
+ \
+ return deferred<void(void)>( \
+ std::tr1::bind(dispatch, pid, method, ENUM_PARAMS(N, a))); \
+ } \
+ \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<void(void)> defer(const Process<T>& process, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return defer(process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<void(void)> defer(const Process<T>* process, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return defer(process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+
+// Next, definitions of defer for methods returning future:
+//
+// template <typename R, typename T, typename ...P>
+// deferred<Future<R>(void)> void defer(const PID<T>& pid,
+// Future<R> (T::*method)(P...),
+// P... p)
+// {
+// Future<R> (*dispatch)(const PID<T>&, Future<R> (T::*)(P...), P...) =
+// &process::template dispatch<R, T, P...>;
+//
+// return deferred<Future<R>(void)>(
+// std::tr1::bind(dispatch, pid, method, std::forward<P>(p)...));
+// }
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+ const PID<T>& pid,
+ Future<R> (T::*method)(void))
+{
+ Future<R> (*dispatch)(const PID<T>&, Future<R> (T::*)(void)) =
+ &process::template dispatch<R, T>;
+
+ return deferred<Future<R>(void)>(
+ std::tr1::bind(dispatch, pid, method));
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+ const Process<T>& process,
+ Future<R> (T::*method)(void))
+{
+ return defer(process.self(), method);
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+ const Process<T>* process,
+ Future<R> (T::*method)(void))
+{
+ return defer(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<Future<R>(void)> defer( \
+ const PID<T>& pid, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ Future<R> (*dispatch)(const PID<T>&, Future<R> (T::*)(ENUM_PARAMS(N, P)), ENUM_PARAMS(N, A)) = \
+ &process::template dispatch<R, T, ENUM_PARAMS(N, P), ENUM_PARAMS(N, A)>; \
+ \
+ return deferred<Future<R>(void)>( \
+ std::tr1::bind(dispatch, pid, method, ENUM_PARAMS(N, a))); \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<Future<R>(void)> defer( \
+ const Process<T>& process, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return defer(process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<Future<R>(void)> defer( \
+ const Process<T>* process, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return defer(process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+
+// Next, definitions of defer for methods returning a value:
+//
+// template <typename R, typename T, typename ...P>
+// deferred<Future<R>(void)> void defer(const PID<T>& pid,
+// R (T::*method)(P...),
+// P... p)
+// {
+// Future<R> (*dispatch)(const PID<T>&, R (T::*)(P...), P...) =
+// &process::template dispatch<R, T, P...>;
+//
+// return deferred<Future<R>(void)>(
+// std::tr1::bind(dispatch, pid, method, std::forward<P>(p)...));
+// }
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+ const PID<T>& pid,
+ R (T::*method)(void))
+{
+ Future<R> (*dispatch)(const PID<T>&, R (T::*)(void)) =
+ &process::template dispatch<R, T>;
+
+ return deferred<Future<R>(void)>(
+ std::tr1::bind(dispatch, pid, method));
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+ const Process<T>& process,
+ R (T::*method)(void))
+{
+ return defer(process.self(), method);
+}
+
+template <typename R, typename T>
+deferred<Future<R>(void)> defer(
+ const Process<T>* process,
+ R (T::*method)(void))
+{
+ return defer(process->self(), method);
+}
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<Future<R>(void)> defer( \
+ const PID<T>& pid, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ Future<R> (*dispatch)(const PID<T>&, R (T::*)(ENUM_PARAMS(N, P)), ENUM_PARAMS(N, A)) = \
+ &process::template dispatch<R, T, ENUM_PARAMS(N, P), ENUM_PARAMS(N, A)>; \
+ \
+ return deferred<Future<R>(void)>( \
+ std::tr1::bind(dispatch, pid, method, ENUM_PARAMS(N, a))); \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<Future<R>(void)> defer( \
+ const Process<T>& process, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return defer(process.self(), method, ENUM_PARAMS(N, a)); \
+ } \
+ \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ deferred<Future<R>(void)> defer( \
+ const Process<T>* process, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ return defer(process->self(), method, ENUM_PARAMS(N, a)); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+} // namespace process {
+
+#endif // __PROCESS_DEFER_HPP__
Added: incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,76 @@
+#ifndef __PROCESS_DEFERRED_HPP__
+#define __PROCESS_DEFERRED_HPP__
+
+#include <tr1/functional>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/preprocessor.hpp>
+
+namespace process {
+
+// Acts like a function call but runs within an asynchronous execution
+// context such as an Executor or a ProcessBase (since only an
+// executor or the 'defer' routines are allowed to create them).
+template <typename F>
+struct deferred : std::tr1::function<F>
+{
+private:
+ // Only an Executor and the 'defer' routines can create these.
+ friend class Executor;
+
+ template <typename T>
+ friend deferred<void(void)> defer(
+ const PID<T>& pid,
+ void (T::*method)(void));
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ friend deferred<void(void)> defer(const PID<T>& pid, \
+ void (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a));
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ template <typename R, typename T>
+ friend deferred<Future<R>(void)> defer(
+ const PID<T>& pid,
+ Future<R> (T::*method)(void));
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ friend deferred<Future<R>(void)> defer( \
+ const PID<T>& pid, \
+ Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a));
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ template <typename R, typename T>
+ friend deferred<Future<R>(void)> defer(
+ const PID<T>& pid,
+ R (T::*method)(void));
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ typename T, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ friend deferred<Future<R>(void)> defer( \
+ const PID<T>& pid, \
+ R (T::*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a));
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+ deferred(const std::tr1::function<F>& f) : std::tr1::function<F>(f) {}
+};
+
+} // namespace process {
+
+#endif // __PROCESS_DEFERRED_HPP__