You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:56:39 UTC
svn commit: r1132190 - in /incubator/mesos/trunk: src/detector/zookeeper.cpp
third_party/libprocess/process.cpp third_party/libprocess/process.hpp
third_party/libprocess/reliable.cpp third_party/libprocess/reliable.hpp
Author: benh
Date: Sun Jun 5 08:56:39 2011
New Revision: 1132190
URL: http://svn.apache.org/viewvc?rev=1132190&view=rev
Log:
Added asynchronous function dispatch/call support via futures and promises to libprocess.
Modified:
incubator/mesos/trunk/src/detector/zookeeper.cpp
incubator/mesos/trunk/third_party/libprocess/process.cpp
incubator/mesos/trunk/third_party/libprocess/process.hpp
incubator/mesos/trunk/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/third_party/libprocess/reliable.hpp
Modified: incubator/mesos/trunk/src/detector/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/zookeeper.cpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/detector/zookeeper.cpp Sun Jun 5 08:56:39 2011
@@ -164,8 +164,9 @@ protected:
void operator () ()
{
WatcherProcess *process = this;
- if (call(manager, REGISTER,
- reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+ send(manager, REGISTER,
+ reinterpret_cast<char *>(&process), sizeof(process));
+ if (receive() != OK)
fatal("failed to setup underlying watcher mechanism");
while (true) {
switch (receive()) {
@@ -177,8 +178,9 @@ protected:
break;
}
case TERMINATE:
- if (call(manager, UNREGISTER,
- reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+ send(manager, UNREGISTER,
+ reinterpret_cast<char *>(&process), sizeof(process));
+ if (receive() != OK)
fatal("failed to cleanup underlying watcher mechanism");
return;
}
@@ -277,8 +279,9 @@ Watcher::~Watcher()
protected:
void operator () ()
{
- if (call(manager, LOOKUP_PROCESS,
- reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+ send(manager, LOOKUP_PROCESS,
+ reinterpret_cast<char *>(&watcher), sizeof(watcher));
+ if (receive() != OK)
fatal("failed to deallocate resources associated with Watcher");
WatcherProcess *process =
*reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
@@ -445,8 +448,9 @@ protected:
// Lookup and cache the WatcherProcess PID associated with our
// Watcher _before_ we yield control via calling zookeeper_process
// so that Watcher callbacks can occur.
- if (call(manager, LOOKUP_PID,
- reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+ send(manager, LOOKUP_PID,
+ reinterpret_cast<char *>(&watcher), sizeof(watcher));
+ if (receive() != OK)
fatal("failed to setup underlying ZooKeeper mechanisms");
// TODO(benh): Link with WatcherProcess?
@@ -468,7 +472,9 @@ protected:
tv.tv_usec = 0;
}
- if (await(fd, ops, tv, false)) {
+ double secs = tv.tv_sec + (tv.tv_usec * 1e-6);
+
+ if (await(fd, ops, secs, false)) {
// Either timer expired (might be 0) or data became available on fd.
process(fd, ops);
} else {
@@ -639,10 +645,11 @@ int ZooKeeper::create(const string &path
protected:
void operator () ()
{
- if (call(zooKeeperProcess->self(),
- CREATE,
- reinterpret_cast<char *>(&createCall),
- sizeof(CreateCall *)) != COMPLETED)
+ send(zooKeeperProcess->self(),
+ CREATE,
+ reinterpret_cast<char *>(&createCall),
+ sizeof(CreateCall *));
+ if (receive() != COMPLETED)
createCall->ret = ZSYSTEMERROR;
}
@@ -677,10 +684,11 @@ int ZooKeeper::remove(const string &path
protected:
void operator () ()
{
- if (call(zooKeeperProcess->self(),
- REMOVE,
- reinterpret_cast<char *>(&removeCall),
- sizeof(RemoveCall *)) != COMPLETED)
+ send(zooKeeperProcess->self(),
+ REMOVE,
+ reinterpret_cast<char *>(&removeCall),
+ sizeof(RemoveCall *));
+ if (receive() != COMPLETED)
removeCall->ret = ZSYSTEMERROR;
}
@@ -717,10 +725,11 @@ int ZooKeeper::exists(const string &path
protected:
void operator () ()
{
- if (call(zooKeeperProcess->self(),
- EXISTS,
- reinterpret_cast<char *>(&existsCall),
- sizeof(ExistsCall *)) != COMPLETED)
+ send(zooKeeperProcess->self(),
+ EXISTS,
+ reinterpret_cast<char *>(&existsCall),
+ sizeof(ExistsCall *));
+ if (receive() != COMPLETED)
existsCall->ret = ZSYSTEMERROR;
}
@@ -759,10 +768,11 @@ int ZooKeeper::get(const string &path,
protected:
void operator () ()
{
- if (call(zooKeeperProcess->self(),
- GET,
- reinterpret_cast<char *>(&getCall),
- sizeof(GetCall *)) != COMPLETED)
+ send(zooKeeperProcess->self(),
+ GET,
+ reinterpret_cast<char *>(&getCall),
+ sizeof(GetCall *));
+ if (receive() != COMPLETED)
getCall->ret = ZSYSTEMERROR;
}
@@ -799,10 +809,11 @@ int ZooKeeper::getChildren(const string
protected:
void operator () ()
{
- if (call(zooKeeperProcess->self(),
- GET_CHILDREN,
- reinterpret_cast<char *>(&getChildrenCall),
- sizeof(GetChildrenCall *)) != COMPLETED)
+ send(zooKeeperProcess->self(),
+ GET_CHILDREN,
+ reinterpret_cast<char *>(&getChildrenCall),
+ sizeof(GetChildrenCall *));
+ if (receive() != COMPLETED)
getChildrenCall->ret = ZSYSTEMERROR;
}
@@ -839,10 +850,11 @@ int ZooKeeper::set(const string &path,
protected:
void operator () ()
{
- if (call(zooKeeperProcess->self(),
- SET,
- reinterpret_cast<char *>(&setCall),
- sizeof(SetCall *)) != COMPLETED)
+ send(zooKeeperProcess->self(),
+ SET,
+ reinterpret_cast<char *>(&setCall),
+ sizeof(SetCall *));
+ if (receive() != COMPLETED)
setCall->ret = ZSYSTEMERROR;
}
Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun 5 08:56:39 2011
@@ -60,6 +60,8 @@ using std::queue;
using std::set;
using std::stack;
+using std::tr1::function;
+
#define Byte (1)
#define Kilobyte (1024*Byte)
@@ -249,6 +251,8 @@ public:
void run(Process *process);
void cleanup(Process *process);
+ static void invoke(const function<void (void)> &thunk);
+
private:
timeout create_timeout(Process *process, double secs);
void start_timeout(const timeout &timeout);
@@ -271,15 +275,15 @@ private:
/* Tick, tock ... manually controlled clock! */
-class InternalProcessClock
+class InternalClock
{
public:
- InternalProcessClock()
+ InternalClock()
{
initial = current = elapsed = ev_time();
}
- ~InternalProcessClock() {}
+ ~InternalClock() {}
ev_tstamp getCurrent(Process *process)
{
@@ -334,7 +338,7 @@ private:
/* Using manual clock if non-null. */
-static InternalProcessClock *clk = NULL;
+static InternalClock *clk = NULL;
/* Global 'pipe' id uniquely assigned to each process. */
static uint32_t global_pipe = 0;
@@ -405,8 +409,8 @@ static Process *proc_process = NULL;
static bool legacy = false;
/* Thunk to safely call into legacy. */
-// static __thread std::tr1::function<void (void)> *legacy_thunk;
-static const std::tr1::function<void (void)> *legacy_thunk;
+// static __thread function<void (void)> *legacy_thunk;
+static const function<void (void)> *legacy_thunk;
/* Scheduler gate. */
static Gate *gate = new Gate();
@@ -441,7 +445,7 @@ static map<uint32_t, deque<uint32_t> > *
* recursive incase a filterer wants to do anything fancy (which is
* possible and likely given that filters will get used for testing).
*/
-static MessageFilter *filterer = NULL;
+static Filter *filterer = NULL;
static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -497,7 +501,7 @@ void handle_async(struct ev_loop *loop,
ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
} else {
// Only repeat the timer if not using a manual clock (a call
- // to ProcessClock::advance() will force a timer event later).
+ // to Clock::advance() will force a timer event later).
if (clk != NULL && timeouts_watcher.repeat > 0)
timeouts_watcher.repeat = 0;
ev_timer_again(loop, &timeouts_watcher);
@@ -2051,14 +2055,6 @@ bool ProcessManager::await(Process *proc
return false;
}
- assert(secs > 0);
-
- /* Create timeout. */
- const timeout &timeout = create_timeout(process, secs);
-
- /* Start the timeout. */
- start_timeout(timeout);
-
// Treat an await with a bad fd as an interruptible pause!
if (fd >= 0) {
/* Allocate/Initialize the watcher. */
@@ -2086,6 +2082,15 @@ bool ProcessManager::await(Process *proc
ev_async_send(loop, &async_watcher);
}
+ assert(secs >= 0);
+
+ timeout timeout;
+
+ if (secs != 0) {
+ timeout = create_timeout(process, secs);
+ start_timeout(timeout);
+ }
+
/* Context switch. */
process->state = Process::AWAITING;
swapcontext(&process->uctx, &proc_uctx_running);
@@ -2094,8 +2099,9 @@ bool ProcessManager::await(Process *proc
process->state == Process::INTERRUPTED);
/* Attempt to cancel the timer if necessary. */
- if (process->state != Process::TIMEDOUT)
- cancel_timeout(timeout);
+ if (secs != 0)
+ if (process->state != Process::TIMEDOUT)
+ cancel_timeout(timeout);
if (process->state == Process::INTERRUPTED)
interrupted = true;
@@ -2323,6 +2329,16 @@ void ProcessManager::cleanup(Process *pr
}
+void ProcessManager::invoke(const function<void (void)> &thunk)
+{
+ legacy_thunk = &thunk;
+ legacy = true;
+ assert(proc_process != NULL);
+ swapcontext(&proc_process->uctx, &proc_uctx_running);
+ legacy = false;
+}
+
+
timeout ProcessManager::create_timeout(Process *process, double secs)
{
assert(process != NULL);
@@ -2380,7 +2396,7 @@ void ProcessManager::cancel_timeout(cons
}
-void ProcessClock::pause()
+void Clock::pause()
{
initialize();
@@ -2391,7 +2407,7 @@ void ProcessClock::pause()
// sends and spawning new processes, not currently done for
// PROCESS_EXIT messages).
if (clk == NULL) {
- clk = new InternalProcessClock();
+ clk = new InternalClock();
// The existing libev timer might actually timeout, but now that
// clk != NULL, no "time" will actually have passed, so no
@@ -2401,7 +2417,7 @@ void ProcessClock::pause()
}
-void ProcessClock::resume()
+void Clock::resume()
{
initialize();
@@ -2417,7 +2433,7 @@ void ProcessClock::resume()
}
-void ProcessClock::advance(double secs)
+void Clock::advance(double secs)
{
synchronized(timeouts) {
if (clk != NULL) {
@@ -2696,11 +2712,30 @@ MSGID Process::receive(double secs)
}
-MSGID Process::call(const PID &to, MSGID id,
- const char *data, size_t length, double secs)
+MSGID Process::serve(bool forever)
+{
+ do {
+ switch (receive()) {
+ case PROCESS_DISPATCH: {
+ void *pointer = (char *) current + sizeof(struct msg);
+ std::tr1::function<void (void)> *delegator =
+ *reinterpret_cast<std::tr1::function<void (void)> **>(pointer);
+ (*delegator)();
+ delete delegator;
+ break;
+ }
+
+ default: {
+ return msgid();
+ }
+ }
+ } while (forever);
+};
+
+
+void Process::operator () ()
{
- send(to, id, data, length);
- return receive(secs);
+ serve();
}
@@ -2742,17 +2777,9 @@ PID Process::link(const PID &to)
}
-bool Process::await(int fd, int op, const timeval& tv)
-{
- return await(fd, op, tv, true);
-}
-
-
-bool Process::await(int fd, int op, const timeval& tv, bool ignore)
+bool Process::await(int fd, int op, double secs, bool ignore)
{
- double secs = tv.tv_sec + (tv.tv_usec * 1e-6);
-
- if (secs <= 0)
+ if (secs < 0)
return true;
/* TODO(benh): Handle invoking await from "outside" thread. */
@@ -2809,44 +2836,6 @@ double Process::elapsed()
}
-void Process::post(const PID &to, MSGID id, const char *data, size_t length)
-{
- initialize();
-
- if (replaying)
- return;
-
- if (!to)
- return;
-
- /* Disallow sending messages using an internal id. */
- if (id < PROCESS_MSGID)
- return;
-
- /* Allocate/Initialize outgoing message. */
- struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
-
- msg->from.pipe = 0;
- msg->from.ip = 0;
- msg->from.port = 0;
- msg->to.pipe = to.pipe;
- msg->to.ip = to.ip;
- msg->to.port = to.port;
- msg->id = id;
- msg->len = length;
-
- if (length > 0)
- memcpy((char *) msg + sizeof(struct msg), data, length);
-
- if (to.ip == ip && to.port == port)
- /* Local message. */
- process_manager->deliver(msg);
- else
- /* Remote message. */
- link_manager->send(msg);
-}
-
-
PID Process::spawn(Process *process)
{
initialize();
@@ -2866,7 +2855,8 @@ PID Process::spawn(Process *process)
}
process_manager->spawn(process);
- return process->pid;
+
+ return process->self();
} else {
return PID();
}
@@ -2899,18 +2889,14 @@ bool Process::wait(const PID &pid)
}
-void Process::invoke(const std::tr1::function<void (void)> &thunk)
+void Process::invoke(const function<void (void)> &thunk)
{
initialize();
- legacy_thunk = &thunk;
- legacy = true;
- assert(proc_process != NULL);
- swapcontext(&proc_process->uctx, &proc_uctx_running);
- legacy = false;
+ ProcessManager::invoke(thunk);
}
-void Process::filter(MessageFilter *filter)
+void Process::filter(Filter *filter)
{
initialize();
@@ -2918,3 +2904,64 @@ void Process::filter(MessageFilter *filt
filterer = filter;
}
}
+
+
+void Process::post(const PID &to, MSGID id, const char *data, size_t length)
+{
+ initialize();
+
+ if (replaying)
+ return;
+
+ if (!to)
+ return;
+
+ /* Disallow sending messages using an internal id. */
+ if (id < PROCESS_MSGID)
+ return;
+
+ /* Allocate/Initialize outgoing message. */
+ struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
+
+ msg->from.pipe = 0;
+ msg->from.ip = 0;
+ msg->from.port = 0;
+ msg->to.pipe = to.pipe;
+ msg->to.ip = to.ip;
+ msg->to.port = to.port;
+ msg->id = id;
+ msg->len = length;
+
+ if (length > 0)
+ memcpy((char *) msg + sizeof(struct msg), data, length);
+
+ if (to.ip == ip && to.port == port)
+ /* Local message. */
+ process_manager->deliver(msg);
+ else
+ /* Remote message. */
+ link_manager->send(msg);
+}
+
+
+void Process::dispatcher(Process *process, function<void (void)> *delegator)
+{
+ if (replaying)
+ return;
+
+ /* Allocate/Initialize outgoing message. */
+ struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + sizeof(delegator));
+
+ msg->from.pipe = 0;
+ msg->from.ip = 0;
+ msg->from.port = 0;
+ msg->to.pipe = process->pid.pipe;
+ msg->to.ip = process->pid.ip;
+ msg->to.port = process->pid.port;
+ msg->id = PROCESS_DISPATCH;
+ msg->len = sizeof(delegator);
+
+ memcpy((char *) msg + sizeof(struct msg), &delegator, sizeof(delegator));
+
+ process_manager->deliver(msg);
+}
Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun 5 08:56:39 2011
@@ -1,6 +1,7 @@
#ifndef PROCESS_HPP
#define PROCESS_HPP
+#include <assert.h>
#include <stdint.h>
#include <stdlib.h>
#include <ucontext.h>
@@ -17,10 +18,15 @@
typedef uint16_t MSGID;
-const MSGID PROCESS_ERROR = 0;
-const MSGID PROCESS_TIMEOUT = 1;
-const MSGID PROCESS_EXIT = 2;
-const MSGID PROCESS_MSGID = PROCESS_EXIT+1;
+const MSGID PROCESS_ERROR = 1;
+const MSGID PROCESS_TIMEOUT = 2;
+const MSGID PROCESS_EXIT = 3;
+const MSGID PROCESS_TERMINATE = 4;
+const MSGID PROCESS_DISPATCH = 5;
+const MSGID PROCESS_MSGID = PROCESS_DISPATCH + 1;
+
+
+class Process;
struct msg
@@ -32,7 +38,7 @@ struct msg
};
-class ProcessClock {
+class Clock {
public:
static void pause();
static void resume();
@@ -40,74 +46,121 @@ public:
};
-class MessageFilter {
+class Filter {
public:
- virtual bool filter(struct msg *) = 0;
+ virtual bool filter(msg *) = 0;
};
-class Process {
+template <typename T>
+struct Future
+{
+ Future();
+ Future(const Future<T> &that);
+ Future<T> & operator = (const Future<T> &that);
+ virtual ~Future();
+ void set(const T &t_);
+ T get() const;
+
+private:
+ int *refs;
+ T **t;
+ Process *trigger;
+};
+
+
+template <typename T>
+class Promise
+{
public:
- /* Returns pid of process; valid even before calling spawn. */
- PID self() const;
+ Promise();
+ Promise(const Promise<T> &that);
+ virtual ~Promise();
+ void set(const T &t_);
+ void associate(const Future<T> &future_);
- /* Sends a message to PID without a return address. */
- static void post(const PID &to, MSGID id);
+private:
+ void operator = (const Promise<T> &);
- /* Sends a message with data to PID without a return address. */
- static void post(const PID &to, MSGID id, const char *data, size_t length);
+ enum State {
+ UNSET_UNASSOCIATED,
+ SET_UNASSOCIATED,
+ UNSET_ASSOCIATED,
+ SET_ASSOCIATED,
+ };
+
+ int *refs;
+ T **t;
+ State *state;
+ Future<T> **future;
+};
- /* Spawn a new process. */
- static PID spawn(Process *process);
- /* Wait for PID to exit (returns true if actually waited on a process). */
- static bool wait(const PID &pid);
+template <>
+class Promise<void>;
- /* Invoke the thunk in a legacy safe way. */
- static void invoke(const std::tr1::function<void (void)> &thunk);
- /* Filter messages to be enqueued (except for timeout messages). */
- static void filter(MessageFilter *);
+template <typename T>
+class Promise<T&>;
-protected:
+
+template <typename T>
+struct Result
+{
+ Result(const T &t_);
+ Result(const Promise<T> &promise_);
+ Result(const Result<T> &that);
+ virtual ~Result();
+ bool isPromise() const;
+ Promise<T> getPromise() const;
+
+ T get() const;
+
+private:
+ void operator = (const Result<T> &);
+
+ int *refs;
+ T *t;
+ Promise<T> *promise;
+};
+
+
+template <>
+struct Result<void>;
+
+
+class Process {
+public:
Process();
virtual ~Process();
+ /* Returns pid of process; valid even before calling spawn. */
+ PID self() const { return pid; }
+
+protected:
/* Function run when process spawned. */
- virtual void operator() () = 0;
+ virtual void operator() ();
/* Returns the sender's PID of the last dequeued (current) message. */
- PID from() const;
+ PID from() const { return current != NULL ? current->from : PID(); }
/* Returns the id of the last dequeued (current) message. */
- MSGID msgid() const;
+ MSGID msgid() const { return current != NULL ? current->id : PROCESS_ERROR; }
/* Returns pointer and length of body of last dequeued (current) message. */
- const char * body(size_t *length) const;
+ virtual const char * body(size_t *length) const;
/* Put a message at front of queue (will not reschedule process). */
- virtual void inject(const PID &from, MSGID id, const char *data, size_t length);
-
- /* Sends a message to PID. */
- virtual void send(const PID &to , MSGID);
+ virtual void inject(const PID &from, MSGID id, const char *data = NULL, size_t length = 0);
/* Sends a message with data to PID. */
- virtual void send(const PID &to, MSGID id, const char *data, size_t length);
-
- /* Blocks for message indefinitely. */
- virtual MSGID receive();
+ virtual void send(const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
/* Blocks for message at most specified seconds. */
- virtual MSGID receive(double secs);
+ virtual MSGID receive(double secs = 0);
- /* Sends a message to PID and then blocks for a message indefinitely. */
- virtual MSGID call(const PID &to , MSGID id);
-
- /* Sends a message with data to PID and then blocks for a message. */
- virtual MSGID call(const PID &to, MSGID id, const char *data, size_t length);
-
- /* Sends, and then blocks for a message at most specified seconds. */
- virtual MSGID call(const PID &to, MSGID id, const char *data, size_t length, double secs);
+ /* Processes dispatch messages. */
+ virtual MSGID serve(bool forever = true);
/* Blocks at least specified seconds (may block longer). */
virtual void pause(double secs);
@@ -118,11 +171,8 @@ protected:
/* IO events for awaiting. */
enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
- /* Wait until operation is ready for file descriptor (or message received). */
- virtual bool await(int fd, int op, const timeval& tv);
-
/* Wait until operation is ready for file descriptor (or message received if not ignored). */
- virtual bool await(int fd, int op, const timeval& tv, bool ignore);
+ virtual bool await(int fd, int op, double secs = 0, bool ignore = true);
/* Returns true if operation on file descriptor is ready. */
virtual bool ready(int fd, int op);
@@ -130,11 +180,314 @@ protected:
/* Returns sub-second elapsed time (according to this process). */
double elapsed();
+public:
+ /**
+ * Spawn a new process.
+ *
+ * @param process process to be spawned
+ */
+ static PID spawn(Process *process);
+
+ /**
+ * Wait for process to exit (returns true if actually waited on a process).
+ *
+ * @param PID id of the process
+ */
+ static bool wait(const PID &pid);
+
+ /**
+ * Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
+ *
+ * @param thunk function to be invoked
+ */
+ static void invoke(const std::tr1::function<void (void)> &thunk);
+
+ /**
+ * Use the specified filter on messages that get enqueued (note,
+ * however, that for now you cannot filter timeout messages).
+ *
+ * @param filter message filter
+ */
+ static void filter(Filter *filter);
+
+ /**
+ * Sends a message with data without a return address.
+ *
+ * @param to receiver
+ * @param id message id
+ * @param data data to send (gets copied)
+ * @param length length of data
+ */
+ static void post(const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
+
+ /**
+ * Dispatches a void method on a process.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ */
+ template <typename C>
+ static void dispatch(C *instance, void (C::*method)());
+
+ /**
+ * Dispatches a void method on a process.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 argument to pass to method
+ */
+ template <typename C, typename P1, typename A1>
+ static void dispatch(C *instance, void (C::*method)(P1), A1 a1);
+
+ /**
+ * Dispatches a void method on a process.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ */
+ template <typename C, typename P1, typename P2, typename A1, typename A2>
+ static void dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2);
+
+ /**
+ * Dispatches a void method on a process.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ */
+ template <typename C,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+ static void dispatch(C *instance, void (C::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3);
+
+ /**
+ * Dispatches a void method on a process.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ */
+ template <typename C,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+ static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4);
+
+ /**
+ * Dispatches a void method on a process.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ */
+ template <typename C,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+ static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
+ /**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @return future corresponding to the result of executing the method
+ */
+ template <typename T, typename C>
+ static Future<T> dispatch(C *instance, Result<T> (C::*method)());
+
+ /**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+ template <typename T, typename C, typename P1, typename A1>
+ static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1);
+
+ /**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2,
+ typename A1, typename A2>
+ static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+ A1 a1, A2 a2);
+
+ /**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+ static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3);
+
+ /**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+ static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4);
+
+ /**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+ static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
+ /**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @return result of executing the method
+ */
+ template <typename T, typename C>
+ static T call(C *instance, Result<T> (C::*method)());
+
+ /**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 argument to pass to method
+ * @return result of executing the method
+ */
+ template <typename T, typename C, typename P1, typename A1>
+ static T call(C *instance, Result<T> (C::*method)(P1), A1 a1);
+
+ /**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @return result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2,
+ typename A1, typename A2>
+ static T call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2);
+
+ /**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ * @return result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+ static T call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3);
+
+ /**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @return result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+ static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4);
+
+ /**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param instance running process to receive dispatch message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ * @return result of executing the method
+ */
+ template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+ static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
+
private:
friend class LinkManager;
friend class ProcessManager;
friend class ProcessReference;
- friend void * schedule(void *arg);
+ friend void * schedule(void *);
/* Flag indicating state of process. */
enum { INIT,
@@ -153,10 +506,10 @@ private:
int refs;
/* Queue of received messages. */
- std::deque<struct msg *> msgs;
+ std::deque<msg *> msgs;
/* Current message. */
- struct msg *current;
+ msg *current;
/* Current "blocking" generation. */
int generation;
@@ -173,59 +526,542 @@ private:
void unlock() { pthread_mutex_unlock(&m); }
/* Enqueues the specified message. */
- void enqueue(struct msg *msg);
+ void enqueue(msg *msg);
/* Dequeues a message or returns NULL. */
- struct msg * dequeue();
+ msg * dequeue();
+
+ /* Dispatches the delegator to the specified process. */
+ static void dispatcher(Process *, std::tr1::function<void (void)> *delegator);
};
-inline PID Process::self() const
+template <typename T>
+void delegate(std::tr1::function<Result<T> (void)> *thunk, Future<T> *future)
+{
+ assert(thunk != NULL);
+ assert(future != NULL);
+
+ const Result<T> &result = (*thunk)();
+
+ if (!result.isPromise()) {
+ future->set(result.get());
+ } else {
+ result.getPromise().associate(*future);
+ }
+
+ delete thunk;
+ delete future;
+}
+
+
+template <typename C>
+void Process::dispatch(C *instance, void (C::*method)())
+{
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(method, instance));
+
+ dispatcher(instance, delegator);
+}
+
+
+template <typename C, typename P1, typename A1>
+void Process::dispatch(C *instance, void (C::*method)(P1), A1 a1)
+{
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+ a1));
+
+ dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+ typename P1, typename P2,
+ typename A1, typename A2>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2)
+{
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+ a1, a2));
+
+ dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3)
+{
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+ a1, a2, a3));
+
+ dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4)
+{
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(method, instance, a1, a2, a3,
+ a4));
+
+ dispatcher(instance, delegator);
+}
+
+
+template <typename C,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
+ a1, a2, a3, a4, a5));
+
+ dispatcher(instance, delegator);
+}
+
+
+template <typename T, typename C>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)())
+{
+ std::tr1::function<Result<T> (void)> *thunk =
+ new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance));
+
+ Future<T> *future = new Future<T>();
+
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+ future));
+
+ dispatcher(instance, delegator);
+
+ return *future;
+}
+
+
+template <typename T, typename C, typename P1, typename A1>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1)
+{
+ std::tr1::function<Result<T> (void)> *thunk =
+ new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+ a1));
+
+ Future<T> *future = new Future<T>();
+
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+ future));
+
+ dispatcher(instance, delegator);
+
+ return *future;
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2,
+ typename A1, typename A2>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+ A1 a1, A2 a2)
+{
+ std::tr1::function<Result<T> (void)> *thunk =
+ new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+ a1, a2));
+
+ Future<T> *future = new Future<T>();
+
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+ future));
+
+ dispatcher(instance, delegator);
+
+ return *future;
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3)
+{
+ std::tr1::function<Result<T> (void)> *thunk =
+ new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+ a1, a2, a3));
+
+ Future<T> *future = new Future<T>();
+
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+ future));
+
+ dispatcher(instance, delegator);
+
+ return *future;
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4)
+{
+ std::tr1::function<Result<T> (void)> *thunk =
+ new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+ a1, a2, a3, a4));
+
+ Future<T> *future = new Future<T>();
+
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+ future));
+
+ dispatcher(instance, delegator);
+
+ return *future;
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+ std::tr1::function<Result<T> (void)> *thunk =
+ new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
+ a1, a2, a3, a4, a5));
+
+ Future<T> *future = new Future<T>();
+
+ std::tr1::function<void (void)> *delegator =
+ new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
+ future));
+
+ dispatcher(instance, delegator);
+
+ return *future;
+}
+
+
+template <typename T, typename C>
+T Process::call(C *instance, Result<T> (C::*method)())
+{
+ return dispatch(instance, method).get();
+}
+
+
+template <typename T, typename C, typename P1, typename A1>
+T Process::call(C *instance, Result<T> (C::*method)(P1), A1 a1)
+{
+ return dispatch(instance, method, a1).get();
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2,
+ typename A1, typename A2>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2)
+{
+ return dispatch(instance, method, a1, a2).get();
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3)
+{
+ return dispatch(instance, method, a1, a2, a3).get();
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4)
+{
+ return dispatch(instance, method, a1, a2, a3, a4).get();
+}
+
+
+template <typename T, typename C,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+ return dispatch(instance, method, a1, a2, a3, a4, a5).get();
+}
+
+
+template <typename T>
+Future<T>::Future()
+{
+ refs = new int;
+ *refs = 1;
+ t = new T *;
+ *t = NULL;
+ trigger = new Process();
+ Process::spawn(trigger);
+}
+
+
+template <typename T>
+Future<T>::Future(const Future<T> &that)
+{
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ trigger = that.trigger;
+}
+
+
+template <typename T>
+Future<T> & Future<T>::operator = (const Future<T> &that)
+{
+ if (this != &that) {
+ // Destructor ...
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ assert(t != NULL);
+ if (*t != NULL)
+ delete *t;
+ assert(trigger != NULL);
+ Process::post(trigger->self(), PROCESS_MSGID);
+ Process::wait(trigger->self());
+ delete trigger;
+ }
+
+ // Copy constructor ...
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ trigger = that.trigger;
+ }
+}
+
+
+template <typename T>
+Future<T>::~Future()
+{
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ assert(t != NULL);
+ if (*t != NULL)
+ delete *t;
+ assert(trigger != NULL);
+ Process::post(trigger->self(), PROCESS_MSGID);
+ Process::wait(trigger->self());
+ delete trigger;
+ }
+}
+
+
+template <typename T>
+void Future<T>::set(const T &t_)
+{
+ assert(t != NULL && *t == NULL);
+ *t = new T(t_);
+ Process::post(trigger->self(), PROCESS_MSGID);
+}
+
+
+template <typename T>
+T Future<T>::get() const
+{
+ assert(t != NULL);
+ if (*t != NULL)
+ return **t;
+ assert(trigger != NULL);
+ Process::wait(trigger->self());
+ assert(t != NULL && *t != NULL);
+ return **t;
+}
+
+
+// TODO(benh): Use synchronized instead of CAS?
+#define CAS __sync_bool_compare_and_swap
+
+
+template <typename T>
+Promise<T>::Promise()
+{
+ refs = new int;
+ *refs = 1;
+ t = new T *;
+ *t = NULL;
+ state = new State;
+ *state = UNSET_UNASSOCIATED;
+ future = new Future<T> *;
+ *future = NULL;
+}
+
+
+template <typename T>
+Promise<T>::Promise(const Promise<T> &that)
+{
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ state = that.state;
+ future = that.future;
+}
+
+
+template <typename T>
+Promise<T>::~Promise()
+{
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ assert(t != NULL);
+ if (*t != NULL)
+ delete *t;
+ assert(state != NULL);
+ delete state;
+ assert(future != NULL);
+ if (*future != NULL)
+ delete *future;
+ }
+}
+
+
+template <typename T>
+void Promise<T>::set(const T &t_)
+{
+ assert(state != NULL);
+ assert(*state == UNSET_UNASSOCIATED ||
+ *state == UNSET_ASSOCIATED);
+ assert(t != NULL && *t == NULL);
+ if (*state == UNSET_UNASSOCIATED) {
+ *t = new T(t_);
+ if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED, SET_UNASSOCIATED)) {
+ assert(*state == UNSET_ASSOCIATED);
+ __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+ assert(future != NULL && *future != NULL);
+ (*future)->set(**t);
+ }
+ } else {
+ assert(*state == UNSET_ASSOCIATED);
+ assert(future != NULL && *future != NULL);
+ (*future)->set(t_);
+ __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+ }
+}
+
+
+template <typename T>
+void Promise<T>::associate(const Future<T> &future_)
{
- return pid;
+ assert(state != NULL);
+ assert(*state == UNSET_UNASSOCIATED ||
+ *state == SET_UNASSOCIATED);
+ assert(future != NULL);
+ *future = new Future<T>(future_);
+ if (*state == UNSET_UNASSOCIATED) {
+ if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
+ UNSET_ASSOCIATED)) {
+ assert(*state == SET_UNASSOCIATED);
+ __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+ assert(*state == SET_ASSOCIATED);
+ assert(t != NULL && *t != NULL);
+ (*future)->set(**t);
+ }
+ } else {
+ assert(*state == SET_UNASSOCIATED);
+ __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+ assert(*state == SET_ASSOCIATED);
+ assert(t != NULL && *t != NULL);
+ (*future)->set(**t);
+ }
}
-inline PID Process::from() const
+template <typename T>
+Result<T>::Result(const T &t_)
{
- return current != NULL ? current->from : PID();
+ refs = new int;
+ *refs = 1;
+ t = new T(t_);
+ promise = NULL;
}
-inline MSGID Process::msgid() const
+template <typename T>
+Result<T>::Result(const Promise<T> &promise_)
{
- return current != NULL ? current->id : PROCESS_ERROR;
+ refs = new int;
+ *refs = 1;
+ t = NULL;
+ promise = new Promise<T>(promise_);
}
-inline void Process::send(const PID &to, MSGID id)
+template <typename T>
+Result<T>::Result(const Result<T> &that)
{
- send(to, id, NULL, 0);
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ promise = that.promise;
}
-inline MSGID Process::call(const PID &to, MSGID id)
+template <typename T>
+Result<T>::~Result()
{
- return call(to, id, NULL, 0);
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ if (t != NULL)
+ delete t;
+ if (promise != NULL)
+ delete promise;
+ }
}
-inline MSGID Process::call(const PID &to, MSGID id,
- const char *data, size_t length)
+template <typename T>
+bool Result<T>::isPromise() const
{
- return call(to, id, data, length, 0);
+ return promise != NULL;
}
-inline MSGID Process::receive()
+template <typename T>
+Promise<T> Result<T>::getPromise() const
{
- return receive(0);
+ assert(isPromise());
+ return *promise;
}
-inline void Process::post(const PID &to, MSGID id)
+template <typename T>
+T Result<T>::get() const
{
- post(to, id, NULL, 0);
+ assert(!isPromise());
+ return *t;
}
Modified: incubator/mesos/trunk/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.cpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.cpp Sun Jun 5 08:56:39 2011
@@ -8,6 +8,7 @@ using std::make_pair;
using std::map;
using std::pair;
+
#define malloc(bytes) \
({ void *tmp; \
if ((tmp = malloc(bytes)) == NULL) \
Modified: incubator/mesos/trunk/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.hpp?rev=1132190&r1=1132189&r2=1132190&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.hpp Sun Jun 5 08:56:39 2011
@@ -154,4 +154,5 @@ inline MSGID ReliableProcess::receive()
return receive(0);
}
+
#endif /* __RELIABLE_HPP__ */