You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:19:39 UTC
svn commit: r1132286 - in /incubator/mesos/trunk/third_party/libprocess:
encoder.hpp future.hpp http.hpp latch.cpp latch.hpp process.cpp process.hpp
promise.hpp tests.cpp
Author: benh
Date: Sun Jun 5 09:19:39 2011
New Revision: 1132286
URL: http://svn.apache.org/viewvc?rev=1132286&view=rev
Log:
Improvements and minor bug fixes in libprocess.
Modified:
incubator/mesos/trunk/third_party/libprocess/encoder.hpp
incubator/mesos/trunk/third_party/libprocess/future.hpp
incubator/mesos/trunk/third_party/libprocess/http.hpp
incubator/mesos/trunk/third_party/libprocess/latch.cpp
incubator/mesos/trunk/third_party/libprocess/latch.hpp
incubator/mesos/trunk/third_party/libprocess/process.cpp
incubator/mesos/trunk/third_party/libprocess/process.hpp
incubator/mesos/trunk/third_party/libprocess/promise.hpp
incubator/mesos/trunk/third_party/libprocess/tests.cpp
Modified: incubator/mesos/trunk/third_party/libprocess/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/encoder.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/encoder.hpp Sun Jun 5 09:19:39 2011
@@ -95,46 +95,44 @@ public:
{
std::ostringstream out;
- out << "HTTP/1.0 " << response.status << "\r\n";
+ // TODO(benh): Check version?
- foreachpair (const std::string& key, const std::string& value, response.headers) {
- out << key << ": " << value << "\r\n";
-// << "Content-Type: text/html\r\n"
-// << "Content-Length: " << response.body.size() << "\r\n"
- }
+ out << "HTTP/1.1 " << response.status << "\r\n";
- out << "Connection: close\r\n"
- << "\r\n";
+ std::map<std::string, std::string> headers = response.headers;
- out.write(response.body.data(), response.body.size());
+ // HTTP 1.1 requires the "Date" header. In the future once we
+ // start checking the version (above) then we can conditionally
+ // add this header, but for now, we always do.
+ time_t rawtime;
+ time(&rawtime);
- return out.str();
- }
-};
+ char date[256];
+ // TODO(benh): Check return code of strftime!
+ strftime(date, 256, "%a, %d %b %Y %H:%M:%S GMT", gmtime(&rawtime));
-class HttpGatewayTimeoutEncoder : public DataEncoder
-{
-public:
- HttpGatewayTimeoutEncoder()
- : DataEncoder(encode()) {}
+ headers["Date"] = date;
- static std::string encode()
- {
- std::ostringstream out;
+ foreachpair (const std::string& key, const std::string& value, headers) {
+ out << key << ": " << value << "\r\n";
+ }
+
+ // Make sure at least the "Content-Length" header since is present
+ // in order to signal to a client the end of a response.
+ if (headers.count("Content-Length") == 0) {
+ out << "Content-Length: " << response.body.size() << "\r\n";
+ }
- // TODO(benh): We send a content length of 0 so that a browser
- // won't just sit and wait for the socket to get closed (because
- // we can't close it our self right now).
- out << "HTTP/1.0 504 Gateway Timeout\r\n"
- << "Content-Length: 0\r\n"
- << "Connection: close\r\n"
- << "\r\n";
+ out << "\r\n";
+
+ out.write(response.body.data(), response.body.size());
return out.str();
}
};
+
} // namespace process {
#endif // __ENCODER_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/future.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/future.hpp Sun Jun 5 09:19:39 2011
@@ -15,15 +15,19 @@ class Future
{
public:
Future();
+ Future(const T& _t);
Future(const Future<T>& that);
Future<T>& operator = (const Future<T>& that);
virtual ~Future();
+ bool ready() const;
+ bool await(double secs = 0) const;
T get() const;
+ operator T () const;
private:
friend class Promise<T>;
- void set(const T& t_);
+ void set(const T& _t);
int* refs;
T** t;
@@ -43,6 +47,18 @@ Future<T>::Future()
template <typename T>
+Future<T>::Future(const T& _t)
+{
+ refs = new int;
+ *refs = 1;
+ t = new T*;
+ *t = NULL;
+ latch = new Latch();
+ set(_t);
+}
+
+
+template <typename T>
Future<T>::Future(const Future<T>& that)
{
assert(that.refs > 0);
@@ -94,26 +110,51 @@ Future<T>::~Future()
template <typename T>
-void Future<T>::set(const T& t_)
+bool Future<T>::ready() const
{
- assert(t != NULL && *t == NULL);
- *t = new T(t_);
- latch->trigger();
+ assert(t != NULL);
+ if (*t != NULL)
+ return true;
+ return false;
+}
+
+
+template <typename T>
+bool Future<T>::await(double secs) const
+{
+ if (ready())
+ return true;
+ assert(latch != NULL);
+ return latch->await(secs);
}
template <typename T>
T Future<T>::get() const
{
- assert(t != NULL);
- if (*t != NULL)
+ if (ready())
return **t;
- assert(latch != NULL);
- latch->await();
+ await();
assert(t != NULL && *t != NULL);
return **t;
}
+
+template <typename T>
+Future<T>::operator T () const
+{
+ return get();
+}
+
+
+template <typename T>
+void Future<T>::set(const T& _t)
+{
+ assert(t != NULL && *t == NULL);
+ *t = new T(_t);
+ latch->trigger();
+}
+
} // namespace process {
#endif // __FUTURE_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/http.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/http.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/http.hpp Sun Jun 5 09:19:39 2011
@@ -22,6 +22,9 @@ struct HttpResponse
// TODO(benh): Add major/minor version.
std::string status;
std::map<std::string, std::string> headers;
+ // TODO(benh): Make body a stream (channel) instead, and allow a
+ // response to be returned without forcing the stream to be
+ // finished.
std::string body;
};
@@ -35,6 +38,15 @@ struct HttpOKResponse : HttpResponse
};
+struct HttpBadRequestResponse : HttpResponse
+{
+ HttpBadRequestResponse()
+ {
+ status = "400 Bad Request";
+ }
+};
+
+
struct HttpNotFoundResponse : HttpResponse
{
HttpNotFoundResponse()
@@ -43,6 +55,25 @@ struct HttpNotFoundResponse : HttpRespon
}
};
+
+struct HttpInternalServerErrorResponse : HttpResponse
+{
+ HttpInternalServerErrorResponse()
+ {
+ status = "500 Internal Server Error";
+ }
+};
+
+
+struct HttpServiceUnavailableResponse : HttpResponse
+{
+ HttpServiceUnavailableResponse()
+ {
+ status = "503 Service Unavailable";
+ }
+};
+
+
} // namespace process {
#endif // __HTTP_HPP
Modified: incubator/mesos/trunk/third_party/libprocess/latch.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.cpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/latch.cpp Sun Jun 5 09:19:39 2011
@@ -39,12 +39,14 @@ void Latch::trigger()
}
-void Latch::await()
+bool Latch::await(double secs)
{
assert(latch != NULL);
if (!triggered) {
- wait(latch->self());
+ return wait(latch->self(), secs);
}
+
+ return true;
}
} // namespace process {
Modified: incubator/mesos/trunk/third_party/libprocess/latch.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/latch.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/latch.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/latch.hpp Sun Jun 5 09:19:39 2011
@@ -16,7 +16,7 @@ public:
virtual ~Latch();
void trigger();
- void await();
+ bool await(double secs = 0);
private:
Latch(const Latch& that);
Modified: incubator/mesos/trunk/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.cpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.cpp Sun Jun 5 09:19:39 2011
@@ -94,6 +94,9 @@ using std::tr1::function;
struct Node
{
+ Node(uint32_t _ip = 0, uint16_t _port = 0)
+ : ip(_ip), port(_port) {}
+
uint32_t ip;
uint16_t port;
};
@@ -194,16 +197,151 @@ private:
};
-class LinkManager
+/* Tick, tock ... manually controlled clock! */
+class InternalClock
{
public:
- LinkManager();
- ~LinkManager();
+ InternalClock()
+ {
+ initial = current = elapsed = ev_time();
+ }
+
+ ~InternalClock() {}
+
+ ev_tstamp getCurrent(ProcessBase *process)
+ {
+ ev_tstamp tstamp;
+
+ if (currents.count(process) != 0) {
+ tstamp = currents[process];
+ } else {
+ tstamp = currents[process] = initial;
+ }
+
+ return tstamp;
+ }
+
+ void setCurrent(ProcessBase *process, ev_tstamp tstamp)
+ {
+ currents[process] = tstamp;
+ }
+
+ ev_tstamp getCurrent()
+ {
+ return current;
+ }
+
+ void setCurrent(ev_tstamp tstamp)
+ {
+ current = tstamp;
+ }
+
+ ev_tstamp getElapsed()
+ {
+ return elapsed;
+ }
+
+ void setElapsed(ev_tstamp tstamp)
+ {
+ elapsed = tstamp;
+ }
+
+ void discard(ProcessBase *process)
+ {
+ assert(process != NULL);
+ currents.erase(process);
+ }
+
+private:
+ map<ProcessBase *, ev_tstamp> currents;
+ ev_tstamp initial;
+ ev_tstamp current;
+ ev_tstamp elapsed;
+};
+
+
+class GarbageCollector : public Process<GarbageCollector>
+{
+public:
+ GarbageCollector() {}
+ ~GarbageCollector() {}
+
+ template <typename T>
+ void manage(const T* t)
+ {
+ const ProcessBase* process = t;
+ if (process != NULL) {
+ processes[process->self()] = process;
+ link(process->self());
+ }
+ }
+
+protected:
+ virtual void operator () ()
+ {
+ while (true) {
+ serve();
+ if (name() == EXITED && processes.count(from()) > 0) {
+ const ProcessBase* process = processes[from()];
+ processes.erase(from());
+ delete process;
+ }
+ }
+ }
+
+private:
+ map<UPID, const ProcessBase*> processes;
+};
+
+
+/* Global garbage collector (move to own file). */
+static PID<GarbageCollector> gc;
+
+
+class HttpProxy;
+
+
+class HttpResponseWaiter : public Process<HttpResponseWaiter>
+{
+public:
+ HttpResponseWaiter(const PID<HttpProxy>& _proxy);
+ virtual ~HttpResponseWaiter();
+
+ void await(const Future<HttpResponse>& future, bool persist);
+
+private:
+ const PID<HttpProxy> proxy;
+};
+
+
+class HttpProxy : public Process<HttpProxy>
+{
+public:
+ HttpProxy(int _c);
+ virtual ~HttpProxy();
+
+ void handle(const Future<HttpResponse>& future, bool persist);
+ void ready(const Future<HttpResponse>& future, bool persist);
+ void unavailable(bool persist);
+
+private:
+ int c;
+ HttpResponseWaiter* waiter;
+};
+
+
+class SocketManager
+{
+public:
+ SocketManager();
+ ~SocketManager();
void link(ProcessBase* process, const UPID& to);
+ PID<HttpProxy> proxy(int s);
+
+ void send(DataEncoder* encoder, int s, bool persist);
void send(Message* message);
- void send(DataEncoder* encoder, int s);
DataEncoder* next(int s);
@@ -223,11 +361,15 @@ private:
map<Node, int> temps;
map<Node, int> persists;
+ /* Set of sockets that should be closed. */
set<int> disposables;
- /* Map from socket to outgoing messages. */
+ /* Map from socket to outgoing queue. */
map<int, queue<DataEncoder*> > outgoing;
+ /* HTTP proxies. */
+ map<int, HttpProxy*> proxies;
+
/* Protects instance variables. */
synchronizable(this);
};
@@ -277,7 +419,7 @@ private:
/* Waiting processes (protected by synchronizable(processes)). */
map<ProcessBase *, set<ProcessBase *> > waiters;
- /* Map of gates for waiting threads. */
+ /* Gates for waiting threads (protected by synchronizable(processes)). */
map<ProcessBase *, Gate *> gates;
/* Queue of runnable processes (implemented as deque). */
@@ -286,115 +428,6 @@ private:
};
-class HttpProxy : public Process<HttpProxy>
-{
-public:
- HttpProxy(int _c, const Future<HttpResponse>& _future);
- ~HttpProxy();
-
-protected:
- virtual void operator () ();
-
-private:
- int c;
- Future<HttpResponse> future;
-};
-
-
-class HttpProxyManager : public Process<HttpProxyManager>
-{
-public:
- HttpProxyManager() {}
- ~HttpProxyManager() {}
-
- void manage(HttpProxy* proxy)
- {
- assert(proxy != NULL);
- proxies[proxy->self()] = proxy;
- link(proxy->self());
- }
-
-protected:
- virtual void operator () ()
- {
- while (true) {
- serve();
- if (name() == EXITED && proxies.count(from()) > 0) {
- HttpProxy* proxy = proxies[from()];
- proxies.erase(from());
- delete proxy;
- }
- }
- }
-
-private:
- map<UPID, HttpProxy*> proxies;
-};
-
-
-/* Tick, tock ... manually controlled clock! */
-class InternalClock
-{
-public:
- InternalClock()
- {
- initial = current = elapsed = ev_time();
- }
-
- ~InternalClock() {}
-
- ev_tstamp getCurrent(ProcessBase *process)
- {
- ev_tstamp tstamp;
-
- if (currents.count(process) != 0) {
- tstamp = currents[process];
- } else {
- tstamp = currents[process] = initial;
- }
-
- return tstamp;
- }
-
- void setCurrent(ProcessBase *process, ev_tstamp tstamp)
- {
- currents[process] = tstamp;
- }
-
- ev_tstamp getCurrent()
- {
- return current;
- }
-
- void setCurrent(ev_tstamp tstamp)
- {
- current = tstamp;
- }
-
- ev_tstamp getElapsed()
- {
- return elapsed;
- }
-
- void setElapsed(ev_tstamp tstamp)
- {
- elapsed = tstamp;
- }
-
- void discard(ProcessBase *process)
- {
- assert(process != NULL);
- currents.erase(process);
- }
-
-private:
- map<ProcessBase *, ev_tstamp> currents;
- ev_tstamp initial;
- ev_tstamp current;
- ev_tstamp elapsed;
-};
-
-
/* Using manual clock if non-null. */
static InternalClock *clk = NULL;
@@ -410,8 +443,8 @@ static uint32_t ip = 0;
/* Local port. */
static uint16_t port = 0;
-/* Active LinkManager (eventually will probably be thread-local). */
-static LinkManager *link_manager = NULL;
+/* Active SocketManager (eventually will probably be thread-local). */
+static SocketManager *socket_manager = NULL;
/* Active ProcessManager (eventually will probably be thread-local). */
static ProcessManager *process_manager = NULL;
@@ -489,12 +522,6 @@ static Filter *filterer = NULL;
static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-/**
- * Instance of HttpProxyManager.
- */
-static HttpProxyManager* proxy_manager = NULL;
-
-
int set_nbio(int fd)
{
int flags;
@@ -531,11 +558,47 @@ void transport(Message* message, Process
process_manager->deliver(message, sender);
} else {
// Remote message.
- link_manager->send(message);
+ socket_manager->send(message);
}
}
+Message* parse(HttpRequest* request)
+{
+ if (request->method == "POST" && request->headers.count("User-Agent") > 0) {
+ const string& temp = request->headers["User-Agent"];
+ const string& libprocess = "libprocess/";
+ size_t index = temp.find(libprocess);
+ if (index != string::npos) {
+ // Okay, now determine 'from'.
+ const UPID from(temp.substr(index + libprocess.size(), temp.size()));
+
+ // Now determine 'to'.
+ index = request->path.find('/', 1);
+ index = index != string::npos ? index - 1 : string::npos;
+ const UPID to(request->path.substr(1, index), ip, port);
+
+ // And now determine 'name'.
+ index = index != string::npos ? index + 2: request->path.size();
+ const string& name = request->path.substr(index);
+
+ VLOG(2) << "Parsed message name '" << name
+ << "' for " << to << " from " << from;
+
+ Message* message = new Message();
+ message->name = name;
+ message->from = from;
+ message->to = to;
+ message->body = request->body;
+
+ return message;
+ }
+ }
+
+ return NULL;
+}
+
+
void handle_async(struct ev_loop *loop, ev_async *_, int revents)
{
synchronized (watchers) {
@@ -673,10 +736,14 @@ void recv_data(struct ev_loop *loop, ev_
// Might block, try again later.
break;
} else if (length <= 0) {
- // Socket error ... we consider closed.
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while receiving: " << error;
- link_manager->closed(c);
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(2) << "Socket error while receiving: " << error;
+ } else {
+ VLOG(2) << "Socket closed while receiving";
+ }
+ socket_manager->closed(c);
delete decoder;
ev_io_stop(loop, watcher);
delete watcher;
@@ -692,8 +759,8 @@ void recv_data(struct ev_loop *loop, ev_
process_manager->deliver(c, request);
}
} else if (requests.empty() && decoder->failed()) {
- VLOG(1) << "Decoder error while receiving";
- link_manager->closed(c);
+ VLOG(2) << "Decoder error while receiving";
+ socket_manager->closed(c);
delete decoder;
ev_io_stop(loop, watcher);
delete watcher;
@@ -726,10 +793,14 @@ void send_data(struct ev_loop *loop, ev_
// Might block, try again later.
break;
} else if (length <= 0) {
- // Socket closed or error ... we consider closed.
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- link_manager->closed(c);
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(2) << "Socket error while sending: " << error;
+ } else {
+ VLOG(2) << "Socket closed while sending";
+ }
+ socket_manager->closed(c);
delete encoder;
ev_io_stop(loop, watcher);
delete watcher;
@@ -745,7 +816,7 @@ void send_data(struct ev_loop *loop, ev_
delete encoder;
// Check for more stuff to send on socket.
- encoder = link_manager->next(c);
+ encoder = socket_manager->next(c);
if (encoder != NULL) {
watcher->data = encoder;
} else {
@@ -771,7 +842,7 @@ void sending_connect(struct ev_loop *loo
if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
// Connect failure.
VLOG(1) << "Socket error while connecting";
- link_manager->closed(c);
+ socket_manager->closed(c);
MessageEncoder* encoder = (MessageEncoder*) watcher->data;
delete encoder;
ev_io_stop(loop, watcher);
@@ -796,7 +867,7 @@ void receiving_connect(struct ev_loop *l
if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
// Connect failure.
VLOG(1) << "Socket error while connecting";
- link_manager->closed(c);
+ socket_manager->closed(c);
DataDecoder* decoder = (DataDecoder*) watcher->data;
delete decoder;
ev_io_stop(loop, watcher);
@@ -1051,9 +1122,9 @@ void initialize(bool initialize_google_l
signal(SIGPIPE, SIG_IGN);
#endif /* __sun__ */
- /* Create a new ProcessManager and LinkManager. */
+ /* Create a new ProcessManager and SocketManager. */
process_manager = new ProcessManager();
- link_manager = new LinkManager();
+ socket_manager = new SocketManager();
/* Setup processing thread. */
if (pthread_create (&proc_thread, NULL, schedule, NULL) != 0)
@@ -1176,61 +1247,97 @@ void initialize(bool initialize_google_l
initializing = false;
+ // Create global garbage collector.
+ gc = spawn(new GarbageCollector());
+
char temp[INET_ADDRSTRLEN];
CHECK(inet_ntop(AF_INET, (in_addr *) &ip, temp, INET_ADDRSTRLEN) != NULL);
VLOG(1) << "libprocess is initialized on " << temp << ":" << port;
}
-HttpProxy::HttpProxy(int _c, const Future<HttpResponse>& _future)
- : c(_c), future(_future)
+HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy)
+ : proxy(_proxy) {}
+
+
+HttpResponseWaiter::~HttpResponseWaiter() {}
+
+
+void HttpResponseWaiter::await(const Future<HttpResponse>& future, bool persist)
{
- // TODO(benh): Do proper initialization of the proxy manager. Right
- // now we can rely on invocations of the HttpProxy constructor to be
- // serial, so we don't need to do any fancy
- // synchronization. However, ultimately, we'd like to stick the
- // initialization into 'initialize' above, but that seemed to have
- // some issues. :(
- if (proxy_manager == NULL) {
- proxy_manager = new HttpProxyManager();
- spawn(proxy_manager);
+ if (future.await(30)) {
+ dispatch(proxy, &HttpProxy::ready, future, persist);
+ } else {
+ dispatch(proxy, &HttpProxy::unavailable, persist);
}
+}
+
+
+HttpProxy::HttpProxy(int _c) : c(_c)
+{
+ // Get garbage collected!
+ dispatch(gc, &GarbageCollector::manage<HttpProxy>, this);
+
+ // Create our waiter.
+ waiter = new HttpResponseWaiter(self());
+ spawn(waiter);
+}
+
- dispatch(proxy_manager->self(), &HttpProxyManager::manage, this);
+HttpProxy::~HttpProxy()
+{
+ send(waiter->self(), TERMINATE);
+ wait(waiter->self());
+ delete waiter;
}
-HttpProxy::~HttpProxy() {}
+void HttpProxy::handle(const Future<HttpResponse>& future, bool persist)
+{
+ dispatch(waiter->self(), &HttpResponseWaiter::await, future, persist);
+}
-void HttpProxy::operator () ()
+void HttpProxy::ready(const Future<HttpResponse>& future, bool persist)
{
- // Wait for the response!
+ CHECK(future.ready());
+
const HttpResponse& response = future.get();
- link_manager->send(new HttpResponseEncoder(response), c);
+ // Don't persist the connection if the responder doesn't want it to.
+ if (response.headers.count("Connection") > 0) {
+ const string& connection = response.headers.find("Connection")->second;
+ if (connection == "close") {
+ persist = false;
+ }
+ }
+
+ // See the semantics of SocketManager::send for details about how
+ // the socket will get closed (it might actually already be closed
+ // before we issue this send).
+ socket_manager->send(new HttpResponseEncoder(response), c, persist);
+}
+
+
+void HttpProxy::unavailable(bool persist)
+{
+ HttpResponse response = HttpServiceUnavailableResponse();
- // How the socket gets closed is a bit esoteric. :( If the browser
- // closes their socket then we will close it in when
- // LinkManger::closed gets called. Otherwise, after the future gets
- // set and we send the response then LinkManager::next will get
- // called, which will close the socket. Ultimately, the
- // HttpProxyManager will deal with deallocating the HttpProxy, even
- // though it was allocated in ProcessManager::deliver. Ugh, gross,
- // gross, gross. :(
+ // As above, the socket might all ready be closed when we do a send.
+ socket_manager->send(new HttpResponseEncoder(response), c, persist);
}
-LinkManager::LinkManager()
+SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
}
-LinkManager::~LinkManager() {}
+SocketManager::~SocketManager() {}
-void LinkManager::link(ProcessBase *process, const UPID &to)
+void SocketManager::link(ProcessBase *process, const UPID &to)
{
// TODO(benh): The semantics we want to support for link are such
// that if there is nobody to link to (local or remote) then a
@@ -1242,7 +1349,7 @@ void LinkManager::link(ProcessBase *proc
assert(process != NULL);
- const Node node = { to.ip, to.port };
+ Node node(to.ip, to.port);
synchronized (this) {
// Check if node is remote and there isn't a persistant link.
@@ -1250,13 +1357,16 @@ void LinkManager::link(ProcessBase *proc
// Okay, no link, lets create a socket.
int s;
- if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
+ if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
fatalerror("failed to link (socket)");
+ }
- if (set_nbio(s) < 0)
+ if (set_nbio(s) < 0) {
fatalerror("failed to link (set_nbio)");
+ }
sockets[s] = node;
+
persists[node] = s;
sockaddr_in addr;
@@ -1273,8 +1383,9 @@ void LinkManager::link(ProcessBase *proc
// Try and connect to the node using this socket.
if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS)
+ if (errno != EINPROGRESS) {
fatalerror("failed to link (connect)");
+ }
// Wait for socket to be connected.
ev_io_init(watcher, receiving_connect, s, EV_WRITE);
@@ -1296,28 +1407,38 @@ void LinkManager::link(ProcessBase *proc
}
-void LinkManager::send(Message* message)
+PID<HttpProxy> SocketManager::proxy(int s)
{
- assert(message != NULL);
+ synchronized (this) {
+ if (sockets.count(s) > 0) {
+ CHECK(proxies.count(s) > 0);
+ return proxies[s]->self();
+ } else {
+ // Register the socket with the manager for sending purposes. The
+ // current design doesn't let us create a valid "node" for this
+ // socket, so we use a "default" one for now.
+ sockets[s] = Node();
- DataEncoder* encoder = new MessageEncoder(message);
+ CHECK(proxies.count(s) == 0);
+
+ HttpProxy* proxy = new HttpProxy(s);
+ spawn(proxy);
+ proxies[s] = proxy;
+ return proxy->self();
+ }
+ }
+}
- Node node = { message->to.ip, message->to.port };
- synchronized (this) {
- // Check if there is already a link.
- if (persists.count(node) > 0 || temps.count(node) > 0) {
- int s = persists.count(node) > 0 ? persists[node] : temps[node];
+void SocketManager::send(DataEncoder* encoder, int s, bool persist)
+{
+ assert(encoder != NULL);
- // Check whether or not this socket has an outgoing queue.
- if (outgoing.count(s) != 0) {
+ synchronized (this) {
+ if (sockets.count(s) > 0) {
+ if (outgoing.count(s) > 0) {
outgoing[s].push(encoder);
} else {
- // Must be a persistant socket since temporary socket
- // shouldn't outlast it's outgoing queue!
- assert(persists.count(node) != 0);
- assert(temps.count(node) == 0 || temps[node] != s);
-
// Initialize the outgoing queue.
outgoing[s];
@@ -1333,19 +1454,50 @@ void LinkManager::send(Message* message)
ev_async_send(loop, &async_watcher);
}
+
+ // Set the socket to get closed if not persistant.
+ if (!persist) {
+ disposables.insert(s);
+ }
+ } else {
+ VLOG(1) << "Attempting to send on a no longer valid socket!";
+ }
+ }
+}
+
+
+void SocketManager::send(Message* message)
+{
+ assert(message != NULL);
+
+ DataEncoder* encoder = new MessageEncoder(message);
+
+ Node node(message->to.ip, message->to.port);
+
+ synchronized (this) {
+ // Check if there is already a socket.
+ bool persistant = persists.count(node) > 0;
+ bool temporary = temps.count(node) > 0;
+ if (persistant || temporary) {
+ int s = persistant ? persists[node] : temps[node];
+ send(encoder, s, persistant);
} else {
// No peristant or temporary socket to the node currently
// exists, so we create a temporary one.
int s;
- if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
+ if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
fatalerror("failed to send (socket)");
+ }
- if (set_nbio(s) < 0)
+ if (set_nbio(s) < 0) {
fatalerror("failed to send (set_nbio)");
+ }
sockets[s] = node;
+
temps[node] = s;
+ disposables.insert(s);
// Initialize the outgoing queue.
outgoing[s];
@@ -1362,8 +1514,9 @@ void LinkManager::send(Message* message)
watcher->data = encoder;
if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS)
+ if (errno != EINPROGRESS) {
fatalerror("failed to send (connect)");
+ }
// Initialize watcher for connecting.
ev_io_init(watcher, sending_connect, s, EV_WRITE);
@@ -1383,71 +1536,37 @@ void LinkManager::send(Message* message)
}
-void LinkManager::send(DataEncoder* encoder, int s)
-{
- assert(encoder != NULL);
-
- // TODO(benh): This is a big hack for sending data on a socket from
- // an encoder where we want one-time (i.e., disposable) use of the
- // socket. This is particularly useful right now for proxies we set
- // up for HTTP GET requests, however, the semantics of what this
- // send means should be less esoteric.
-
- synchronized (this) {
- assert(sockets.count(s) == 0);
- assert(outgoing.count(s) == 0);
-
- sockets[s] = Node(); // TODO(benh): HACK!
- outgoing[s];
- disposables.insert(s);
-
- // Allocate and initialize the watcher.
- ev_io *watcher = new ev_io();
- watcher->data = encoder;
-
- ev_io_init(watcher, send_data, s, EV_WRITE);
-
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- ev_async_send(loop, &async_watcher);
- }
-}
-
-
-DataEncoder* LinkManager::next(int s)
+DataEncoder* SocketManager::next(int s)
{
DataEncoder* encoder = NULL;
- // Sometimes we look for another encoder even though this socket
- // isn't actually maintained by the LinkManager (e.g., for proxy
- // sockets). In the future this kind of esoteric semantics will
- // hopefully get improved. :(
synchronized (this) {
- if (sockets.count(s) > 0 && outgoing.count(s) > 0) {
- if (!outgoing[s].empty()) {
- // More messages!
- encoder = outgoing[s].front();
- outgoing[s].pop();
- } else {
- // No more messages ... erase the outgoing queue.
- outgoing.erase(s);
+ CHECK(sockets.count(s) > 0);
+ CHECK(outgoing.count(s) > 0);
- // Close the socket if it was for one-time use.
- if (disposables.count(s) > 0) {
- disposables.erase(s);
- sockets.erase(s);
- close(s);
- }
+ if (!outgoing[s].empty()) {
+ // More messages!
+ encoder = outgoing[s].front();
+ outgoing[s].pop();
+ } else {
+ // No more messages ... erase the outgoing queue.
+ outgoing.erase(s);
- // Close the socket if it was temporary.
- const Node &node = sockets[s];
+ // Close the socket if it was set for disposal.
+ if (disposables.count(s) > 0) {
+ // Also try and remove from temps.
+ const Node& node = sockets[s];
if (temps.count(node) > 0 && temps[node] == s) {
temps.erase(node);
- sockets.erase(s);
- close(s);
+ } else if (proxies.count(s) > 0) {
+ HttpProxy* proxy = proxies[s];
+ proxies.erase(s);
+ post(proxy->self(), TERMINATE);
}
+
+ disposables.erase(s);
+ sockets.erase(s);
+ close(s);
}
}
}
@@ -1456,7 +1575,7 @@ DataEncoder* LinkManager::next(int s)
}
-void LinkManager::closed(int s)
+void SocketManager::closed(int s)
{
synchronized (this) {
if (sockets.count(s) > 0) {
@@ -1465,25 +1584,30 @@ void LinkManager::closed(int s)
// Don't bother invoking exited unless socket was persistant.
if (persists.count(node) > 0 && persists[node] == s) {
persists.erase(node);
- exited(node);
- } else {
- if (temps.count(node) > 0 && temps[node] == s) {
- temps.erase(node);
- }
+ exited(node);
+ } else if (temps.count(node) > 0 && temps[node] == s) {
+ temps.erase(node);
+ } else if (proxies.count(s) > 0) {
+ HttpProxy* proxy = proxies[s];
+ proxies.erase(s);
+ post(proxy->self(), TERMINATE);
}
- sockets.erase(s);
outgoing.erase(s);
+ disposables.erase(s);
+ sockets.erase(s);
}
}
- // This might have just been a receiving socket, so we want to make
- // sure to call close so that the file descriptor can get reused.
+ // This might have just been a receiving socket (only sending
+ // sockets, with the exception of the receiving side of a persistant
+ // socket, get added to 'sockets'), so we want to make sure to call
+ // close so that the file descriptor can get reused.
close(s);
}
-void LinkManager::exited(const Node &node)
+void SocketManager::exited(const Node &node)
{
// TODO(benh): It would be cleaner if this routine could call back
// into ProcessManager ... then we wouldn't have to convince
@@ -1510,7 +1634,7 @@ void LinkManager::exited(const Node &nod
}
-void LinkManager::exited(ProcessBase *process)
+void SocketManager::exited(ProcessBase *process)
{
synchronized (this) {
/* Remove any links this process might have had. */
@@ -1574,12 +1698,16 @@ bool ProcessManager::deliver(Message *me
if (sender != NULL) {
synchronized (timeouts) {
if (clk != NULL) {
- clk->setCurrent(receiver, max(clk->getCurrent(receiver),
- clk->getCurrent(sender)));
+ ev_tstamp tstamp =
+ max(clk->getCurrent(receiver), clk->getCurrent(sender));
+ clk->setCurrent(receiver, tstamp);
}
}
}
+ VLOG(2) << "Delivering message name '" << message->name
+ << "' for " << message->to << " from " << message->from;
+
receiver->enqueue(message);
} else {
delete message;
@@ -1595,51 +1723,18 @@ bool ProcessManager::deliver(int c, Http
{
assert(request != NULL);
- // Get out the receiver (needed regardless of if this is a standard
- // HTTP request or a libprocess message).
- const vector<string>& pairs = tokenize(request->path, "/");
- if (pairs.size() != 2) {
- // This has no receiver, send a response and cleanup.
- VLOG(1) << "Returning '404 Not Found' for HTTP request '"
- << request->path << "'";
- link_manager->send(new HttpResponseEncoder(HttpNotFoundResponse()), c);
- delete request;
- return false;
- }
-
- UPID to(pairs[0], ip, port);
-
- // Determine whether or not this is just a libprocess message.
- if (request->method == "POST" && request->headers.count("User-Agent") > 0) {
- const string& value = request->headers["User-Agent"];
- string libprocess = "libprocess/";
- size_t index = value.find(libprocess);
- if (index != string::npos) {
- // Okay, this is a libprocess message, now we can try and
- // determine name and from.
- UPID from(value.substr(index + libprocess.size(), value.size()));
-
- string name = pairs[1];
- if (name == "") {
- delete request;
- return false;
- }
-
- Message* message = new Message();
- message->name = name;
- message->from = from;
- message->to = to;
- message->body = request->body;
-
- delete request;
+ // Determine whether or not this is a libprocess message.
+ Message* message = parse(request);
- return deliver(message, sender);
- }
+ if (message != NULL) {
+ delete request;
+ return deliver(message, sender);
}
- // Okay, this isn't a libprocess message, so create a proxy and deliver.
+ // Treat this as an HTTP request and check for a valid receiver.
+ string temp = request->path.substr(1, request->path.find('/', 1) - 1);
- if (ProcessReference receiver = use(to)) {
+ if (ProcessReference receiver = use(UPID(temp, ip, port))) {
// If we have a local sender AND we are using a manual clock
// then update the current time of the receiver to preserve
// the happens-before relationship between the sender and
@@ -1649,24 +1744,39 @@ bool ProcessManager::deliver(int c, Http
if (sender != NULL) {
synchronized (timeouts) {
if (clk != NULL) {
- clk->setCurrent(receiver, max(clk->getCurrent(receiver),
- clk->getCurrent(sender)));
+ ev_tstamp tstamp =
+ max(clk->getCurrent(receiver), clk->getCurrent(sender));
+ clk->setCurrent(receiver, tstamp);
}
}
}
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(c);
// Create the future to associate with whatever gets returned.
Future<HttpResponse>* future = new Future<HttpResponse>();
- // Spawn a proxy that waits on the future.
- spawn(new HttpProxy(c, *future));
+ // Let the HttpProxy know about this request.
+ dispatch(proxy, &HttpProxy::handle, *future, request->keepAlive);
+ // Enqueue request and future for receiver.
receiver->enqueue(new pair<HttpRequest*, Future<HttpResponse>*>(request, future));
} else {
- // This has no receiver, send a response and cleanup.
- VLOG(1) << "Returning '404 Not Found' for HTTP request '"
+ // This has no receiver, send error response.
+ VLOG(1) << "Returning '404 Not Found' for HTTP request for '"
<< request->path << "'";
- link_manager->send(new HttpResponseEncoder(HttpNotFoundResponse()), c);
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(c);
+
+ // Create a "future" response.
+ Future<HttpResponse> future = HttpNotFoundResponse();
+
+ // Let the HttpProxy know about this request.
+ dispatch(proxy, &HttpProxy::handle, future, request->keepAlive);
+
+ // Cleanup request.
delete request;
return false;
}
@@ -1690,8 +1800,9 @@ bool ProcessManager::deliver(const UPID&
if (sender != NULL) {
synchronized (timeouts) {
if (clk != NULL) {
- clk->setCurrent(receiver, max(clk->getCurrent(receiver),
- clk->getCurrent(sender)));
+ ev_tstamp tstamp =
+ max(clk->getCurrent(receiver), clk->getCurrent(sender));
+ clk->setCurrent(receiver, tstamp);
}
}
}
@@ -1782,13 +1893,13 @@ void ProcessManager::link(ProcessBase *p
{
// Check if the pid is local.
if (!(to.ip == ip && to.port == port)) {
- link_manager->link(process, to);
+ socket_manager->link(process, to);
} else {
// Since the pid is local we want to get a reference to it's
// underlying process so that while we are invoking the link
// manager we don't miss sending a possible EXITED.
if (ProcessReference _ = use(to)) {
- link_manager->link(process, to);
+ socket_manager->link(process, to);
} else {
// Since the pid isn't valid it's process must have already died
// (or hasn't been spawned yet) so send a process exit message.
@@ -1990,8 +2101,10 @@ bool ProcessManager::external_wait(const
assert(process->state != ProcessBase::FINISHED);
/* Check and see if a gate already exists. */
- if (gates.find(process) == gates.end())
+ if (gates.find(process) == gates.end()) {
gates[process] = new Gate();
+ }
+
gate = gates[process];
old = gate->approach();
}
@@ -2000,8 +2113,11 @@ bool ProcessManager::external_wait(const
/* Now arrive at the gate and wait until it opens. */
if (gate != NULL) {
gate->arrive(old);
- if (gate->empty())
+
+ if (gate->empty()) {
delete gate;
+ }
+
return true;
}
@@ -2028,12 +2144,13 @@ bool ProcessManager::poll(ProcessBase *p
/* Allocate/Initialize the watcher. */
ev_io *watcher = new ev_io();
- if ((op & ProcessBase::RDWR) == ProcessBase::RDWR)
+ if ((op & ProcessBase::RDWR) == ProcessBase::RDWR) {
ev_io_init(watcher, handle_poll, fd, EV_READ | EV_WRITE);
- else if ((op & ProcessBase::RDONLY) == ProcessBase::RDONLY)
+ } else if ((op & ProcessBase::RDONLY) == ProcessBase::RDONLY) {
ev_io_init(watcher, handle_poll, fd, EV_READ);
- else if ((op & ProcessBase::WRONLY) == ProcessBase::WRONLY)
+ } else if ((op & ProcessBase::WRONLY) == ProcessBase::WRONLY) {
ev_io_init(watcher, handle_poll, fd, EV_WRITE);
+ }
// Tuple describing state (on heap in case we can't "cancel" it,
// the watcher will always fire, even if we get interrupted and
@@ -2067,12 +2184,15 @@ bool ProcessManager::poll(ProcessBase *p
process->state == ProcessBase::INTERRUPTED);
/* Attempt to cancel the timer if necessary. */
- if (secs != 0)
- if (process->state != ProcessBase::TIMEDOUT)
+ if (secs != 0) {
+ if (process->state != ProcessBase::TIMEDOUT) {
cancel_timeout(timeout);
+ }
+ }
- if (process->state == ProcessBase::INTERRUPTED)
+ if (process->state == ProcessBase::INTERRUPTED) {
interrupted = true;
+ }
process->state = ProcessBase::RUNNING;
@@ -2230,6 +2350,20 @@ void ProcessManager::cleanup(ProcessBase
delete message;
}
+ // Free any pending requests.
+ while (!process->requests.empty()) {
+ pair<HttpRequest*, Future<HttpResponse>*>* request = process->requests.front();
+ process->requests.pop_front();
+ delete request;
+ }
+
+ // Free any pending delegators.
+ while (!process->delegators.empty()) {
+ function<void(ProcessBase*)>* delegator = process->delegators.front();
+ process->delegators.pop_front();
+ delete delegator;
+ }
+
// Free current message.
if (process->current) {
delete process->current;
@@ -2263,8 +2397,8 @@ void ProcessManager::cleanup(ProcessBase
process->unlock();
}
- // Inform link manager.
- link_manager->exited(process);
+ // Inform socket manager.
+ socket_manager->exited(process);
// Confirm process not in runq.
synchronized (runq) {
@@ -2724,17 +2858,15 @@ string ProcessBase::serve(double secs, b
pair<HttpRequest*, Future<HttpResponse>*>* request;
function<void(ProcessBase*)>* delegator;
if ((request = dequeue<pair<HttpRequest*, Future<HttpResponse>*> >()) != NULL) {
- const string& id = "/" + pid.id + "/";
- size_t index = request->first->path.find(id);
- CHECK(index != string::npos);
- const string& name =
- request->first->path.substr(index + id.size(), request->first->path.size());
+ size_t index = request->first->path.find('/', 1);
+ index = index != string::npos ? index + 1 : request->first->path.size();
+ const string& name = request->first->path.substr(index);
if (http_handlers.count(name) > 0) {
http_handlers[name](*request->first).associate(*request->second);
} else {
- Promise<HttpResponse>(HttpNotFoundResponse()).associate(*request->second);
- VLOG(1) << "Returning '404 Not Found' for HTTP request '"
+ VLOG(1) << "Returning '404 Not Found' for HTTP request for '"
<< request->first->path << "'";
+ Promise<HttpResponse>(HttpNotFoundResponse()).associate(*request->second);
}
delete request->first;
delete request->second;
@@ -2956,6 +3088,48 @@ bool wait(const UPID& pid)
}
+class WaitWaiter : public Process<WaitWaiter>
+{
+public:
+ WaitWaiter(const UPID& _pid, double _secs, bool* _waited)
+ : pid(_pid), secs(_secs), waited(_waited) {}
+
+protected:
+ virtual void operator () ()
+ {
+ link(pid);
+ receive(secs);
+ if (name() == EXITED) {
+ *waited = true;
+ } else {
+ *waited = false;
+ }
+ }
+
+private:
+ const UPID pid;
+ const double secs;
+ bool* waited;
+};
+
+
+bool wait(const UPID& pid, double secs)
+{
+ initialize();
+
+ if (!pid) {
+ return false;
+ }
+
+ bool waited = false;
+
+ WaitWaiter waiter(pid, secs, &waited);
+ wait(spawn(&waiter));
+
+ return waited;
+}
+
+
void invoke(const function<void (void)> &thunk)
{
initialize();
@@ -2986,6 +3160,8 @@ void post(const UPID& to, const string&
}
+namespace internal {
+
void dispatcher(const UPID& pid, function<void(ProcessBase*)>* delegator)
{
if (proc_process != NULL) {
@@ -2995,4 +3171,4 @@ void dispatcher(const UPID& pid, functio
}
}
-} // namespace process {
+}} // namespace process { namespace internal {
Modified: incubator/mesos/trunk/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/process.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/process.hpp Sun Jun 5 09:19:39 2011
@@ -123,7 +123,7 @@ protected:
}
private:
- friend class LinkManager;
+ friend class SocketManager;
friend class ProcessManager;
friend class ProcessReference;
friend void* schedule(void *);
@@ -199,7 +199,7 @@ public:
protected:
virtual void operator () ()
{
- while (true) serve();
+ do { serve(); if (name() == TERMINATE) break; } while (true);
}
};
@@ -207,9 +207,9 @@ protected:
/**
* Initialize the library.
*
- * @param initGoogleLogging whether or not to initialize the Google
- * Logging library (glog). If the application is also using
- * glog, this should be set to false.
+ * @param initialize_google_logging whether or not to initialize the
+ * Google Logging library (glog). If the application is also
+ * using glog, this should be set to false.
*/
void initialize(bool initialize_google_logging = true);
@@ -239,6 +239,16 @@ bool wait(const UPID& pid);
/**
+ * Wait for process to exit no more than specified seconds (returns
+ * true if actually waited on a process).
+ *
+ * @param PID id of the process
+ * @param secs max time to wait
+ */
+bool wait(const UPID& pid, double secs);
+
+
+/**
* Invoke the thunk in a legacy safe way (i.e., outside of libprocess).
*
* @param thunk function to be invoked
@@ -266,6 +276,8 @@ void filter(Filter* filter);
void post(const UPID& to, const std::string& name, const char* data = NULL, size_t length = 0);
+namespace internal {
+
template <typename T>
void vdelegate(ProcessBase* process,
std::tr1::function<void(T*)>* thunk)
@@ -278,14 +290,28 @@ void vdelegate(ProcessBase* process,
template <typename R, typename T>
+void pdelegate(ProcessBase* process,
+ std::tr1::function<Promise<R>(T*)>* thunk,
+ Future<R>* future)
+{
+ assert(process != NULL);
+ assert(thunk != NULL);
+ assert(future != NULL);
+ (*thunk)(static_cast<T*>(process)).associate(*future);
+ delete thunk;
+ delete future;
+}
+
+
+template <typename R, typename T>
void delegate(ProcessBase* process,
- std::tr1::function<Promise<R>(T*)>* thunk,
+ std::tr1::function<R(T*)>* thunk,
Future<R>* future)
{
assert(process != NULL);
assert(thunk != NULL);
assert(future != NULL);
- (*thunk)(static_cast<T*>(process)).associate(*future);
+ Promise<R>((*thunk)(static_cast<T*>(process))).associate(*future);
delete thunk;
delete future;
}
@@ -294,6 +320,8 @@ void delegate(ProcessBase* process,
/* Dispatches the delegator to the specified process. */
void dispatcher(const UPID& pid, std::tr1::function<void(ProcessBase*)>* delegator);
+} // namespace internal {
+
/**
* Dispatches a void method on a process.
@@ -308,11 +336,11 @@ void dispatch(const PID<T>& pid, void (T
new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
std::tr1::placeholders::_1,
thunk));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
}
@@ -328,14 +356,14 @@ void dispatch(const PID<T>& pid, void (T
{
std::tr1::function<void(T*)>* thunk =
new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1));
+ a1));
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
std::tr1::placeholders::_1,
thunk));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
}
@@ -354,14 +382,14 @@ void dispatch(const PID<T>& pid, void (T
{
std::tr1::function<void(T*)>* thunk =
new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2));
+ a1, a2));
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
std::tr1::placeholders::_1,
thunk));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
}
@@ -378,18 +406,18 @@ template <typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3),
- A1 a1, A2 a2, A3 a3)
+ A1 a1, A2 a2, A3 a3)
{
std::tr1::function<void(T*)>* thunk =
new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2, a3));
+ a1, a2, a3));
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
std::tr1::placeholders::_1,
thunk));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
}
@@ -407,18 +435,18 @@ template <typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4),
- A1 a1, A2 a2, A3 a3, A4 a4)
+ A1 a1, A2 a2, A3 a3, A4 a4)
{
std::tr1::function<void(T*)>* thunk =
new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2, a3, a4));
+ a1, a2, a3, a4));
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
std::tr1::placeholders::_1,
thunk));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
}
@@ -437,18 +465,18 @@ template <typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
void dispatch(const PID<T>& pid, void (T::*method)(P1, P2, P3, P4, P5),
- A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
{
std::tr1::function<void(T*)>* thunk =
new std::tr1::function<void(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2, a3, a4, a5));
+ a1, a2, a3, a4, a5));
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&vdelegate<T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::vdelegate<T>,
std::tr1::placeholders::_1,
thunk));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
}
@@ -463,17 +491,17 @@ void dispatch(const PID<T>& pid, void (T
template <typename R, typename T>
Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)())
{
- std::tr1::function<Promise<R> (T*)>* thunk =
- new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
+ std::tr1::function<Promise<R>(T*)>* thunk =
+ new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
Future<R>* future = new Future<R>();
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
std::tr1::placeholders::_1,
thunk, future));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
return *future;
}
@@ -491,18 +519,18 @@ Future<R> dispatch(const PID<T>& pid, Pr
template <typename R, typename T, typename P1, typename A1>
Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1), A1 a1)
{
- std::tr1::function<Promise<R> (T*)>* thunk =
- new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1));
+ std::tr1::function<Promise<R>(T*)>* thunk =
+ new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1));
Future<R>* future = new Future<R>();
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
std::tr1::placeholders::_1,
thunk, future));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
return *future;
}
@@ -522,20 +550,20 @@ template <typename R, typename T,
typename P1, typename P2,
typename A1, typename A2>
Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2),
- A1 a1, A2 a2)
+ A1 a1, A2 a2)
{
- std::tr1::function<Promise<R> (T*)>* thunk =
- new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2));
+ std::tr1::function<Promise<R>(T*)>* thunk =
+ new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2));
Future<R>* future = new Future<R>();
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
std::tr1::placeholders::_1,
thunk, future));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
return *future;
}
@@ -556,20 +584,20 @@ template <typename R, typename T,
typename P1, typename P2, typename P3,
typename A1, typename A2, typename A3>
Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3),
- A1 a1, A2 a2, A3 a3)
+ A1 a1, A2 a2, A3 a3)
{
- std::tr1::function<Promise<R> (T*)>* thunk =
- new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2, a3));
+ std::tr1::function<Promise<R>(T*)>* thunk =
+ new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3));
Future<R>* future = new Future<R>();
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
std::tr1::placeholders::_1,
thunk, future));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
return *future;
}
@@ -591,20 +619,20 @@ template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4,
typename A1, typename A2, typename A3, typename A4>
Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4),
- A1 a1, A2 a2, A3 a3, A4 a4)
+ A1 a1, A2 a2, A3 a3, A4 a4)
{
- std::tr1::function<Promise<R> (T*)>* thunk =
- new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2, a3, a4));
+ std::tr1::function<Promise<R>(T*)>* thunk =
+ new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4));
Future<R>* future = new Future<R>();
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
std::tr1::placeholders::_1,
thunk, future));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
return *future;
}
@@ -627,20 +655,212 @@ template <typename R, typename T,
typename P1, typename P2, typename P3, typename P4, typename P5,
typename A1, typename A2, typename A3, typename A4, typename A5>
Future<R> dispatch(const PID<T>& pid, Promise<R> (T::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+ std::tr1::function<Promise<R>(T*)>* thunk =
+ new std::tr1::function<Promise<R>(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4, a5));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(ProcessBase*)>* delegator =
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::pdelegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
+
+ internal::dispatcher(pid, delegator);
+
+ return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on receiver
+ */
+template <typename R, typename T>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)())
+{
+ std::tr1::function<R(T*)>* thunk =
+ new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(ProcessBase*)>* delegator =
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
+
+ internal::dispatcher(pid, delegator);
+
+ return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on receiver
+ * @param a1 first argument to pass to method
+ */
+template <typename R, typename T, typename P1, typename A1>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1), A1 a1)
+{
+ std::tr1::function<R(T*)>* thunk =
+ new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(ProcessBase*)>* delegator =
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
+
+ internal::dispatcher(pid, delegator);
+
+ return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2,
+ typename A1, typename A2>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2),
+ A1 a1, A2 a2)
+{
+ std::tr1::function<R(T*)>* thunk =
+ new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(ProcessBase*)>* delegator =
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
+
+ internal::dispatcher(pid, delegator);
+
+ return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3)
+{
+ std::tr1::function<R(T*)>* thunk =
+ new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(ProcessBase*)>* delegator =
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
+
+ internal::dispatcher(pid, delegator);
+
+ return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4)
+{
+ std::tr1::function<R(T*)>* thunk =
+ new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4));
+
+ Future<R>* future = new Future<R>();
+
+ std::tr1::function<void(ProcessBase*)>* delegator =
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
+ std::tr1::placeholders::_1,
+ thunk, future));
+
+ internal::dispatcher(pid, delegator);
+
+ return *future;
+}
+
+
+/**
+ * Dispatches a method on a process and returns the future that
+ * corresponds to the result of executing the method.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ * @return future corresponding to the result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+Future<R> dispatch(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4, P5),
A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
{
- std::tr1::function<Promise<R> (T*)>* thunk =
- new std::tr1::function<Promise<R> (T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
- a1, a2, a3, a4, a5));
+ std::tr1::function<R(T*)>* thunk =
+ new std::tr1::function<R(T*)>(std::tr1::bind(method, std::tr1::placeholders::_1,
+ a1, a2, a3, a4, a5));
Future<R>* future = new Future<R>();
std::tr1::function<void(ProcessBase*)>* delegator =
- new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&delegate<R, T>,
+ new std::tr1::function<void(ProcessBase*)>(std::tr1::bind(&internal::delegate<R, T>,
std::tr1::placeholders::_1,
thunk, future));
- dispatcher(pid, delegator);
+ internal::dispatcher(pid, delegator);
return *future;
}
@@ -761,6 +981,122 @@ R call(const PID<T>& pid, Promise<R> (T:
return dispatch(pid, method, a1, a2, a3, a4, a5).get();
}
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @return result of executing the method
+ */
+template <typename R, typename T>
+R call(const PID<T>& pid, R (T::*method)())
+{
+ return dispatch(pid, method).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T, typename P1, typename A1>
+R call(const PID<T>& pid, R (T::*method)(P1), A1 a1)
+{
+ return dispatch(pid, method, a1).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2,
+ typename A1, typename A2>
+R call(const PID<T>& pid, R (T::*method)(P1, P2), A1 a1, A2 a2)
+{
+ return dispatch(pid, method, a1, a2).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 second argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2, typename P3,
+ typename A1, typename A2, typename A3>
+R call(const PID<T>& pid, R (T::*method)(P1, P2, P3),
+ A1 a1, A2 a2, A3 a3)
+{
+ return dispatch(pid, method, a1, a2, a3).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2, typename P3, typename P4,
+ typename A1, typename A2, typename A3, typename A4>
+R call(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4),
+ A1 a1, A2 a2, A3 a3, A4 a4)
+{
+ return dispatch(pid, method, a1, a2, a3, a4).get();
+}
+
+
+/**
+ * Dispatches a method on a process and waits (on the underlying
+ * future) for the result.
+ *
+ * @param pid receiver of message
+ * @param method method to invoke on instance
+ * @param a1 first argument to pass to method
+ * @param a2 second argument to pass to method
+ * @param a3 third argument to pass to method
+ * @param a4 fourth argument to pass to method
+ * @param a5 fifth argument to pass to method
+ * @return result of executing the method
+ */
+template <typename R, typename T,
+ typename P1, typename P2, typename P3, typename P4, typename P5,
+ typename A1, typename A2, typename A3, typename A4, typename A5>
+R call(const PID<T>& pid, R (T::*method)(P1, P2, P3, P4, P5),
+ A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
+{
+ return dispatch(pid, method, a1, a2, a3, a4, a5).get();
+}
+
} // namespace process {
#endif // __PROCESS_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/promise.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/promise.hpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/promise.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/promise.hpp Sun Jun 5 09:19:39 2011
@@ -11,14 +11,15 @@ class Promise
{
public:
Promise();
- Promise(const T &t_);
- Promise(const Promise<T> &that);
+ Promise(const T& _t);
+ Promise(const Promise<T>& that);
virtual ~Promise();
- void set(const T &t_);
- void associate(const Future<T> &future_);
+ void set(const T& _t);
+ bool ready() const;
+ void associate(const Future<T>& _future);
private:
- void operator = (const Promise<T> &);
+ void operator = (const Promise<T>&);
enum State {
UNSET_UNASSOCIATED,
@@ -27,10 +28,10 @@ private:
SET_ASSOCIATED,
};
- int *refs;
- T **t;
- Future<T> **future;
- State *state;
+ int* refs;
+ T** t;
+ Future<T>** future;
+ State* state;
};
@@ -47,22 +48,22 @@ Promise<T>::Promise()
{
refs = new int;
*refs = 1;
- t = new T *;
+ t = new T*;
*t = NULL;
- future = new Future<T> *;
+ future = new Future<T>*;
*future = NULL;
state = new State;
*state = UNSET_UNASSOCIATED;
}
template <typename T>
-Promise<T>::Promise(const T &_t)
+Promise<T>::Promise(const T& _t)
{
refs = new int;
*refs = 1;
- t = new T *;
+ t = new T*;
*t = new T(_t);
- future = new Future<T> *;
+ future = new Future<T>*;
*future = NULL;
state = new State;
*state = SET_UNASSOCIATED;
@@ -70,7 +71,7 @@ Promise<T>::Promise(const T &_t)
template <typename T>
-Promise<T>::Promise(const Promise<T> &that)
+Promise<T>::Promise(const Promise<T>& that)
{
assert(that.refs != NULL);
assert(*that.refs > 0);
@@ -101,7 +102,7 @@ Promise<T>::~Promise()
template <typename T>
-void Promise<T>::set(const T &t_)
+void Promise<T>::set(const T& _t)
{
assert(state != NULL);
assert(*state == UNSET_UNASSOCIATED ||
@@ -116,12 +117,12 @@ void Promise<T>::set(const T &t_)
(*future)->set(**t);
} else {
// Save the value for association later.
- *t = new T(t_);
+ *t = new T(_t);
}
} else if (*state == UNSET_ASSOCIATED) {
assert(future != NULL && *future != NULL);
// Directly set the value in the future.
- (*future)->set(t_);
+ (*future)->set(_t);
__sync_bool_compare_and_swap(state, UNSET_ASSOCIATED, SET_ASSOCIATED);
} else {
assert(*state == SET_ASSOCIATED);
@@ -133,13 +134,21 @@ void Promise<T>::set(const T &t_)
template <typename T>
-void Promise<T>::associate(const Future<T> &future_)
+bool Promise<T>::ready() const
+{
+ assert(state != NULL);
+ return *state == SET_UNASSOCIATED || *state == SET_ASSOCIATED;
+}
+
+
+template <typename T>
+void Promise<T>::associate(const Future<T>& _future)
{
assert(state != NULL);
assert(*state == UNSET_UNASSOCIATED ||
*state == SET_UNASSOCIATED);
assert(future != NULL);
- *future = new Future<T>(future_);
+ *future = new Future<T>(_future);
if (*state == UNSET_UNASSOCIATED) {
if (!__sync_bool_compare_and_swap(state, UNSET_UNASSOCIATED,
UNSET_ASSOCIATED)) {
Modified: incubator/mesos/trunk/third_party/libprocess/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/tests.cpp?rev=1132286&r1=1132285&r2=1132286&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/tests.cpp Sun Jun 5 09:19:39 2011
@@ -2,11 +2,14 @@
#include <process.hpp>
+using process::Future;
using process::PID;
using process::Process;
+using process::Promise;
using process::UPID;
-using testing::AtMost;
+using testing::_;
+using testing::ReturnArg;
class SpawnMockProcess : public Process<SpawnMockProcess>
@@ -24,7 +27,7 @@ TEST(libprocess, spawn)
SpawnMockProcess process;
EXPECT_CALL(process, __operator_call__())
- .Times(AtMost(1));
+ .Times(1);
PID<SpawnMockProcess> pid = process::spawn(&process);
@@ -37,8 +40,11 @@ TEST(libprocess, spawn)
class DispatchMockProcess : public Process<DispatchMockProcess>
{
public:
- MOCK_METHOD0(func, void());
- virtual void operator () () { serve(); }
+ MOCK_METHOD0(func0, void());
+ MOCK_METHOD1(func1, bool(bool));
+ MOCK_METHOD1(func2, Promise<bool>(bool));
+ MOCK_METHOD1(func3, int(int));
+ MOCK_METHOD1(func4, Promise<int>(int));
};
@@ -48,14 +54,61 @@ TEST(libprocess, dispatch)
DispatchMockProcess process;
- EXPECT_CALL(process, func())
- .Times(AtMost(1));
+ EXPECT_CALL(process, func0())
+ .Times(1);
+
+ EXPECT_CALL(process, func1(_))
+ .WillOnce(ReturnArg<0>());
+
+ EXPECT_CALL(process, func2(_))
+ .WillOnce(ReturnArg<0>());
+
+ PID<DispatchMockProcess> pid = process::spawn(&process);
+
+ ASSERT_FALSE(!pid);
+
+ process::dispatch(pid, &DispatchMockProcess::func0);
+
+ Future<bool> future;
+
+ future = process::dispatch(pid, &DispatchMockProcess::func1, true);
+
+ EXPECT_TRUE(future.get());
+
+ future = process::dispatch(pid, &DispatchMockProcess::func2, true);
+
+ EXPECT_TRUE(future.get());
+
+ process::post(pid, process::TERMINATE);
+ process::wait(pid);
+}
+
+
+TEST(libprocess, call)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ DispatchMockProcess process;
+
+ EXPECT_CALL(process, func3(_))
+ .WillOnce(ReturnArg<0>());
+
+ EXPECT_CALL(process, func4(_))
+ .WillOnce(ReturnArg<0>());
PID<DispatchMockProcess> pid = process::spawn(&process);
ASSERT_FALSE(!pid);
- process::dispatch(pid, &DispatchMockProcess::func);
+ int result;
+
+ result = process::call(pid, &DispatchMockProcess::func3, 42);
+
+ EXPECT_EQ(42, result);
+
+ result = process::call(pid, &DispatchMockProcess::func4, 43);
+
+ EXPECT_EQ(43, result);
process::post(pid, process::TERMINATE);
process::wait(pid);
@@ -67,7 +120,6 @@ class InstallMockProcess : public Proces
public:
InstallMockProcess() { install("func", &InstallMockProcess::func); }
MOCK_METHOD0(func, void());
- virtual void operator () () { serve(); }
};
@@ -78,7 +130,7 @@ TEST(libprocess, install)
InstallMockProcess process;
EXPECT_CALL(process, func())
- .Times(AtMost(1));
+ .Times(1);
PID<InstallMockProcess> pid = process::spawn(&process);
@@ -104,7 +156,6 @@ class DerivedMockProcess : public BaseMo
public:
DerivedMockProcess() {}
MOCK_METHOD0(func, void());
- virtual void operator () () { serve(); }
};
@@ -115,10 +166,10 @@ TEST(libprocess, inheritance)
DerivedMockProcess process;
EXPECT_CALL(process, func())
- .Times(AtMost(1));
+ .Times(2);
EXPECT_CALL(process, foo())
- .Times(AtMost(1));
+ .Times(1);
PID<DerivedMockProcess> pid1 = process::spawn(&process);
@@ -131,6 +182,7 @@ TEST(libprocess, inheritance)
ASSERT_EQ(pid2, pid3);
+ process::dispatch(pid3, &BaseMockProcess::func);
process::dispatch(pid3, &BaseMockProcess::foo);
process::post(pid1, process::TERMINATE);