You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:09:34 UTC
svn commit: r1132263 - in /incubator/mesos/trunk/third_party/libprocess:
Makefile.in decoder.hpp future.hpp latch.cpp latch.hpp pid.cpp pid.hpp
process.cpp process.hpp promise.hpp
Author: benh
Date: Sun Jun 5 09:09:33 2011
New Revision: 1132263
URL: http://svn.apache.org/viewvc?rev=1132263&view=rev
Log:
Updates to libprocess to make dispatches no longer require a pointer to the process.
Added:
incubator/mesos/trunk/third_party/libprocess/future.hpp
incubator/mesos/trunk/third_party/libprocess/latch.cpp
incubator/mesos/trunk/third_party/libprocess/latch.hpp
incubator/mesos/trunk/third_party/libprocess/promise.hpp
Modified:
incubator/mesos/trunk/third_party/libprocess/Makefile.in
incubator/mesos/trunk/third_party/libprocess/decoder.hpp
incubator/mesos/trunk/third_party/libprocess/pid.cpp
incubator/mesos/trunk/third_party/libprocess/pid.hpp
incubator/mesos/trunk/third_party/libprocess/process.cpp
incubator/mesos/trunk/third_party/libprocess/process.hpp
Modified: incubator/mesos/trunk/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/Makefile.in?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/third_party/libprocess/Makefile.in Sun Jun 5 09:09:33 2011
@@ -24,7 +24,7 @@ CXXFLAGS += -fPIC -D_XOPEN_SOURCE
# Add dependency tracking to CXXFLAGS.
CXXFLAGS += -MMD -MP
-LIB_OBJ = process.o pid.o fatal.o tokenize.o
+LIB_OBJ = process.o pid.o fatal.o tokenize.o latch.o
LIB = libprocess.a
OBJS = $(LIB_OBJ)
Modified: incubator/mesos/trunk/third_party/libprocess/decoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/decoder.hpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/decoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/decoder.hpp Sun Jun 5 09:09:33 2011
@@ -99,7 +99,7 @@ private:
// Okay, the message is complete, do the necessary parsing (it
// would be cooler to do this inline instead).
std::string name;
- PID to, from;
+ UPID to, from;
const std::vector<std::string>& pairs = tokenize(decoder->path, "/");
if (pairs.size() != 2) {
@@ -107,7 +107,7 @@ private:
return -1;
}
- to = PID(pairs[0], Process().self().ip, Process().self().port);
+ to = UPID(pairs[0], Process().self().ip, Process().self().port);
name = pairs[1];
if (name == "") {
Added: incubator/mesos/trunk/third_party/libprocess/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/future.hpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/future.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/future.hpp Sun Jun 5 09:09:33 2011
@@ -0,0 +1,108 @@
+#ifndef __FUTURE_HPP__
+#define __FUTURE_HPP__
+
+#include "latch.hpp"
+
+
+template <typename T>
+class Future
+{
+public:
+ Future();
+ Future(const Future<T>& that);
+ Future<T>& operator = (const Future<T>& that);
+ virtual ~Future();
+ void set(const T& t_);
+ T get() const;
+
+private:
+ int* refs;
+ T** t;
+ Latch* latch;
+};
+
+
+template <typename T>
+Future<T>::Future()
+{
+ refs = new int;
+ *refs = 1;
+ t = new T*;
+ *t = NULL;
+ latch = new Latch();
+}
+
+
+template <typename T>
+Future<T>::Future(const Future<T>& that)
+{
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ latch = that.latch;
+}
+
+
+template <typename T>
+Future<T>& Future<T>::operator = (const Future<T>& that)
+{
+ if (this != &that) {
+ // Destructor ...
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ assert(t != NULL);
+ if (*t != NULL)
+ delete *t;
+ assert(latch != NULL);
+ delete latch;
+ }
+
+ // Copy constructor ...
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ latch = that.latch;
+ }
+}
+
+
+template <typename T>
+Future<T>::~Future()
+{
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ assert(t != NULL);
+ if (*t != NULL)
+ delete *t;
+ assert(latch != NULL);
+ delete latch;
+ }
+}
+
+
+template <typename T>
+void Future<T>::set(const T& t_)
+{
+ assert(t != NULL && *t == NULL);
+ *t = new T(t_);
+ latch->trigger();
+}
+
+
+template <typename T>
+T Future<T>::get() const
+{
+ assert(t != NULL);
+ if (*t != NULL)
+ return **t;
+ assert(latch != NULL);
+ latch->wait();
+ assert(t != NULL && *t != NULL);
+ return **t;
+}
+
+#endif // __FUTURE_HPP__
Added: incubator/mesos/trunk/third_party/libprocess/latch.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.cpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.cpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/latch.cpp Sun Jun 5 09:09:33 2011
@@ -0,0 +1,39 @@
+#include <process.hpp>
+
+#include "latch.hpp"
+
+
+Latch::Latch()
+{
+ triggered = false;
+ process = new Process();
+ Process::spawn(process);
+}
+
+
+Latch::~Latch()
+{
+ assert(process != NULL);
+ Process::post(process->self(), TERMINATE);
+ Process::wait(process->self());
+ delete process;
+}
+
+
+void Latch::trigger()
+{
+ assert(process != NULL);
+ if (!triggered) {
+ triggered = true;
+ Process::post(process->self(), TERMINATE);
+ }
+}
+
+
+void Latch::wait()
+{
+ assert(process != NULL);
+ if (!triggered) {
+ Process::wait(process->self());
+ }
+}
Added: incubator/mesos/trunk/third_party/libprocess/latch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.hpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/latch.hpp Sun Jun 5 09:09:33 2011
@@ -0,0 +1,28 @@
+#ifndef __LATCH_HPP__
+#define __LATCH_HPP__
+
+#include <process.hpp>
+
+// Need a forward declaration to break the dependency chain between
+// Process, Future, and Latch.
+class Process;
+
+
+class Latch
+{
+public:
+ Latch();
+ virtual ~Latch();
+
+ void trigger();
+ void wait();
+
+private:
+ Latch(const Latch& that);
+ Latch& operator = (const Latch& that);
+
+ bool triggered;
+ Process* process;
+};
+
+#endif // __LATCH_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/pid.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/pid.cpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/pid.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/pid.cpp Sun Jun 5 09:09:33 2011
@@ -11,6 +11,7 @@
#include "config.hpp"
#include "pid.hpp"
+#include "process.hpp"
using std::istream;
using std::ostream;
@@ -18,7 +19,7 @@ using std::size_t;
using std::string;
-ostream & operator << (ostream &stream, const PID &pid)
+ostream& operator << (ostream& stream, const UPID& pid)
{
// Call inet_ntop since inet_ntoa is not thread-safe!
char ip[INET_ADDRSTRLEN];
@@ -30,7 +31,7 @@ ostream & operator << (ostream &stream,
}
-istream & operator >> (istream &stream, PID &pid)
+istream& operator >> (istream& stream, UPID& pid)
{
pid.id = "";
pid.ip = 0;
@@ -92,7 +93,7 @@ istream & operator >> (istream &stream,
}
-size_t hash_value(const PID& pid)
+size_t hash_value(const UPID& pid)
{
size_t seed = 0;
boost::hash_combine(seed, pid.id);
@@ -100,3 +101,11 @@ size_t hash_value(const PID& pid)
boost::hash_combine(seed, pid.port);
return seed;
}
+
+
+UPID::UPID(const Process& process)
+{
+ id = process.self().id;
+ ip = process.self().ip;
+ port = process.self().port;
+}
Modified: incubator/mesos/trunk/third_party/libprocess/pid.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/pid.hpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/pid.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/pid.hpp Sun Jun 5 09:09:33 2011
@@ -1,5 +1,5 @@
-#ifndef PID_HPP
-#define PID_HPP
+#ifndef __PID_HPP__
+#define __PID_HPP__
#include <stdint.h>
@@ -8,40 +8,43 @@
#include <string>
-struct PID;
+class Process;
+struct UPID;
-/* Outputing PIDs and generating PIDs using streamds. */
-std::ostream & operator << (std::ostream &, const PID &);
-std::istream & operator >> (std::istream &, PID &);
+// Outputing UPIDs and generating UPIDs using streams.
+std::ostream& operator << (std::ostream&, const UPID&);
+std::istream& operator >> (std::istream&, UPID&);
-/* PID hash value (for example, to use in Boost's unordered maps). */
-std::size_t hash_value(const PID &);
+// UPID hash value (for example, to use in Boost's unordered maps).
+std::size_t hash_value(const UPID&);
-struct PID
+struct UPID
{
- PID() : ip(0), port(0) {}
+ UPID() : ip(0), port(0) {}
- PID(const char *id_, uint32_t ip_, uint16_t port_)
+ UPID(const char* id_, uint32_t ip_, uint16_t port_)
: id(id_), ip(ip_), port(port_) {}
- PID(const std::string& id_, uint32_t ip_, uint16_t port_)
+ UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
: id(id_), ip(ip_), port(port_) {}
- PID(const char *s)
+ UPID(const char* s)
{
std::istringstream in(s);
in >> *this;
}
- PID(const std::string &s)
+ UPID(const std::string& s)
{
std::istringstream in(s);
in >> *this;
}
+ UPID(const Process& process);
+
operator std::string() const
{
std::ostringstream out;
@@ -54,7 +57,7 @@ struct PID
return id == "" && ip == 0 && port == 0;
}
- bool operator < (const PID &that) const
+ bool operator < (const UPID& that) const
{
if (this != &that) {
if (ip == that.ip && port == that.port)
@@ -68,7 +71,7 @@ struct PID
return false;
}
- bool operator == (const PID &that) const
+ bool operator == (const UPID& that) const
{
if (this != &that) {
return (id == that.id &&
@@ -79,16 +82,23 @@ struct PID
return true;
}
- bool operator != (const PID &that) const
+ bool operator != (const UPID& that) const
{
return !(this->operator == (that));
}
-
std::string id;
uint32_t ip;
uint16_t port;
};
-#endif /* PID_HPP */
+template <typename T = Process>
+struct PID : UPID
+{
+ PID() : UPID() {}
+ PID(const T& t) : UPID(static_cast<const Process&>(t)) {}
+};
+
+
+#endif // __PID_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun 5 09:09:33 2011
@@ -117,7 +117,7 @@ ostream& operator << (ostream& stream, c
struct timeout
{
ev_tstamp tstamp;
- PID pid;
+ UPID pid;
int generation;
};
@@ -191,7 +191,7 @@ public:
LinkManager();
~LinkManager();
- void link(Process* process, const PID& to);
+ void link(Process* process, const UPID& to);
void send(Message* message);
void send(DataEncoder* encoder, int s);
@@ -204,8 +204,8 @@ public:
void exited(Process* process);
private:
- /* Map from PID (local/remote) to process. */
- map<PID, set<Process*> > links;
+ /* Map from UPID (local/remote) to process. */
+ map<UPID, set<Process*> > links;
/* Map from socket to node (ip, port). */
map<int, Node> sockets;
@@ -230,23 +230,23 @@ public:
ProcessManager();
~ProcessManager();
- ProcessReference use(const PID &pid);
+ ProcessReference use(const UPID &pid);
void deliver(Message *message, Process *sender = NULL);
- PID spawn(Process *process);
- void link(Process *process, const PID &to);
+ UPID spawn(Process *process);
+ void link(Process *process, const UPID &to);
void receive(Process *process, double secs);
void pause(Process *process, double secs);
- bool wait(Process *process, const PID &pid);
- bool external_wait(const PID &pid);
+ bool wait(Process *process, const UPID &pid);
+ bool external_wait(const UPID &pid);
bool await(Process *process, int fd, int op, double secs, bool ignore);
void enqueue(Process *process);
Process * dequeue();
- void timedout(const PID &pid, int generation);
- void awaited(const PID &pid, int generation);
+ void timedout(const UPID &pid, int generation);
+ void awaited(const UPID &pid, int generation);
void run(Process *process);
void cleanup(Process *process);
@@ -301,7 +301,7 @@ protected:
}
private:
- map<PID, Proxy*> proxies;
+ map<UPID, Proxy*> proxies;
};
@@ -486,7 +486,7 @@ int set_nbio(int fd)
}
-Message* encode(const PID &from, const PID &to, const string &name, const string &data = "")
+Message* encode(const UPID &from, const UPID &to, const string &name, const string &data = "")
{
Message* message = new Message();
message->from = from;
@@ -617,7 +617,7 @@ void handle_timeout(struct ev_loop *loop
void handle_await(struct ev_loop *loop, ev_io *watcher, int revents)
{
- tuple<PID, int> *t = (tuple<PID, int> *) watcher->data;
+ tuple<UPID, int> *t = (tuple<UPID, int> *) watcher->data;
process_manager->awaited(t->get<0>(), t->get<1>());
ev_io_stop(loop, watcher);
delete watcher;
@@ -1150,7 +1150,7 @@ Proxy::Proxy(int _c) : c(_c)
Process::spawn(proxy_manager);
}
- dispatch(proxy_manager, &ProxyManager::manage, this);
+ dispatch(PID<ProxyManager>(*proxy_manager), &ProxyManager::manage, this);
}
@@ -1188,7 +1188,7 @@ LinkManager::LinkManager()
LinkManager::~LinkManager() {}
-void LinkManager::link(Process *process, const PID &to)
+void LinkManager::link(Process *process, const UPID &to)
{
// TODO(benh): The semantics we want to support for link are such
// that if there is nobody to link to (local or remote) then a
@@ -1448,9 +1448,9 @@ void LinkManager::exited(const Node &nod
// ourselves that the accesses to each Process object will always be
// valid.
synchronized (this) {
- list<PID> removed;
+ list<UPID> removed;
// Look up all linked processes.
- foreachpair (const PID &pid, set<Process *> &processes, links) {
+ foreachpair (const UPID &pid, set<Process *> &processes, links) {
if (pid.ip == node.ip && pid.port == node.port) {
// N.B. If we call exited(pid) we might invalidate iteration.
foreach (Process *process, processes) {
@@ -1461,7 +1461,7 @@ void LinkManager::exited(const Node &nod
}
}
- foreach (const PID &pid, removed) {
+ foreach (const UPID &pid, removed) {
links.erase(pid);
}
}
@@ -1475,10 +1475,10 @@ void LinkManager::exited(Process *proces
foreachpair (_, set<Process *> &processes, links)
processes.erase(process);
- const PID &pid = process->self();
+ const UPID &pid = process->self();
/* Look up all linked processes. */
- map<PID, set<Process *> >::iterator it = links.find(pid);
+ map<UPID, set<Process *> >::iterator it = links.find(pid);
if (it != links.end()) {
set<Process *> &processes = it->second;
@@ -1504,7 +1504,7 @@ ProcessManager::ProcessManager()
ProcessManager::~ProcessManager() {}
-ProcessReference ProcessManager::use(const PID &pid)
+ProcessReference ProcessManager::use(const UPID &pid)
{
if (pid.ip == ip && pid.port == port) {
synchronized (processes) {
@@ -1547,7 +1547,7 @@ void ProcessManager::deliver(Message *me
}
-PID ProcessManager::spawn(Process *process)
+UPID ProcessManager::spawn(Process *process)
{
assert(process != NULL);
@@ -1555,7 +1555,7 @@ PID ProcessManager::spawn(Process *proce
synchronized (processes) {
if (processes.count(process->pid.id) > 0) {
- return PID();
+ return UPID();
} else {
processes[process->pid.id] = process;
}
@@ -1619,7 +1619,7 @@ PID ProcessManager::spawn(Process *proce
-void ProcessManager::link(Process *process, const PID &to)
+void ProcessManager::link(Process *process, const UPID &to)
{
// Check if the pid is local.
if (!(to.ip == ip && to.port == port)) {
@@ -1712,7 +1712,7 @@ void ProcessManager::pause(Process *proc
}
-bool ProcessManager::wait(Process *process, const PID &pid)
+bool ProcessManager::wait(Process *process, const UPID &pid)
{
bool waited = false;
@@ -1748,7 +1748,7 @@ bool ProcessManager::wait(Process *proce
}
-bool ProcessManager::external_wait(const PID &pid)
+bool ProcessManager::external_wait(const UPID &pid)
{
// We use a gate for external waiters. A gate is single use. That
// is, a new gate is created when the first external thread shows
@@ -1818,7 +1818,7 @@ bool ProcessManager::await(Process *proc
// the watcher will always fire, even if we get interrupted and
// return early, so this tuple will get cleaned up when the
// watcher runs).
- watcher->data = new tuple<PID, int>(process->pid, process->generation);
+ watcher->data = new tuple<UPID, int>(process->pid, process->generation);
/* Enqueue the watcher. */
synchronized (watchers) {
@@ -1892,7 +1892,7 @@ Process * ProcessManager::dequeue()
}
-void ProcessManager::timedout(const PID &pid, int generation)
+void ProcessManager::timedout(const UPID &pid, int generation)
{
if (ProcessReference process = use(pid)) {
process->lock();
@@ -1931,7 +1931,7 @@ void ProcessManager::timedout(const PID
}
-void ProcessManager::awaited(const PID &pid, int generation)
+void ProcessManager::awaited(const UPID &pid, int generation)
{
if (ProcessReference process = use(pid)) {
process->lock();
@@ -2300,7 +2300,7 @@ Message * Process::dequeue()
}
-void Process::inject(const PID &from, const string &name, const char *data, size_t length)
+void Process::inject(const UPID& from, const string& name, const char* data, size_t length)
{
if (!from)
return;
@@ -2328,7 +2328,7 @@ void Process::inject(const PID &from, co
}
-void Process::send(const PID &to, const string &name, const char *data, size_t length)
+void Process::send(const UPID& to, const string& name, const char* data, size_t length)
{
if (!to) {
return;
@@ -2387,7 +2387,7 @@ string Process::receive(double secs)
timeout:
assert(current == NULL);
- current = encode(PID(), pid, TIMEOUT);
+ current = encode(UPID(), pid, TIMEOUT);
return name();
}
@@ -2395,12 +2395,12 @@ string Process::receive(double secs)
string Process::serve(double secs, bool forever)
{
do {
- const string &name = receive(secs);
- if (name == DISPATCH) {
- void *pointer = (char *) current->body.data();
- std::tr1::function<void (void)> *delegator =
- *reinterpret_cast<std::tr1::function<void (void)> **>(pointer);
- (*delegator)();
+ const string& name = receive(secs);
+ if (name == "__DISPATCH__") {
+ void* pointer = (char *) current->body.data();
+ std::tr1::function<void(Process*)>* delegator =
+ *reinterpret_cast<std::tr1::function<void(Process*)>**>(pointer);
+ (*delegator)(this);
delete delegator;
} else {
return name;
@@ -2415,17 +2415,17 @@ void Process::operator () ()
}
-PID Process::from() const
+UPID Process::from() const
{
if (current != NULL) {
return current->from;
} else {
- return PID();
+ return UPID();
}
}
-string Process::name() const
+const string& Process::name() const
{
if (current != NULL) {
return current->name;
@@ -2435,7 +2435,7 @@ string Process::name() const
}
-const char * Process::body(size_t *length) const
+const char* Process::body(size_t* length) const
{
if (current != NULL && current->body.size() > 0) {
if (length != NULL) {
@@ -2460,7 +2460,7 @@ void Process::pause(double secs)
}
-PID Process::link(const PID &to)
+UPID Process::link(const UPID& to)
{
if (!to) {
return to;
@@ -2533,7 +2533,7 @@ double Process::elapsed()
}
-PID Process::spawn(Process *process)
+UPID Process::spawn(Process *process)
{
initialize();
@@ -2553,12 +2553,12 @@ PID Process::spawn(Process *process)
return process_manager->spawn(process);
} else {
- return PID();
+ return UPID();
}
}
-bool Process::wait(const PID &pid)
+bool Process::wait(const UPID& pid)
{
initialize();
@@ -2603,7 +2603,7 @@ void Process::filter(Filter *filter)
}
-void Process::post(const PID &to, const string &name, const char *data, size_t length)
+void Process::post(const UPID& to, const string& name, const char* data, size_t length)
{
initialize();
@@ -2612,18 +2612,14 @@ void Process::post(const PID &to, const
}
// Encode and transport outgoing message.
- transport(encode(PID(), to, name, string(data, length)));
+ transport(encode(UPID(), to, name, string(data, length)));
}
-void Process::dispatcher(Process *process, function<void (void)> *delegator)
+void Process::dispatcher(const UPID& pid, function<void(Process*)>* delegator)
{
- if (process == NULL) {
- return;
- }
-
// Encode and deliver outgoing message.
- Message *message = encode(PID(), process->pid, DISPATCH,
+ Message* message = encode(UPID(), pid, "__DISPATCH__",
string((char *) &delegator, sizeof(delegator)));
if (proc_process != NULL) {
Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132263&r1=1132262&r2=1132263&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun 5 09:09:33 2011
@@ -12,23 +12,21 @@
#include <tr1/functional>
+#include "future.hpp"
#include "pid.hpp"
+#include "promise.hpp"
const std::string ERROR = "error";
const std::string TIMEOUT = "timeout";
const std::string EXIT = "exit";
const std::string TERMINATE = "terminate";
-const std::string DISPATCH = "dispatch"; // TODO(benh): Make this internal.
-
-
-class Process;
struct Message {
std::string name;
- PID from;
- PID to;
+ UPID from;
+ UPID to;
std::string body;
};
@@ -43,87 +41,10 @@ public:
class Filter {
public:
- virtual bool filter(Message *) = 0;
-};
-
-
-template <typename T>
-struct Future
-{
- Future();
- Future(const Future<T> &that);
- Future<T> & operator = (const Future<T> &that);
- virtual ~Future();
- void set(const T &t_);
- T get() const;
-
-private:
- int *refs;
- T **t;
- Process *trigger;
-};
-
-
-template <typename T>
-class Promise
-{
-public:
- Promise();
- Promise(const Promise<T> &that);
- virtual ~Promise();
- void set(const T &t_);
- void associate(const Future<T> &future_);
-
-private:
- void operator = (const Promise<T> &);
-
- enum State {
- UNSET_UNASSOCIATED,
- SET_UNASSOCIATED,
- UNSET_ASSOCIATED,
- SET_ASSOCIATED,
- };
-
- int *refs;
- T **t;
- State *state;
- Future<T> **future;
-};
-
-
-template <>
-class Promise<void>;
-
-
-template <typename T>
-class Promise<T&>;
-
-
-template <typename T>
-struct Result
-{
- Result(const T &t_);
- Result(const Promise<T> &promise_);
- Result(const Result<T> &that);
- virtual ~Result();
- bool isPromise() const;
- Promise<T> getPromise() const;
-
- T get() const;
-
-private:
- void operator = (const Result<T> &);
-
- int *refs;
- T *t;
- Promise<T> *promise;
+ virtual bool filter(Message*) = 0;
};
-template <>
-struct Result<void>;
-
-
class Process {
public:
Process(const std::string& id = "");
@@ -131,26 +52,26 @@ public:
virtual ~Process();
/* Returns pid of process; valid even before calling spawn. */
- PID self() const { return pid; }
+ UPID self() const { return pid; }
protected:
/* Function run when process spawned. */
virtual void operator() ();
/* Returns the sender's PID of the last dequeued (current) message. */
- PID from() const;
+ UPID from() const;
/* Returns the name of the last dequeued (current) message. */
- std::string name() const;
+ const std::string& name() const;
/* Returns pointer and length of body of last dequeued (current) message. */
- const char * body(size_t *length) const;
+ const char* body(size_t* length) const;
/* Put a message at front of queue (will not reschedule process). */
- void inject(const PID &from, const std::string &name, const char *data = NULL, size_t length = 0);
+ void inject(const UPID& from, const std::string& name, const char* data = NULL, size_t length = 0);
/* Sends a message with data to PID. */
- void send(const PID &to, const std::string &name, const char *data = NULL, size_t length = 0);
+ void send(const UPID& to, const std::string &name, const char *data = NULL, size_t length = 0);
/* Blocks for message at most specified seconds (0 implies forever). */
std::string receive(double secs = 0);
@@ -162,7 +83,7 @@ protected:
void pause(double secs);
/* Links with the specified PID. */
- PID link(const PID &pid);
+ UPID link(const UPID& pid);
/* IO events for awaiting. */
enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
@@ -182,21 +103,21 @@ public:
*
* @param process process to be spawned
*/
- static PID spawn(Process *process);
+ static UPID spawn(Process* process);
/**
* Wait for process to exit (returns true if actually waited on a process).
*
* @param PID id of the process
*/
- static bool wait(const PID &pid);
+ static bool wait(const UPID& pid);
/**
* Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
*
* @param thunk function to be invoked
*/
- static void invoke(const std::tr1::function<void (void)> &thunk);
+ static void invoke(const std::tr1::function<void(void)>& thunk);
/**
* Use the specified filter on messages that get enqueued (note,
@@ -204,7 +125,7 @@ public:
*
* @param filter message filter
*/
- static void filter(Filter *filter);
+ static void filter(Filter* filter);
/**
* Sends a message with data without a return address.
@@ -214,73 +135,73 @@ public:
* @param data data to send (gets copied)
* @param length length of data
*/
- static void post(const PID &to, const std::string &name, const char *data = NULL, size_t length = 0);
+ static void post(const UPID& to, const std::string& name, const char* data = NULL, size_t length = 0);
/**
* Dispatches a void method on a process.
*
- * @param instance running process to receive dispatch message
- * @param method method to invoke on instance
+ * @param pid receiver of message
+ * @param method method to invoke on receiver
*/
- template <typename C>
- static void dispatch(C *instance, void (C::*method)());
+ template <typename T>
+ static void dispatch(const PID<T>& pid, void (T::*method)());
/**
* Dispatches a void method on a process.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 argument to pass to method
*/
- template <typename C, typename P1, typename A1>
- static void dispatch(C *instance, void (C::*method)(P1), A1 a1);
+ template <typename T, typename P1, typename A1>
+ static void dispatch(const PID<T>& pid, void (T::*method)(P1), A1 a1);
/**
* Dispatches a void method on a process.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
*/
- template <typename C, typename P1, typename P2, typename A1, typename A2>
- static void dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2);
+ template <typename T, typename P1, typename P2, typename A1, typename A2>
+ static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2), A1 a1, A2 a2);
/**
* Dispatches a void method on a process.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
* @param a3 second argument to pass to method
*/
- template <typename C,
+ template <typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
- static void dispatch(C *instance, void (C::*method)(P1, P2, P3),
+ static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3);
/**
* Dispatches a void method on a process.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
* @param a3 third argument to pass to method
* @param a4 fourth argument to pass to method
*/
- template <typename C,
+ template <typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
- static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
+ static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4),
A1 a1, A2 a2, A3 a3, A4 a4);
/**
* Dispatches a void method on a process.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
@@ -288,73 +209,77 @@ public:
* @param a4 fourth argument to pass to method
* @param a5 fifth argument to pass to method
*/
- template <typename C,
+ template <typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
- static void dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+ static void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
/**
* Dispatches a method on a process and returns the future that
* corresponds to the result of executing the method.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @return future corresponding to the result of executing the method
*/
- template <typename T, typename C>
- static Future<T> dispatch(C *instance, Result<T> (C::*method)());
+ template <typename R, typename T>
+ static Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)());
/**
* Dispatches a method on a process and returns the future that
* corresponds to the result of executing the method.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 argument to pass to method
* @return future corresponding to the result of executing the method
*/
- template <typename T, typename C, typename P1, typename A1>
- static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1);
+ template <typename R, typename T, typename P1, typename A1>
+ static Future<R> dispatch(const PID<T>& pid,
+ Promise<R> (T::*method)(P1),
+ A1 a1);
/**
* Dispatches a method on a process and returns the future that
* corresponds to the result of executing the method.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
* @return future corresponding to the result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2,
typename A1, typename A2>
- static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+ static Future<R> dispatch(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2),
A1 a1, A2 a2);
/**
* Dispatches a method on a process and returns the future that
* corresponds to the result of executing the method.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
* @param a3 second argument to pass to method
* @return future corresponding to the result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
- static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+ static Future<R> dispatch(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3);
/**
* Dispatches a method on a process and returns the future that
* corresponds to the result of executing the method.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
@@ -362,17 +287,18 @@ public:
* @param a4 fourth argument to pass to method
* @return future corresponding to the result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
- static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+ static Future<R> dispatch(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2, P3, P4),
A1 a1, A2 a2, A3 a3, A4 a4);
/**
* Dispatches a method on a process and returns the future that
* corresponds to the result of executing the method.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
@@ -381,72 +307,76 @@ public:
* @param a5 fifth argument to pass to method
* @return future corresponding to the result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
- static Future<T> dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+ static Future<R> dispatch(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
/**
* Dispatches a method on a process and waits (on the underlying
* future) for the result.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @return result of executing the method
*/
- template <typename T, typename C>
- static T call(C *instance, Result<T> (C::*method)());
+ template <typename R, typename T>
+ static R call(const PID<T>& pid, Promise<R> (T::*method)());
/**
* Dispatches a method on a process and waits (on the underlying
* future) for the result.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 argument to pass to method
* @return result of executing the method
*/
- template <typename T, typename C, typename P1, typename A1>
- static T call(C *instance, Result<T> (C::*method)(P1), A1 a1);
+ template <typename R, typename T, typename P1, typename A1>
+ static R call(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1);
/**
* Dispatches a method on a process and waits (on the underlying
* future) for the result.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
* @return result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2,
typename A1, typename A2>
- static T call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2);
+ static R call(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2),
+ A1 a1, A2 a2);
/**
* Dispatches a method on a process and waits (on the underlying
* future) for the result.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
* @param a3 second argument to pass to method
* @return result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
- static T call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+ static R call(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3);
/**
* Dispatches a method on a process and waits (on the underlying
* future) for the result.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
@@ -454,17 +384,18 @@ public:
* @param a4 fourth argument to pass to method
* @return result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
- static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+ static R call(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2, P3, P4),
A1 a1, A2 a2, A3 a3, A4 a4);
/**
* Dispatches a method on a process and waits (on the underlying
* future) for the result.
*
- * @param instance running process to receive dispatch message
+ * @param pid receiver of message
* @param method method to invoke on instance
* @param a1 first argument to pass to method
* @param a2 second argument to pass to method
@@ -473,17 +404,18 @@ public:
* @param a5 fifth argument to pass to method
* @return result of executing the method
*/
- template <typename T, typename C,
+ template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
- static T call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+ static R call(const PID<T>& pid,
+ Promise<R> (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5);
private:
friend class LinkManager;
friend class ProcessManager;
friend class ProcessReference;
- friend void * schedule(void *);
+ friend void* schedule(void *);
/* Flag indicating state of process. */
enum { INIT,
@@ -502,16 +434,16 @@ private:
int refs;
/* Queue of received messages. */
- std::deque<Message *> messages;
+ std::deque<Message*> messages;
/* Current message. */
- Message *current;
+ Message* current;
/* Current "blocking" generation. */
int generation;
/* Process PID. */
- PID pid;
+ UPID pid;
/* Continuation/Context of process. */
ucontext_t uctx;
@@ -522,543 +454,322 @@ private:
void unlock() { pthread_mutex_unlock(&m); }
/* Enqueues the specified message. */
- void enqueue(Message *message);
+ void enqueue(Message* message);
/* Dequeues a message or returns NULL. */
- Message * dequeue();
+ Message* dequeue();
+
+ template <typename T>
+ static void vdelegate(Process* process, std::tr1::function<void(T*)>* thunk)
+ {
+ assert(process != NULL);
+ assert(thunk != NULL);
+ (*thunk)(static_cast<T*>(process));
+ delete thunk;
+ }
+
+ template <typename R, typename T>
+ static void delegate(Process* process, std::tr1::function<Promise<R>(T*)>* thunk, Future<R>* future)
+ {
+ assert(process != NULL);
+ assert(thunk != NULL);
+ assert(future != NULL);
+ (*thunk)(static_cast<T*>(process)).associate(future);
+ delete thunk;
+ }
/* Dispatches the delegator to the specified process. */
- static void dispatcher(Process *, std::tr1::function<void (void)> *delegator);
+ static void dispatcher(const UPID& pid, std::tr1::function<void(Process*)>* delegator);
};
template <typename T>
-void delegate(std::tr1::function<Result<T> (void)> *thunk, Future<T> *future)
+void Process::dispatch(const PID<T>& pid, void (T::*method)())
{
- assert(thunk != NULL);
- assert(future != NULL);
+ std::tr1::function<void(T*)>* thunk =
+ new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
- const Result<T> &result = (*thunk)();
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- if (!result.isPromise()) {
- future->set(result.get());
- } else {
- result.getPromise().associate(*future);
- }
-
- delete thunk;
- delete future;
+ dispatcher(pid, delegator);
}
-template <typename C>
-void Process::dispatch(C *instance, void (C::*method)())
+template <typename T, typename P1, typename A1>
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1), A1 a1)
{
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(method, instance));
-
- dispatcher(instance, delegator);
-}
-
+ std::tr1::function<void(T*)>* thunk =
+ new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1));
-template <typename C, typename P1, typename A1>
-void Process::dispatch(C *instance, void (C::*method)(P1), A1 a1)
-{
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
- a1));
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
}
-template <typename C,
+template <typename T,
typename P1, typename P2,
typename A1, typename A2>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2), A1 a1, A2 a2)
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2), A1 a1, A2 a2)
{
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
- a1, a2));
+ std::tr1::function<void(T*)>* thunk =
+ new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2));
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
}
-template <typename C,
+template <typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3),
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3)
{
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
- a1, a2, a3));
+ std::tr1::function<void(T*)>* thunk =
+ new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3));
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
}
-template <typename C,
+template <typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4),
- A1 a1, A2 a2, A3 a3, A4 a4)
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4)
{
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(method, instance, a1, a2, a3,
- a4));
+ std::tr1::function<void(T*)>* thunk =
+ new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4));
- dispatcher(instance, delegator);
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+ std::tr1::placeholders::_1,
+ thunk));
+
+ dispatcher(pid, delegator);
}
-template <typename C,
+template <typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
-void Process::dispatch(C *instance, void (C::*method)(P1, P2, P3, P4, P5),
+void Process::dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
{
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(method, instance,
- a1, a2, a3, a4, a5));
+ std::tr1::function<void(T*)>* thunk =
+ new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4, a5));
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::vdelegate<T>,
+ std::tr1::placeholders::_1,
+ thunk));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
}
-template <typename T, typename C>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)())
+template <typename R, typename T>
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)())
{
- std::tr1::function<Result<T> (void)> *thunk =
- new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance));
+ std::tr1::function<Promise<R> (T*)>* thunk =
+ new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
- Future<T> *future = new Future<T>();
+ Future<R>* future = new Future<R>();
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
- future));
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
return *future;
}
-template <typename T, typename C, typename P1, typename A1>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1), A1 a1)
+template <typename R, typename T, typename P1, typename A1>
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1)
{
- std::tr1::function<Result<T> (void)> *thunk =
- new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
- a1));
+ std::tr1::function<Promise<R> (T*)>* thunk =
+ new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1));
- Future<T> *future = new Future<T>();
+ Future<R>* future = new Future<R>();
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
- future));
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
return *future;
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2,
typename A1, typename A2>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2),
A1 a1, A2 a2)
{
- std::tr1::function<Result<T> (void)> *thunk =
- new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
- a1, a2));
+ std::tr1::function<Promise<R> (T*)>* thunk =
+ new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
- Future<T> *future = new Future<T>();
-
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
- future));
-
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
return *future;
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3)
{
- std::tr1::function<Result<T> (void)> *thunk =
- new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
- a1, a2, a3));
-
- Future<T> *future = new Future<T>();
+ std::tr1::function<Promise<R> (T*)>* thunk =
+ new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
- future));
-
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
return *future;
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4),
A1 a1, A2 a2, A3 a3, A4 a4)
{
- std::tr1::function<Result<T> (void)> *thunk =
- new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
- a1, a2, a3, a4));
-
- Future<T> *future = new Future<T>();
+ std::tr1::function<Promise<R> (T*)>* thunk =
+ new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
- future));
-
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
return *future;
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
-Future<T> Process::dispatch(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+Future<R> Process::dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
{
- std::tr1::function<Result<T> (void)> *thunk =
- new std::tr1::function<Result<T> (void)>(std::tr1::bind(method, instance,
- a1, a2, a3, a4, a5));
-
- Future<T> *future = new Future<T>();
-
- std::tr1::function<void (void)> *delegator =
- new std::tr1::function<void (void)>(std::tr1::bind(&delegate<T>, thunk,
- future));
+ std::tr1::function<Promise<R> (T*)>* thunk =
+ new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4, a5));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(Process*)>* delegator =
+ new std::tr1::function<void(Process*)>(std::tr1::bind(&Process::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
- dispatcher(instance, delegator);
+ dispatcher(pid, delegator);
return *future;
}
-template <typename T, typename C>
-T Process::call(C *instance, Result<T> (C::*method)())
+template <typename R, typename T>
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)())
{
- return dispatch(instance, method).get();
+ return dispatch(pid, method).get();
}
-template <typename T, typename C, typename P1, typename A1>
-T Process::call(C *instance, Result<T> (C::*method)(P1), A1 a1)
+template <typename R, typename T, typename P1, typename A1>
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1)
{
- return dispatch(instance, method, a1).get();
+ return dispatch(pid, method, a1).get();
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2,
typename A1, typename A2>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2), A1 a1, A2 a2)
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2), A1 a1, A2 a2)
{
- return dispatch(instance, method, a1, a2).get();
+ return dispatch(pid, method, a1, a2).get();
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3),
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3),
A1 a1, A2 a2, A3 a3)
{
- return dispatch(instance, method, a1, a2, a3).get();
+ return dispatch(pid, method, a1, a2, a3).get();
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4),
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4),
A1 a1, A2 a2, A3 a3, A4 a4)
{
- return dispatch(instance, method, a1, a2, a3, a4).get();
+ return dispatch(pid, method, a1, a2, a3, a4).get();
}
-template <typename T, typename C,
+template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
-T Process::call(C *instance, Result<T> (C::*method)(P1, P2, P3, P4, P5),
+R Process::call(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
{
- return dispatch(instance, method, a1, a2, a3, a4, a5).get();
-}
-
-
-template <typename T>
-Future<T>::Future()
-{
- refs = new int;
- *refs = 1;
- t = new T *;
- *t = NULL;
- trigger = new Process();
- Process::spawn(trigger);
-}
-
-
-template <typename T>
-Future<T>::Future(const Future<T> &that)
-{
- assert(that.refs > 0);
- __sync_fetch_and_add(that.refs, 1);
- refs = that.refs;
- t = that.t;
- trigger = that.trigger;
-}
-
-
-template <typename T>
-Future<T> & Future<T>::operator = (const Future<T> &that)
-{
- if (this != &that) {
- // Destructor ...
- assert(refs != NULL);
- if (__sync_sub_and_fetch(refs, 1) == 0) {
- delete refs;
- assert(t != NULL);
- if (*t != NULL)
- delete *t;
- assert(trigger != NULL);
- Process::post(trigger->self(), "");
- Process::wait(trigger->self());
- delete trigger;
- }
-
- // Copy constructor ...
- assert(that.refs > 0);
- __sync_fetch_and_add(that.refs, 1);
- refs = that.refs;
- t = that.t;
- trigger = that.trigger;
- }
-}
-
-
-template <typename T>
-Future<T>::~Future()
-{
- assert(refs != NULL);
- if (__sync_sub_and_fetch(refs, 1) == 0) {
- delete refs;
- assert(t != NULL);
- if (*t != NULL)
- delete *t;
- assert(trigger != NULL);
- Process::post(trigger->self(), "");
- Process::wait(trigger->self());
- delete trigger;
- }
-}
-
-
-template <typename T>
-void Future<T>::set(const T &t_)
-{
- assert(t != NULL && *t == NULL);
- *t = new T(t_);
- Process::post(trigger->self(), "");
-}
-
-
-template <typename T>
-T Future<T>::get() const
-{
- assert(t != NULL);
- if (*t != NULL)
- return **t;
- assert(trigger != NULL);
- Process::wait(trigger->self());
- assert(t != NULL && *t != NULL);
- return **t;
-}
-
-
-// TODO(benh): Use synchronized instead of CAS?
-#define CAS __sync_bool_compare_and_swap
-
-
-template <typename T>
-Promise<T>::Promise()
-{
- refs = new int;
- *refs = 1;
- t = new T *;
- *t = NULL;
- state = new State;
- *state = UNSET_UNASSOCIATED;
- future = new Future<T> *;
- *future = NULL;
-}
-
-
-template <typename T>
-Promise<T>::Promise(const Promise<T> &that)
-{
- assert(that.refs > 0);
- __sync_fetch_and_add(that.refs, 1);
- refs = that.refs;
- t = that.t;
- state = that.state;
- future = that.future;
-}
-
-
-template <typename T>
-Promise<T>::~Promise()
-{
- assert(refs != NULL);
- if (__sync_sub_and_fetch(refs, 1) == 0) {
- delete refs;
- assert(t != NULL);
- if (*t != NULL)
- delete *t;
- assert(state != NULL);
- delete state;
- assert(future != NULL);
- if (*future != NULL)
- delete *future;
- }
+ return dispatch(pid, method, a1, a2, a3, a4, a5).get();
}
-
-template <typename T>
-void Promise<T>::set(const T &t_)
-{
- assert(state != NULL);
- assert(*state == UNSET_UNASSOCIATED ||
- *state == UNSET_ASSOCIATED);
- assert(t != NULL && *t == NULL);
- if (*state == UNSET_UNASSOCIATED) {
- *t = new T(t_);
- if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED, SET_UNASSOCIATED)) {
- assert(*state == UNSET_ASSOCIATED);
- __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
- assert(future != NULL && *future != NULL);
- (*future)->set(**t);
- }
- } else {
- assert(*state == UNSET_ASSOCIATED);
- assert(future != NULL && *future != NULL);
- (*future)->set(t_);
- __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
- }
-}
-
-
-template <typename T>
-void Promise<T>::associate(const Future<T> &future_)
-{
- assert(state != NULL);
- assert(*state == UNSET_UNASSOCIATED ||
- *state == SET_UNASSOCIATED);
- assert(future != NULL);
- *future = new Future<T>(future_);
- if (*state == UNSET_UNASSOCIATED) {
- if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
- UNSET_ASSOCIATED)) {
- assert(*state == SET_UNASSOCIATED);
- __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
- assert(*state == SET_ASSOCIATED);
- assert(t != NULL && *t != NULL);
- (*future)->set(**t);
- }
- } else {
- assert(*state == SET_UNASSOCIATED);
- __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
- assert(*state == SET_ASSOCIATED);
- assert(t != NULL && *t != NULL);
- (*future)->set(**t);
- }
-}
-
-
-template <typename T>
-Result<T>::Result(const T &t_)
-{
- refs = new int;
- *refs = 1;
- t = new T(t_);
- promise = NULL;
-}
-
-
-template <typename T>
-Result<T>::Result(const Promise<T> &promise_)
-{
- refs = new int;
- *refs = 1;
- t = NULL;
- promise = new Promise<T>(promise_);
-}
-
-
-template <typename T>
-Result<T>::Result(const Result<T> &that)
-{
- assert(that.refs > 0);
- __sync_fetch_and_add(that.refs, 1);
- refs = that.refs;
- t = that.t;
- promise = that.promise;
-}
-
-
-template <typename T>
-Result<T>::~Result()
-{
- assert(refs != NULL);
- if (__sync_sub_and_fetch(refs, 1) == 0) {
- delete refs;
- if (t != NULL)
- delete t;
- if (promise != NULL)
- delete promise;
- }
-}
-
-
-template <typename T>
-bool Result<T>::isPromise() const
-{
- return promise != NULL;
-}
-
-
-template <typename T>
-Promise<T> Result<T>::getPromise() const
-{
- assert(isPromise());
- return *promise;
-}
-
-
-template <typename T>
-T Result<T>::get() const
-{
- assert(!isPromise());
- return *t;
-}
-
-
#endif // __PROCESS_HPP__
Added: incubator/mesos/trunk/third_party/libprocess/promise.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/promise.hpp?rev=1132263&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/promise.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/promise.hpp Sun Jun 5 09:09:33 2011
@@ -0,0 +1,151 @@
+#ifndef __PROMISE_HPP__
+#define __PROMISE_HPP__
+
+#include "future.hpp"
+
+
+template <typename T>
+class Promise
+{
+public:
+ Promise();
+ Promise(const T &t_);
+ Promise(const Promise<T> &that);
+ virtual ~Promise();
+ void set(const T &t_);
+ void associate(Future<T> *future_);
+
+private:
+ void operator = (const Promise<T> &);
+
+ enum State {
+ UNSET_UNASSOCIATED,
+ SET_UNASSOCIATED,
+ UNSET_ASSOCIATED,
+ SET_ASSOCIATED,
+ };
+
+ int *refs;
+ T **t;
+ Future<T> **future;
+ State *state;
+};
+
+
+template <>
+class Promise<void>;
+
+
+template <typename T>
+class Promise<T&>;
+
+
+template <typename T>
+Promise<T>::Promise()
+{
+ refs = new int;
+ *refs = 1;
+ t = new T *;
+ *t = NULL;
+ future = new Future<T> *;
+ *future = NULL;
+ state = new State;
+ *state = UNSET_UNASSOCIATED;
+}
+
+template <typename T>
+Promise<T>::Promise(const T &_t)
+{
+ refs = new int;
+ *refs = 1;
+ t = new T *;
+ *t = new T(_t);
+ future = new Future<T> *;
+ *future = NULL;
+ state = new State;
+ *state = SET_UNASSOCIATED;
+}
+
+
+template <typename T>
+Promise<T>::Promise(const Promise<T> &that)
+{
+ assert(that.refs != NULL);
+ assert(*that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ t = that.t;
+ state = that.state;
+ future = that.future;
+}
+
+
+template <typename T>
+Promise<T>::~Promise()
+{
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ assert(t != NULL);
+ if (*t != NULL)
+ delete *t;
+ assert(state != NULL);
+ delete state;
+ assert(future != NULL);
+ if (*future != NULL)
+ delete *future;
+ }
+}
+
+
+template <typename T>
+void Promise<T>::set(const T &t_)
+{
+ assert(state != NULL);
+ assert(*state == UNSET_UNASSOCIATED ||
+ *state == UNSET_ASSOCIATED);
+ assert(t != NULL && *t == NULL);
+ if (*state == UNSET_UNASSOCIATED) {
+ *t = new T(t_);
+ if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED, SET_UNASSOCIATED)) {
+ assert(*state == UNSET_ASSOCIATED);
+ __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+ assert(future != NULL && *future != NULL);
+ (*future)->set(**t);
+ }
+ } else {
+ assert(*state == UNSET_ASSOCIATED);
+ assert(future != NULL && *future != NULL);
+ (*future)->set(t_);
+ __sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
+ }
+}
+
+
+template <typename T>
+void Promise<T>::associate(Future<T> *future_)
+{
+ assert(state != NULL);
+ assert(*state == UNSET_UNASSOCIATED ||
+ *state == SET_UNASSOCIATED);
+ assert(future != NULL);
+ *future = future_;
+ if (*state == UNSET_UNASSOCIATED) {
+ if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
+ UNSET_ASSOCIATED)) {
+ assert(*state == SET_UNASSOCIATED);
+ __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+ assert(*state == SET_ASSOCIATED);
+ assert(t != NULL && *t != NULL);
+ (*future)->set(**t);
+ }
+ } else {
+ assert(*state == SET_UNASSOCIATED);
+ __sync_bool_compare_and_swap(state, SET_UNASSOCIATED, SET_ASSOCIATED);
+ assert(*state == SET_ASSOCIATED);
+ assert(t != NULL && *t != NULL);
+ (*future)->set(**t);
+ }
+}
+
+#endif // __PROMISE_HPP__