You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC
svn commit: r1236485 [6/7] - in /incubator/mesos/trunk: ./ include/mesos/
src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/
src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/
third_party/libprocess/include/pr...
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Fri Jan 27 01:25:13 2012
@@ -29,6 +29,7 @@
#include <algorithm>
#include <deque>
#include <fstream>
+#include <iomanip>
#include <iostream>
#include <list>
#include <map>
@@ -39,23 +40,24 @@
#include <stdexcept>
#include <vector>
+#include <process/clock.hpp>
+#include <process/deferred.hpp>
#include <process/dispatch.hpp>
+#include <process/executor.hpp>
+#include <process/filter.hpp>
+#include <process/future.hpp>
#include <process/gc.hpp>
#include <process/process.hpp>
-
-#include <boost/tuple/tuple.hpp>
+#include <process/timer.hpp>
#include "config.hpp"
#include "decoder.hpp"
#include "encoder.hpp"
-#include "fatal.hpp"
#include "foreach.hpp"
#include "gate.hpp"
#include "synchronized.hpp"
-#include "tokenize.hpp"
-
+#include "thread.hpp"
-using boost::tuple;
using std::deque;
using std::find;
@@ -71,27 +73,26 @@ using std::string;
using std::stringstream;
using std::vector;
+namespace lambda {
+
+using std::tr1::bind;
using std::tr1::function;
+using namespace std::tr1::placeholders;
+
+} // namespace lambda {
#define Byte (1)
#define Kilobyte (1024*Byte)
#define Megabyte (1024*Kilobyte)
#define Gigabyte (1024*Megabyte)
-#define PROCESS_STACK_SIZE (64*Kilobyte)
-#define malloc(bytes) \
- ({ void *tmp; \
- if ((tmp = malloc(bytes)) == NULL) \
- fatalerror("malloc"); tmp; \
- })
-
-#define realloc(address, bytes) \
- ({ void *tmp; \
- if ((tmp = realloc(address, bytes)) == NULL) \
- fatalerror("realloc"); tmp; \
- })
+template <int i>
+std::ostream& fixedprecision(std::ostream& os)
+{
+ return os << std::fixed << std::setprecision(i);
+}
struct Node
@@ -120,41 +121,15 @@ ostream& operator << (ostream& stream, c
}
-/*
- * Timeout support! Note that we don't store a pointer to the process
- * because we can't dereference it because it might no longer be
- * valid. But we can check if the process is valid using the PID and
- * then use referencing counting to keep the process valid.
-*/
-struct timeout
-{
- ev_tstamp tstamp;
- process::UPID pid;
- int generation;
-};
-
-
-bool operator == (const timeout &left, const timeout &right)
-{
- return left.tstamp == right.tstamp &&
- left.pid == right.pid &&
- left.generation == right.generation;
-}
-
-
namespace process {
class ProcessReference
{
public:
- explicit ProcessReference(ProcessBase *_process) : process(_process)
+ explicit ProcessReference(ProcessBase* _process) : process(_process)
{
if (process != NULL) {
__sync_fetch_and_add(&(process->refs), 1);
- if (process->state == ProcessBase::FINISHING) {
- __sync_fetch_and_sub(&(process->refs), 1);
- process = NULL;
- }
}
}
@@ -164,7 +139,7 @@ public:
__sync_fetch_and_sub(&(process->refs), 1);
}
- ProcessReference(const ProcessReference &that)
+ ProcessReference(const ProcessReference& that)
{
process = that.process;
@@ -177,12 +152,12 @@ public:
}
}
- ProcessBase * operator -> ()
+ ProcessBase* operator -> ()
{
return process;
}
- operator ProcessBase * ()
+ operator ProcessBase* ()
{
return process;
}
@@ -193,88 +168,31 @@ public:
}
private:
- ProcessReference & operator = (const ProcessReference &that);
-
- ProcessBase *process;
-};
-
-
-/* Tick, tock ... manually controlled clock! */
-class InternalClock
-{
-public:
- InternalClock()
- {
- initial = current = elapsed = ev_time();
- }
-
- ~InternalClock() {}
-
- ev_tstamp getCurrent(ProcessBase *process)
- {
- ev_tstamp tstamp;
-
- if (currents.count(process) != 0) {
- tstamp = currents[process];
- } else {
- tstamp = currents[process] = initial;
- }
-
- return tstamp;
- }
-
- void setCurrent(ProcessBase *process, ev_tstamp tstamp)
- {
- currents[process] = tstamp;
- }
-
- ev_tstamp getCurrent()
- {
- return current;
- }
-
- void setCurrent(ev_tstamp tstamp)
- {
- current = tstamp;
- }
-
- ev_tstamp getElapsed()
- {
- return elapsed;
- }
-
- void setElapsed(ev_tstamp tstamp)
- {
- elapsed = tstamp;
- }
-
- void discard(ProcessBase *process)
- {
- CHECK(process != NULL);
- currents.erase(process);
- }
+ ProcessReference& operator = (const ProcessReference& that);
-private:
- map<ProcessBase *, ev_tstamp> currents;
- ev_tstamp initial;
- ev_tstamp current;
- ev_tstamp elapsed;
+ ProcessBase* process;
};
class HttpProxy;
-class HttpResponseWaiter : public Process<HttpResponseWaiter>
+class HttpResponseWaiter
{
public:
- HttpResponseWaiter(const PID<HttpProxy>& _proxy);
- virtual ~HttpResponseWaiter();
+ HttpResponseWaiter(const PID<HttpProxy>& proxy,
+ Future<HttpResponse>* future,
+ bool persist);
- void await(const Future<HttpResponse>& future, bool persist);
+ void waited(const Future<HttpResponse>&);
+ void timeout();
private:
const PID<HttpProxy> proxy;
+ Future<HttpResponse>* future;
+ bool persist;
+
+ Executor executor;
};
@@ -282,15 +200,14 @@ class HttpProxy : public Process<HttpPro
{
public:
HttpProxy(int _c);
- virtual ~HttpProxy();
- void handle(const Future<HttpResponse>& future, bool persist);
- void ready(const Future<HttpResponse>& future, bool persist);
- void unavailable(bool persist);
+ void handle(Future<HttpResponse>* future, bool persist);
+ void ready(Future<HttpResponse>* future, bool persist);
+ void unavailable(Future<HttpResponse>* future, bool persist);
private:
int c;
- HttpResponseWaiter* waiter;
+ map<Future<HttpResponse>*, HttpResponseWaiter*> waiters;
};
@@ -315,26 +232,26 @@ public:
void exited(ProcessBase* process);
private:
- /* Map from UPID (local/remote) to process. */
+ // Map from UPID (local/remote) to process.
map<UPID, set<ProcessBase*> > links;
- /* Map from socket to node (ip, port). */
+ // Map from socket to node (ip, port).
map<int, Node> sockets;
- /* Maps from node (ip, port) to socket. */
+ // Maps from node (ip, port) to socket.
map<Node, int> temps;
map<Node, int> persists;
- /* Set of sockets that should be closed. */
+ // Set of sockets that should be closed.
set<int> disposables;
- /* Map from socket to outgoing queue. */
+ // Map from socket to outgoing queue.
map<int, queue<DataEncoder*> > outgoing;
- /* HTTP proxies. */
+ // HTTP proxies.
map<int, HttpProxy*> proxies;
- /* Protects instance variables. */
+ // Protects instance variables.
synchronizable(this);
};
@@ -345,150 +262,261 @@ public:
ProcessManager();
~ProcessManager();
- ProcessReference use(const UPID &pid);
+ ProcessReference use(const UPID& pid);
- bool deliver(Message* message, ProcessBase *sender = NULL);
- bool deliver(int c, HttpRequest* request, ProcessBase *sender = NULL);
- bool deliver(const UPID& to, function<void(ProcessBase*)>* dispatcher, ProcessBase *sender = NULL);
-
- UPID spawn(ProcessBase *process, bool manage);
- void link(ProcessBase *process, const UPID &to);
- bool receive(ProcessBase *process, double secs);
- bool serve(ProcessBase *process, double secs);
- void pause(ProcessBase *process, double secs);
- void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
- bool wait(ProcessBase *process, const UPID &pid);
- bool external_wait(const UPID &pid);
- bool poll(ProcessBase *process, int fd, int op, double secs, bool ignore);
+ bool deliver(Message* message, ProcessBase* sender = NULL);
- void enqueue(ProcessBase *process);
- ProcessBase * dequeue();
+ bool deliver(int c, HttpRequest* request, ProcessBase* sender = NULL);
- void timedout(const UPID &pid, int generation);
- void polled(const UPID &pid, int generation);
+ bool deliver(const UPID& to,
+ lambda::function<void(ProcessBase*)>* f,
+ ProcessBase* sender = NULL);
- void run(ProcessBase *process);
- void cleanup(ProcessBase *process);
+ UPID spawn(ProcessBase* process, bool manage);
+ void resume(ProcessBase* process);
+ void cleanup(ProcessBase* process);
+ void link(ProcessBase* process, const UPID& to);
+ void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
+ bool wait(const UPID& pid);
-private:
- timeout create_timeout(ProcessBase *process, double secs);
- void start_timeout(const timeout &timeout);
- void cancel_timeout(const timeout &timeout);
+ void enqueue(ProcessBase* process);
+ ProcessBase* dequeue();
- /* Map of all local spawned and running processes. */
- map<string, ProcessBase *> processes;
+private:
+ // Map of all local spawned and running processes.
+ map<string, ProcessBase*> processes;
synchronizable(processes);
- /* Waiting processes (protected by synchronizable(processes)). */
- map<ProcessBase *, set<ProcessBase *> > waiters;
+ // Gates for waiting threads (protected by synchronizable(processes)).
+ map<ProcessBase*, Gate*> gates;
- /* Gates for waiting threads (protected by synchronizable(processes)). */
- map<ProcessBase *, Gate *> gates;
-
- /* Queue of runnable processes (implemented as deque). */
- deque<ProcessBase *> runq;
+ // Queue of runnable processes (implemented using list).
+ list<ProcessBase*> runq;
synchronizable(runq);
};
-/* Using manual clock if non-null. */
-static InternalClock *clk = NULL;
-
-/* Unique id that can be assigned to each process. */
+// Unique id that can be assigned to each process.
static uint32_t id = 0;
-/* Local server socket. */
+// Local server socket.
static int s = -1;
-/* Local IP address. */
+// Local IP address.
static uint32_t ip = 0;
-/* Local port. */
+// Local port.
static uint16_t port = 0;
-/* Active SocketManager (eventually will probably be thread-local). */
-static SocketManager *socket_manager = NULL;
+// Active SocketManager (eventually will probably be thread-local).
+static SocketManager* socket_manager = NULL;
-/* Active ProcessManager (eventually will probably be thread-local). */
-static ProcessManager *process_manager = NULL;
+// Active ProcessManager (eventually will probably be thread-local).
+static ProcessManager* process_manager = NULL;
-/* Event loop. */
-static struct ev_loop *loop = NULL;
+// Event loop.
+static struct ev_loop* loop = NULL;
-/* Asynchronous watcher for interrupting loop. */
+// Asynchronous watcher for interrupting loop.
static ev_async async_watcher;
-/* Timeouts watcher for process timeouts. */
+// Watcher for timeouts.
static ev_timer timeouts_watcher;
-/* Server watcher for accepting connections. */
+// Server watcher for accepting connections.
static ev_io server_watcher;
-/* Queue of I/O watchers. */
-static queue<ev_io *> *watchers = new queue<ev_io *>();
+// Queue of I/O watchers.
+static queue<ev_io*>* watchers = new queue<ev_io*>();
static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
-/**
- * We store the timeouts in a map of lists indexed by the time stamp
- * of the timeout so that we can have two timeouts that have the same
- * time stamp. Note however, that we should never have two identical
- * timeouts because a process should only ever have one outstanding
- * timeout at a time. Also, we exploit that the map is SORTED!
- */
-static map<ev_tstamp, list<timeout> > *timeouts =
- new map<ev_tstamp, list<timeout> >();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER;
+// We store the timers in a map of lists indexed by the timeout of the
+// timer so that we can have two timers that have the same timeout. We
+// exploit that the map is SORTED!
+static map<double, list<timer> >* timeouts =
+ new map<double, list<timer> >();
+static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-/* Flag to indicate whether or to update the timer on async interrupt. */
+// Flag to indicate whether or to update the timer on async interrupt.
static bool update_timer = false;
-/* I/O thread. */
-static pthread_t io_thread;
+const int NUMBER_OF_PROCESSING_THREADS = 4; // TODO(benh): Do 2x cores.
+
+
+// Thread local process pointer magic (constructed in
+// 'initialize'). We need the extra level of indirection from
+// _process_ to __process__ so that we can take advantage of the
+// operators without needing the extra dereference.
+static ThreadLocal<ProcessBase>* _process_ = NULL;
-/* Processing thread. */
-static pthread_t proc_thread;
+#define __process__ (*_process_)
-/* Scheduling context for processing thread. */
-static ucontext_t proc_uctx_schedule;
-/* Running context for processing thread. */
-static ucontext_t proc_uctx_running;
-
-/* Current process of processing thread. */
-//static __thread ProcessBase *proc_process = NULL;
-static ProcessBase *proc_process = NULL;
-
-/* Scheduler gate. */
-static Gate *gate = new Gate();
-
-/* Stack of recycled stacks. */
-static stack<void *> *stacks = new stack<void *>();
-static synchronizable(stacks) = SYNCHRONIZED_INITIALIZER;
-
-/* Last exited process's stack to be recycled (global variable hack!). */
-static void *recyclable = NULL;
-
-/**
- * Filter. Synchronized support for using the filterer needs to be
- * recursive incase a filterer wants to do anything fancy (which is
- * possible and likely given that filters will get used for testing).
-*/
-static Filter *filterer = NULL;
+// Scheduler gate.
+static Gate* gate = new Gate();
+
+// Filter. Synchronized support for using the filterer needs to be
+// recursive incase a filterer wants to do anything fancy (which is
+// possible and likely given that filters will get used for testing).
+static Filter* filterer = NULL;
static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-/* Global garbage collector. */
+// Global garbage collector.
PID<GarbageCollector> gc;
-/* Thunks to be invoked via process::invoke. */
-static queue<function<void(void)>*>* thunks =
- new queue<function<void(void)>*>();
+// Thunks to be invoked via process::invoke.
+static queue<lambda::function<void(void)>*>* thunks =
+ new queue<lambda::function<void(void)>*>();
static synchronizable(thunks) = SYNCHRONIZED_INITIALIZER;
-/* Thread to invoke thunks (see above). */
+// Thread to invoke thunks (see above).
static Gate* invoke_gate = new Gate();
static pthread_t invoke_thread;
+// We namespace the clock related variables to keep them well
+// named. In the future we'll probably want to associate a clock with
+// a specific ProcessManager/SocketManager instance pair, so this will
+// likely change.
+namespace clock {
+
+map<ProcessBase*, double>* currents = new map<ProcessBase*, double>();
+
+double initial = 0;
+double current = 0;
+
+bool paused = false;
+
+} // namespace clock {
+
+
+double Clock::now()
+{
+ return now(__process__);
+}
+
+
+double Clock::now(ProcessBase* process)
+{
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ if (process != NULL) {
+ if (clock::currents->count(process) != 0) {
+ return (*clock::currents)[process];
+ } else {
+ return (*clock::currents)[process] = clock::initial;
+
+ }
+ } else {
+ return clock::current;
+ }
+ }
+ }
+
+ return ev_time(); // TODO(benh): Versus ev_now()?
+}
+
+
+void Clock::pause()
+{
+ process::initialize(); // For the libev watchers to be setup.
+
+ synchronized (timeouts) {
+ if (!clock::paused) {
+ clock::initial = clock::current = now();
+ clock::paused = true;
+ VLOG(1) << "Clock paused at "
+ << std::fixed << std::setprecision(9) << clock::initial;
+ }
+ }
+
+ // Note that after pausing the clock an existing libev timer might
+ // still fire (invoking handle_timeout), but since paused == true no
+ // "time" will actually have passed, so no timer will actually fire.
+}
+
+
+bool Clock::paused()
+{
+ return clock::paused;
+}
+
+
+void Clock::resume()
+{
+ process::initialize(); // For the libev watchers to be setup.
+
+ synchronized (timeouts) {
+ if (clock::paused) {
+ VLOG(1) << "Clock resumed at "
+ << std::fixed << std::setprecision(9) << clock::current;
+ clock::paused = false;
+ clock::currents->clear();
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+}
+
+
+void Clock::advance(double secs)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ clock::current += secs;
+ VLOG(1) << "Clock advanced ("
+ << std::fixed << std::setprecision(9) << secs
+ << " seconds) to " << clock::current;
+ if (!update_timer) {
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+ }
+}
+
+
+void Clock::update(double secs)
+{
+ VLOG(2) << "Attempting to update clock to "
+ << std::fixed << std::setprecision(9) << secs;
+ synchronized (timeouts) {
+ if (clock::paused) {
+ if (clock::current < secs) {
+ clock::current = secs;
+ VLOG(1) << "Clock updated to "
+ << std::fixed << std::setprecision(9) << clock::current;
+ if (!update_timer) {
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+ }
+ }
+}
+
+
+void Clock::update(ProcessBase* process, double secs)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ double current = now(process);
+ if (current < secs) {
+ VLOG(2) << "Clock of " << process->self() << " updated to "
+ << std::fixed << std::setprecision(9) << secs;
+ (*clock::currents)[process] = secs;
+ }
+ }
+ }
+}
+
+
+void Clock::order(ProcessBase* from, ProcessBase* to)
+{
+ update(to, now(from));
+}
+
+
int set_nbio(int fd)
{
int flags;
@@ -566,12 +594,12 @@ Message* parse(HttpRequest* request)
}
-void handle_async(struct ev_loop *loop, ev_async *_, int revents)
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
{
synchronized (watchers) {
- /* Start all the new I/O watchers. */
+ // Start all the new I/O watchers.
while (!watchers->empty()) {
- ev_io *watcher = watchers->front();
+ ev_io* watcher = watchers->front();
watchers->pop();
ev_io_start(loop, watcher);
}
@@ -580,28 +608,22 @@ void handle_async(struct ev_loop *loop,
synchronized (timeouts) {
if (update_timer) {
if (!timeouts->empty()) {
- // Determine the current time.
- ev_tstamp current_tstamp;
- if (clk != NULL) {
- current_tstamp = clk->getCurrent();
- } else {
- // TODO(benh): Unclear if want ev_now(...) or ev_time().
- current_tstamp = ev_time();
- }
+ // Determine when the next timer should fire.
+ timeouts_watcher.repeat = timeouts->begin()->first - Clock::now();
- timeouts_watcher.repeat = timeouts->begin()->first - current_tstamp;
-
- // Check when the timer event should fire.
if (timeouts_watcher.repeat <= 0) {
// Feed the event now!
timeouts_watcher.repeat = 0;
ev_timer_again(loop, &timeouts_watcher);
ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
} else {
- // Only repeat the timer if not using a manual clock (a call
- // to Clock::advance() will force a timer event later).
- if (clk != NULL && timeouts_watcher.repeat > 0)
- timeouts_watcher.repeat = 0;
+ // Don't fire the timer if the clock is paused since we
+ // don't want time to advance (instead a call to
+ // clock::advance() will handle the timer).
+ if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ timeouts_watcher.repeat = 0;
+ }
+
ev_timer_again(loop, &timeouts_watcher);
}
}
@@ -612,73 +634,83 @@ void handle_async(struct ev_loop *loop,
}
-void handle_timeout(struct ev_loop *loop, ev_timer *watcher, int revents)
+void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
{
- list<timeout> timedout;
+ list<timer> timedout;
synchronized (timeouts) {
- ev_tstamp current_tstamp;
+ double now = Clock::now();
- if (clk != NULL) {
- current_tstamp = clk->getCurrent();
- } else {
- // TODO(benh): Unclear if want ev_now(...) or ev_time().
- current_tstamp = ev_time();
- }
+ VLOG(1) << "Handling timeouts up to "
+ << std::fixed << std::setprecision(9) << now;
- foreachpair (ev_tstamp tstamp, const list<timeout> &timedouts, *timeouts) {
- if (tstamp > current_tstamp)
+ foreachkey (double timeout, *timeouts) {
+ if (timeout > now) {
break;
+ }
- foreach (const timeout &timeout, timedouts) {
- if (clk != NULL) {
- // Update current time of process (if it's still
- // valid). Note that current time may be greater than the
- // timeout if a local message was received (and
- // happens-before kicks in), hence we use max.
- if (ProcessReference process = process_manager->use(timeout.pid)) {
- clk->setCurrent(process, max(clk->getCurrent(process),
- timeout.tstamp));
- }
- }
- // TODO(benh): Ensure deterministic order for testing?
- timedout.push_back(timeout);
+ VLOG(2) << "Have timeout(s) at "
+ << std::fixed << std::setprecision(9) << timeout;
+
+ foreach (const timer& timer, (*timeouts)[timeout]) {
+ timedout.push_back(timer);
}
}
- // Now erase the range of time stamps that timed out.
- timeouts->erase(timeouts->begin(), timeouts->upper_bound(current_tstamp));
+ // Now erase the range of timeouts that timed out.
+ timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
- // Okay, so the time stamp for the next timeout should not have fired.
- CHECK(timeouts->empty() || (timeouts->begin()->first > current_tstamp));
+ // Okay, so the timeout for the next timer should not have fired.
+ CHECK(timeouts->empty() || (timeouts->begin()->first > now));
// Update the timer as necessary.
- // TODO(benh): Make this code look like the code in handle_async.
- if (!timeouts->empty() && clk == NULL) {
- timeouts_watcher.repeat = timeouts->begin()->first - current_tstamp;
- CHECK(timeouts_watcher.repeat > 0);
- ev_timer_again(loop, &timeouts_watcher);
- } else {
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
+ if (!timeouts->empty()) {
+ // Determine when the next timer should fire.
+ timeouts_watcher.repeat = timeouts->begin()->first - Clock::now();
+
+ if (timeouts_watcher.repeat <= 0) {
+ // Feed the event now!
+ timeouts_watcher.repeat = 0;
+ ev_timer_again(loop, &timeouts_watcher);
+ ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+ } else {
+ // Don't fire the timer if the clock is paused since we don't
+ // want time to advance (instead a call to Clock::advance()
+ // will handle the timer).
+ if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ timeouts_watcher.repeat = 0;
+ }
+
+ ev_timer_again(loop, &timeouts_watcher);
+ }
}
- update_timer = false;
+ update_timer = false; // Since we might have a queued update_timer.
}
- foreach (const timeout &timeout, timedout) {
- process_manager->timedout(timeout.pid, timeout.generation);
+ // Update current time of process (if it's present/valid). It might
+ // be necessary to actually add some more synchronization around
+ // this so that, for example, pausing and resuming the clock doesn't
+ // cause some processes to get thier current times updated and
+ // others not. Since ProcessManager::use acquires the 'processes'
+ // lock we had to move this out of the synchronized (timeouts) above
+ // since there was a deadlock with acquring 'processes' then
+ // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
+ // current time may be greater than the timeout if a local message
+ // was received (and happens-before kicks in).
+ if (Clock::paused()) {
+ foreach (const timer& timer, timedout) {
+ if (ProcessReference process = process_manager->use(timer.pid)) {
+ Clock::update(process, timer.timeout);
+ }
+ }
}
-}
-
-void handle_poll(struct ev_loop *loop, ev_io *watcher, int revents)
-{
- tuple<UPID, int> *t = (tuple<UPID, int> *) watcher->data;
- process_manager->polled(t->get<0>(), t->get<1>());
- ev_io_stop(loop, watcher);
- delete watcher;
- delete t;
+ // Execute the thunks of the timeouts that timed out (TODO(benh): Do
+ // this async so that we don't tie up the event thread!).
+ foreach (const timer& timer, timedout) {
+ timer.thunk();
+ }
}
@@ -885,177 +917,43 @@ void accept(struct ev_loop *loop, ev_io
}
-void * serve(void *arg)
-{
- ev_loop(((struct ev_loop *) arg), 0);
-
- return NULL;
-}
-
-
-void* invoker(void* arg)
+void* serve(void* arg)
{
- do {
- Gate::state_t old = invoke_gate->approach();
-
- function<void(void)>* thunk = NULL;
- synchronized (thunks) {
- if (!thunks->empty()) {
- thunk = thunks->front();
- thunks->pop();
- }
- }
-
- if (thunk != NULL) {
- (*thunk)();
- continue;
- }
-
- invoke_gate->arrive(old);
- } while (true);
+ ev_loop(((struct ev_loop*) arg), 0);
return NULL;
}
-void trampoline(int stack0, int stack1, int process0, int process1)
-{
- /* Unpackage the arguments. */
-#ifdef __x86_64__
- CHECK (sizeof(unsigned long) == sizeof(ProcessBase *));
- void *stack = (void *)
- (((unsigned long) stack1 << 32) + (unsigned int) stack0);
- ProcessBase *process = (ProcessBase *)
- (((unsigned long) process1 << 32) + (unsigned int) process0);
-#else
- CHECK (sizeof(unsigned int) == sizeof(ProcessBase *));
- void *stack = (void *) (unsigned int) stack0;
- ProcessBase *process = (ProcessBase *) (unsigned int) process0;
-#endif /* __x86_64__ */
-
- /* Run the process. */
- process_manager->run(process);
-
- /* Prepare to recycle this stack (global variable hack!). */
- CHECK(recyclable == NULL);
- recyclable = stack;
-
- proc_process = NULL;
- setcontext(&proc_uctx_schedule);
-}
-
-
-void * schedule(void *arg)
+void* schedule(void* arg)
{
- // Context for the entry into the schedule routine, used when a
- // process exits, so that other processes can get scheduled!
- if (getcontext(&proc_uctx_schedule) < 0)
- fatalerror("getcontext failed (schedule)");
-
- // Recycle the stack from an exited process.
- if (recyclable != NULL) {
- synchronized (stacks) {
- stacks->push(recyclable);
- }
- recyclable = NULL;
- }
+ __process__ = NULL; // Start off not running anything.
do {
- ProcessBase *process = process_manager->dequeue();
-
+ ProcessBase* process = process_manager->dequeue();
if (process == NULL) {
Gate::state_t old = gate->approach();
process = process_manager->dequeue();
if (process == NULL) {
-
- // When using the manual clock, we want to let all the
- // processes "run" up to the current time so that processes
- // receive messages in order. If we let one process have a
- // drastically advanced current time then it may try send
- // messages to another process that, due to the happens-before
- // relationship, will inherit it's drastically advanced
- // current time. If the processing thread gets to this point
- // (i.e., the point where no other processes are runnable)
- // with the manual clock means that all of the processes have
- // been run which could be run up to the current time. The
- // only way another process could become runnable is if (1) it
- // receives a message from another node, (2) a file descriptor
- // it is polling has become ready, or (3) if it has a
- // timeout. We can ignore processes that become runnable due
- // to receiving a message from another node or getting an
- // event on a file descriptor because that should not change
- // the timing happens-before relationship of the local
- // processes (unless of course the file descriptor was created
- // from something like timerfd, in which case, since the
- // programmer is not using the timing source provided in
- // libprocess and all bets are off). Thus, we can check that
- // there are no pending timeouts before the current time and
- // move the current time to the next timeout value, and tell
- // the timer to update itself.
-
- synchronized (timeouts) {
- if (clk != NULL) {
- if (!timeouts->empty()) {
- // Adjust the current time to the next timeout, provided
- // it is not past the elapsed time.
- ev_tstamp tstamp = timeouts->begin()->first;
- if (tstamp <= clk->getElapsed()) {
- clk->setCurrent(tstamp);
- }
-
- update_timer = true;
- ev_async_send(loop, &async_watcher);
- } else {
- // Woah! This comment is the only thing in this else
- // branch because this is a pretty serious state ... the
- // only way to make progress is for another node to send
- // a message or for an event to occur on a file
- // descriptor that a process is polling. We may want to
- // consider doing (or printing) something here.
- }
- }
- }
-
- /* Wait at gate if idle. */
- gate->arrive(old);
+ gate->arrive(old); // Wait at gate if idle.
continue;
} else {
gate->leave();
}
}
-
- VLOG(2) << "Resuming " << process->pid;
-
- process->lock();
- {
- CHECK(process->state == ProcessBase::INIT ||
- process->state == ProcessBase::READY ||
- process->state == ProcessBase::INTERRUPTED ||
- process->state == ProcessBase::TIMEDOUT);
-
- /* Continue process. */
- CHECK(proc_process == NULL);
- proc_process = process;
- swapcontext(&proc_uctx_running, &process->uctx);
- CHECK(proc_process != NULL);
- proc_process = NULL;
- }
- process->unlock();
+ process_manager->resume(process);
} while (true);
}
-/*
- * We might find value in catching terminating signals at some point.
- * However, for now, adding signal handlers freely is not allowed
- * because they will clash with Java and Python virtual machines and
- * causes hard to debug crashes/segfaults.
- */
-
+// We might find value in catching terminating signals at some point.
+// However, for now, adding signal handlers freely is not allowed
+// because they will clash with Java and Python virtual machines and
+// causes hard to debug crashes/segfaults.
// void sigbad(int signal, struct sigcontext *ctx)
// {
-// /* Pass on the signal (so that a core file is produced). */
+// // Pass on the signal (so that a core file is produced).
// struct sigaction sa;
// sa.sa_handler = SIG_DFL;
// sigemptyset(&sa.sa_mask);
@@ -1067,6 +965,9 @@ void * schedule(void *arg)
void initialize(bool initialize_google_logging)
{
+// static pthread_once_t init = PTHREAD_ONCE_INIT;
+// pthread_once(&init, ...);
+
static volatile bool initialized = false;
static volatile bool initializing = true;
@@ -1088,7 +989,7 @@ void initialize(bool initialize_google_l
google::LogToStderr();
}
-// /* Install signal handler. */
+// // Install signal handler.
// struct sigaction sa;
// sa.sa_handler = (void (*) (int)) sigbad;
@@ -1119,9 +1020,20 @@ void initialize(bool initialize_google_l
process_manager = new ProcessManager();
socket_manager = new SocketManager();
- // Setup processing thread.
- if (pthread_create (&proc_thread, NULL, schedule, NULL) != 0) {
- PLOG(FATAL) << "Failed to initialize, pthread_create";
+ // Setup the thread local process pointer.
+ pthread_key_t key;
+ if (pthread_key_create(&key, NULL) != 0) {
+ LOG(FATAL) << "Failed to initialize, pthread_key_create";
+ }
+
+ _process_ = new ThreadLocal<ProcessBase>(key);
+
+ // Setup processing threads.
+ for (int i = 0; i < NUMBER_OF_PROCESSING_THREADS; i++) {
+ pthread_t thread; // For now, not saving handles on our threads.
+ if (pthread_create(&thread, NULL, schedule, NULL) != 0) {
+ LOG(FATAL) << "Failed to initialize, pthread_create";
+ }
}
ip = 0;
@@ -1134,9 +1046,9 @@ void initialize(bool initialize_google_l
if (value != NULL) {
int result = inet_pton(AF_INET, value, &ip);
if (result == 0) {
- fatal("LIBPROCESS_IP=%s was unparseable", value);
+ LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
} else if (result < 0) {
- fatalerror("failed to initialize (inet_pton)");
+ PLOG(FATAL) << "Failed to initialize, inet_pton";
}
}
@@ -1145,7 +1057,7 @@ void initialize(bool initialize_google_l
if (value != NULL) {
int result = atoi(value);
if (result < 0 || result > USHRT_MAX) {
- fatal("LIBPROCESS_PORT=%s is not a valid port", value);
+ LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
}
port = result;
}
@@ -1221,7 +1133,7 @@ void initialize(bool initialize_google_l
ev_async_init(&async_watcher, handle_async);
ev_async_start(loop, &async_watcher);
- ev_timer_init(&timeouts_watcher, handle_timeout, 0., 2100000.0);
+ ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
ev_timer_again(loop, &timeouts_watcher);
ev_io_init(&server_watcher, accept, s, EV_READ);
@@ -1242,12 +1154,9 @@ void initialize(bool initialize_google_l
// sigaddset (&sa.sa_mask, w->signum);
// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
- if (pthread_create(&io_thread, NULL, serve, loop) != 0) {
- PLOG(FATAL) << "Failed to initialize, pthread_create";
- }
-
- if (pthread_create(&invoke_thread, NULL, invoker, NULL) != 0) {
- PLOG(FATAL) << "Failed to initialize, pthread_create";
+ pthread_t thread; // For now, not saving handles on our threads.
+ if (pthread_create(&thread, NULL, serve, loop) != 0) {
+ LOG(FATAL) << "Failed to initialize, pthread_create";
}
// Need to set initialzing here so that we can actually invoke
@@ -1266,50 +1175,67 @@ void initialize(bool initialize_google_l
}
-HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy)
- : proxy(_proxy) {}
+HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy,
+ Future<HttpResponse>* _future,
+ bool _persist)
+ : proxy(_proxy), future(_future), persist(_persist)
+{
+ // Wait for any event on the future.
+ deferred<void(const Future<HttpResponse>&)> waited = executor.defer(
+ lambda::bind(&HttpResponseWaiter::waited, this, lambda::_1));
+
+ future->onAny(waited);
+ // Also create a timer so we don't wait forever.
+ deferred<void(void)> timeout = executor.defer(
+ lambda::bind(&HttpResponseWaiter::timeout, this));
-HttpResponseWaiter::~HttpResponseWaiter() {}
+ timers::create(30, timeout);
+}
-void HttpResponseWaiter::await(const Future<HttpResponse>& future, bool persist)
+void HttpResponseWaiter::waited(const Future<HttpResponse>&)
{
- if (future.await(30)) {
- dispatch(proxy, &HttpProxy::ready, future, persist);
+ if (future->isReady()) {
+ process::dispatch(proxy, &HttpProxy::ready, future, persist);
} else {
- dispatch(proxy, &HttpProxy::unavailable, persist);
+ // TODO(benh): Consider handling other "states" of future
+ // (discarded, failed, etc) with different HTTP statuses.
+ process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
}
+
+ executor.stop(); // Ensure we ignore the timeout.
}
-HttpProxy::HttpProxy(int _c) : c(_c)
+void HttpResponseWaiter::timeout()
{
- // Create our waiter.
- waiter = new HttpResponseWaiter(self());
- spawn(waiter);
+ process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
+
+ executor.stop(); // Ensure we ignore the future.
}
-HttpProxy::~HttpProxy()
-{
- send(waiter->self(), TERMINATE);
- wait(waiter->self());
- delete waiter;
-}
+HttpProxy::HttpProxy(int _c) : c(_c) {}
-void HttpProxy::handle(const Future<HttpResponse>& future, bool persist)
+void HttpProxy::handle(Future<HttpResponse>* future, bool persist)
{
- dispatch(waiter, &HttpResponseWaiter::await, future, persist);
+ HttpResponseWaiter* waiter = new HttpResponseWaiter(this, future, persist);
+ waiters[future] = waiter;
}
-void HttpProxy::ready(const Future<HttpResponse>& future, bool persist)
+void HttpProxy::ready(Future<HttpResponse>* future, bool persist)
{
- CHECK(future.isReady());
+ CHECK(waiters.count(future) > 0);
+ HttpResponseWaiter* waiter = waiters[future];
+ waiters.erase(future);
+ delete waiter;
- const HttpResponse& response = future.get();
+ CHECK(future->isReady());
+
+ const HttpResponse& response = future->get();
// Don't persist the connection if the responder doesn't want it to.
if (response.headers.count("Connection") > 0) {
@@ -1319,19 +1245,32 @@ void HttpProxy::ready(const Future<HttpR
}
}
+ HttpResponseEncoder* encoder =
+ new HttpResponseEncoder(response);
+
+ delete future;
+
// See the semantics of SocketManager::send for details about how
// the socket will get closed (it might actually already be closed
// before we issue this send).
- socket_manager->send(new HttpResponseEncoder(response), c, persist);
+ socket_manager->send(encoder, c, persist);
}
-void HttpProxy::unavailable(bool persist)
+void HttpProxy::unavailable(Future<HttpResponse>* future, bool persist)
{
- HttpResponse response = HttpServiceUnavailableResponse();
+ CHECK(waiters.count(future) > 0);
+ HttpResponseWaiter* waiter = waiters[future];
+ waiters.erase(future);
+ delete waiter;
+
+ HttpResponseEncoder* encoder =
+ new HttpResponseEncoder(HttpServiceUnavailableResponse());
+
+ delete future;
// As above, the socket might all ready be closed when we do a send.
- socket_manager->send(new HttpResponseEncoder(response), c, persist);
+ socket_manager->send(encoder, c, persist);
}
@@ -1347,10 +1286,10 @@ SocketManager::~SocketManager() {}
void SocketManager::link(ProcessBase *process, const UPID &to)
{
// TODO(benh): The semantics we want to support for link are such
- // that if there is nobody to link to (local or remote) then a
- // EXITED message gets generated. This functionality has only
- // been implemented when the link is local, not remote. Of course,
- // if there is nobody listening on the remote side, then this should
+ // that if there is nobody to link to (local or remote) then an
+ // ExitedEvent gets generated. This functionality has only been
+ // implemented when the link is local, not remote. Of course, if
+ // there is nobody listening on the remote side, then this should
// work remotely ... but if there is someone listening remotely just
// not at that id, then it will silently continue executing.
@@ -1365,11 +1304,11 @@ void SocketManager::link(ProcessBase *pr
int s;
if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
- fatalerror("failed to link (socket)");
+ PLOG(FATAL) << "Failed to link, socket";
}
if (set_nbio(s) < 0) {
- fatalerror("failed to link (set_nbio)");
+ PLOG(FATAL) << "Failed to link, set_nbio";
}
sockets[s] = node;
@@ -1391,7 +1330,7 @@ void SocketManager::link(ProcessBase *pr
// Try and connect to the node using this socket.
if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
- fatalerror("failed to link (connect)");
+ PLOG(FATAL) << "Failed to link, connect";
}
// Wait for socket to be connected.
@@ -1441,6 +1380,13 @@ void SocketManager::send(DataEncoder* en
{
CHECK(encoder != NULL);
+ // TODO(benh): The current mechanism here is insufficient. It could
+ // be the case that an HttpProxy attempts to do a send on a socket
+ // just as that socket has been closed and then re-opened for
+ // another connection. In this case, the data sent on that socket
+ // will be completely bogus ... one easy fix would be to check the
+ // proxy that is associated with the socket to eliminate this race.
+
synchronized (this) {
if (sockets.count(s) > 0) {
if (outgoing.count(s) > 0) {
@@ -1468,6 +1414,7 @@ void SocketManager::send(DataEncoder* en
}
} else {
VLOG(1) << "Attempting to send on a no longer valid socket!";
+ delete encoder;
}
}
}
@@ -1494,11 +1441,11 @@ void SocketManager::send(Message* messag
int s;
if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
- fatalerror("failed to send (socket)");
+ PLOG(FATAL) << "Failed to send, socket";
}
if (set_nbio(s) < 0) {
- fatalerror("failed to send (set_nbio)");
+ PLOG(FATAL) << "Failed to send, set_nbio";
}
sockets[s] = node;
@@ -1522,7 +1469,7 @@ void SocketManager::send(Message* messag
if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
- fatalerror("failed to send (connect)");
+ PLOG(FATAL) << "Failed to send, connect";
}
// Initialize watcher for connecting.
@@ -1568,7 +1515,7 @@ DataEncoder* SocketManager::next(int s)
} else if (proxies.count(s) > 0) {
HttpProxy* proxy = proxies[s];
proxies.erase(s);
- post(proxy->self(), TERMINATE);
+ terminate(proxy);
}
disposables.erase(s);
@@ -1584,6 +1531,8 @@ DataEncoder* SocketManager::next(int s)
void SocketManager::closed(int s)
{
+ HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
+
synchronized (this) {
if (sockets.count(s) > 0) {
const Node& node = sockets[s];
@@ -1595,9 +1544,8 @@ void SocketManager::closed(int s)
} else if (temps.count(node) > 0 && temps[node] == s) {
temps.erase(node);
} else if (proxies.count(s) > 0) {
- HttpProxy* proxy = proxies[s];
+ proxy = proxies[s];
proxies.erase(s);
- post(proxy->self(), TERMINATE);
}
outgoing.erase(s);
@@ -1606,6 +1554,12 @@ void SocketManager::closed(int s)
}
}
+ // We terminate the proxy outside the synchronized block to avoid
+ // possible deadlock between the ProcessManager and SocketManager.
+ if (proxy != NULL) {
+ terminate(proxy);
+ }
+
// This might have just been a receiving socket (only sending
// sockets, with the exception of the receiving side of a persistant
// socket, get added to 'sockets'), so we want to make sure to call
@@ -1614,7 +1568,7 @@ void SocketManager::closed(int s)
}
-void SocketManager::exited(const Node &node)
+void SocketManager::exited(const Node& node)
{
// TODO(benh): It would be cleaner if this routine could call back
// into ProcessManager ... then we wouldn't have to convince
@@ -1623,14 +1577,12 @@ void SocketManager::exited(const Node &n
synchronized (this) {
list<UPID> removed;
// Look up all linked processes.
- foreachpair (const UPID &pid, set<ProcessBase *> &processes, links) {
- if (pid.ip == node.ip && pid.port == node.port) {
- // N.B. If we call exited(pid) we might invalidate iteration.
- foreach (ProcessBase *process, processes) {
- Message* message = encode(pid, process->pid, EXITED);
- process->enqueue(message);
+ foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
+ if (linkee.ip == node.ip && linkee.port == node.port) {
+ foreach (ProcessBase* linker, processes) {
+ linker->enqueue(new ExitedEvent(linkee));
}
- removed.push_back(pid);
+ removed.push_back(linkee);
}
}
@@ -1641,27 +1593,38 @@ void SocketManager::exited(const Node &n
}
-void SocketManager::exited(ProcessBase *process)
+void SocketManager::exited(ProcessBase* process)
{
+ // An exited event is enough to cause the process to get deleted
+ // (e.g., by the garbage collector), which means we can't
+ // dereference process (or even use the address) after we enqueue at
+ // least one exited event. Thus, we save the process pid.
+ const UPID pid = process->pid;
+
+ // Likewise, we need to save the current time of the process so we
+ // can update the clocks of linked processes as appropriate.
+ const double secs = Clock::now(process);
+
synchronized (this) {
- /* Remove any links this process might have had. */
- foreachpair (_, set<ProcessBase *> &processes, links) {
+ // Iterate through the links, removing any links the process might
+ // have had and creating exited events for any linked processes.
+ foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
processes.erase(process);
- }
-
- /* Look up all linked processes. */
- map<UPID, set<ProcessBase *> >::iterator it = links.find(process->pid);
- if (it != links.end()) {
- set<ProcessBase *> &processes = it->second;
- foreach (ProcessBase *p, processes) {
- CHECK(process != p);
- Message *message = encode(process->pid, p->pid, EXITED);
- // TODO(benh): Preserve happens-before when using clock.
- p->enqueue(message);
+ if (linkee == pid) {
+ foreach (ProcessBase* linker, processes) {
+ CHECK(linker != process) << "Process linked with itself";
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ Clock::update(linker, secs);
+ }
+ }
+ linker->enqueue(new ExitedEvent(linkee));
+ }
}
- links.erase(process->pid);
}
+
+ links.erase(pid);
}
}
@@ -1681,8 +1644,9 @@ ProcessReference ProcessManager::use(con
if (pid.ip == ip && pid.port == port) {
synchronized (processes) {
if (processes.count(pid.id) > 0) {
- // Note that the ProcessReference constructor MUST get called
- // while holding the lock on processes.
+ // Note that the ProcessReference constructor _must_ get
+ // called while holding the lock on processes so that waiting
+ // for references is atomic (i.e., race free).
return ProcessReference(processes[pid.id]);
}
}
@@ -1692,7 +1656,7 @@ ProcessReference ProcessManager::use(con
}
-bool ProcessManager::deliver(Message *message, ProcessBase *sender)
+bool ProcessManager::deliver(Message* message, ProcessBase* sender)
{
CHECK(message != NULL);
@@ -1705,18 +1669,13 @@ bool ProcessManager::deliver(Message *me
// that we can look up it's current time).
if (sender != NULL) {
synchronized (timeouts) {
- if (clk != NULL) {
- ev_tstamp tstamp =
- max(clk->getCurrent(receiver), clk->getCurrent(sender));
- clk->setCurrent(receiver, tstamp);
+ if (Clock::paused()) {
+ Clock::order(sender, receiver);
}
}
}
- VLOG(2) << "Delivering message name '" << message->name
- << "' to " << message->to << " from " << message->from;
-
- receiver->enqueue(message);
+ receiver->enqueue(new MessageEvent(message));
} else {
delete message;
return false;
@@ -1727,7 +1686,7 @@ bool ProcessManager::deliver(Message *me
// TODO(benh): Refactor and share code with above!
-bool ProcessManager::deliver(int c, HttpRequest *request, ProcessBase *sender)
+bool ProcessManager::deliver(int c, HttpRequest* request, ProcessBase* sender)
{
CHECK(request != NULL);
@@ -1740,9 +1699,9 @@ bool ProcessManager::deliver(int c, Http
}
// Treat this as an HTTP request and check for a valid receiver.
- string temp = request->path.substr(1, request->path.find('/', 1) - 1);
+ string path = request->path.substr(1, request->path.find('/', 1) - 1);
- UPID to(temp, ip, port);
+ UPID to(path, ip, port);
if (ProcessReference receiver = use(to)) {
// If we have a local sender AND we are using a manual clock
@@ -1753,41 +1712,23 @@ bool ProcessManager::deliver(int c, Http
// that we can look up it's current time).
if (sender != NULL) {
synchronized (timeouts) {
- if (clk != NULL) {
- ev_tstamp tstamp =
- max(clk->getCurrent(receiver), clk->getCurrent(sender));
- clk->setCurrent(receiver, tstamp);
+ if (Clock::paused()) {
+ Clock::order(sender, receiver);
}
}
}
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(c);
-
- // Create the promise to link with whatever gets returned.
- Promise<HttpResponse>* promise = new Promise<HttpResponse>();
-
- // Let the HttpProxy know about this request.
- dispatch(proxy, &HttpProxy::handle, promise->future(), request->keepAlive);
-
- VLOG(2) << "Delivering HTTP request for '" << request->path
- << "' to " << to;
-
- // Enqueue request and promise for receiver.
- receiver->enqueue(new pair<HttpRequest*, Promise<HttpResponse>*>(request, promise));
+ // Enqueue the event.
+ receiver->enqueue(new HttpEvent(c, request));
} else {
// This has no receiver, send error response.
- VLOG(1) << "Returning '404 Not Found' for HTTP request for '"
- << request->path << "'";
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(c);
+ VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";
- // Create a "future" response.
- Future<HttpResponse> future = HttpNotFoundResponse();
+ HttpResponseEncoder* encoder =
+ new HttpResponseEncoder(HttpNotFoundResponse());
- // Let the HttpProxy know about this request.
- dispatch(proxy, &HttpProxy::handle, future, request->keepAlive);
+ // TODO(benh): Socket might be closed and then re-opened!
+ socket_manager->send(encoder, c, request->keepAlive);
// Cleanup request.
delete request;
@@ -1799,9 +1740,12 @@ bool ProcessManager::deliver(int c, Http
// TODO(benh): Refactor and share code with above!
-bool ProcessManager::deliver(const UPID& to, function<void(ProcessBase*)>* dispatcher, ProcessBase *sender)
+bool ProcessManager::deliver(
+ const UPID& to,
+ lambda::function<void(ProcessBase*)>* f,
+ ProcessBase* sender)
{
- CHECK(dispatcher != NULL);
+ CHECK(f != NULL);
if (ProcessReference receiver = use(to)) {
// If we have a local sender AND we are using a manual clock
@@ -1812,19 +1756,15 @@ bool ProcessManager::deliver(const UPID&
// that we can look up it's current time).
if (sender != NULL) {
synchronized (timeouts) {
- if (clk != NULL) {
- ev_tstamp tstamp =
- max(clk->getCurrent(receiver), clk->getCurrent(sender));
- clk->setCurrent(receiver, tstamp);
+ if (Clock::paused()) {
+ Clock::order(sender, receiver);
}
}
}
- VLOG(2) << "Delivering dispatcher to " << to;
-
- receiver->enqueue(dispatcher);
+ receiver->enqueue(new DispatchEvent(f));
} else {
- delete dispatcher;
+ delete f;
return false;
}
@@ -1832,12 +1772,10 @@ bool ProcessManager::deliver(const UPID&
}
-UPID ProcessManager::spawn(ProcessBase *process, bool manage)
+UPID ProcessManager::spawn(ProcessBase* process, bool manage)
{
CHECK(process != NULL);
- process->state = ProcessBase::INIT;
-
synchronized (processes) {
if (processes.count(process->pid.id) > 0) {
return UPID();
@@ -1846,311 +1784,269 @@ UPID ProcessManager::spawn(ProcessBase *
}
}
- void *stack = NULL;
-
- // Reuse a stack if any are available.
- synchronized (stacks) {
- if (!stacks->empty()) {
- stack = stacks->top();
- stacks->pop();
- }
+ // Use the garbage collector if requested.
+ if (manage) {
+ dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
}
- if (stack == NULL) {
- const int protection = (PROT_READ | PROT_WRITE);
- const int flags = (MAP_PRIVATE | MAP_ANONYMOUS | MAP_32BIT);
+ // Add process to the run queue (so 'initialize' will get invoked).
+ enqueue(process);
- stack = mmap(NULL, PROCESS_STACK_SIZE, protection, flags, -1, 0);
+ VLOG(2) << "Spawned process " << process->self();
- if (stack == MAP_FAILED)
- fatalerror("mmap failed (spawn)");
+ return process->self();
+}
- /* Disallow all memory access to the last page. */
- if (mprotect(stack, getpagesize(), PROT_NONE) != 0)
- fatalerror("mprotect failed (spawn)");
- }
- /* Set up the ucontext. */
- if (getcontext(&process->uctx) < 0)
- fatalerror("getcontext failed (spawn)");
-
- process->uctx.uc_stack.ss_sp = stack;
- process->uctx.uc_stack.ss_size = PROCESS_STACK_SIZE;
- process->uctx.uc_link = 0;
-
- /* Package the arguments. */
-#ifdef __x86_64__
- CHECK(sizeof(unsigned long) == sizeof(ProcessBase *));
- int stack0 = (unsigned int) (unsigned long) stack;
- int stack1 = (unsigned long) stack >> 32;
- int process0 = (unsigned int) (unsigned long) process;
- int process1 = (unsigned long) process >> 32;
-#else
- CHECK(sizeof(unsigned int) == sizeof(ProcessBase *));
- int stack0 = (unsigned int) stack;
- int stack1 = 0;
- int process0 = (unsigned int) process;
- int process1 = 0;
-#endif /* __x86_64__ */
-
- makecontext(&process->uctx, (void (*)()) trampoline,
- 4, stack0, stack1, process0, process1);
+void ProcessManager::resume(ProcessBase* process)
+{
+ __process__ = process;
- /* Add process to the run queue. */
- enqueue(process);
+ VLOG(2) << "Resuming " << process->pid << " at "
+ << std::fixed << std::setprecision(9) << Clock::now();
- /* Use the garbage collector if requested. */
- if (manage) {
- dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
- }
+ bool terminate = false;
+ bool blocked = false;
- return process->self();
-}
+ CHECK(process->state == ProcessBase::BOTTOM ||
+ process->state == ProcessBase::READY);
+ if (process->state == ProcessBase::BOTTOM) {
+ process->state = ProcessBase::RUNNING;
+ try { process->initialize(); }
+ catch (...) { terminate = true; }
+ }
+ while (!terminate && !blocked) {
+ Event* event = NULL;
-void ProcessManager::link(ProcessBase *process, const UPID &to)
-{
- // Check if the pid is local.
- if (!(to.ip == ip && to.port == port)) {
- socket_manager->link(process, to);
- } else {
- // Since the pid is local we want to get a reference to it's
- // underlying process so that while we are invoking the link
- // manager we don't miss sending a possible EXITED.
- if (ProcessReference _ = use(to)) {
- socket_manager->link(process, to);
- } else {
- // Since the pid isn't valid it's process must have already died
- // (or hasn't been spawned yet) so send a process exit message.
- Message *message = encode(to, process->pid, EXITED);
- process->enqueue(message);
+ process->lock();
+ {
+ if (process->events.size() > 0) {
+ event = process->events.front();
+ process->events.pop_front();
+ process->state = ProcessBase::RUNNING;
+ } else {
+ process->state = ProcessBase::BLOCKED;
+ blocked = true;
+ }
}
- }
-}
-
+ process->unlock();
-bool ProcessManager::receive(ProcessBase *process, double secs)
-{
- CHECK(process != NULL);
+ if (!blocked) {
+ CHECK(event != NULL);
- bool timedout = false;
+ // Determine if we should terminate.
+ terminate = event->is<TerminateEvent>();
- process->lock();
- {
- /* Ensure nothing enqueued since check in ProcessBase::receive. */
- if (process->messages.empty()) {
- if (secs > 0) {
- /* Create timeout. */
- const timeout &timeout = create_timeout(process, secs);
-
- /* Start the timeout. */
- start_timeout(timeout);
-
- /* Context switch. */
- process->state = ProcessBase::RECEIVING;
- swapcontext(&process->uctx, &proc_uctx_running);
-
- CHECK(process->state == ProcessBase::READY ||
- process->state == ProcessBase::TIMEDOUT);
-
- /* Attempt to cancel the timer if necessary. */
- if (process->state != ProcessBase::TIMEDOUT) {
- cancel_timeout(timeout);
- } else {
- timedout = true;
- }
+ // Now service the event.
+ try {
+ process->serve(*event);
+ } catch (const std::exception& e) {
+ std::cerr << "libprocess: " << process->pid
+ << " terminating due to "
+ << e.what() << std::endl;
+ terminate = true;
+ } catch (...) {
+ std::cerr << "libprocess: " << process->pid
+ << " terminating due to unknown exception" << std::endl;
+ terminate = true;
+ }
- /* N.B. No cancel means possible unnecessary timeouts. */
+ delete event;
- process->state = ProcessBase::RUNNING;
-
- /* Update the generation (handles racing timeouts). */
- process->generation++;
- } else {
- /* Context switch. */
- process->state = ProcessBase::RECEIVING;
- swapcontext(&process->uctx, &proc_uctx_running);
- CHECK(process->state == ProcessBase::READY);
- process->state = ProcessBase::RUNNING;
+ if (terminate) {
+ cleanup(process);
}
}
}
- process->unlock();
- return !timedout;
+ __process__ = NULL;
}
-bool ProcessManager::serve(ProcessBase *process, double secs)
+void ProcessManager::cleanup(ProcessBase* process)
{
- CHECK(process != NULL);
+ VLOG(2) << "Cleaning up " << process->pid;
- bool timedout = false;
+ // Processes that were waiting on exiting process.
+ list<ProcessBase*> resumable;
- process->lock();
- {
- /* Ensure nothing enqueued since check in ProcessBase::serve. */
- if (process->messages.empty() &&
- process->requests.empty() &&
- process->dispatchers.empty()) {
- if (secs > 0) {
- /* Create timeout. */
- const timeout &timeout = create_timeout(process, secs);
-
- /* Start the timeout. */
- start_timeout(timeout);
-
- /* Context switch. */
- process->state = ProcessBase::SERVING;
- swapcontext(&process->uctx, &proc_uctx_running);
-
- CHECK(process->state == ProcessBase::READY ||
- process->state == ProcessBase::TIMEDOUT);
-
- /* Attempt to cancel the timer if necessary. */
- if (process->state != ProcessBase::TIMEDOUT) {
- cancel_timeout(timeout);
- } else {
- timedout = true;
- }
+ // Possible gate non-libprocess threads are waiting at.
+ Gate* gate = NULL;
+
+ // Remove process.
+ synchronized (processes) {
+ // Wait for all process references to get cleaned up.
+ while (process->refs > 0) {
+ asm ("pause");
+ __sync_synchronize();
+ }
- /* N.B. No cancel means possible unnecessary timeouts. */
+ process->lock();
+ {
+ // Free any pending events.
+ while (!process->events.empty()) {
+ Event* event = process->events.front();
+ process->events.pop_front();
+ delete event;
+ }
- process->state = ProcessBase::RUNNING;
-
- /* Update the generation (handles racing timeouts). */
- process->generation++;
- } else {
- /* Context switch. */
- process->state = ProcessBase::SERVING;
- swapcontext(&process->uctx, &proc_uctx_running);
- CHECK(process->state == ProcessBase::READY);
- process->state = ProcessBase::RUNNING;
+ processes.erase(process->pid.id);
+
+ // Lookup gate to wake up waiting threads.
+ map<ProcessBase*, Gate*>::iterator it = gates.find(process);
+ if (it != gates.end()) {
+ gate = it->second;
+ // N.B. The last thread that leaves the gate also free's it.
+ gates.erase(it);
}
+
+ CHECK(process->refs == 0);
+ process->state = ProcessBase::FINISHED;
}
+ process->unlock();
+
+ // Note that we don't remove the process from the clock during
+ // cleanup, but rather the clock is reset for a process when it is
+ // created (see ProcessBase::ProcessBase). We do this so that
+ // SocketManager::exited can access the current time of the
+ // process to "order" exited events. It might make sense to
+ // consider storing the time of the process as a field of the
+ // class instead.
+
+ // Now we tell the socket manager about this process exiting so
+ // that it can create exited events for linked processes. We
+ // _must_ do this while synchronized on processes because
+ // otherwise another process could attempt to link this process
+ // and SocketManger::link would see that the processes doesn't
+ // exist when it attempts to get a ProcessReference (since we
+ // removed the process above) thus causing an exited event, which
+ // could cause the process to get deleted (e.g., the garbage
+ // collector might link _after_ the process has already been
+ // removed, thus getting an exited event but we don't want that
+ // exited event to fire until after we have used the process in
+ // SocketManager::exited.
+ socket_manager->exited(process);
}
- process->unlock();
- return !timedout;
+ // Confirm process not in runq.
+ synchronized (runq) {
+ CHECK(find(runq.begin(), runq.end(), process) == runq.end());
+ }
+
+ // ***************************************************************
+ // At this point we can no longer dereference the process since it
+ // might already be deallocated (e.g., by the garbage collector).
+ // ***************************************************************
+
+ if (gate != NULL) {
+ gate->open();
+ }
}
-void ProcessManager::pause(ProcessBase *process, double secs)
+void ProcessManager::link(ProcessBase* process, const UPID& to)
{
- CHECK(process != NULL);
-
- process->lock();
- {
- if (secs > 0) {
- /* Create/Start the timeout. */
- start_timeout(create_timeout(process, secs));
-
- /* Context switch. */
- process->state = ProcessBase::PAUSED;
- swapcontext(&process->uctx, &proc_uctx_running);
- CHECK(process->state == ProcessBase::TIMEDOUT);
- process->state = ProcessBase::RUNNING;
+ // Check if the pid is local.
+ if (!(to.ip == ip && to.port == port)) {
+ socket_manager->link(process, to);
+ } else {
+ // Since the pid is local we want to get a reference to it's
+ // underlying process so that while we are invoking the link
+ // manager we don't miss sending a possible ExitedEvent.
+ if (ProcessReference _ = use(to)) {
+ socket_manager->link(process, to);
} else {
- /* Modified context switch (basically a yield). */
- process->state = ProcessBase::READY;
- enqueue(process);
- swapcontext(&process->uctx, &proc_uctx_running);
- CHECK(process->state == ProcessBase::READY);
- process->state = ProcessBase::RUNNING;
+ // Since the pid isn't valid it's process must have already died
+ // (or hasn't been spawned yet) so send a process exit message.
+ process->enqueue(new ExitedEvent(to));
}
}
- process->unlock();
}
-void ProcessManager::terminate(const UPID& pid, bool inject, ProcessBase* sender)
+void ProcessManager::terminate(
+ const UPID& pid,
+ bool inject,
+ ProcessBase* sender)
{
if (ProcessReference process = use(pid)) {
if (sender != NULL) {
synchronized (timeouts) {
- if (clk != NULL) {
- ev_tstamp tstamp =
- max(clk->getCurrent(process), clk->getCurrent(sender));
- clk->setCurrent(process, tstamp);
+ if (Clock::paused()) {
+ Clock::order(sender, process);
}
}
- process->enqueue(encode(sender->self(), pid, TERMINATE), inject);
+ process->enqueue(new TerminateEvent(sender->self()), inject);
} else {
- process->enqueue(encode(UPID(), pid, TERMINATE), inject);
- }
- }
-}
-
-
-bool ProcessManager::wait(ProcessBase *process, const UPID &pid)
-{
- bool waited = false;
-
- /* Now we can add the process to the waiters. */
- synchronized (processes) {
- if (processes.count(pid.id) > 0) {
- CHECK(processes[pid.id]->state != ProcessBase::FINISHED);
- waiters[processes[pid.id]].insert(process);
- waited = true;
- }
- }
-
- /* If we waited then we should context switch. */
- if (waited) {
- process->lock();
- {
- if (process->state == ProcessBase::RUNNING) {
- /* Context switch. */
- process->state = ProcessBase::WAITING;
- swapcontext(&process->uctx, &proc_uctx_running);
- CHECK(process->state == ProcessBase::READY);
- process->state = ProcessBase::RUNNING;
- } else {
- /* Process is cleaned up and we have been removed from waiters. */
- CHECK(process->state == ProcessBase::INTERRUPTED);
- process->state = ProcessBase::RUNNING;
- }
+ process->enqueue(new TerminateEvent(UPID()), inject);
}
- process->unlock();
}
-
- return waited;
}
-bool ProcessManager::external_wait(const UPID &pid)
+bool ProcessManager::wait(const UPID& pid)
{
- // We use a gate for external waiters. A gate is single use. That
- // is, a new gate is created when the first external thread shows
- // up and wants to wait for a process that currently has no
- // gate. Once that process exits, the last external thread to
- // leave the gate will also clean it up. Note that a gate will
- // never get more external threads waiting on it after it has been
- // opened, since the process should no longer be valid and
- // therefore will not have an entry in 'processes'.
+ // We use a gate for waiters. A gate is single use. That is, a new
+ // gate is created when the first thread shows up and wants to wait
+ // for a process that currently has no gate. Once that process
+ // exits, the last thread to leave the gate will also clean it
+ // up. Note that a gate will never get more threads waiting on it
+ // after it has been opened, since the process should no longer be
+ // valid and therefore will not have an entry in 'processes'.
- Gate *gate = NULL;
+ Gate* gate = NULL;
Gate::state_t old;
- /* Try and approach the gate if necessary. */
+ ProcessBase* process = NULL; // Set to non-null if we donate thread.
+
+ // Try and approach the gate if necessary.
synchronized (processes) {
if (processes.count(pid.id) > 0) {
- ProcessBase *process = processes[pid.id];
+ process = processes[pid.id];
CHECK(process->state != ProcessBase::FINISHED);
- /* Check and see if a gate already exists. */
+ // Check and see if a gate already exists.
if (gates.find(process) == gates.end()) {
gates[process] = new Gate();
}
gate = gates[process];
old = gate->approach();
+
+ // Check if it is runnable in order to donate this thread.
+ if (process->state == ProcessBase::BOTTOM ||
+ process->state == ProcessBase::READY) {
+ synchronized (runq) {
+ list<ProcessBase*>::iterator it =
+ find(runq.begin(), runq.end(), process);
+ if (it != runq.end()) {
+ runq.erase(it);
+ } else {
+ // Another thread has resumed the process ...
+ process = NULL;
+ }
+ }
+ } else {
+ // Process is not runnable, so no need to donate ...
+ process = NULL;
+ }
}
}
- /* Now arrive at the gate and wait until it opens. */
+ if (process != NULL) {
+ VLOG(1) << "Donating thread to " << process->pid << " while waiting";
+ ProcessBase* donator = __process__;
+ process_manager->resume(process);
+ __process__ = donator;
+ }
+
+ // TODO(benh): Donating only once may not be sufficient, so we might
+ // still deadlock here ... perhaps warn if that's the case?
+
+ // Now arrive at the gate and wait until it opens.
if (gate != NULL) {
gate->arrive(old);
@@ -2165,87 +2061,7 @@ bool ProcessManager::external_wait(const
}
-bool ProcessManager::poll(ProcessBase *process, int fd, int op, double secs, bool ignore)
-{
- CHECK(process != NULL);
-
- bool interrupted = false;
-
- process->lock();
- {
- /* Consider a non-empty message queue as an immediate interrupt. */
- if (!ignore && !process->messages.empty()) {
- process->unlock();
- return false;
- }
-
- // Treat an poll with a bad fd as an interruptible pause!
- if (fd >= 0) {
- /* Allocate/Initialize the watcher. */
- ev_io *watcher = new ev_io();
-
- if ((op & ProcessBase::RDWR) == ProcessBase::RDWR) {
- ev_io_init(watcher, handle_poll, fd, EV_READ | EV_WRITE);
- } else if ((op & ProcessBase::RDONLY) == ProcessBase::RDONLY) {
- ev_io_init(watcher, handle_poll, fd, EV_READ);
- } else if ((op & ProcessBase::WRONLY) == ProcessBase::WRONLY) {
- ev_io_init(watcher, handle_poll, fd, EV_WRITE);
- }
-
- // Tuple describing state (on heap in case we can't "cancel" it,
- // the watcher will always fire, even if we get interrupted and
- // return early, so this tuple will get cleaned up when the
- // watcher runs).
- watcher->data = new tuple<UPID, int>(process->pid, process->generation);
-
- /* Enqueue the watcher. */
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- /* Interrupt the loop. */
- ev_async_send(loop, &async_watcher);
- }
-
- CHECK(secs >= 0);
-
- timeout timeout;
-
- if (secs != 0) {
- timeout = create_timeout(process, secs);
- start_timeout(timeout);
- }
-
- /* Context switch. */
- process->state = ProcessBase::POLLING;
- swapcontext(&process->uctx, &proc_uctx_running);
- CHECK(process->state == ProcessBase::READY ||
- process->state == ProcessBase::TIMEDOUT ||
- process->state == ProcessBase::INTERRUPTED);
-
- /* Attempt to cancel the timer if necessary. */
- if (secs != 0) {
- if (process->state != ProcessBase::TIMEDOUT) {
- cancel_timeout(timeout);
- }
- }
-
- if (process->state == ProcessBase::INTERRUPTED) {
- interrupted = true;
- }
-
- process->state = ProcessBase::RUNNING;
-
- /* Update the generation (handles racing polled). */
- process->generation++;
- }
- process->unlock();
-
- return !interrupted;
-}
-
-
-void ProcessManager::enqueue(ProcessBase *process)
+void ProcessManager::enqueue(ProcessBase* process)
{
CHECK(process != NULL);
@@ -2259,18 +2075,18 @@ void ProcessManager::enqueue(ProcessBase
runq.push_back(process);
}
- /* Wake up the processing thread if necessary. */
+ // Wake up the processing thread if necessary.
gate->open();
}
-ProcessBase * ProcessManager::dequeue()
+ProcessBase* ProcessManager::dequeue()
{
// TODO(benh): Remove a process from this thread's runq. If there
// are no processes to run, and this is not a dedicated thread, then
// steal one from another threads runq.
- ProcessBase *process = NULL;
+ ProcessBase* process = NULL;
synchronized (runq) {
if (!runq.empty()) {
@@ -2283,340 +2099,76 @@ ProcessBase * ProcessManager::dequeue()
}
-void ProcessManager::timedout(const UPID &pid, int generation)
-{
- if (ProcessReference process = use(pid)) {
- process->lock();
- {
- // We know we timed out if the state != READY after a timeout
- // but the generation is still the same.
- if (process->state != ProcessBase::READY &&
- process->generation == generation) {
-
- // The process could be in any of the following states,
- // including RUNNING if a pause, receive, or poll was
- // initiated by an "outside" thread (e.g., in the constructor
- // of the process).
- CHECK(process->state == ProcessBase::RUNNING ||
- process->state == ProcessBase::RECEIVING ||
- process->state == ProcessBase::SERVING ||
- process->state == ProcessBase::POLLING ||
- process->state == ProcessBase::INTERRUPTED ||
- process->state == ProcessBase::PAUSED);
-
- if (process->state != ProcessBase::RUNNING ||
- process->state != ProcessBase::INTERRUPTED ||
- process->state != ProcessBase::FINISHING) {
- process_manager->enqueue(process);
- }
-
- // We always have a timeout override the state (unless we are
- // exiting). This includes overriding INTERRUPTED. This means
- // that a process that was polling when selected from the
- // runq will fall out because of a timeout even though it also
- // received a message.
- if (process->state != ProcessBase::FINISHING) {
- process->state = ProcessBase::TIMEDOUT;
- }
- }
- }
- process->unlock();
- }
-}
-
-
-void ProcessManager::polled(const UPID &pid, int generation)
-{
- if (ProcessReference process = use(pid)) {
- process->lock();
- {
- if (process->state == ProcessBase::POLLING &&
- process->generation == generation) {
- process->state = ProcessBase::READY;
- enqueue(process);
- }
- }
- process->unlock();
- }
-}
-
-
-void ProcessManager::run(ProcessBase *process)
-{
- // Each process gets locked before 'schedule' runs it to enforce
- // atomicity for the blocking routines (receive, poll, pause,
- // etc). So, we only need to unlock the process here.
- {
- process->state = ProcessBase::RUNNING;
- }
- process->unlock();
-
- try {
- VLOG(2) << "Invoking " << process->pid;
- (*process)();
- } catch (const std::exception &e) {
- std::cerr << "libprocess: " << process->pid
- << " exited due to "
- << e.what() << std::endl;
- } catch (...) {
- std::cerr << "libprocess: " << process->pid
- << " exited due to unknown exception" << std::endl;
- }
+namespace timers {
- cleanup(process);
-}
-
-
-void ProcessManager::cleanup(ProcessBase *process)
+timer create(double secs, const lambda::function<void(void)>& thunk)
{
- // Processes that were waiting on exiting process.
- list<ProcessBase *> resumable;
+ static long id = 0;
- // Possible gate non-libprocess threads are waiting at.
- Gate *gate = NULL;
+ double timeout = Clock::now() + secs;
- // Stop new process references from being created.
- process->state = ProcessBase::FINISHING;
-
- /* Remove process. */
- synchronized (processes) {
- // Remove from internal clock (if necessary).
+ if (__process__ != NULL) {
synchronized (timeouts) {
- if (clk != NULL)
- clk->discard(process);
- }
-
- // Wait for all process references to get cleaned up.
- while (process->refs > 0) {
- asm ("pause");
- __sync_synchronize();
- }
-
- process->lock();
- {
- // Free any pending messages.
- while (!process->messages.empty()) {
- Message *message = process->messages.front();
- process->messages.pop_front();
- delete message;
- }
-
- // Free any pending requests.
- while (!process->requests.empty()) {
- pair<HttpRequest*, Promise<HttpResponse>*>* request = process->requests.front();
- process->requests.pop_front();
- delete request;
- }
-
- // Free any pending dispatchers.
- while (!process->dispatchers.empty()) {
- function<void(ProcessBase*)>* dispatcher = process->dispatchers.front();
- process->dispatchers.pop_front();
- delete dispatcher;
- }
-
- // Free current message.
- if (process->current) {
- delete process->current;
- }
-
- processes.erase(process->pid.id);
-
- // Confirm that the process is not in any waiting queue.
- foreachpair (_, set<ProcessBase *> &waiting, waiters) {
- CHECK(waiting.find(process) == waiting.end());
- }
-
- // Confirm process not in runq.
- synchronized (runq) {
- CHECK(find(runq.begin(), runq.end(), process) == runq.end());
- }
-
- // Grab all the waiting processes that are now resumable.
- foreach (ProcessBase *waiter, waiters[process]) {
- resumable.push_back(waiter);
- }
-
- waiters.erase(process);
-
- // Lookup gate to wake up waiting non-libprocess threads.
- map<ProcessBase *, Gate *>::iterator it = gates.find(process);
- if (it != gates.end()) {
- gate = it->second;
- // N.B. The last thread that leaves the gate also free's it.
- gates.erase(it);
- }
-
- CHECK(process->refs == 0);
- process->state = ProcessBase::FINISHED;
- }
- process->unlock();
- }
-
- // Inform socket manager.
- socket_manager->exited(process);
-
- // N.B. After opening the gate we can no longer dereference
- // 'process' since it might already be cleaned up by user code (a
- // waiter might have cleaned up the stack where the process was
- // allocated).
- if (gate != NULL) {
- gate->open();
- }
-
- // And resume all processes waiting too.
- foreach (ProcessBase *p, resumable) {
- p->lock();
- {
- // Process 'p' might be RUNNING because it is racing to become
- // WAITING while we are actually trying to get it to become
- // running again.
- // TODO(benh): Once we actually run multiple processes at a
- // time (using multiple threads) this logic will need to get
- // made thread safe (in particular, a process may be
- // FINISHING).
- CHECK(p->state == ProcessBase::RUNNING ||
- p->state == ProcessBase::WAITING);
- if (p->state == ProcessBase::RUNNING) {
- p->state = ProcessBase::INTERRUPTED;
- } else {
- p->state = ProcessBase::READY;
- enqueue(p);
+ if (Clock::paused()) {
+ timeout = Clock::now(__process__) + secs;
}
}
- p->unlock();
}
-}
+ timer timer;
+ timer.id = id++;
+ timer.timeout = timeout;
+ timer.pid = __process__ != NULL ? __process__->self() : UPID();
+ timer.thunk = thunk;
-timeout ProcessManager::create_timeout(ProcessBase *process, double secs)
-{
- CHECK(process != NULL);
-
- ev_tstamp tstamp;
+ VLOG(2) << "Created a timer for "
+ << std::fixed << std::setprecision(9) << timeout;
+ // Add the timer.
synchronized (timeouts) {
- if (clk != NULL) {
- tstamp = clk->getCurrent(process) + secs;
- } else {
- // TODO(benh): Unclear if want ev_now(...) or ev_time().
- tstamp = ev_time() + secs;
- }
- }
-
- timeout timeout;
- timeout.tstamp = tstamp;
- timeout.pid = process->pid;
- timeout.generation = process->generation;
-
- return timeout;
-}
-
-
-void ProcessManager::start_timeout(const timeout &timeout)
-{
- /* Add the timer. */
- synchronized (timeouts) {
- if (timeouts->size() == 0 || timeout.tstamp < timeouts->begin()->first) {
+ if (timeouts->size() == 0 || timer.timeout < timeouts->begin()->first) {
// Need to interrupt the loop to update/set timer repeat.
- (*timeouts)[timeout.tstamp].push_back(timeout);
+ (*timeouts)[timer.timeout].push_back(timer);
update_timer = true;
ev_async_send(loop, &async_watcher);
} else {
// Timer repeat is adequate, just add the timeout.
CHECK(timeouts->size() >= 1);
- (*timeouts)[timeout.tstamp].push_back(timeout);
+ (*timeouts)[timer.timeout].push_back(timer);
}
}
+
+ return timer;
}
-void ProcessManager::cancel_timeout(const timeout &timeout)
+void cancel(const timer& timer)
{
synchronized (timeouts) {
// Check if the timeout is still pending, and if so, erase
// it. In addition, erase an empty list if we just removed the
// last timeout.
- if (timeouts->count(timeout.tstamp) > 0) {
- (*timeouts)[timeout.tstamp].remove(timeout);
- if ((*timeouts)[timeout.tstamp].empty())
- timeouts->erase(timeout.tstamp);
- }
- }
-}
-
-
-double Clock::now()
-{
- synchronized (timeouts) {
- if (clk != NULL) {
- return clk->getElapsed();
- } else {
- return ev_time();
- }
- }
-}
-
-
-void Clock::pause()
-{
- initialize();
-
- synchronized (timeouts) {
- // For now, only one global clock (rather than clock per
- // process). This Means that we have to take special care to
- // ensure happens-before timing (currently done for local message
- // sends and spawning new processes, not currently done for
- // EXITED messages).
- if (clk == NULL) {
- clk = new InternalClock();
-
- // The existing libev timer might actually timeout, but now that
- // clk != NULL, no "time" will actually have passed, so no
- // timeouts will actually occur.
- }
- }
-}
-
-
-void Clock::resume()
-{
- initialize();
-
- synchronized (timeouts) {
- if (clk != NULL) {
- delete clk;
- clk = NULL;
+ if (timeouts->count(timer.timeout) > 0) {
+ (*timeouts)[timer.timeout].remove(timer);
+ if ((*timeouts)[timer.timeout].empty()) {
+ timeouts->erase(timer.timeout);
+ }
}
-
- update_timer = true;
- ev_async_send(loop, &async_watcher);
}
}
-
-void Clock::advance(double secs)
-{
- synchronized (timeouts) {
- if (clk != NULL) {
- clk->setElapsed(clk->getElapsed() + secs);
-
- // Might need to wakeup the processing thread.
- gate->open();
- }
- }
-}
+} // namespace timeouts {
ProcessBase::ProcessBase(const std::string& _id)
{
- initialize();
+ process::initialize();
+
+ state = ProcessBase::BOTTOM;
pthread_mutex_init(&m, NULL);
refs = 0;
- current = NULL;
- generation = 0;
// Generate string representation of unique id for process.
if (_id != "") {
@@ -2633,12 +2185,10 @@ ProcessBase::ProcessBase(const std::stri
// If using a manual clock, try and set current time of process
// using happens before relationship between creator and createe!
synchronized (timeouts) {
- if (clk != NULL) {
- if (pthread_self() == proc_thread) {
- CHECK(proc_process != NULL);
- clk->setCurrent(this, clk->getCurrent(proc_process));
- } else {
- clk->setCurrent(this, clk->getCurrent());
+ if (Clock::paused()) {
+ clock::currents->erase(this); // In case the address is reused!
+ if (__process__ != NULL) {
+ Clock::order(__process__, this);
}
}
}
@@ -2648,189 +2198,76 @@ ProcessBase::ProcessBase(const std::stri
ProcessBase::~ProcessBase() {}
-void ProcessBase::enqueue(Message* message, bool inject)
+void ProcessBase::enqueue(Event* event, bool inject)
{
- CHECK(message != NULL);
+ CHECK(event != NULL);
// TODO(benh): Put filter inside lock statement below so that we can
// guarantee the order of the messages seen by a filter are the same
- // as the order of messages seen by the process.
+ // as the order of messages seen by the process. Right now two
+ // different threads might execute the filter code and then enqueue
+ // the messages in non-deterministic orderings (i.e., there are two
+ // "atomic" blocks, the filter code here and the enqueue code
+ // below).
synchronized (filterer) {
if (filterer != NULL) {
- if (filterer->filter(message)) {
- delete message;
+ bool filter = false;
+ struct FilterVisitor : EventVisitor
+ {
+ FilterVisitor(bool* _filter) : filter(_filter) {}
+
+ virtual void visit(const MessageEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ virtual void visit(const DispatchEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ virtual void visit(const HttpEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ virtual void visit(const ExitedEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ bool* filter;
+ } visitor(&filter);
+
+ event->visit(&visitor);
+
+ if (filter) {
+ delete event;
return;
}
}
}
- UPID delegate;
-
lock();
{
- CHECK(state != FINISHED);
-
- // Check and see if we should delegate this message.
- if (delegates.count(message->name) > 0) {
- delegate = delegates[message->name];
- } else {
+ if (state != FINISHED) {
if (!inject) {
- messages.push_back(message);
+ events.push_back(event);
} else {
- messages.push_front(message);
+ events.push_front(event);
}
- if (state == RECEIVING || state == SERVING) {
+ if (state == BLOCKED) {
state = READY;
process_manager->enqueue(this);
- } else if (state == POLLING) {
- state = INTERRUPTED;
- process_manager->enqueue(this);
}
- CHECK(state == INIT ||
+ CHECK(state == BOTTOM ||
state == READY ||
- state == RUNNING ||
- state == PAUSED ||
- state == WAITING ||
- state == INTERRUPTED ||
- state == TIMEDOUT ||
- state == FINISHING);
- }
- }
- unlock();
-
- // Delegate this message if necessary.
- if (delegate != UPID()) {
- VLOG(1) << "Delegating message '" << message->name << "' to " << delegate;
- message->to = delegate;
- transport(message, this);
- }
-}
-
-
-void ProcessBase::enqueue(pair<HttpRequest*, Promise<HttpResponse>*>* request)
-{
- CHECK(request != NULL);
-
- // TODO(benh): Support filtering HTTP requests.
-
- lock();
- {
- CHECK(state != FINISHED);
-
- requests.push_back(request);
-
- if (state == SERVING) {
- state = READY;
- process_manager->enqueue(this);
- } else if (state == POLLING) {
- state = INTERRUPTED;
- process_manager->enqueue(this);
- }
-
- CHECK(state == INIT ||
- state == READY ||
- state == RUNNING ||
- state == RECEIVING ||
- state == PAUSED ||
- state == WAITING ||
- state == INTERRUPTED ||
- state == TIMEDOUT ||
- state == FINISHING);
- }
- unlock();
-}
-
-
-void ProcessBase::enqueue(function<void(ProcessBase*)>* dispatcher)
-{
- CHECK(dispatcher != NULL);
-
- // TODO(benh): Support filtering dispatches.
-
- lock();
- {
- CHECK(state != FINISHED);
-
- dispatchers.push_back(dispatcher);
-
- if (state == SERVING) {
- state = READY;
- process_manager->enqueue(this);
- } else if (state == POLLING) {
- state = INTERRUPTED;
- process_manager->enqueue(this);
- }
-
- CHECK(state == INIT ||
- state == READY ||
- state == RUNNING ||
- state == RECEIVING ||
- state == PAUSED ||
- state == WAITING ||
- state == INTERRUPTED ||
- state == TIMEDOUT ||
- state == FINISHING);
- }
- unlock();
-}
-
-
-template <>
-Message * ProcessBase::dequeue()
-{
- Message *message = NULL;
-
- lock();
- {
- CHECK(state == RUNNING);
- if (!messages.empty()) {
- message = messages.front();
- messages.pop_front();
+ state == RUNNING);
}
}
unlock();
-
- return message;
-}
-
-
-template <>
-pair<HttpRequest*, Promise<HttpResponse>*>* ProcessBase::dequeue()
-{
- pair<HttpRequest*, Promise<HttpResponse>*>* request = NULL;
-
- lock();
- {
- CHECK(state == RUNNING);
- if (!requests.empty()) {
- request = requests.front();
- requests.pop_front();
- }
- }
- unlock();
-
- return request;
-}
-
-
-template <>
-function<void(ProcessBase*)> * ProcessBase::dequeue()
-{
- function<void(ProcessBase*)> *dispatcher = NULL;
-
- lock();
- {
- CHECK(state == RUNNING);
- if (!dispatchers.empty()) {
- dispatcher = dispatchers.front();
- dispatchers.pop_front();
- }
- }
- unlock();
-
- return dispatcher;
}
@@ -2839,7 +2276,9 @@ void ProcessBase::inject(const UPID& fro
if (!from)
return;
- enqueue(encode(from, pid, name, string(data, length)), true);
+ Message* message = encode(from, pid, name, string(data, length));
+
+ enqueue(new MessageEvent(message), true);
}
@@ -2854,212 +2293,75 @@ void ProcessBase::send(const UPID& to, c
}
-string ProcessBase::receive(double secs)
+void ProcessBase::visit(const MessageEvent& event)
{
- // Free current message.
- if (current != NULL) {
- delete current;
- current = NULL;
- }
-
- // Check if there is a message.
- check:
- if ((current = dequeue<Message>()) != NULL) {
- return name();
- }
-
- if (pthread_self() == proc_thread) {
- // Avoid blocking if negative seconds.
- if (secs >= 0) {
- if (!process_manager->receive(this, secs)) {
- goto timeout;
- } else {
- lock();
- {
- CHECK(!messages.empty());
- }
- unlock();
- goto check;
- }
- } else {
- goto timeout;
- }
- } else {
- // TODO(benh): Handle calling receive from an outside thread.
- fatal("unimplemented");
[... 465 lines stripped ...]