You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC

svn commit: r1236485 [6/7] - in /incubator/mesos/trunk: ./ include/mesos/ src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/ src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/ third_party/libprocess/include/pr...

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Fri Jan 27 01:25:13 2012
@@ -29,6 +29,7 @@
 #include <algorithm>
 #include <deque>
 #include <fstream>
+#include <iomanip>
 #include <iostream>
 #include <list>
 #include <map>
@@ -39,23 +40,24 @@
 #include <stdexcept>
 #include <vector>
 
+#include <process/clock.hpp>
+#include <process/deferred.hpp>
 #include <process/dispatch.hpp>
+#include <process/executor.hpp>
+#include <process/filter.hpp>
+#include <process/future.hpp>
 #include <process/gc.hpp>
 #include <process/process.hpp>
-
-#include <boost/tuple/tuple.hpp>
+#include <process/timer.hpp>
 
 #include "config.hpp"
 #include "decoder.hpp"
 #include "encoder.hpp"
-#include "fatal.hpp"
 #include "foreach.hpp"
 #include "gate.hpp"
 #include "synchronized.hpp"
-#include "tokenize.hpp"
-
+#include "thread.hpp"
 
-using boost::tuple;
 
 using std::deque;
 using std::find;
@@ -71,27 +73,26 @@ using std::string;
 using std::stringstream;
 using std::vector;
 
+namespace lambda {
+
+using std::tr1::bind;
 using std::tr1::function;
+using namespace std::tr1::placeholders;
+
+} // namespace lambda {
 
 
 #define Byte (1)
 #define Kilobyte (1024*Byte)
 #define Megabyte (1024*Kilobyte)
 #define Gigabyte (1024*Megabyte)
-#define PROCESS_STACK_SIZE (64*Kilobyte)
 
 
-#define malloc(bytes)                                               \
-  ({ void *tmp;                                                     \
-     if ((tmp = malloc(bytes)) == NULL)                             \
-       fatalerror("malloc"); tmp;                                   \
-   })
-
-#define realloc(address, bytes)                                     \
-  ({ void *tmp;                                                     \
-     if ((tmp = realloc(address, bytes)) == NULL)                   \
-       fatalerror("realloc"); tmp;                                  \
-   })
+template <int i>
+std::ostream& fixedprecision(std::ostream& os)
+{
+  return os << std::fixed << std::setprecision(i);
+}
 
 
 struct Node
@@ -120,41 +121,15 @@ ostream& operator << (ostream& stream, c
 }
 
 
-/*
- * Timeout support! Note that we don't store a pointer to the process
- * because we can't dereference it because it might no longer be
- * valid. But we can check if the process is valid using the PID and
- * then use referencing counting to keep the process valid.
-*/
-struct timeout
-{
-  ev_tstamp tstamp;
-  process::UPID pid;
-  int generation;
-};
-
-
-bool operator == (const timeout &left, const timeout &right)
-{
-  return left.tstamp == right.tstamp &&
-    left.pid == right.pid &&
-    left.generation == right.generation;
-}
-
-
 namespace process {
 
 class ProcessReference
 {
 public:
-  explicit ProcessReference(ProcessBase *_process) : process(_process)
+  explicit ProcessReference(ProcessBase* _process) : process(_process)
   {
     if (process != NULL) {
       __sync_fetch_and_add(&(process->refs), 1);
-      if (process->state == ProcessBase::FINISHING) {
-        __sync_fetch_and_sub(&(process->refs), 1);
-        process = NULL;
-      }
     }
   }
 
@@ -164,7 +139,7 @@ public:
       __sync_fetch_and_sub(&(process->refs), 1);
   }
 
-  ProcessReference(const ProcessReference &that)
+  ProcessReference(const ProcessReference& that)
   {
     process = that.process;
 
@@ -177,12 +152,12 @@ public:
     }
   }
 
-  ProcessBase * operator -> ()
+  ProcessBase* operator -> ()
   {
     return process;
   }
 
-  operator ProcessBase * ()
+  operator ProcessBase* ()
   {
     return process;
   }
@@ -193,88 +168,31 @@ public:
   }
 
 private:
-  ProcessReference & operator = (const ProcessReference &that);
-
-  ProcessBase *process;
-};
-
-
-/* Tick, tock ... manually controlled clock! */
-class InternalClock
-{
-public:
-  InternalClock()
-  {
-    initial = current = elapsed = ev_time();
-  }
-
-  ~InternalClock() {}
-
-  ev_tstamp getCurrent(ProcessBase *process)
-  {
-    ev_tstamp tstamp;
-
-    if (currents.count(process) != 0) {
-      tstamp = currents[process];
-    } else {
-      tstamp = currents[process] = initial;
-    }
-
-    return tstamp;
-  }
-
-  void setCurrent(ProcessBase *process, ev_tstamp tstamp)
-  {
-    currents[process] = tstamp;
-  }
-
-  ev_tstamp getCurrent()
-  {
-    return current;
-  }
-
-  void setCurrent(ev_tstamp tstamp)
-  {
-    current = tstamp;
-  }
-
-  ev_tstamp getElapsed()
-  {
-    return elapsed;
-  }
-
-  void setElapsed(ev_tstamp tstamp)
-  {
-    elapsed = tstamp;
-  }
-
-  void discard(ProcessBase *process)
-  {
-    CHECK(process != NULL);
-    currents.erase(process);
-  }
+  ProcessReference& operator = (const ProcessReference& that);
 
-private:
-  map<ProcessBase *, ev_tstamp> currents;
-  ev_tstamp initial;
-  ev_tstamp current;
-  ev_tstamp elapsed;
+  ProcessBase* process;
 };
 
 
 class HttpProxy;
 
 
-class HttpResponseWaiter : public Process<HttpResponseWaiter>
+class HttpResponseWaiter
 {
 public:
-  HttpResponseWaiter(const PID<HttpProxy>& _proxy);
-  virtual ~HttpResponseWaiter();
+  HttpResponseWaiter(const PID<HttpProxy>& proxy,
+                     Future<HttpResponse>* future,
+                     bool persist);
 
-  void await(const Future<HttpResponse>& future, bool persist);
+  void waited(const Future<HttpResponse>&);
+  void timeout();
 
 private:
   const PID<HttpProxy> proxy;
+  Future<HttpResponse>* future;
+  bool persist;
+
+  Executor executor;
 };
 
 
@@ -282,15 +200,14 @@ class HttpProxy : public Process<HttpPro
 {
 public:
   HttpProxy(int _c);
-  virtual ~HttpProxy();
 
-  void handle(const Future<HttpResponse>& future, bool persist);
-  void ready(const Future<HttpResponse>& future, bool persist);
-  void unavailable(bool persist);
+  void handle(Future<HttpResponse>* future, bool persist);
+  void ready(Future<HttpResponse>* future, bool persist);
+  void unavailable(Future<HttpResponse>* future, bool persist);
 
 private:
   int c;
-  HttpResponseWaiter* waiter;
+  map<Future<HttpResponse>*, HttpResponseWaiter*> waiters;
 };
 
 
@@ -315,26 +232,26 @@ public:
   void exited(ProcessBase* process);
 
 private:
-  /* Map from UPID (local/remote) to process. */
+  // Map from UPID (local/remote) to process.
   map<UPID, set<ProcessBase*> > links;
 
-  /* Map from socket to node (ip, port). */
+  // Map from socket to node (ip, port).
   map<int, Node> sockets;
 
-  /* Maps from node (ip, port) to socket. */
+  // Maps from node (ip, port) to socket.
   map<Node, int> temps;
   map<Node, int> persists;
 
-  /* Set of sockets that should be closed. */
+  // Set of sockets that should be closed.
   set<int> disposables;
 
-  /* Map from socket to outgoing queue. */
+  // Map from socket to outgoing queue.
   map<int, queue<DataEncoder*> > outgoing;
 
-  /* HTTP proxies. */
+  // HTTP proxies.
   map<int, HttpProxy*> proxies;
 
-  /* Protects instance variables. */
+  // Protects instance variables.
   synchronizable(this);
 };
 
@@ -345,150 +262,261 @@ public:
   ProcessManager();
   ~ProcessManager();
 
-  ProcessReference use(const UPID &pid);
+  ProcessReference use(const UPID& pid);
 
-  bool deliver(Message* message, ProcessBase *sender = NULL);
-  bool deliver(int c, HttpRequest* request, ProcessBase *sender = NULL);
-  bool deliver(const UPID& to, function<void(ProcessBase*)>* dispatcher, ProcessBase *sender = NULL);
-
-  UPID spawn(ProcessBase *process, bool manage);
-  void link(ProcessBase *process, const UPID &to);
-  bool receive(ProcessBase *process, double secs);
-  bool serve(ProcessBase *process, double secs);
-  void pause(ProcessBase *process, double secs);
-  void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
-  bool wait(ProcessBase *process, const UPID &pid);
-  bool external_wait(const UPID &pid);
-  bool poll(ProcessBase *process, int fd, int op, double secs, bool ignore);
+  bool deliver(Message* message, ProcessBase* sender = NULL);
 
-  void enqueue(ProcessBase *process);
-  ProcessBase * dequeue();
+  bool deliver(int c, HttpRequest* request, ProcessBase* sender = NULL);
 
-  void timedout(const UPID &pid, int generation);
-  void polled(const UPID &pid, int generation);
+  bool deliver(const UPID& to,
+               lambda::function<void(ProcessBase*)>* f,
+               ProcessBase* sender = NULL);
 
-  void run(ProcessBase *process);
-  void cleanup(ProcessBase *process);
+  UPID spawn(ProcessBase* process, bool manage);
+  void resume(ProcessBase* process);
+  void cleanup(ProcessBase* process);
+  void link(ProcessBase* process, const UPID& to);
+  void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
+  bool wait(const UPID& pid);
 
-private:
-  timeout create_timeout(ProcessBase *process, double secs);
-  void start_timeout(const timeout &timeout);
-  void cancel_timeout(const timeout &timeout);
+  void enqueue(ProcessBase* process);
+  ProcessBase* dequeue();
 
-  /* Map of all local spawned and running processes. */
-  map<string, ProcessBase *> processes;
+private:
+  // Map of all local spawned and running processes.
+  map<string, ProcessBase*> processes;
   synchronizable(processes);
 
-  /* Waiting processes (protected by synchronizable(processes)). */
-  map<ProcessBase *, set<ProcessBase *> > waiters;
+  // Gates for waiting threads (protected by synchronizable(processes)).
+  map<ProcessBase*, Gate*> gates;
 
-  /* Gates for waiting threads (protected by synchronizable(processes)). */
-  map<ProcessBase *, Gate *> gates;
-
-  /* Queue of runnable processes (implemented as deque). */
-  deque<ProcessBase *> runq;
+  // Queue of runnable processes (implemented using list).
+  list<ProcessBase*> runq;
   synchronizable(runq);
 };
 
 
-/* Using manual clock if non-null. */
-static InternalClock *clk = NULL;
-
-/* Unique id that can be assigned to each process. */
+// Unique id that can be assigned to each process.
 static uint32_t id = 0;
 
-/* Local server socket. */
+// Local server socket.
 static int s = -1;
 
-/* Local IP address. */
+// Local IP address.
 static uint32_t ip = 0;
 
-/* Local port. */
+// Local port.
 static uint16_t port = 0;
 
-/* Active SocketManager (eventually will probably be thread-local). */
-static SocketManager *socket_manager = NULL;
+// Active SocketManager (eventually will probably be thread-local).
+static SocketManager* socket_manager = NULL;
 
-/* Active ProcessManager (eventually will probably be thread-local). */
-static ProcessManager *process_manager = NULL;
+// Active ProcessManager (eventually will probably be thread-local).
+static ProcessManager* process_manager = NULL;
 
-/* Event loop. */
-static struct ev_loop *loop = NULL;
+// Event loop.
+static struct ev_loop* loop = NULL;
 
-/* Asynchronous watcher for interrupting loop. */
+// Asynchronous watcher for interrupting loop.
 static ev_async async_watcher;
 
-/* Timeouts watcher for process timeouts. */
+// Watcher for timeouts.
 static ev_timer timeouts_watcher;
 
-/* Server watcher for accepting connections. */
+// Server watcher for accepting connections.
 static ev_io server_watcher;
 
-/* Queue of I/O watchers. */
-static queue<ev_io *> *watchers = new queue<ev_io *>();
+// Queue of I/O watchers.
+static queue<ev_io*>* watchers = new queue<ev_io*>();
 static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
 
-/**
- * We store the timeouts in a map of lists indexed by the time stamp
- * of the timeout so that we can have two timeouts that have the same
- * time stamp. Note however, that we should never have two identical
- * timeouts because a process should only ever have one outstanding
- * timeout at a time. Also, we exploit that the map is SORTED!
- */
-static map<ev_tstamp, list<timeout> > *timeouts =
-  new map<ev_tstamp, list<timeout> >();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER;
+// We store the timers in a map of lists indexed by the timeout of the
+// timer so that we can have two timers that have the same timeout. We
+// exploit that the map is SORTED!
+static map<double, list<timer> >* timeouts =
+  new map<double, list<timer> >();
+static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
-/* Flag to indicate whether or to update the timer on async interrupt. */
+// Flag to indicate whether or to update the timer on async interrupt.
 static bool update_timer = false;
 
-/* I/O thread. */
-static pthread_t io_thread;
+const int NUMBER_OF_PROCESSING_THREADS = 4; // TODO(benh): Do 2x cores.
+
+
+// Thread local process pointer magic (constructed in
+// 'initialize'). We need the extra level of indirection from
+// _process_ to __process__ so that we can take advantage of the
+// operators without needing the extra dereference.
+static ThreadLocal<ProcessBase>* _process_ = NULL;
 
-/* Processing thread. */
-static pthread_t proc_thread;
+#define __process__ (*_process_)
 
-/* Scheduling context for processing thread. */
-static ucontext_t proc_uctx_schedule;
 
-/* Running context for processing thread. */
-static ucontext_t proc_uctx_running;
-
-/* Current process of processing thread. */
-//static __thread ProcessBase *proc_process = NULL;
-static ProcessBase *proc_process = NULL;
-
-/* Scheduler gate. */
-static Gate *gate = new Gate();
-
-/* Stack of recycled stacks. */
-static stack<void *> *stacks = new stack<void *>();
-static synchronizable(stacks) = SYNCHRONIZED_INITIALIZER;
-
-/* Last exited process's stack to be recycled (global variable hack!). */
-static void *recyclable = NULL;
-
-/**
- * Filter. Synchronized support for using the filterer needs to be
- * recursive incase a filterer wants to do anything fancy (which is
- * possible and likely given that filters will get used for testing).
-*/
-static Filter *filterer = NULL;
+// Scheduler gate.
+static Gate* gate = new Gate();
+
+// Filter. Synchronized support for using the filterer needs to be
+// recursive incase a filterer wants to do anything fancy (which is
+// possible and likely given that filters will get used for testing).
+static Filter* filterer = NULL;
 static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
-/* Global garbage collector. */
+// Global garbage collector.
 PID<GarbageCollector> gc;
 
-/* Thunks to be invoked via process::invoke. */
-static queue<function<void(void)>*>* thunks =
-  new queue<function<void(void)>*>();
+// Thunks to be invoked via process::invoke.
+static queue<lambda::function<void(void)>*>* thunks =
+  new queue<lambda::function<void(void)>*>();
 static synchronizable(thunks) = SYNCHRONIZED_INITIALIZER;
 
-/* Thread to invoke thunks (see above). */
+// Thread to invoke thunks (see above).
 static Gate* invoke_gate = new Gate();
 static pthread_t invoke_thread;
 
 
+// We namespace the clock related variables to keep them well
+// named. In the future we'll probably want to associate a clock with
+// a specific ProcessManager/SocketManager instance pair, so this will
+// likely change.
+namespace clock {
+
+map<ProcessBase*, double>* currents = new map<ProcessBase*, double>();
+
+double initial = 0;
+double current = 0;
+
+bool paused = false;
+
+} // namespace clock {
+
+
+double Clock::now()
+{
+  return now(__process__);
+}
+
+
+double Clock::now(ProcessBase* process)
+{
+  synchronized (timeouts) {
+    if (Clock::paused()) {
+      if (process != NULL) {
+        if (clock::currents->count(process) != 0) {
+          return (*clock::currents)[process];
+        } else {
+          return (*clock::currents)[process] = clock::initial;
+
+        }
+      } else {
+        return clock::current;
+      }
+    }
+  }
+    
+  return ev_time(); // TODO(benh): Versus ev_now()?
+}
+
+
+void Clock::pause()
+{
+  process::initialize(); // For the libev watchers to be setup.
+
+  synchronized (timeouts) {
+    if (!clock::paused) {
+      clock::initial = clock::current = now();
+      clock::paused = true;
+      VLOG(1) << "Clock paused at "
+              << std::fixed << std::setprecision(9) << clock::initial;
+    }
+  }
+
+  // Note that after pausing the clock an existing libev timer might
+  // still fire (invoking handle_timeout), but since paused == true no
+  // "time" will actually have passed, so no timer will actually fire.
+}
+
+
+bool Clock::paused()
+{
+  return clock::paused;
+}
+
+
+void Clock::resume()
+{
+  process::initialize(); // For the libev watchers to be setup.
+
+  synchronized (timeouts) {
+    if (clock::paused) {
+      VLOG(1) << "Clock resumed at "
+              << std::fixed << std::setprecision(9) << clock::current;
+      clock::paused = false;
+      clock::currents->clear();
+      update_timer = true;
+      ev_async_send(loop, &async_watcher);
+    }
+  }
+}
+
+
+void Clock::advance(double secs)
+{
+  synchronized (timeouts) {
+    if (clock::paused) {
+      clock::current += secs;
+      VLOG(1) << "Clock advanced ("
+              << std::fixed << std::setprecision(9) << secs
+              << " seconds) to " << clock::current;
+      if (!update_timer) {
+        update_timer = true;
+        ev_async_send(loop, &async_watcher);
+      }
+    }
+  }
+}
+
+
+void Clock::update(double secs)
+{
+  VLOG(2) << "Attempting to update clock to "
+          << std::fixed << std::setprecision(9) << secs;
+  synchronized (timeouts) {
+    if (clock::paused) {
+      if (clock::current < secs) {
+        clock::current = secs;
+        VLOG(1) << "Clock updated to "
+                << std::fixed << std::setprecision(9) << clock::current;
+        if (!update_timer) {
+          update_timer = true;
+          ev_async_send(loop, &async_watcher);
+        }
+      }
+    }
+  }
+}
+
+
+void Clock::update(ProcessBase* process, double secs)
+{
+  synchronized (timeouts) {
+    if (clock::paused) {
+      double current = now(process);
+      if (current < secs) {
+        VLOG(2) << "Clock of " << process->self() << " updated to "
+                << std::fixed << std::setprecision(9) << secs;
+        (*clock::currents)[process] = secs;
+      }
+    }
+  }
+}
+
+
+void Clock::order(ProcessBase* from, ProcessBase* to)
+{
+  update(to, now(from));
+}
+
+
 int set_nbio(int fd)
 {
   int flags;
@@ -566,12 +594,12 @@ Message* parse(HttpRequest* request)
 }
 
 
-void handle_async(struct ev_loop *loop, ev_async *_, int revents)
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 {
   synchronized (watchers) {
-    /* Start all the new I/O watchers. */
+    // Start all the new I/O watchers.
     while (!watchers->empty()) {
-      ev_io *watcher = watchers->front();
+      ev_io* watcher = watchers->front();
       watchers->pop();
       ev_io_start(loop, watcher);
     }
@@ -580,28 +608,22 @@ void handle_async(struct ev_loop *loop, 
   synchronized (timeouts) {
     if (update_timer) {
       if (!timeouts->empty()) {
-	// Determine the current time.
-	ev_tstamp current_tstamp;
-	if (clk != NULL) {
-	  current_tstamp = clk->getCurrent();
-	} else {
-	  // TODO(benh): Unclear if want ev_now(...) or ev_time().
-	  current_tstamp = ev_time();
-	}
+	// Determine when the next timer should fire.
+	timeouts_watcher.repeat = timeouts->begin()->first - Clock::now();
 
-	timeouts_watcher.repeat = timeouts->begin()->first - current_tstamp;
-
-	// Check when the timer event should fire.
         if (timeouts_watcher.repeat <= 0) {
 	  // Feed the event now!
 	  timeouts_watcher.repeat = 0;
 	  ev_timer_again(loop, &timeouts_watcher);
           ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
         } else {
-	  // Only repeat the timer if not using a manual clock (a call
-	  // to Clock::advance() will force a timer event later).
-	  if (clk != NULL && timeouts_watcher.repeat > 0)
-	    timeouts_watcher.repeat = 0;
+ 	  // Don't fire the timer if the clock is paused since we
+ 	  // don't want time to advance (instead a call to
+ 	  // clock::advance() will handle the timer).
+ 	  if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ 	    timeouts_watcher.repeat = 0;
+          }
+
 	  ev_timer_again(loop, &timeouts_watcher);
 	}
       }
@@ -612,73 +634,83 @@ void handle_async(struct ev_loop *loop, 
 }
 
 
-void handle_timeout(struct ev_loop *loop, ev_timer *watcher, int revents)
+void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 {
-  list<timeout> timedout;
+  list<timer> timedout;
 
   synchronized (timeouts) {
-    ev_tstamp current_tstamp;
+    double now = Clock::now();
 
-    if (clk != NULL) {
-      current_tstamp = clk->getCurrent();
-    } else {
-      // TODO(benh): Unclear if want ev_now(...) or ev_time().
-      current_tstamp = ev_time();
-    }
+    VLOG(1) << "Handling timeouts up to "
+            << std::fixed << std::setprecision(9) << now;
 
-    foreachpair (ev_tstamp tstamp, const list<timeout> &timedouts, *timeouts) {
-      if (tstamp > current_tstamp)
+    foreachkey (double timeout, *timeouts) {
+      if (timeout > now) {
         break;
+      }
 
-      foreach (const timeout &timeout, timedouts) {
-        if (clk != NULL) {
-          // Update current time of process (if it's still
-          // valid). Note that current time may be greater than the
-          // timeout if a local message was received (and
-          // happens-before kicks in), hence we use max.
-          if (ProcessReference process = process_manager->use(timeout.pid)) {
-            clk->setCurrent(process, max(clk->getCurrent(process),
-                                         timeout.tstamp));
-          }
-        }
-        // TODO(benh): Ensure deterministic order for testing?
-        timedout.push_back(timeout);
+      VLOG(2) << "Have timeout(s) at "
+              << std::fixed << std::setprecision(9) << timeout;
+
+      foreach (const timer& timer, (*timeouts)[timeout]) {
+        timedout.push_back(timer);
       }
     }
 
-    // Now erase the range of time stamps that timed out.
-    timeouts->erase(timeouts->begin(), timeouts->upper_bound(current_tstamp));
+    // Now erase the range of timeouts that timed out.
+    timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
 
-    // Okay, so the time stamp for the next timeout should not have fired.
-    CHECK(timeouts->empty() || (timeouts->begin()->first > current_tstamp));
+    // Okay, so the timeout for the next timer should not have fired.
+    CHECK(timeouts->empty() || (timeouts->begin()->first > now));
 
     // Update the timer as necessary.
-    // TODO(benh): Make this code look like the code in handle_async.
-    if (!timeouts->empty() && clk == NULL) {
-      timeouts_watcher.repeat = timeouts->begin()->first - current_tstamp;
-      CHECK(timeouts_watcher.repeat > 0);
-      ev_timer_again(loop, &timeouts_watcher);
-    } else {
-      timeouts_watcher.repeat = 0;
-      ev_timer_again(loop, &timeouts_watcher);
+    if (!timeouts->empty()) {
+      // Determine when the next timer should fire.
+      timeouts_watcher.repeat = timeouts->begin()->first - Clock::now();
+
+      if (timeouts_watcher.repeat <= 0) {
+        // Feed the event now!
+        timeouts_watcher.repeat = 0;
+        ev_timer_again(loop, &timeouts_watcher);
+        ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+      } else {
+        // Don't fire the timer if the clock is paused since we don't
+        // want time to advance (instead a call to Clock::advance()
+        // will handle the timer).
+        if (Clock::paused() && timeouts_watcher.repeat > 0) {
+          timeouts_watcher.repeat = 0;
+        }
+
+        ev_timer_again(loop, &timeouts_watcher);
+      }
     }
 
-    update_timer = false;
+    update_timer = false; // Since we might have a queued update_timer.
   }
 
-  foreach (const timeout &timeout, timedout) {
-    process_manager->timedout(timeout.pid, timeout.generation);
+  // Update current time of process (if it's present/valid). It might
+  // be necessary to actually add some more synchronization around
+  // this so that, for example, pausing and resuming the clock doesn't
+  // cause some processes to get thier current times updated and
+  // others not. Since ProcessManager::use acquires the 'processes'
+  // lock we had to move this out of the synchronized (timeouts) above
+  // since there was a deadlock with acquring 'processes' then
+  // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
+  // current time may be greater than the timeout if a local message
+  // was received (and happens-before kicks in).
+  if (Clock::paused()) {
+    foreach (const timer& timer, timedout) {
+      if (ProcessReference process = process_manager->use(timer.pid)) {
+        Clock::update(process, timer.timeout);
+      }
+    }
   }
-}
-
 
-void handle_poll(struct ev_loop *loop, ev_io *watcher, int revents)
-{
-  tuple<UPID, int> *t = (tuple<UPID, int> *) watcher->data;
-  process_manager->polled(t->get<0>(), t->get<1>());
-  ev_io_stop(loop, watcher);
-  delete watcher;
-  delete t;
+  // Execute the thunks of the timeouts that timed out (TODO(benh): Do
+  // this async so that we don't tie up the event thread!).
+  foreach (const timer& timer, timedout) {
+    timer.thunk();
+  }
 }
 
 
@@ -885,177 +917,43 @@ void accept(struct ev_loop *loop, ev_io 
 }
 
 
-void * serve(void *arg)
-{
-  ev_loop(((struct ev_loop *) arg), 0);
-
-  return NULL;
-}
-
-
-void* invoker(void* arg)
+void* serve(void* arg)
 {
-  do {
-    Gate::state_t old = invoke_gate->approach();
-
-    function<void(void)>* thunk = NULL;
-    synchronized (thunks) {
-      if (!thunks->empty()) {
-        thunk = thunks->front();
-        thunks->pop();
-      }
-    }
-
-    if (thunk != NULL) {
-      (*thunk)();
-      continue;
-    }
-
-    invoke_gate->arrive(old);
-  } while (true);
+  ev_loop(((struct ev_loop*) arg), 0);
 
   return NULL;
 }
 
 
-void trampoline(int stack0, int stack1, int process0, int process1)
-{
-  /* Unpackage the arguments. */
-#ifdef __x86_64__
-  CHECK (sizeof(unsigned long) == sizeof(ProcessBase *));
-  void *stack = (void *)
-    (((unsigned long) stack1 << 32) + (unsigned int) stack0);
-  ProcessBase *process = (ProcessBase *)
-    (((unsigned long) process1 << 32) + (unsigned int) process0);
-#else
-  CHECK (sizeof(unsigned int) == sizeof(ProcessBase *));
-  void *stack = (void *) (unsigned int) stack0;
-  ProcessBase *process = (ProcessBase *) (unsigned int) process0;
-#endif /* __x86_64__ */
-
-  /* Run the process. */
-  process_manager->run(process);
-
-  /* Prepare to recycle this stack (global variable hack!). */
-  CHECK(recyclable == NULL);
-  recyclable = stack;
-
-  proc_process = NULL;
-  setcontext(&proc_uctx_schedule);
-}
-
-
-void * schedule(void *arg)
+void* schedule(void* arg)
 {
-  // Context for the entry into the schedule routine, used when a
-  // process exits, so that other processes can get scheduled!
-  if (getcontext(&proc_uctx_schedule) < 0)
-    fatalerror("getcontext failed (schedule)");
-
-  // Recycle the stack from an exited process.
-  if (recyclable != NULL) {
-    synchronized (stacks) {
-      stacks->push(recyclable);
-    }
-    recyclable = NULL;
-  }
+  __process__ = NULL; // Start off not running anything.
 
   do {
-    ProcessBase *process = process_manager->dequeue();
-
+    ProcessBase* process = process_manager->dequeue();
     if (process == NULL) {
       Gate::state_t old = gate->approach();
       process = process_manager->dequeue();
       if (process == NULL) {
-
-        // When using the manual clock, we want to let all the
-        // processes "run" up to the current time so that processes
-        // receive messages in order. If we let one process have a
-        // drastically advanced current time then it may try send
-        // messages to another process that, due to the happens-before
-        // relationship, will inherit it's drastically advanced
-        // current time. If the processing thread gets to this point
-        // (i.e., the point where no other processes are runnable)
-        // with the manual clock means that all of the processes have
-        // been run which could be run up to the current time. The
-        // only way another process could become runnable is if (1) it
-        // receives a message from another node, (2) a file descriptor
-        // it is polling has become ready, or (3) if it has a
-        // timeout. We can ignore processes that become runnable due
-        // to receiving a message from another node or getting an
-        // event on a file descriptor because that should not change
-        // the timing happens-before relationship of the local
-        // processes (unless of course the file descriptor was created
-        // from something like timerfd, in which case, since the
-        // programmer is not using the timing source provided in
-        // libprocess and all bets are off). Thus, we can check that
-        // there are no pending timeouts before the current time and
-        // move the current time to the next timeout value, and tell
-        // the timer to update itself.
-
-        synchronized (timeouts) {
-          if (clk != NULL) {
-            if (!timeouts->empty()) {
-              // Adjust the current time to the next timeout, provided
-              // it is not past the elapsed time.
-              ev_tstamp tstamp = timeouts->begin()->first;
-              if (tstamp <= clk->getElapsed()) {
-                clk->setCurrent(tstamp);
-              }
-
-              update_timer = true;
-              ev_async_send(loop, &async_watcher);
-            } else {
-              // Woah! This comment is the only thing in this else
-              // branch because this is a pretty serious state ... the
-              // only way to make progress is for another node to send
-              // a message or for an event to occur on a file
-              // descriptor that a process is polling. We may want to
-              // consider doing (or printing) something here.
-            }
-          }
-        }
-
-	/* Wait at gate if idle. */
-	gate->arrive(old);
+	gate->arrive(old); // Wait at gate if idle.
 	continue;
       } else {
 	gate->leave();
       }
     }
-
-    VLOG(2) << "Resuming " << process->pid;
-
-    process->lock();
-    {
-      CHECK(process->state == ProcessBase::INIT ||
-	    process->state == ProcessBase::READY ||
-	    process->state == ProcessBase::INTERRUPTED ||
-	    process->state == ProcessBase::TIMEDOUT);
-
-      /* Continue process. */
-      CHECK(proc_process == NULL);
-      proc_process = process;
-      swapcontext(&proc_uctx_running, &process->uctx);
-      CHECK(proc_process != NULL);
-      proc_process = NULL;
-    }
-    process->unlock();
+    process_manager->resume(process);
   } while (true);
 }
 
 
-/*
- * We might find value in catching terminating signals at some point.
- * However, for now, adding signal handlers freely is not allowed
- * because they will clash with Java and Python virtual machines and
- * causes hard to debug crashes/segfaults.
- */
-
+// We might find value in catching terminating signals at some point.
+// However, for now, adding signal handlers freely is not allowed
+// because they will clash with Java and Python virtual machines and
+// causes hard to debug crashes/segfaults.
 
 // void sigbad(int signal, struct sigcontext *ctx)
 // {
-//   /* Pass on the signal (so that a core file is produced).  */
+//   // Pass on the signal (so that a core file is produced).
 //   struct sigaction sa;
 //   sa.sa_handler = SIG_DFL;
 //   sigemptyset(&sa.sa_mask);
@@ -1067,6 +965,9 @@ void * schedule(void *arg)
 
 void initialize(bool initialize_google_logging)
 {
+//   static pthread_once_t init = PTHREAD_ONCE_INIT;
+//   pthread_once(&init, ...);
+
   static volatile bool initialized = false;
   static volatile bool initializing = true;
 
@@ -1088,7 +989,7 @@ void initialize(bool initialize_google_l
     google::LogToStderr();
   }
 
-//   /* Install signal handler. */
+//   // Install signal handler.
 //   struct sigaction sa;
 
 //   sa.sa_handler = (void (*) (int)) sigbad;
@@ -1119,9 +1020,20 @@ void initialize(bool initialize_google_l
   process_manager = new ProcessManager();
   socket_manager = new SocketManager();
 
-  // Setup processing thread.
-  if (pthread_create (&proc_thread, NULL, schedule, NULL) != 0) {
-    PLOG(FATAL) << "Failed to initialize, pthread_create";
+  // Setup the thread local process pointer.
+  pthread_key_t key;
+  if (pthread_key_create(&key, NULL) != 0) {
+    LOG(FATAL) << "Failed to initialize, pthread_key_create";
+  }
+
+  _process_ = new ThreadLocal<ProcessBase>(key);
+
+  // Setup processing threads.
+  for (int i = 0; i < NUMBER_OF_PROCESSING_THREADS; i++) {
+    pthread_t thread; // For now, not saving handles on our threads.
+    if (pthread_create(&thread, NULL, schedule, NULL) != 0) {
+      LOG(FATAL) << "Failed to initialize, pthread_create";
+    }
   }
 
   ip = 0;
@@ -1134,9 +1046,9 @@ void initialize(bool initialize_google_l
   if (value != NULL) {
     int result = inet_pton(AF_INET, value, &ip);
     if (result == 0) {
-      fatal("LIBPROCESS_IP=%s was unparseable", value);
+      LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
     } else if (result < 0) {
-      fatalerror("failed to initialize (inet_pton)");
+      PLOG(FATAL) << "Failed to initialize, inet_pton";
     }
   }
 
@@ -1145,7 +1057,7 @@ void initialize(bool initialize_google_l
   if (value != NULL) {
     int result = atoi(value);
     if (result < 0 || result > USHRT_MAX) {
-      fatal("LIBPROCESS_PORT=%s is not a valid port", value);
+      LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
     }
     port = result;
   }
@@ -1221,7 +1133,7 @@ void initialize(bool initialize_google_l
   ev_async_init(&async_watcher, handle_async);
   ev_async_start(loop, &async_watcher);
 
-  ev_timer_init(&timeouts_watcher, handle_timeout, 0., 2100000.0);
+  ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
   ev_timer_again(loop, &timeouts_watcher);
 
   ev_io_init(&server_watcher, accept, s, EV_READ);
@@ -1242,12 +1154,9 @@ void initialize(bool initialize_google_l
 //   sigaddset (&sa.sa_mask, w->signum);
 //   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
 
-  if (pthread_create(&io_thread, NULL, serve, loop) != 0) {
-    PLOG(FATAL) << "Failed to initialize, pthread_create";
-  }
-
-  if (pthread_create(&invoke_thread, NULL, invoker, NULL) != 0) {
-    PLOG(FATAL) << "Failed to initialize, pthread_create";
+  pthread_t thread; // For now, not saving handles on our threads.
+  if (pthread_create(&thread, NULL, serve, loop) != 0) {
+    LOG(FATAL) << "Failed to initialize, pthread_create";
   }
 
   // Need to set initialzing here so that we can actually invoke
@@ -1266,50 +1175,67 @@ void initialize(bool initialize_google_l
 }
 
 
-HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy)
-  : proxy(_proxy) {}
+HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy,
+                                       Future<HttpResponse>* _future,
+                                       bool _persist)
+  : proxy(_proxy), future(_future), persist(_persist)
+{
+  // Wait for any event on the future.
+  deferred<void(const Future<HttpResponse>&)> waited = executor.defer(
+      lambda::bind(&HttpResponseWaiter::waited, this, lambda::_1));
+
+  future->onAny(waited);
 
+  // Also create a timer so we don't wait forever.
+  deferred<void(void)> timeout = executor.defer(
+      lambda::bind(&HttpResponseWaiter::timeout, this));
 
-HttpResponseWaiter::~HttpResponseWaiter() {}
+  timers::create(30, timeout);
+}
 
 
-void HttpResponseWaiter::await(const Future<HttpResponse>& future, bool persist)
+void HttpResponseWaiter::waited(const Future<HttpResponse>&)
 {
-  if (future.await(30)) {
-    dispatch(proxy, &HttpProxy::ready, future, persist);  
+  if (future->isReady()) {
+    process::dispatch(proxy, &HttpProxy::ready, future, persist);
   } else {
-    dispatch(proxy, &HttpProxy::unavailable, persist);  
+    // TODO(benh): Consider handling other "states" of future
+    // (discarded, failed, etc) with different HTTP statuses.
+    process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
   }
+
+  executor.stop(); // Ensure we ignore the timeout.
 }
 
 
-HttpProxy::HttpProxy(int _c) : c(_c)
+void HttpResponseWaiter::timeout()
 {
-  // Create our waiter.
-  waiter = new HttpResponseWaiter(self());
-  spawn(waiter);
+  process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
+
+  executor.stop(); // Ensure we ignore the future.
 }
 
 
-HttpProxy::~HttpProxy()
-{
-  send(waiter->self(), TERMINATE);
-  wait(waiter->self());
-  delete waiter;
-}
+HttpProxy::HttpProxy(int _c) : c(_c) {}
 
 
-void HttpProxy::handle(const Future<HttpResponse>& future, bool persist)
+void HttpProxy::handle(Future<HttpResponse>* future, bool persist)
 {
-  dispatch(waiter, &HttpResponseWaiter::await, future, persist);
+  HttpResponseWaiter* waiter = new HttpResponseWaiter(this, future, persist);
+  waiters[future] = waiter;
 }
 
 
-void HttpProxy::ready(const Future<HttpResponse>& future, bool persist)
+void HttpProxy::ready(Future<HttpResponse>* future, bool persist)
 {
-  CHECK(future.isReady());
+  CHECK(waiters.count(future) > 0);
+  HttpResponseWaiter* waiter = waiters[future];
+  waiters.erase(future);
+  delete waiter;
 
-  const HttpResponse& response = future.get();
+  CHECK(future->isReady());
+
+  const HttpResponse& response = future->get();
 
   // Don't persist the connection if the responder doesn't want it to.
   if (response.headers.count("Connection") > 0) {
@@ -1319,19 +1245,32 @@ void HttpProxy::ready(const Future<HttpR
     }
   }
 
+  HttpResponseEncoder* encoder =
+    new HttpResponseEncoder(response);
+
+  delete future;
+
   // See the semantics of SocketManager::send for details about how
   // the socket will get closed (it might actually already be closed
   // before we issue this send).
-  socket_manager->send(new HttpResponseEncoder(response), c, persist);
+  socket_manager->send(encoder, c, persist);
 }
 
 
-void HttpProxy::unavailable(bool persist)
+void HttpProxy::unavailable(Future<HttpResponse>* future, bool persist)
 {
-  HttpResponse response = HttpServiceUnavailableResponse();
+  CHECK(waiters.count(future) > 0);
+  HttpResponseWaiter* waiter = waiters[future];
+  waiters.erase(future);
+  delete waiter;
+
+  HttpResponseEncoder* encoder =
+    new HttpResponseEncoder(HttpServiceUnavailableResponse());
+
+  delete future;
 
   // As above, the socket might all ready be closed when we do a send.
-  socket_manager->send(new HttpResponseEncoder(response), c, persist);
+  socket_manager->send(encoder, c, persist);
 }
 
 
@@ -1347,10 +1286,10 @@ SocketManager::~SocketManager() {}
 void SocketManager::link(ProcessBase *process, const UPID &to)
 {
   // TODO(benh): The semantics we want to support for link are such
-  // that if there is nobody to link to (local or remote) then a
-  // EXITED message gets generated. This functionality has only
-  // been implemented when the link is local, not remote. Of course,
-  // if there is nobody listening on the remote side, then this should
+  // that if there is nobody to link to (local or remote) then an
+  // ExitedEvent gets generated. This functionality has only been
+  // implemented when the link is local, not remote. Of course, if
+  // there is nobody listening on the remote side, then this should
   // work remotely ... but if there is someone listening remotely just
   // not at that id, then it will silently continue executing.
 
@@ -1365,11 +1304,11 @@ void SocketManager::link(ProcessBase *pr
       int s;
 
       if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
-        fatalerror("failed to link (socket)");
+        PLOG(FATAL) << "Failed to link, socket";
       }
 
       if (set_nbio(s) < 0) {
-        fatalerror("failed to link (set_nbio)");
+        PLOG(FATAL) << "Failed to link, set_nbio";
       }
 
       sockets[s] = node;
@@ -1391,7 +1330,7 @@ void SocketManager::link(ProcessBase *pr
       // Try and connect to the node using this socket.
       if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
         if (errno != EINPROGRESS) {
-          fatalerror("failed to link (connect)");
+          PLOG(FATAL) << "Failed to link, connect";
         }
 
         // Wait for socket to be connected.
@@ -1441,6 +1380,13 @@ void SocketManager::send(DataEncoder* en
 {
   CHECK(encoder != NULL);
 
+  // TODO(benh): The current mechanism here is insufficient. It could
+  // be the case that an HttpProxy attempts to do a send on a socket
+  // just as that socket has been closed and then re-opened for
+  // another connection. In this case, the data sent on that socket
+  // will be completely bogus ... one easy fix would be to check the
+  // proxy that is associated with the socket to eliminate this race.
+
   synchronized (this) {
     if (sockets.count(s) > 0) {
       if (outgoing.count(s) > 0) {
@@ -1468,6 +1414,7 @@ void SocketManager::send(DataEncoder* en
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";
+      delete encoder;
     }
   }
 }
@@ -1494,11 +1441,11 @@ void SocketManager::send(Message* messag
       int s;
 
       if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
-        fatalerror("failed to send (socket)");
+        PLOG(FATAL) << "Failed to send, socket";
       }
 
       if (set_nbio(s) < 0) {
-        fatalerror("failed to send (set_nbio)");
+        PLOG(FATAL) << "Failed to send, set_nbio";
       }
 
       sockets[s] = node;
@@ -1522,7 +1469,7 @@ void SocketManager::send(Message* messag
     
       if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
         if (errno != EINPROGRESS) {
-          fatalerror("failed to send (connect)");
+          PLOG(FATAL) << "Failed to send, connect";
         }
 
         // Initialize watcher for connecting.
@@ -1568,7 +1515,7 @@ DataEncoder* SocketManager::next(int s)
         } else if (proxies.count(s) > 0) {
           HttpProxy* proxy = proxies[s];
           proxies.erase(s);
-          post(proxy->self(), TERMINATE);
+          terminate(proxy);
         }
 
         disposables.erase(s);
@@ -1584,6 +1531,8 @@ DataEncoder* SocketManager::next(int s)
 
 void SocketManager::closed(int s)
 {
+  HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
+
   synchronized (this) {
     if (sockets.count(s) > 0) {
       const Node& node = sockets[s];
@@ -1595,9 +1544,8 @@ void SocketManager::closed(int s)
       } else if (temps.count(node) > 0 && temps[node] == s) {
         temps.erase(node);
       } else if (proxies.count(s) > 0) {
-        HttpProxy* proxy = proxies[s];
+        proxy = proxies[s];
         proxies.erase(s);
-        post(proxy->self(), TERMINATE);
       }
 
       outgoing.erase(s);
@@ -1606,6 +1554,12 @@ void SocketManager::closed(int s)
     }
   }
 
+  // We terminate the proxy outside the synchronized block to avoid
+  // possible deadlock between the ProcessManager and SocketManager.
+  if (proxy != NULL) {
+    terminate(proxy);
+  }
+
   // This might have just been a receiving socket (only sending
   // sockets, with the exception of the receiving side of a persistant
   // socket, get added to 'sockets'), so we want to make sure to call
@@ -1614,7 +1568,7 @@ void SocketManager::closed(int s)
 }
 
 
-void SocketManager::exited(const Node &node)
+void SocketManager::exited(const Node& node)
 {
   // TODO(benh): It would be cleaner if this routine could call back
   // into ProcessManager ... then we wouldn't have to convince
@@ -1623,14 +1577,12 @@ void SocketManager::exited(const Node &n
   synchronized (this) {
     list<UPID> removed;
     // Look up all linked processes.
-    foreachpair (const UPID &pid, set<ProcessBase *> &processes, links) {
-      if (pid.ip == node.ip && pid.port == node.port) {
-        // N.B. If we call exited(pid) we might invalidate iteration.
-        foreach (ProcessBase *process, processes) {
-          Message* message = encode(pid, process->pid, EXITED);
-          process->enqueue(message);
+    foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
+      if (linkee.ip == node.ip && linkee.port == node.port) {
+        foreach (ProcessBase* linker, processes) {
+          linker->enqueue(new ExitedEvent(linkee));
         }
-        removed.push_back(pid);
+        removed.push_back(linkee);
       }
     }
 
@@ -1641,27 +1593,38 @@ void SocketManager::exited(const Node &n
 }
 
 
-void SocketManager::exited(ProcessBase *process)
+void SocketManager::exited(ProcessBase* process)
 {
+  // An exited event is enough to cause the process to get deleted
+  // (e.g., by the garbage collector), which means we can't
+  // dereference process (or even use the address) after we enqueue at
+  // least one exited event. Thus, we save the process pid.
+  const UPID pid = process->pid;
+
+  // Likewise, we need to save the current time of the process so we
+  // can update the clocks of linked processes as appropriate.
+  const double secs = Clock::now(process);
+
   synchronized (this) {
-    /* Remove any links this process might have had. */
-    foreachpair (_, set<ProcessBase *> &processes, links) {
+    // Iterate through the links, removing any links the process might
+    // have had and creating exited events for any linked processes.
+    foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
       processes.erase(process);
-    }
-
-    /* Look up all linked processes. */
-    map<UPID, set<ProcessBase *> >::iterator it = links.find(process->pid);
 
-    if (it != links.end()) {
-      set<ProcessBase *> &processes = it->second;
-      foreach (ProcessBase *p, processes) {
-        CHECK(process != p);
-        Message *message = encode(process->pid, p->pid, EXITED);
-        // TODO(benh): Preserve happens-before when using clock.
-        p->enqueue(message);
+      if (linkee == pid) {
+        foreach (ProcessBase* linker, processes) {
+          CHECK(linker != process) << "Process linked with itself";
+          synchronized (timeouts) {
+            if (Clock::paused()) {
+              Clock::update(linker, secs);
+            }
+          }
+          linker->enqueue(new ExitedEvent(linkee));
+        }
       }
-      links.erase(process->pid);
     }
+
+    links.erase(pid);
   }
 }
 
@@ -1681,8 +1644,9 @@ ProcessReference ProcessManager::use(con
   if (pid.ip == ip && pid.port == port) {
     synchronized (processes) {
       if (processes.count(pid.id) > 0) {
-        // Note that the ProcessReference constructor MUST get called
-        // while holding the lock on processes.
+        // Note that the ProcessReference constructor _must_ get
+        // called while holding the lock on processes so that waiting
+        // for references is atomic (i.e., race free).
         return ProcessReference(processes[pid.id]);
       }
     }
@@ -1692,7 +1656,7 @@ ProcessReference ProcessManager::use(con
 }
 
 
-bool ProcessManager::deliver(Message *message, ProcessBase *sender)
+bool ProcessManager::deliver(Message* message, ProcessBase* sender)
 {
   CHECK(message != NULL);
 
@@ -1705,18 +1669,13 @@ bool ProcessManager::deliver(Message *me
     // that we can look up it's current time).
     if (sender != NULL) {
       synchronized (timeouts) {
-        if (clk != NULL) {
-          ev_tstamp tstamp =
-            max(clk->getCurrent(receiver), clk->getCurrent(sender));
-          clk->setCurrent(receiver, tstamp);
+        if (Clock::paused()) {
+          Clock::order(sender, receiver);
         }
       }
     }
 
-    VLOG(2) << "Delivering message name '" << message->name
-            << "' to " << message->to << " from " << message->from;
-
-    receiver->enqueue(message);
+    receiver->enqueue(new MessageEvent(message));
   } else {
     delete message;
     return false;
@@ -1727,7 +1686,7 @@ bool ProcessManager::deliver(Message *me
 
 
 // TODO(benh): Refactor and share code with above!
-bool ProcessManager::deliver(int c, HttpRequest *request, ProcessBase *sender)
+bool ProcessManager::deliver(int c, HttpRequest* request, ProcessBase* sender)
 {
   CHECK(request != NULL);
 
@@ -1740,9 +1699,9 @@ bool ProcessManager::deliver(int c, Http
   }
 
   // Treat this as an HTTP request and check for a valid receiver.
-  string temp = request->path.substr(1, request->path.find('/', 1) - 1);
+  string path = request->path.substr(1, request->path.find('/', 1) - 1);
 
-  UPID to(temp, ip, port);
+  UPID to(path, ip, port);
 
   if (ProcessReference receiver = use(to)) {
     // If we have a local sender AND we are using a manual clock
@@ -1753,41 +1712,23 @@ bool ProcessManager::deliver(int c, Http
     // that we can look up it's current time).
     if (sender != NULL) {
       synchronized (timeouts) {
-        if (clk != NULL) {
-          ev_tstamp tstamp =
-            max(clk->getCurrent(receiver), clk->getCurrent(sender));
-          clk->setCurrent(receiver, tstamp);
+        if (Clock::paused()) {
+          Clock::order(sender, receiver);
         }
       }
     }
 
-    // Get the HttpProxy pid for this socket.
-    PID<HttpProxy> proxy = socket_manager->proxy(c);
-    
-    // Create the promise to link with whatever gets returned.
-    Promise<HttpResponse>* promise = new Promise<HttpResponse>();
-
-    // Let the HttpProxy know about this request.
-    dispatch(proxy, &HttpProxy::handle, promise->future(), request->keepAlive);
-
-    VLOG(2) << "Delivering HTTP request for '" << request->path
-            << "' to " << to;
-
-    // Enqueue request and promise for receiver.
-    receiver->enqueue(new pair<HttpRequest*, Promise<HttpResponse>*>(request, promise));
+    // Enqueue the event.
+    receiver->enqueue(new HttpEvent(c, request));
   } else {
     // This has no receiver, send error response.
-    VLOG(1) << "Returning '404 Not Found' for HTTP request for '"
-            << request->path << "'";
-
-    // Get the HttpProxy pid for this socket.
-    PID<HttpProxy> proxy = socket_manager->proxy(c);
+    VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";
 
-    // Create a "future" response.
-    Future<HttpResponse> future = HttpNotFoundResponse();
+    HttpResponseEncoder* encoder =
+      new HttpResponseEncoder(HttpNotFoundResponse());
 
-    // Let the HttpProxy know about this request.
-    dispatch(proxy, &HttpProxy::handle, future, request->keepAlive);
+    // TODO(benh): Socket might be closed and then re-opened!
+    socket_manager->send(encoder, c, request->keepAlive);
 
     // Cleanup request.
     delete request;
@@ -1799,9 +1740,12 @@ bool ProcessManager::deliver(int c, Http
 
 
 // TODO(benh): Refactor and share code with above!
-bool ProcessManager::deliver(const UPID& to, function<void(ProcessBase*)>* dispatcher, ProcessBase *sender)
+bool ProcessManager::deliver(
+    const UPID& to,
+    lambda::function<void(ProcessBase*)>* f,
+    ProcessBase* sender)
 {
-  CHECK(dispatcher != NULL);
+  CHECK(f != NULL);
 
   if (ProcessReference receiver = use(to)) {
     // If we have a local sender AND we are using a manual clock
@@ -1812,19 +1756,15 @@ bool ProcessManager::deliver(const UPID&
     // that we can look up it's current time).
     if (sender != NULL) {
       synchronized (timeouts) {
-        if (clk != NULL) {
-          ev_tstamp tstamp =
-            max(clk->getCurrent(receiver), clk->getCurrent(sender));
-          clk->setCurrent(receiver, tstamp);
+        if (Clock::paused()) {
+          Clock::order(sender, receiver);
         }
       }
     }
 
-    VLOG(2) << "Delivering dispatcher to " << to;
-
-    receiver->enqueue(dispatcher);
+    receiver->enqueue(new DispatchEvent(f));
   } else {
-    delete dispatcher;
+    delete f;
     return false;
   }
 
@@ -1832,12 +1772,10 @@ bool ProcessManager::deliver(const UPID&
 }
 
 
-UPID ProcessManager::spawn(ProcessBase *process, bool manage)
+UPID ProcessManager::spawn(ProcessBase* process, bool manage)
 {
   CHECK(process != NULL);
 
-  process->state = ProcessBase::INIT;
-
   synchronized (processes) {
     if (processes.count(process->pid.id) > 0) {
       return UPID();
@@ -1846,311 +1784,269 @@ UPID ProcessManager::spawn(ProcessBase *
     }
   }
 
-  void *stack = NULL;
-
-  // Reuse a stack if any are available.
-  synchronized (stacks) {
-    if (!stacks->empty()) {
-      stack = stacks->top();
-      stacks->pop();
-    }
+  // Use the garbage collector if requested.
+  if (manage) {
+    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
   }
 
-  if (stack == NULL) {
-    const int protection = (PROT_READ | PROT_WRITE);
-    const int flags = (MAP_PRIVATE | MAP_ANONYMOUS | MAP_32BIT);
+  // Add process to the run queue (so 'initialize' will get invoked).
+  enqueue(process);
 
-    stack = mmap(NULL, PROCESS_STACK_SIZE, protection, flags, -1, 0);
+  VLOG(2) << "Spawned process " << process->self();
 
-    if (stack == MAP_FAILED)
-      fatalerror("mmap failed (spawn)");
+  return process->self();
+}
 
-    /* Disallow all memory access to the last page. */
-    if (mprotect(stack, getpagesize(), PROT_NONE) != 0)
-      fatalerror("mprotect failed (spawn)");
-  }
 
-  /* Set up the ucontext. */
-  if (getcontext(&process->uctx) < 0)
-    fatalerror("getcontext failed (spawn)");
-    
-  process->uctx.uc_stack.ss_sp = stack;
-  process->uctx.uc_stack.ss_size = PROCESS_STACK_SIZE;
-  process->uctx.uc_link = 0;
-
-  /* Package the arguments. */
-#ifdef __x86_64__
-  CHECK(sizeof(unsigned long) == sizeof(ProcessBase *));
-  int stack0 = (unsigned int) (unsigned long) stack;
-  int stack1 = (unsigned long) stack >> 32;
-  int process0 = (unsigned int) (unsigned long) process;
-  int process1 = (unsigned long) process >> 32;
-#else
-  CHECK(sizeof(unsigned int) == sizeof(ProcessBase *));
-  int stack0 = (unsigned int) stack;
-  int stack1 = 0;
-  int process0 = (unsigned int) process;
-  int process1 = 0;
-#endif /* __x86_64__ */
-
-  makecontext(&process->uctx, (void (*)()) trampoline,
-              4, stack0, stack1, process0, process1);
+void ProcessManager::resume(ProcessBase* process)
+{
+  __process__ = process;
 
-  /* Add process to the run queue. */
-  enqueue(process);
+  VLOG(2) << "Resuming " << process->pid << " at "
+          << std::fixed << std::setprecision(9) << Clock::now();
 
-  /* Use the garbage collector if requested. */
-  if (manage) {
-    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
-  }
+  bool terminate = false;
+  bool blocked = false;
 
-  return process->self();
-}
+  CHECK(process->state == ProcessBase::BOTTOM ||
+        process->state == ProcessBase::READY);
 
+  if (process->state == ProcessBase::BOTTOM) {
+    process->state = ProcessBase::RUNNING;
+    try { process->initialize(); }
+    catch (...) { terminate = true; }
+  }
 
+  while (!terminate && !blocked) {
+    Event* event = NULL;
 
-void ProcessManager::link(ProcessBase *process, const UPID &to)
-{
-  // Check if the pid is local.
-  if (!(to.ip == ip && to.port == port)) {
-    socket_manager->link(process, to);
-  } else {
-    // Since the pid is local we want to get a reference to it's
-    // underlying process so that while we are invoking the link
-    // manager we don't miss sending a possible EXITED.
-    if (ProcessReference _ = use(to)) {
-      socket_manager->link(process, to);
-    } else {
-      // Since the pid isn't valid it's process must have already died
-      // (or hasn't been spawned yet) so send a process exit message.
-      Message *message = encode(to, process->pid, EXITED);
-      process->enqueue(message);
+    process->lock();
+    {
+      if (process->events.size() > 0) {
+        event = process->events.front();
+        process->events.pop_front();
+        process->state = ProcessBase::RUNNING;
+      } else {
+        process->state = ProcessBase::BLOCKED;
+        blocked = true;
+      }
     }
-  }
-}
-
+    process->unlock();
 
-bool ProcessManager::receive(ProcessBase *process, double secs)
-{
-  CHECK(process != NULL);
+    if (!blocked) {
+      CHECK(event != NULL);
 
-  bool timedout = false;
+      // Determine if we should terminate.
+      terminate = event->is<TerminateEvent>();
 
-  process->lock();
-  {
-    /* Ensure nothing enqueued since check in ProcessBase::receive. */
-    if (process->messages.empty()) {
-      if (secs > 0) {
-        /* Create timeout. */
-        const timeout &timeout = create_timeout(process, secs);
-
-        /* Start the timeout. */
-        start_timeout(timeout);
-
-        /* Context switch. */
-        process->state = ProcessBase::RECEIVING;
-        swapcontext(&process->uctx, &proc_uctx_running);
-
-        CHECK(process->state == ProcessBase::READY ||
-	      process->state == ProcessBase::TIMEDOUT);
-
-        /* Attempt to cancel the timer if necessary. */
-        if (process->state != ProcessBase::TIMEDOUT) {
-          cancel_timeout(timeout);
-        } else {
-          timedout = true;
-        }
+      // Now service the event.
+      try {
+        process->serve(*event);
+      } catch (const std::exception& e) {
+        std::cerr << "libprocess: " << process->pid
+                  << " terminating due to "
+                  << e.what() << std::endl;
+        terminate = true;
+      } catch (...) {
+        std::cerr << "libprocess: " << process->pid
+                  << " terminating due to unknown exception" << std::endl;
+        terminate = true;
+      }
 
-        /* N.B. No cancel means possible unnecessary timeouts. */
+      delete event;
 
-        process->state = ProcessBase::RUNNING;
-      
-        /* Update the generation (handles racing timeouts). */
-        process->generation++;
-      } else {
-        /* Context switch. */
-        process->state = ProcessBase::RECEIVING;
-        swapcontext(&process->uctx, &proc_uctx_running);
-        CHECK(process->state == ProcessBase::READY);
-        process->state = ProcessBase::RUNNING;
+      if (terminate) {
+        cleanup(process);
       }
     }
   }
-  process->unlock();
 
-  return !timedout;
+  __process__ = NULL;
 }
 
 
-bool ProcessManager::serve(ProcessBase *process, double secs)
+void ProcessManager::cleanup(ProcessBase* process)
 {
-  CHECK(process != NULL);
+  VLOG(2) << "Cleaning up " << process->pid;
 
-  bool timedout = false;
+  // Processes that were waiting on exiting process.
+  list<ProcessBase*> resumable;
 
-  process->lock();
-  {
-    /* Ensure nothing enqueued since check in ProcessBase::serve. */
-    if (process->messages.empty() &&
-        process->requests.empty() &&
-        process->dispatchers.empty()) {
-      if (secs > 0) {
-        /* Create timeout. */
-        const timeout &timeout = create_timeout(process, secs);
-
-        /* Start the timeout. */
-        start_timeout(timeout);
-
-        /* Context switch. */
-        process->state = ProcessBase::SERVING;
-        swapcontext(&process->uctx, &proc_uctx_running);
-
-        CHECK(process->state == ProcessBase::READY ||
-	      process->state == ProcessBase::TIMEDOUT);
-
-        /* Attempt to cancel the timer if necessary. */
-        if (process->state != ProcessBase::TIMEDOUT) {
-          cancel_timeout(timeout);
-        } else {
-          timedout = true;
-        }
+  // Possible gate non-libprocess threads are waiting at.
+  Gate* gate = NULL;
+ 
+  // Remove process.
+  synchronized (processes) {
+    // Wait for all process references to get cleaned up.
+    while (process->refs > 0) {
+      asm ("pause");
+      __sync_synchronize();
+    }
 
-        /* N.B. No cancel means possible unnecessary timeouts. */
+    process->lock();
+    {
+      // Free any pending events.
+      while (!process->events.empty()) {
+        Event* event = process->events.front();
+        process->events.pop_front();
+        delete event;
+      }
 
-        process->state = ProcessBase::RUNNING;
-      
-        /* Update the generation (handles racing timeouts). */
-        process->generation++;
-      } else {
-        /* Context switch. */
-        process->state = ProcessBase::SERVING;
-        swapcontext(&process->uctx, &proc_uctx_running);
-        CHECK(process->state == ProcessBase::READY);
-        process->state = ProcessBase::RUNNING;
+      processes.erase(process->pid.id);
+ 
+      // Lookup gate to wake up waiting threads.
+      map<ProcessBase*, Gate*>::iterator it = gates.find(process);
+      if (it != gates.end()) {
+        gate = it->second;
+        // N.B. The last thread that leaves the gate also free's it.
+        gates.erase(it);
       }
+
+      CHECK(process->refs == 0);
+      process->state = ProcessBase::FINISHED;
     }
+    process->unlock();
+
+    // Note that we don't remove the process from the clock during
+    // cleanup, but rather the clock is reset for a process when it is
+    // created (see ProcessBase::ProcessBase). We do this so that
+    // SocketManager::exited can access the current time of the
+    // process to "order" exited events. It might make sense to
+    // consider storing the time of the process as a field of the
+    // class instead.
+
+    // Now we tell the socket manager about this process exiting so
+    // that it can create exited events for linked processes. We
+    // _must_ do this while synchronized on processes because
+    // otherwise another process could attempt to link this process
+    // and SocketManger::link would see that the processes doesn't
+    // exist when it attempts to get a ProcessReference (since we
+    // removed the process above) thus causing an exited event, which
+    // could cause the process to get deleted (e.g., the garbage
+    // collector might link _after_ the process has already been
+    // removed, thus getting an exited event but we don't want that
+    // exited event to fire until after we have used the process in
+    // SocketManager::exited.
+    socket_manager->exited(process);
   }
-  process->unlock();
 
-  return !timedout;
+  // Confirm process not in runq.
+  synchronized (runq) {
+    CHECK(find(runq.begin(), runq.end(), process) == runq.end());
+  }
+
+  // ***************************************************************
+  // At this point we can no longer dereference the process since it
+  // might already be deallocated (e.g., by the garbage collector).
+  // ***************************************************************
+
+  if (gate != NULL) {
+    gate->open();
+  }
 }
 
 
-void ProcessManager::pause(ProcessBase *process, double secs)
+void ProcessManager::link(ProcessBase* process, const UPID& to)
 {
-  CHECK(process != NULL);
-
-  process->lock();
-  {
-    if (secs > 0) {
-      /* Create/Start the timeout. */
-      start_timeout(create_timeout(process, secs));
-
-      /* Context switch. */
-      process->state = ProcessBase::PAUSED;
-      swapcontext(&process->uctx, &proc_uctx_running);
-      CHECK(process->state == ProcessBase::TIMEDOUT);
-      process->state = ProcessBase::RUNNING;
+  // Check if the pid is local.
+  if (!(to.ip == ip && to.port == port)) {
+    socket_manager->link(process, to);
+  } else {
+    // Since the pid is local we want to get a reference to it's
+    // underlying process so that while we are invoking the link
+    // manager we don't miss sending a possible ExitedEvent.
+    if (ProcessReference _ = use(to)) {
+      socket_manager->link(process, to);
     } else {
-      /* Modified context switch (basically a yield). */
-      process->state = ProcessBase::READY;
-      enqueue(process);
-      swapcontext(&process->uctx, &proc_uctx_running);
-      CHECK(process->state == ProcessBase::READY);
-      process->state = ProcessBase::RUNNING;
+      // Since the pid isn't valid it's process must have already died
+      // (or hasn't been spawned yet) so send a process exit message.
+      process->enqueue(new ExitedEvent(to));
     }
   }
-  process->unlock();
 }
 
 
-void ProcessManager::terminate(const UPID& pid, bool inject, ProcessBase* sender)
+void ProcessManager::terminate(
+    const UPID& pid,
+    bool inject,
+    ProcessBase* sender)
 {
   if (ProcessReference process = use(pid)) {
     if (sender != NULL) {
       synchronized (timeouts) {
-        if (clk != NULL) {
-          ev_tstamp tstamp =
-            max(clk->getCurrent(process), clk->getCurrent(sender));
-          clk->setCurrent(process, tstamp);
+        if (Clock::paused()) {
+          Clock::order(sender, process);
         }
       }
 
-      process->enqueue(encode(sender->self(), pid, TERMINATE), inject);
+      process->enqueue(new TerminateEvent(sender->self()), inject);
     } else {
-      process->enqueue(encode(UPID(), pid, TERMINATE), inject);
-    }
-  }
-}
-
-
-bool ProcessManager::wait(ProcessBase *process, const UPID &pid)
-{
-  bool waited = false;
-
-  /* Now we can add the process to the waiters. */
-  synchronized (processes) {
-    if (processes.count(pid.id) > 0) {
-      CHECK(processes[pid.id]->state != ProcessBase::FINISHED);
-      waiters[processes[pid.id]].insert(process);
-      waited = true;
-    }
-  }
-
-  /* If we waited then we should context switch. */
-  if (waited) {
-    process->lock();
-    {
-      if (process->state == ProcessBase::RUNNING) {
-        /* Context switch. */
-        process->state = ProcessBase::WAITING;
-        swapcontext(&process->uctx, &proc_uctx_running);
-        CHECK(process->state == ProcessBase::READY);
-        process->state = ProcessBase::RUNNING;
-      } else {
-        /* Process is cleaned up and we have been removed from waiters. */
-        CHECK(process->state == ProcessBase::INTERRUPTED);
-        process->state = ProcessBase::RUNNING;
-      }
+      process->enqueue(new TerminateEvent(UPID()), inject);
     }
-    process->unlock();
   }
-
-  return waited;
 }
 
 
-bool ProcessManager::external_wait(const UPID &pid)
+bool ProcessManager::wait(const UPID& pid)
 {
-  // We use a gate for external waiters. A gate is single use. That
-  // is, a new gate is created when the first external thread shows
-  // up and wants to wait for a process that currently has no
-  // gate. Once that process exits, the last external thread to
-  // leave the gate will also clean it up. Note that a gate will
-  // never get more external threads waiting on it after it has been
-  // opened, since the process should no longer be valid and
-  // therefore will not have an entry in 'processes'.
+  // We use a gate for waiters. A gate is single use. That is, a new
+  // gate is created when the first thread shows up and wants to wait
+  // for a process that currently has no gate. Once that process
+  // exits, the last thread to leave the gate will also clean it
+  // up. Note that a gate will never get more threads waiting on it
+  // after it has been opened, since the process should no longer be
+  // valid and therefore will not have an entry in 'processes'.
 
-  Gate *gate = NULL;
+  Gate* gate = NULL;
   Gate::state_t old;
 
-  /* Try and approach the gate if necessary. */
+  ProcessBase* process = NULL; // Set to non-null if we donate thread.
+
+  // Try and approach the gate if necessary.
   synchronized (processes) {
     if (processes.count(pid.id) > 0) {
-      ProcessBase *process = processes[pid.id];
+      process = processes[pid.id];
       CHECK(process->state != ProcessBase::FINISHED);
 
-      /* Check and see if a gate already exists. */
+      // Check and see if a gate already exists.
       if (gates.find(process) == gates.end()) {
         gates[process] = new Gate();
       }
 
       gate = gates[process];
       old = gate->approach();
+
+      // Check if it is runnable in order to donate this thread.
+      if (process->state == ProcessBase::BOTTOM ||
+          process->state == ProcessBase::READY) {
+        synchronized (runq) {
+          list<ProcessBase*>::iterator it =
+            find(runq.begin(), runq.end(), process);
+          if (it != runq.end()) {
+            runq.erase(it);
+          } else {
+            // Another thread has resumed the process ...
+            process = NULL;
+          }
+        }
+      } else {
+        // Process is not runnable, so no need to donate ...
+        process = NULL;
+      }
     }
   }
 
-  /* Now arrive at the gate and wait until it opens. */
+  if (process != NULL) {
+    VLOG(1) << "Donating thread to " << process->pid << " while waiting";
+    ProcessBase* donator = __process__;
+    process_manager->resume(process);
+    __process__ = donator;
+  }
+
+  // TODO(benh): Donating only once may not be sufficient, so we might
+  // still deadlock here ... perhaps warn if that's the case?
+
+  // Now arrive at the gate and wait until it opens.
   if (gate != NULL) {
     gate->arrive(old);
 
@@ -2165,87 +2061,7 @@ bool ProcessManager::external_wait(const
 }
 
 
-bool ProcessManager::poll(ProcessBase *process, int fd, int op, double secs, bool ignore)
-{
-  CHECK(process != NULL);
-
-  bool interrupted = false;
-
-  process->lock();
-  {
-    /* Consider a non-empty message queue as an immediate interrupt. */
-    if (!ignore && !process->messages.empty()) {
-      process->unlock();
-      return false;
-    }
-
-    // Treat an poll with a bad fd as an interruptible pause!
-    if (fd >= 0) {
-      /* Allocate/Initialize the watcher. */
-      ev_io *watcher = new ev_io();
-
-      if ((op & ProcessBase::RDWR) == ProcessBase::RDWR) {
-        ev_io_init(watcher, handle_poll, fd, EV_READ | EV_WRITE);
-      } else if ((op & ProcessBase::RDONLY) == ProcessBase::RDONLY) {
-        ev_io_init(watcher, handle_poll, fd, EV_READ);
-      } else if ((op & ProcessBase::WRONLY) == ProcessBase::WRONLY) {
-        ev_io_init(watcher, handle_poll, fd, EV_WRITE);
-      }
-
-      // Tuple describing state (on heap in case we can't "cancel" it,
-      // the watcher will always fire, even if we get interrupted and
-      // return early, so this tuple will get cleaned up when the
-      // watcher runs).
-      watcher->data = new tuple<UPID, int>(process->pid, process->generation);
-
-      /* Enqueue the watcher. */
-      synchronized (watchers) {
-        watchers->push(watcher);
-      }
-    
-      /* Interrupt the loop. */
-      ev_async_send(loop, &async_watcher);
-    }
-
-    CHECK(secs >= 0);
-
-    timeout timeout;
-
-    if (secs != 0) {
-      timeout = create_timeout(process, secs);
-      start_timeout(timeout);
-    }
-
-    /* Context switch. */
-    process->state = ProcessBase::POLLING;
-    swapcontext(&process->uctx, &proc_uctx_running);
-    CHECK(process->state == ProcessBase::READY ||
-	  process->state == ProcessBase::TIMEDOUT ||
-	  process->state == ProcessBase::INTERRUPTED);
-
-    /* Attempt to cancel the timer if necessary. */
-    if (secs != 0) {
-      if (process->state != ProcessBase::TIMEDOUT) {
-        cancel_timeout(timeout);
-      }
-    }
-
-    if (process->state == ProcessBase::INTERRUPTED) {
-      interrupted = true;
-    }
-
-    process->state = ProcessBase::RUNNING;
-      
-    /* Update the generation (handles racing polled). */
-    process->generation++;
-  }
-  process->unlock();
-
-  return !interrupted;
-}
-
-
-void ProcessManager::enqueue(ProcessBase *process)
+void ProcessManager::enqueue(ProcessBase* process)
 {
   CHECK(process != NULL);
 
@@ -2259,18 +2075,18 @@ void ProcessManager::enqueue(ProcessBase
     runq.push_back(process);
   }
     
-  /* Wake up the processing thread if necessary. */
+  // Wake up the processing thread if necessary.
   gate->open();
 }
 
 
-ProcessBase * ProcessManager::dequeue()
+ProcessBase* ProcessManager::dequeue()
 {
   // TODO(benh): Remove a process from this thread's runq. If there
   // are no processes to run, and this is not a dedicated thread, then
   // steal one from another threads runq.
 
-  ProcessBase *process = NULL;
+  ProcessBase* process = NULL;
 
   synchronized (runq) {
     if (!runq.empty()) {
@@ -2283,340 +2099,76 @@ ProcessBase * ProcessManager::dequeue()
 }
 
 
-void ProcessManager::timedout(const UPID &pid, int generation)
-{
-  if (ProcessReference process = use(pid)) {
-    process->lock();
-    {
-      // We know we timed out if the state != READY after a timeout
-      // but the generation is still the same.
-      if (process->state != ProcessBase::READY &&
-          process->generation == generation) {
-
-        // The process could be in any of the following states,
-        // including RUNNING if a pause, receive, or poll was
-        // initiated by an "outside" thread (e.g., in the constructor
-        // of the process).
-        CHECK(process->state == ProcessBase::RUNNING ||
-	      process->state == ProcessBase::RECEIVING ||
-	      process->state == ProcessBase::SERVING ||
-	      process->state == ProcessBase::POLLING ||
-	      process->state == ProcessBase::INTERRUPTED ||
-	      process->state == ProcessBase::PAUSED);
-
-        if (process->state != ProcessBase::RUNNING ||
-            process->state != ProcessBase::INTERRUPTED ||
-            process->state != ProcessBase::FINISHING) {
-          process_manager->enqueue(process);
-        }
-
-        // We always have a timeout override the state (unless we are
-        // exiting). This includes overriding INTERRUPTED. This means
-        // that a process that was polling when selected from the
-        // runq will fall out because of a timeout even though it also
-        // received a message.
-        if (process->state != ProcessBase::FINISHING) {
-          process->state = ProcessBase::TIMEDOUT;
-        }
-      }
-    }
-    process->unlock();
-  }
-}
-
-
-void ProcessManager::polled(const UPID &pid, int generation)
-{
-  if (ProcessReference process = use(pid)) {
-    process->lock();
-    {
-      if (process->state == ProcessBase::POLLING &&
-          process->generation == generation) {
-        process->state = ProcessBase::READY;
-        enqueue(process);
-      }
-    }
-    process->unlock();
-  }
-}
-
-
-void ProcessManager::run(ProcessBase *process)
-{
-  // Each process gets locked before 'schedule' runs it to enforce
-  // atomicity for the blocking routines (receive, poll, pause,
-  // etc). So, we only need to unlock the process here.
-  {
-    process->state = ProcessBase::RUNNING;
-  }
-  process->unlock();
-
-  try {
-    VLOG(2) << "Invoking " << process->pid;
-    (*process)();
-  } catch (const std::exception &e) {
-    std::cerr << "libprocess: " << process->pid
-              << " exited due to "
-              << e.what() << std::endl;
-  } catch (...) {
-    std::cerr << "libprocess: " << process->pid
-              << " exited due to unknown exception" << std::endl;
-  }
+namespace timers {
 
-  cleanup(process);
-}
-
-
-void ProcessManager::cleanup(ProcessBase *process)
+timer create(double secs, const lambda::function<void(void)>& thunk)
 {
-  // Processes that were waiting on exiting process.
-  list<ProcessBase *> resumable;
+  static long id = 0;
 
-  // Possible gate non-libprocess threads are waiting at.
-  Gate *gate = NULL;
+  double timeout = Clock::now() + secs;
 
-  // Stop new process references from being created.
-  process->state = ProcessBase::FINISHING;
- 
-  /* Remove process. */
-  synchronized (processes) {
-    // Remove from internal clock (if necessary).
+  if (__process__ != NULL) {
     synchronized (timeouts) {
-      if (clk != NULL)
-        clk->discard(process);
-    }
-
-    // Wait for all process references to get cleaned up.
-    while (process->refs > 0) {
-      asm ("pause");
-      __sync_synchronize();
-    }
-
-    process->lock();
-    {
-      // Free any pending messages.
-      while (!process->messages.empty()) {
-        Message *message = process->messages.front();
-        process->messages.pop_front();
-        delete message;
-      }
-
-      // Free any pending requests.
-      while (!process->requests.empty()) {
-        pair<HttpRequest*, Promise<HttpResponse>*>* request = process->requests.front();
-        process->requests.pop_front();
-        delete request;
-      }
-
-      // Free any pending dispatchers.
-      while (!process->dispatchers.empty()) {
-        function<void(ProcessBase*)>* dispatcher = process->dispatchers.front();
-        process->dispatchers.pop_front();
-        delete dispatcher;
-      }
-
-      // Free current message.
-      if (process->current) {
-        delete process->current;
-      }
-
-      processes.erase(process->pid.id);
-
-      // Confirm that the process is not in any waiting queue.
-      foreachpair (_, set<ProcessBase *> &waiting, waiters) {
-        CHECK(waiting.find(process) == waiting.end());
-      }
- 
-      // Confirm process not in runq.
-      synchronized (runq) {
-        CHECK(find(runq.begin(), runq.end(), process) == runq.end());
-      }
- 
-      // Grab all the waiting processes that are now resumable.
-      foreach (ProcessBase *waiter, waiters[process]) {
-        resumable.push_back(waiter);
-      }
-
-      waiters.erase(process);
-
-      // Lookup gate to wake up waiting non-libprocess threads.
-      map<ProcessBase *, Gate *>::iterator it = gates.find(process);
-      if (it != gates.end()) {
-        gate = it->second;
-        // N.B. The last thread that leaves the gate also free's it.
-        gates.erase(it);
-      }
-
-      CHECK(process->refs == 0);
-      process->state = ProcessBase::FINISHED;
-    }
-    process->unlock();
-  }
-
-  // Inform socket manager.
-  socket_manager->exited(process);
-
-  // N.B. After opening the gate we can no longer dereference
-  // 'process' since it might already be cleaned up by user code (a
-  // waiter might have cleaned up the stack where the process was
-  // allocated).
-  if (gate != NULL) {
-    gate->open();
-  }
-
-  // And resume all processes waiting too.
-  foreach (ProcessBase *p, resumable) {
-    p->lock();
-    {
-      // Process 'p' might be RUNNING because it is racing to become
-      // WAITING while we are actually trying to get it to become
-      // running again.
-      // TODO(benh): Once we actually run multiple processes at a
-      // time (using multiple threads) this logic will need to get
-      // made thread safe (in particular, a process may be
-      // FINISHING).
-      CHECK(p->state == ProcessBase::RUNNING ||
-	    p->state == ProcessBase::WAITING);
-      if (p->state == ProcessBase::RUNNING) {
-        p->state = ProcessBase::INTERRUPTED;
-      } else {
-        p->state = ProcessBase::READY;
-        enqueue(p);
+      if (Clock::paused()) {
+        timeout = Clock::now(__process__) + secs;
       }
     }
-    p->unlock();
   }
-}
 
+  timer timer;
+  timer.id = id++;
+  timer.timeout = timeout;
+  timer.pid = __process__ != NULL ? __process__->self() : UPID();
+  timer.thunk = thunk;
 
-timeout ProcessManager::create_timeout(ProcessBase *process, double secs)
-{
-  CHECK(process != NULL);
-
-  ev_tstamp tstamp;
+  VLOG(2) << "Created a timer for "
+          << std::fixed << std::setprecision(9) << timeout;
 
+  // Add the timer.
   synchronized (timeouts) {
-    if (clk != NULL) {
-      tstamp = clk->getCurrent(process) + secs;
-    } else {
-      // TODO(benh): Unclear if want ev_now(...) or ev_time().
-      tstamp = ev_time() + secs;
-    }
-  }
-
-  timeout timeout;
-  timeout.tstamp = tstamp;
-  timeout.pid = process->pid;
-  timeout.generation = process->generation;
-
-  return timeout;
-}
-
-
-void ProcessManager::start_timeout(const timeout &timeout)
-{
-  /* Add the timer. */
-  synchronized (timeouts) {
-    if (timeouts->size() == 0 || timeout.tstamp < timeouts->begin()->first) {
+    if (timeouts->size() == 0 || timer.timeout < timeouts->begin()->first) {
       // Need to interrupt the loop to update/set timer repeat.
-      (*timeouts)[timeout.tstamp].push_back(timeout);
+      (*timeouts)[timer.timeout].push_back(timer);
       update_timer = true;
       ev_async_send(loop, &async_watcher);
     } else {
       // Timer repeat is adequate, just add the timeout.
       CHECK(timeouts->size() >= 1);
-      (*timeouts)[timeout.tstamp].push_back(timeout);
+      (*timeouts)[timer.timeout].push_back(timer);
     }
   }
+
+  return timer;
 }
 
 
-void ProcessManager::cancel_timeout(const timeout &timeout)
+void cancel(const timer& timer)
 {
   synchronized (timeouts) {
     // Check if the timeout is still pending, and if so, erase
     // it. In addition, erase an empty list if we just removed the
     // last timeout.
-    if (timeouts->count(timeout.tstamp) > 0) {
-      (*timeouts)[timeout.tstamp].remove(timeout);
-      if ((*timeouts)[timeout.tstamp].empty())
-        timeouts->erase(timeout.tstamp);
-    }
-  }
-}
-
-
-double Clock::now()
-{
-  synchronized (timeouts) {
-    if (clk != NULL) {
-      return clk->getElapsed();
-    } else {
-      return ev_time();
-    }
-  }
-}
-
-
-void Clock::pause()
-{
-  initialize();
-
-  synchronized (timeouts) {
-    // For now, only one global clock (rather than clock per
-    // process). This Means that we have to take special care to
-    // ensure happens-before timing (currently done for local message
-    // sends and spawning new processes, not currently done for
-    // EXITED messages).
-    if (clk == NULL) {
-      clk = new InternalClock();
-
-      // The existing libev timer might actually timeout, but now that
-      // clk != NULL, no "time" will actually have passed, so no
-      // timeouts will actually occur.
-    }
-  }
-}
-
-
-void Clock::resume()
-{
-  initialize();
-
-  synchronized (timeouts) {
-    if (clk != NULL) {
-      delete clk;
-      clk = NULL;
+    if (timeouts->count(timer.timeout) > 0) {
+      (*timeouts)[timer.timeout].remove(timer);
+      if ((*timeouts)[timer.timeout].empty()) {
+        timeouts->erase(timer.timeout);
+      }
     }
-
-    update_timer = true;
-    ev_async_send(loop, &async_watcher);
   }
 }
 
-
-void Clock::advance(double secs)
-{
-  synchronized (timeouts) {
-    if (clk != NULL) {
-      clk->setElapsed(clk->getElapsed() + secs);
-
-      // Might need to wakeup the processing thread.
-      gate->open();
-    }
-  }
-}
+} // namespace timeouts {
 
 
 ProcessBase::ProcessBase(const std::string& _id)
 {
-  initialize();
+  process::initialize();
+
+  state = ProcessBase::BOTTOM;
 
   pthread_mutex_init(&m, NULL);
 
   refs = 0;
-  current = NULL;
-  generation = 0;
 
   // Generate string representation of unique id for process.
   if (_id != "") {
@@ -2633,12 +2185,10 @@ ProcessBase::ProcessBase(const std::stri
   // If using a manual clock, try and set current time of process
   // using happens before relationship between creator and createe!
   synchronized (timeouts) {
-    if (clk != NULL) {
-      if (pthread_self() == proc_thread) {
-        CHECK(proc_process != NULL);
-        clk->setCurrent(this, clk->getCurrent(proc_process));
-      } else {
-        clk->setCurrent(this, clk->getCurrent());
+    if (Clock::paused()) {
+      clock::currents->erase(this); // In case the address is reused!
+      if (__process__ != NULL) {
+        Clock::order(__process__, this);
       }
     }
   }
@@ -2648,189 +2198,76 @@ ProcessBase::ProcessBase(const std::stri
 ProcessBase::~ProcessBase() {}
 
 
-void ProcessBase::enqueue(Message* message, bool inject)
+void ProcessBase::enqueue(Event* event, bool inject)
 {
-  CHECK(message != NULL);
+  CHECK(event != NULL);
 
   // TODO(benh): Put filter inside lock statement below so that we can
   // guarantee the order of the messages seen by a filter are the same
-  // as the order of messages seen by the process.
+  // as the order of messages seen by the process. Right now two
+  // different threads might execute the filter code and then enqueue
+  // the messages in non-deterministic orderings (i.e., there are two
+  // "atomic" blocks, the filter code here and the enqueue code
+  // below).
   synchronized (filterer) {
     if (filterer != NULL) {
-      if (filterer->filter(message)) {
-        delete message;
+      bool filter = false;
+      struct FilterVisitor : EventVisitor
+      {
+        FilterVisitor(bool* _filter) : filter(_filter) {}
+
+        virtual void visit(const MessageEvent& event)
+        {
+          *filter = filterer->filter(event);
+        }
+
+        virtual void visit(const DispatchEvent& event)
+        {
+          *filter = filterer->filter(event);
+        }
+
+        virtual void visit(const HttpEvent& event)
+        {
+          *filter = filterer->filter(event);
+        }
+
+        virtual void visit(const ExitedEvent& event)
+        {
+          *filter = filterer->filter(event);
+        }
+
+        bool* filter;
+      } visitor(&filter);
+
+      event->visit(&visitor);
+
+      if (filter) {
+        delete event;
         return;
       }
     }
   }
 
-  UPID delegate;
-
   lock();
   {
-    CHECK(state != FINISHED);
-
-    // Check and see if we should delegate this message.
-    if (delegates.count(message->name) > 0) {
-      delegate = delegates[message->name];
-    } else {
+    if (state != FINISHED) {
       if (!inject) {
-        messages.push_back(message);
+        events.push_back(event);
       } else {
-        messages.push_front(message);
+        events.push_front(event);
       }
 
-      if (state == RECEIVING || state == SERVING) {
+      if (state == BLOCKED) {
         state = READY;
         process_manager->enqueue(this);
-      } else if (state == POLLING) {
-        state = INTERRUPTED;
-        process_manager->enqueue(this);
       }
 
-      CHECK(state == INIT ||
+      CHECK(state == BOTTOM ||
             state == READY ||
-            state == RUNNING ||
-            state == PAUSED ||
-            state == WAITING ||
-            state == INTERRUPTED ||
-            state == TIMEDOUT ||
-            state == FINISHING);
-    }
-  }
-  unlock();
-
-  // Delegate this message if necessary.
-  if (delegate != UPID()) {
-    VLOG(1) << "Delegating message '" << message->name << "' to " << delegate;
-    message->to = delegate;
-    transport(message, this);
-  }
-}
-
-
-void ProcessBase::enqueue(pair<HttpRequest*, Promise<HttpResponse>*>* request)
-{
-  CHECK(request != NULL);
-
-  // TODO(benh): Support filtering HTTP requests.
-
-  lock();
-  {
-    CHECK(state != FINISHED);
-
-    requests.push_back(request);
-
-    if (state == SERVING) {
-      state = READY;
-      process_manager->enqueue(this);
-    } else if (state == POLLING) {
-      state = INTERRUPTED;
-      process_manager->enqueue(this);
-    }
-
-    CHECK(state == INIT ||
-	  state == READY ||
-	  state == RUNNING ||
-	  state == RECEIVING ||
-	  state == PAUSED ||
-	  state == WAITING ||
-	  state == INTERRUPTED ||
-	  state == TIMEDOUT ||
-	  state == FINISHING);
-  }
-  unlock();
-}
-
-
-void ProcessBase::enqueue(function<void(ProcessBase*)>* dispatcher)
-{
-  CHECK(dispatcher != NULL);
-
-  // TODO(benh): Support filtering dispatches.
-
-  lock();
-  {
-    CHECK(state != FINISHED);
-
-    dispatchers.push_back(dispatcher);
-
-    if (state == SERVING) {
-      state = READY;
-      process_manager->enqueue(this);
-    } else if (state == POLLING) {
-      state = INTERRUPTED;
-      process_manager->enqueue(this);
-    }
-
-    CHECK(state == INIT ||
-	  state == READY ||
-	  state == RUNNING ||
-	  state == RECEIVING ||
-	  state == PAUSED ||
-	  state == WAITING ||
-	  state == INTERRUPTED ||
-	  state == TIMEDOUT ||
-	  state == FINISHING);
-  }
-  unlock();
-}
-
-
-template <>
-Message * ProcessBase::dequeue()
-{
-  Message *message = NULL;
-
-  lock();
-  {
-    CHECK(state == RUNNING);
-    if (!messages.empty()) {
-      message = messages.front();
-      messages.pop_front();
+            state == RUNNING);
     }
   }
   unlock();
-
-  return message;
-}
-
-
-template <>
-pair<HttpRequest*, Promise<HttpResponse>*>* ProcessBase::dequeue()
-{
-  pair<HttpRequest*, Promise<HttpResponse>*>* request = NULL;
-
-  lock();
-  {
-    CHECK(state == RUNNING);
-    if (!requests.empty()) {
-      request = requests.front();
-      requests.pop_front();
-    }
-  }
-  unlock();
-
-  return request;
-}
-
-
-template <>
-function<void(ProcessBase*)> * ProcessBase::dequeue()
-{
-  function<void(ProcessBase*)> *dispatcher = NULL;
-
-  lock();
-  {
-    CHECK(state == RUNNING);
-    if (!dispatchers.empty()) {
-      dispatcher = dispatchers.front();
-      dispatchers.pop_front();
-    }
-  }
-  unlock();
-
-  return dispatcher;
 }
 
 
@@ -2839,7 +2276,9 @@ void ProcessBase::inject(const UPID& fro
   if (!from)
     return;
 
-  enqueue(encode(from, pid, name, string(data, length)), true);
+  Message* message = encode(from, pid, name, string(data, length));
+
+  enqueue(new MessageEvent(message), true);
 }
 
 
@@ -2854,212 +2293,75 @@ void ProcessBase::send(const UPID& to, c
 }
 
 
-string ProcessBase::receive(double secs)
+void ProcessBase::visit(const MessageEvent& event)
 {
-  // Free current message.
-  if (current != NULL) {
-    delete current;
-    current = NULL;
-  }
-
-  // Check if there is a message.
- check:
-  if ((current = dequeue<Message>()) != NULL) {
-    return name();
-  }
-
-  if (pthread_self() == proc_thread) {
-    // Avoid blocking if negative seconds.
-    if (secs >= 0) {
-      if (!process_manager->receive(this, secs)) {
-        goto timeout;
-      } else {
-        lock();
-        {
-          CHECK(!messages.empty());
-        }
-        unlock();
-        goto check;
-      }
-    } else {
-      goto timeout;
-    }
-  } else {
-    // TODO(benh): Handle calling receive from an outside thread.
-    fatal("unimplemented");

[... 465 lines stripped ...]