You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 08:38:58 UTC

svn commit: r1131940 [2/3] - in /incubator/mesos/trunk/src/third_party/libprocess: process.cpp process.hpp synchronized.cpp synchronized.hpp todo utility.hpp

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131940&r1=1131939&r2=1131940&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun  5 06:38:58 2011
@@ -1,22 +1,3 @@
-/* TODO(benh): Compile with a way to figure out which set of messages you used, and that way when someone with a different set of messages sends you a message you can declare that that message is not in your language of understanding. */
-/* TODO(benh): Fix link functionality (processes need to send process_exit message since a dead process on one node might not know that a process on another node linked with it). */
-/* TODO(benh): What happens when a remote link exits? Do we close the socket correclty?. */
-/* TODO(benh): Revisit receive, pause, and await semantics. */
-/* TODO(benh): Handle/Enable forking. */
-/* TODO(benh): Use multiple processing threads (do process affinity). */
-/* TODO(benh): Reclaim/Recycle stack (use Lithe!). */
-/* TODO(benh): Better error handling (i.e., warn if re-spawn process). */
-/* TODO(benh): Better protocol format checking in read_msg. */
-/* TODO(benh): Use different backends for files and sockets. */
-/* TODO(benh): Allow messages to be received out-of-order (i.e., allow
-   someone to do a receive with a message id and let other messages
-   queue until a message with that message id is received).  */
-/* TODO(benh): LinkManager::link and LinkManager::send are pretty big
-   functions, we could probably create some queue that the I/O thread
-   checks for sending messages and creating links instead ... that
-   would probably be faster, and have less contention for the mutex
-   (that might mean we can eliminate contention for the mutex!). */
-
 #include <assert.h>
 #include <errno.h>
 #include <ev.h>
@@ -33,10 +14,6 @@
 
 #include <arpa/inet.h>
 
-#ifdef USE_LITHE
-#include <ht/atomic.h>
-#endif /* USE_LITHE */
-
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 
@@ -53,7 +30,6 @@
 #include <deque>
 #include <fstream>
 #include <iostream>
-#include <iomanip>
 #include <list>
 #include <map>
 #include <queue>
@@ -66,22 +42,18 @@
 #include "foreach.hpp"
 #include "gate.hpp"
 #include "process.hpp"
-#include "singleton.hpp"
-#include "utility.hpp"
+#include "synchronized.hpp"
 
-using boost::make_tuple;
 using boost::tuple;
 
-using std::cout;
 using std::cerr;
 using std::deque;
 using std::endl;
 using std::find;
 using std::list;
-using std::make_pair;
 using std::map;
 using std::max;
-using std::pair;
+using std::ostream;
 using std::queue;
 using std::set;
 using std::stack;
@@ -116,7 +88,6 @@ using std::stack;
 #endif /* __APPLE__ */
 
 
-
 #define Byte (1)
 #define Kilobyte (1024*Byte)
 #define Megabyte (1024*Kilobyte)
@@ -137,148 +108,194 @@ using std::stack;
    })
 
 
-#ifdef USE_LITHE
-#define acquire(l) spinlock_lock(&l ## _lock)
-#define release(l) spinlock_unlock(&l ## _lock)
-class Synchronized
+struct node
 {
-public:
-  int *lock;
-  Synchronized(int *_lock)
-    : lock(_lock) { spinlock_lock(lock); }
-  ~Synchronized() { spinlock_unlock(lock); }
-  operator bool () { return true; }
+  uint32_t ip;
+  uint16_t port;
 };
-#define synchronized(l) if (Synchronized s = Synchronized(&l ## _lock))
-#else
-#define acquire(l) pthread_mutex_lock(&l ## _mutex)
-#define release(l) pthread_mutex_unlock(&l ## _mutex)
-class Synchronized
+
+
+bool operator < (const node& left, const node& right)
 {
-public:
-  pthread_mutex_t *mutex;
-  Synchronized(pthread_mutex_t *_mutex)
-    : mutex(_mutex) { pthread_mutex_lock(mutex); }
-  ~Synchronized() { pthread_mutex_unlock(mutex); }
-  operator bool () { return true; }
+  if (left.ip == right.ip)
+    return left.port < right.port;
+  else
+    return left.ip < right.ip;
+}
+
+
+ostream& operator << (ostream& stream, const node& n)
+{
+  stream << n.ip << ":" << n.port;
+  return stream;
+}
+
+
+/*
+ * Timeout support! Note that we don't store a pointer to the process
+ * because we can't dereference it because it might no longer be
+ * valid. But we can check if the process is valid using the PID and
+ * then use referencing counting to keep the process valid.
+*/
+struct timeout
+{
+  ev_tstamp tstamp;
+  PID pid;
+  int generation;
 };
-#define synchronized(l) if (Synchronized s = Synchronized(&l ## _mutex))
-#endif /* USE_LITHE */
 
 
-/* Local server socket. */
-static int s = -1;
+bool operator == (const timeout &left, const timeout &right)
+{
+  return left.tstamp == right.tstamp &&
+    left.pid == right.pid &&
+    left.generation == right.generation;
+}
 
-/* Local IP address. */
-static uint32_t ip = 0;
 
-/* Local port. */
-static uint16_t port = 0;
+class ProcessReference
+{
+public:
+  explicit ProcessReference(Process *_process) : process(_process)
+  {
+    if (process != NULL) {
+      __sync_fetch_and_add(&(process->refs), 1);
+      if (process->state == Process::EXITING) {
+        __sync_fetch_and_sub(&(process->refs), 1);
+        process = NULL;
+      }
+    }
+  }
 
-/* Event loop. */
-static struct ev_loop *loop = NULL;
+  ~ProcessReference()
+  {
+    if (process != NULL)
+      __sync_fetch_and_sub(&(process->refs), 1);
+  }
 
-/* Queue of new I/O watchers. */
-static queue<ev_io *> *io_watchersq = new queue<ev_io *>();
+  ProcessReference(const ProcessReference &that)
+  {
+    process = that.process;
 
-/* Watcher queues lock/mutex. */
-#ifdef USE_LITHE
-static int io_watchersq_lock = UNLOCKED;
-#else
-static pthread_mutex_t io_watchersq_mutex = PTHREAD_MUTEX_INITIALIZER;
-#endif /* USE_LITHE */
+    if (process != NULL) {
+      // There should be at least one reference to the process, so
+      // we don't need to worry about checking if it's exiting or
+      // not, since we know we can always create another reference.
+      assert(process->refs > 0);
+      __sync_fetch_and_add(&(process->refs), 1);
+    }
+  }
 
-/* Asynchronous watcher for interrupting loop. */
-static ev_async async_watcher;
+  Process * operator -> ()
+  {
+    return process;
+  }
 
-/* Timer watcher for process timeouts. */
-static ev_timer timer_watcher;
+  operator Process * ()
+  {
+    return process;
+  }
 
-/* Process timers lock/mutex. */
-#ifdef USE_LITHE
-static int timers_lock = UNLOCKED;
-#else
-static pthread_mutex_t timers_mutex = PTHREAD_MUTEX_INITIALIZER;
-#endif /* USE_LITHE */
+  operator bool ()
+  {
+    return process != NULL;
+  }
 
-/* Map of process timers (we exploit that the map is SORTED!). */
-typedef tuple<ev_tstamp, Process *, int> timeout_t;
-typedef list<timeout_t> timeouts_t;
-static map<ev_tstamp, timeouts_t> *timers =
-  new map<ev_tstamp, timeouts_t>();
+private:
+  ProcessReference & operator = (const ProcessReference &that);
 
-/* Flag to indicate whether or to update the timer on async interrupt. */
-static bool update_timer = false;
+  Process *process;
+};
 
-/* Server watcher for accepting connections. */
-static ev_io server_watcher;
 
-/* I/O thread. */
-static pthread_t io_thread;
+class LinkManager
+{
+public:
+  LinkManager();
+  ~LinkManager();
 
-/* Processing thread. */
-static pthread_t proc_thread;
+  void link(Process *process, const PID &to);
 
-/* Scheduling context for processing thread. */
-static ucontext_t proc_uctx_schedule;
+  void send(struct msg *msg);
 
-/* Running context for processing thread. */
-static ucontext_t proc_uctx_running;
+  struct msg * next(int s);
+  struct msg * next_or_close(int s);
+  struct msg * next_or_sleep(int s);
 
-/* Current process of processing thread. */
-//static __thread Process *proc_process = NULL;
-static Process *proc_process = NULL;
+  void closed(int s);
 
-/* Flag indicating if performing safe call into legacy. */
-// static __thread bool legacy = false;
-static bool legacy = false;
+  void exited(const node &n);
+  void exited(Process *process);
 
-/* Thunk to safely call into legacy. */
-// static __thread std::tr1::function<void (void)> *legacy_thunk;
-static const std::tr1::function<void (void)> *legacy_thunk;
+private:
+  /* Map from PID (local/remote) to process. */
+  map<PID, set<Process *> > links;
 
-/* Global 'pipe' id uniquely assigned to each process. */
-static uint32_t global_pipe = 0;
+  /* Map from socket to node (ip, port). */
+  map<int, node> sockets;
 
-/* Status of processing thread. */
-static int idle = 0;
+  /* Maps from node (ip, port) to socket. */
+  map<node, int> temps;
+  map<node, int> persists;
 
-/* Scheduler gate. */
-static Gate *gate = new Gate();
+  /* Map from socket to outgoing messages. */
+  map<int, queue<struct msg *> > outgoing;
 
-/* Stack of stacks. */
-static stack<void *> *stacks = new stack<void *>();
+  /* Protects instance variables. */
+  synchronizable(this);
+};
 
-/* Record? */
-static bool recording = false;
 
-/* Record(s) for replay. */
-static std::fstream record_msgs;
-static std::fstream record_pipes;
+class ProcessManager
+{
+public:
+  ProcessManager();
+  ~ProcessManager();
 
-/* Replay? */
-static bool replaying = false;
+  ProcessReference use(const PID &pid);
 
-/* Replay messages (id -> queue of messages). */
-static map<uint32_t, queue<struct msg *> > *replay_msgs =
-  new map<uint32_t, queue<struct msg *> >();
+  void record(struct msg *msg);
+  void replay();
 
-/* Replay pipes (parent id -> stack of remaining child ids). */
-static map<uint32_t, deque<uint32_t> > *replay_pipes =
-  new map<uint32_t, deque<uint32_t> >();
+  void deliver(struct msg *msg, Process *sender = NULL);
 
-/* Filter? */
-static bool filtering = false;
+  void spawn(Process *process);
+  void link(Process *process, const PID &to);
+  void receive(Process *process, double secs);
+  void pause(Process *process, double secs);
+  bool wait(Process *process, const PID &pid);
+  bool external_wait(const PID &pid);
+  bool await(Process *process, int fd, int op, double secs, bool ignore);
 
-/* Filter. */
-static MessageFilter *filterer = NULL;
+  void enqueue(Process *process);
+  Process * dequeue();
+
+  void timedout(const PID &pid, int generation);
+  void awaited(const PID &pid, int generation);
+
+  void run(Process *process);
+  void cleanup(Process *process);
+
+private:
+  timeout create_timeout(Process *process, double secs);
+  void start_timeout(const timeout &timeout);
+  void cancel_timeout(const timeout &timeout);
+
+  /* Map of all local spawned and running processes. */
+  map<uint32_t, Process *> processes;
+  synchronizable(processes);
+
+  /* Waiting processes (protected by synchronizable(processes)). */
+  map<Process *, set<Process *> > waiters;
+
+  /* Map of gates for waiting threads. */
+  map<Process *, Gate *> gates;
+
+  /* Queue of runnable processes (implemented as deque). */
+  deque<Process *> runq;
+  synchronizable(runq);
+};
 
-/*
- * Filtering mutex (needs to be recursive incase a filterer wants to
- * do anything fancy, which is possible given that filters will get
- * used for testing).
-*/
-static pthread_mutex_t filter_mutex;
 
 /* Tick, tock ... manually controlled clock! */
 class InternalProcessClock
@@ -342,36 +359,117 @@ private:
   ev_tstamp elapsed;
 };
 
+
+/* Using manual clock if non-null. */
 static InternalProcessClock *clk = NULL;
 
+/* Global 'pipe' id uniquely assigned to each process. */
+static uint32_t global_pipe = 0;
 
-struct write_ctx {
-  int len;
-  struct msg *msg;
-  bool close;
-};
+/* Local server socket. */
+static int s = -1;
 
-struct read_ctx {
-  int len;
-  struct msg *msg;
-};
+/* Local IP address. */
+static uint32_t ip = 0;
 
+/* Local port. */
+static uint16_t port = 0;
 
-static void initialize();
+/* Active LinkManager (eventually will probably be thread-local). */
+static LinkManager *link_manager = NULL;
 
-void handle_await(struct ev_loop *loop, ev_io *w, int revents);
+/* Active ProcessManager (eventually will probably be thread-local). */
+static ProcessManager *process_manager = NULL;
 
-void read_msg(struct ev_loop *loop, ev_io *w, int revents);
-void write_msg(struct ev_loop *loop, ev_io *w, int revents);
-void write_connect(struct ev_loop *loop, ev_io *w, int revents);
+/* Event loop. */
+static struct ev_loop *loop = NULL;
 
-void link_connect(struct ev_loop *loop, ev_io *w, int revents);
+/* Asynchronous watcher for interrupting loop. */
+static ev_async async_watcher;
 
-#ifdef USE_LITHE
-void trampoline(void *arg);
-#else
-void trampoline(int process0, int process1);
-#endif /* USE_LITHE */
+/* Timeouts watcher for process timeouts. */
+static ev_timer timeouts_watcher;
+
+/* Server watcher for accepting connections. */
+static ev_io server_watcher;
+
+/* Queue of new I/O watchers. */
+static queue<ev_io *> *io_watchersq = new queue<ev_io *>();
+static synchronizable(io_watchersq) = SYNCHRONIZED_INITIALIZER;
+
+/**
+ * We store the timeouts in a map of lists indexed by the time stamp
+ * of the timeout so that we can have two timeouts that have the same
+ * time stamp. Note however, that we should never have two identical
+ * timeouts because a process should only ever have one outstanding
+ * timeout at a time. Also, we exploit that the map is SORTED!
+ */
+static map<ev_tstamp, list<timeout> > *timeouts =
+  new map<ev_tstamp, list<timeout> >();
+static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER;
+
+/* Flag to indicate whether or to update the timer on async interrupt. */
+static bool update_timer = false;
+
+/* I/O thread. */
+static pthread_t io_thread;
+
+/* Processing thread. */
+static pthread_t proc_thread;
+
+/* Scheduling context for processing thread. */
+static ucontext_t proc_uctx_schedule;
+
+/* Running context for processing thread. */
+static ucontext_t proc_uctx_running;
+
+/* Current process of processing thread. */
+//static __thread Process *proc_process = NULL;
+static Process *proc_process = NULL;
+
+/* Flag indicating if performing safe call into legacy. */
+// static __thread bool legacy = false;
+static bool legacy = false;
+
+/* Thunk to safely call into legacy. */
+// static __thread std::tr1::function<void (void)> *legacy_thunk;
+static const std::tr1::function<void (void)> *legacy_thunk;
+
+/* Scheduler gate. */
+static Gate *gate = new Gate();
+
+/* Stack of recycled stacks. */
+static stack<void *> *stacks = new stack<void *>();
+static synchronizable(stacks) = SYNCHRONIZED_INITIALIZER;
+
+/* Last exited process's stack to be recycled (global variable hack!). */
+static void *recyclable = NULL;
+
+/* Record? */
+static bool recording = false;
+
+/* Record(s) for replay. */
+static std::fstream record_msgs;
+static std::fstream record_pipes;
+
+/* Replay? */
+static bool replaying = false;
+
+/* Replay messages (id -> queue of messages). */
+static map<uint32_t, queue<struct msg *> > *replay_msgs =
+  new map<uint32_t, queue<struct msg *> >();
+
+/* Replay pipes (parent id -> stack of remaining child ids). */
+static map<uint32_t, deque<uint32_t> > *replay_pipes =
+  new map<uint32_t, deque<uint32_t> >();
+
+/**
+ * Filter. Synchronized support for using the filterer needs to be
+ * recursive incase a filterer wants to do anything fancy (which is
+ * possible and likely given that filters will get used for testing).
+*/
+static MessageFilter *filterer = NULL;
+static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
 
 PID make_pid(const char *str)
@@ -466,2673 +564,1955 @@ bool operator == (const PID& left, const
 }
 
 
-void ProcessClock::pause()
+int set_nbio(int fd)
 {
-  initialize();
-
-  acquire(timers);
-  {
-    // For now, only one global clock (rather than clock per
-    // process). This Means that we have to take special care to
-    // ensure happens-before timing (currently done for local message
-    // sends and spawning new processes, not currently done for
-    // PROCESS_EXIT messages).
-    if (clk == NULL) {
-      clk = new InternalProcessClock();
+  int flags;
 
-      // The existing libev timer might actually timeout, but now that
-      // clk != NULL, no "time" will actually have passed, so no
-      // timeouts will actually occur.
-    }
-  }
-  release(timers);
+  /* If they have O_NONBLOCK, use the Posix way to do it. */
+#ifdef O_NONBLOCK
+  /* TODO(*): O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
+  if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
+    flags = 0;
+  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+#else
+  /* Otherwise, use the old way of doing it. */
+  flags = 1;
+  return ioctl(fd, FIOBIO, &flags);
+#endif
 }
 
 
-void ProcessClock::resume()
+void handle_async(struct ev_loop *loop, ev_async *w, int revents)
 {
-  initialize();
-
-  acquire(timers);
-  {
-    if (clk != NULL) {
-      delete clk;
-      clk = NULL;
+  synchronized(io_watchersq) {
+    /* Start all the new I/O watchers. */
+    while (!io_watchersq->empty()) {
+      ev_io *io_watcher = io_watchersq->front();
+      io_watchersq->pop();
+      ev_io_start(loop, io_watcher);
     }
-
-    update_timer = true;
-    ev_async_send(loop, &async_watcher);
   }
-  release(timers);
-}
 
+  synchronized(timeouts) {
+    if (update_timer) {
+      if (!timeouts->empty()) {
+	// Determine the current time.
+	ev_tstamp current_tstamp;
+	if (clk != NULL) {
+	  current_tstamp = clk->getCurrent();
+	} else {
+	  // TODO(benh): Unclear if want ev_now(...) or ev_time().
+	  current_tstamp = ev_time();
+	}
 
-void ProcessClock::advance(double secs)
-{
-  acquire(timers);
-  {
-    if (clk != NULL) {
-      clk->setElapsed(clk->getElapsed() + secs);
+	timeouts_watcher.repeat = timeouts->begin()->first - current_tstamp;
 
-      // Might need to wakeup the processing thread.
-      gate->open();
+	// Check when the timer event should fire.
+        if (timeouts_watcher.repeat <= 0) {
+	  // Feed the event now!
+	  timeouts_watcher.repeat = 0;
+	  ev_timer_again(loop, &timeouts_watcher);
+          ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+        } else {
+	  // Only repeat the timer if not using a manual clock (a call
+	  // to ProcessClock::advance() will force a timer event later).
+	  if (clk != NULL && timeouts_watcher.repeat > 0)
+	    timeouts_watcher.repeat = 0;
+	  ev_timer_again(loop, &timeouts_watcher);
+	}
+      }
+
+      update_timer = false;
     }
   }
-  release(timers);
 }
 
 
-static inline int set_nbio(int fd)
+void handle_timeout(struct ev_loop *loop, ev_timer *w, int revents)
 {
-  int flags;
-
-  /* If they have O_NONBLOCK, use the Posix way to do it. */
-#ifdef O_NONBLOCK
-  /* TODO(*): O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
-  if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
-    flags = 0;
-  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
-#else
-  /* Otherwise, use the old way of doing it. */
-  flags = 1;
-  return ioctl(fd, FIOBIO, &flags);
-#endif
-}
+  list<timeout> timedout;
 
+  synchronized(timeouts) {
+    ev_tstamp current_tstamp;
 
-struct node { uint32_t ip; uint16_t port; };
+    if (clk != NULL) {
+      current_tstamp = clk->getCurrent();
+    } else {
+      // TODO(benh): Unclear if want ev_now(...) or ev_time().
+      current_tstamp = ev_time();
+    }
 
-bool operator < (const node& left, const node& right)
-{
-  if (left.ip == right.ip)
-    return left.port < right.port;
-  else
-    return left.ip < right.ip;
-}
+    foreachpair (ev_tstamp tstamp, const list<timeout> &timedouts, *timeouts) {
+      if (tstamp > current_tstamp)
+        break;
+
+      foreach (const timeout &timeout, timedouts) {
+        if (clk != NULL) {
+          // Update current time of process (if it's still
+          // valid). Note that current time may be greater than the
+          // timeout if a local message was received (and
+          // happens-before kicks in), hence we use max.
+          if (ProcessReference process = process_manager->use(timeout.pid)) {
+            clk->setCurrent(process, max(clk->getCurrent(process),
+                                         timeout.tstamp));
+          }
+        }
+        // TODO(benh): Ensure deterministic order for testing?
+        timedout.push_back(timeout);
+      }
+    }
 
-std::ostream& operator << (std::ostream& stream, const node& n)
-{
-  stream << n.ip << ":" << n.port;
-  return stream;
-}
+    // Now erase the range of time stamps that timed out.
+    timeouts->erase(timeouts->begin(), timeouts->upper_bound(current_tstamp));
 
+    // Okay, so the time stamp for the next timeout should not have fired.
+    assert(timeouts->empty() || (timeouts->begin()->first > current_tstamp));
 
-class LinkManager : public Singleton<LinkManager>
-{
-private:
-  /* Map from PID (local/remote) to process. */
-  map<PID, set<Process *> > links;
+    // Update the timer as necessary.
+    // TODO(benh): Make this code look like the code in handle_async.
+    if (!timeouts->empty() && clk == NULL) {
+      timeouts_watcher.repeat = timeouts->begin()->first - current_tstamp;
+      assert(timeouts_watcher.repeat > 0);
+      ev_timer_again(loop, &timeouts_watcher);
+    } else {
+      timeouts_watcher.repeat = 0;
+      ev_timer_again(loop, &timeouts_watcher);
+    }
 
-  /* Map from socket to node (ip, port). */
-  map<int, node> sockets;
+    update_timer = false;
+  }
 
-  /* Maps from node (ip, port) to socket. */
-  map<node, int> temps;
-  map<node, int> persists;
+  foreach (const timeout &timeout, timedout)
+    process_manager->timedout(timeout.pid, timeout.generation);
+}
 
-  /* Map from socket to outgoing messages. */
-  map<int, queue<struct msg *> > outgoing;
 
-  pthread_mutex_t mutex;
+void handle_await(struct ev_loop *loop, ev_io *w, int revents)
+{
+  tuple<PID, int> *t = reinterpret_cast<tuple<PID, int> *>(w->data);
+  process_manager->awaited(t->get<0>(), t->get<1>());
+  ev_io_stop(loop, w);
+  free(w);
+  delete t;
+}
 
-  friend class Singleton<LinkManager>;
 
-  LinkManager()
-  {
-    pthread_mutexattr_t attr;
-    pthread_mutexattr_init(&attr);
-    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-    pthread_mutex_init(&mutex, &attr);
-    pthread_mutexattr_destroy(&attr);
-  }
+/* Socket reading .... */
+void read_data(struct ev_loop *loop, ev_io *w, int revents);
+void read_msg(struct ev_loop *loop, ev_io *w, int revents);
 
-public:
-  /*
-   * TODO(benh): The semantics we want to support for link are such
-   * that if there is nobody to link to (local or remote) then a
-   * PROCESS_EXIT message gets generated. This functionality has only
-   * been implemented when the link is local, not remote. Of course,
-   * if there is nobody listening on the remote side, then this should
-   * work remotely ... but if there is someone listening remotely just
-   * not at that pipe value, then it will silently continue executing.
-  */
-  void link(Process *process, const PID &to)
-  {
-    //cout << "calling link" << endl;
+struct read_ctx {
+  int len;
+  struct msg *msg;
+};
 
-    assert(process != NULL);
 
-    node n = { to.ip, to.port };
+void read_data(struct ev_loop *loop, ev_io *w, int revents)
+{
+  int c = w->fd;
 
-    pthread_mutex_lock(&mutex);
-    {
-      // Check if node is remote and there isn't a persistant link.
-      if ((n.ip != ip || n.port != port) &&
-	  persists.find(n) == persists.end()) {
-	int s;
-
-	/* Create socket for communicating with remote process. */
-	if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
-	  fatalerror("failed to link (socket)");
-    
-	/* Use non-blocking sockets. */
-	if (set_nbio(s) < 0)
-	  fatalerror("failed to link (set_nbio)");
+  struct read_ctx *ctx = (struct read_ctx *) w->data;
 
-	//cout << "created linked socket " << s << endl;
+  /* Read the data starting from the last read. */
+  int len = recv(c,
+		 (char *) ctx->msg + sizeof(struct msg) + ctx->len,
+		 ctx->msg->len - ctx->len,
+		 0);
 
-	/* Record socket. */
-	sockets[s] = n;
+  if (len > 0) {
+    ctx->len += len;
+  } else if (len < 0 && errno == EWOULDBLOCK) {
+    return;
+  } else if (len == 0 || (len < 0 &&
+			  (errno == ECONNRESET ||
+			   errno == EBADF ||
+			   errno == EHOSTUNREACH))) {
+    /* Socket has closed. */
+    link_manager->closed(c);
 
-	/* Record node. */
-	persists[n] = s;
+    /* Stop receiving ... */
+    ev_io_stop (loop, w);
+    close(c);
+    free(ctx->msg);
+    free(ctx);
+    free(w);
+    return;
+  } else {
+    fatalerror("unhandled socket error: please report (read_data)");
+  }
 
-	/* Allocate the watcher. */
-	ev_io *io_watcher = (ev_io *) malloc(sizeof(ev_io));
+  if (ctx->len == ctx->msg->len) {
+    /* Deliver message. */
+    process_manager->deliver(ctx->msg);
 
-	struct sockaddr_in addr;
-      
-	memset(&addr, 0, sizeof(addr));
-      
-	addr.sin_family = PF_INET;
-	addr.sin_port = htons(to.port);
-	addr.sin_addr.s_addr = to.ip;
-
-	if (connect(s, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
-	  if (errno != EINPROGRESS)
-	    fatalerror("failed to link (connect)");
+    /* Reinitialize read context. */
+    ctx->len = 0;
+    ctx->msg = (struct msg *) malloc(sizeof(struct msg));
 
-	  /* Initialize watcher for connecting. */
-	  ev_io_init(io_watcher, link_connect, s, EV_WRITE);
-	} else {
-	  /* Initialize watcher for reading. */
-	  io_watcher->data = malloc(sizeof(struct read_ctx));
+    /* Continue receiving ... */
+    ev_io_stop (loop, w);
+    ev_io_init (w, read_msg, c, EV_READ);
+    ev_io_start (loop, w);
+  }
+}
 
-	  /* Initialize read context. */
-	  struct read_ctx *ctx = (struct read_ctx *) io_watcher->data;
 
-	  ctx->len = 0;
-	  ctx->msg = (struct msg *) malloc(sizeof(struct msg));
+void read_msg(struct ev_loop *loop, ev_io *w, int revents)
+{
+  int c = w->fd;
 
-	  ev_io_init(io_watcher, read_msg, s, EV_READ);
-	}
+  struct read_ctx *ctx = (struct read_ctx *) w->data;
 
-	/* Enqueue the watcher. */
-	acquire(io_watchersq);
-	{
-	  io_watchersq->push(io_watcher);
-	}
-	release(io_watchersq);
+  /* Read the message starting from the last read. */
+  int len = recv(c,
+		 (char *) ctx->msg + ctx->len,
+		 sizeof (struct msg) - ctx->len,
+		 0);
 
-	/* Interrupt the loop. */
-	ev_async_send(loop, &async_watcher);
-      }
+  if (len > 0) {
+    ctx->len += len;
+  } else if (len < 0 && errno == EWOULDBLOCK) {
+    return;
+  } else if (len == 0 || (len < 0 &&
+			  (errno == ECONNRESET ||
+			   errno == EBADF ||
+			   errno == EHOSTUNREACH))) {
+    /* Socket has closed. */
+    link_manager->closed(c);
 
-      links[to].insert(process);
-    }
-    pthread_mutex_unlock(&mutex);
+    /* Stop receiving ... */
+    ev_io_stop (loop, w);
+    close(c);
+    free(ctx->msg);
+    free(ctx);
+    free(w);
+    return;
+  } else {
+    fatalerror("unhandled socket error: please report (read_msg)");
   }
 
-  void send(struct msg *msg)
-  {
-    assert(msg != NULL);
-
-    //cout << "(1) sending msg to " << msg->to << endl;
-
-    node n = { msg->to.ip, msg->to.port };
-
-    pthread_mutex_lock(&mutex);
-    {
-      // Check if there is already a link.
-      map<node, int>::iterator it;
-      if ((it = persists.find(n)) != persists.end() ||
-	  (it = temps.find(n)) != temps.end()) {
-	int s = it->second;
-	//cout << "(2) found a socket " << s << endl;
-	if (outgoing.find(s) == outgoing.end()) {
-	  assert(persists.find(n) != persists.end());
-	  assert(temps.find(n) == temps.end());
-	  //cout << "(3) reusing (sleeping persistant) socket " << s << endl;
-
-	  /* Initialize the outgoing queue. */
-	  outgoing[s];
-
-	  /* Allocate/Initialize the watcher. */
-	  ev_io *io_watcher = (ev_io *) malloc (sizeof (ev_io));
-
-	  io_watcher->data = malloc(sizeof(struct write_ctx));
-
-	  /* Initialize the write context. */
-	  struct write_ctx *ctx = (struct write_ctx *) io_watcher->data;
-
-	  ctx->len = 0;
-	  ctx->msg = msg;
-	  ctx->close = false;
-
-	  ev_io_init(io_watcher, write_msg, s, EV_WRITE);
-
-	  /* Enqueue the watcher. */
-	  acquire(io_watchersq);
-	  {
-	    io_watchersq->push(io_watcher);
-	  }
-	  release(io_watchersq);
-    
-	  /* Interrupt the loop. */
-	  ev_async_send(loop, &async_watcher);
-	} else {
-	  //cout << "(3) reusing socket " << s << endl;
-	  outgoing[s].push(msg);
-	}
-      } else {
-	int s;
+  if (ctx->len == sizeof(struct msg)) {
+    /* Check and see if we need to receive data. */
+    if (ctx->msg->len > 0) {
+      /* Allocate enough space for data. */
+      ctx->msg = (struct msg *)
+	realloc (ctx->msg, sizeof(struct msg) + ctx->msg->len);
 
-	/* Create socket for communicating with remote process. */
-	if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
-	  fatalerror("failed to send (socket)");
-    
-	/* Use non-blocking sockets. */
-	if (set_nbio(s) < 0)
-	  fatalerror("failed to send (set_nbio)");
+      /* TODO(benh): Optimize ... try doing a read first! */
+      ctx->len = 0;
 
-	//cout << "(2) created temporary socket " << s << endl;
+      /* Start receiving data ... */
+      ev_io_stop (loop, w);
+      ev_io_init (w, read_data, c, EV_READ);
+      ev_io_start (loop, w);
+    } else {
+      /* Deliver message. */
+      process_manager->deliver(ctx->msg);
 
-	/* Record socket. */
-	sockets[s] = n;
+      /* Reinitialize read context. */
+      ctx->len = 0;
+      ctx->msg = (struct msg *) malloc(sizeof(struct msg));
 
-	/* Record node. */
-	temps[n] = s;
+      /* Continue receiving ... */
+      ev_io_stop (loop, w);
+      ev_io_init (w, read_msg, c, EV_READ);
+      ev_io_start (loop, w);
+    }
+  }
+}
 
-	/* Initialize the outgoing queue. */
-	outgoing[s];
 
-	/* Allocate/Initialize the watcher. */
-	ev_io *io_watcher = (ev_io *) malloc (sizeof (ev_io));
+/* Socket writing .... */
+void write_data(struct ev_loop *loop, ev_io *w, int revents);
+void write_msg(struct ev_loop *loop, ev_io *w, int revents);
 
-	io_watcher->data = malloc(sizeof(struct write_ctx));
+struct write_ctx {
+  int len;
+  struct msg *msg;
+  bool close;
+};
 
-	/* Initialize the write context. */
-	struct write_ctx *ctx = (struct write_ctx *) io_watcher->data;
 
-	ctx->len = 0;
-	ctx->msg = msg;
-	ctx->close = true;
+void write_data(struct ev_loop *loop, ev_io *w, int revents)
+{
+  int c = w->fd;
 
-	struct sockaddr_in addr;
-      
-	memset(&addr, 0, sizeof(addr));
-      
-	addr.sin_family = PF_INET;
-	addr.sin_port = htons(msg->to.port);
-	addr.sin_addr.s_addr = msg->to.ip;
-    
-	if (connect(s, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
-	  if (errno != EINPROGRESS)
-	    fatalerror("failed to send (connect)");
+  struct write_ctx *ctx = (struct write_ctx *) w->data;
 
-	  /* Initialize watcher for connecting. */
-	  ev_io_init(io_watcher, write_connect, s, EV_WRITE);
-	} else {
-	  /* Initialize watcher for writing. */
-	  ev_io_init(io_watcher, write_msg, s, EV_WRITE);
-	}
+  int len = send(c,
+		 (char *) ctx->msg + sizeof(struct msg) + ctx->len,
+		 ctx->msg->len - ctx->len,
+		 MSG_NOSIGNAL);
 
-	/* Enqueue the watcher. */
-	acquire(io_watchersq);
-	{
-	  io_watchersq->push(io_watcher);
-	}
-	release(io_watchersq);
+  if (len > 0) {
+    ctx->len += len;
+  } else if (len < 0 && errno == EWOULDBLOCK) {
+    return;
+  } else if (len == 0 || (len < 0 &&
+			  (errno == ECONNRESET ||
+			   errno == EBADF ||
+			   errno == EHOSTUNREACH ||
+			   errno == EPIPE))) {
+    /* Socket has closed. */
+    link_manager->closed(c);
 
-	/* Interrupt the loop. */
-	ev_async_send(loop, &async_watcher);
-      }
-    }
-    pthread_mutex_unlock(&mutex);
+    /* Stop receiving ... */
+    ev_io_stop (loop, w);
+    close(c);
+    free(ctx->msg);
+    free(ctx);
+    free(w);
+    return;
+  } else {
+    fatalerror("unhandled socket error: please report (write_data)");
   }
 
-  struct msg * next(int s)
-  {
-    struct msg *msg = NULL;
-    pthread_mutex_lock(&mutex);
-    {
-      assert(outgoing.find(s) != outgoing.end());
-      if (!outgoing[s].empty()) {
-	msg = outgoing[s].front();
-	outgoing[s].pop();
-      }
-    }
-    pthread_mutex_unlock(&mutex);
-    return msg;
-  }
-
-  struct msg * next_or_close(int s)
-  {
-    //cout << "next_or_close socket " << s << endl;
-    struct msg *msg;
-    pthread_mutex_lock(&mutex);
-    {
-      if ((msg = next(s)) == NULL) {
-	assert(outgoing[s].empty());
-	outgoing.erase(s);
-	assert(temps.find(sockets[s]) != temps.end());
-	temps.erase(sockets[s]);
-	sockets.erase(s);
-	::close(s);
-      }
-    }
-    pthread_mutex_unlock(&mutex);
-    return msg;
-  }
-
-  struct msg * next_or_sleep(int s)
-  {
-    //cout << "next_or_sleep socket " << s << endl;
-    struct msg *msg;
-    pthread_mutex_lock(&mutex);
-    {
-      if ((msg = next(s)) == NULL) {
-	assert(outgoing[s].empty());
-	outgoing.erase(s);
-	assert(persists.find(sockets[s]) != persists.end());
-      }
-    }
-    pthread_mutex_unlock(&mutex);
-    return msg;
-  }
-
-  void closed(int s)
-  {
-    //cout << "closed socket " << s << endl;
-    pthread_mutex_lock(&mutex);
-    {
-      map<int, node>::iterator it = sockets.find(s);
-      if (it != sockets.end()) {
-	exited(it->second);
-	persists.erase(sockets[s]);
-	temps.erase(sockets[s]);
-	sockets.erase(s);
-	outgoing.erase(s);
-	::close(s);
-      }
-    }
-    pthread_mutex_unlock(&mutex);
-  }
-
-  /*
-   * TODO(benh): It would be cleaner if these exited routines could
-   * call back into ProcessManager ... then we wouldn't have to
-   * convince ourselves that the accesses to each Process object will
-   * always be valid.
-   */
+  if (ctx->len == ctx->msg->len) {
+    ev_io_stop (loop, w);
+    free(ctx->msg);
 
-  void exited(const node &n)
-  {
-    pthread_mutex_lock(&mutex);
-    {
-      list<PID> removed;
-      /* Look up all linked processes. */
-      foreachpair (const PID &pid, set<Process *> &processes, links) {
-	if (pid.ip == n.ip && pid.port == n.port) {
-	  /* N.B. If we call exited(pid) we might invalidate iteration. */
-	  /* Deliver PROCESS_EXIT messages (if we aren't replaying). */
-	  if (!replaying) {
-	    foreach (Process *process, processes) {
-	      struct msg *msg = (struct msg *) malloc(sizeof(struct msg));
-	      msg->from.pipe = pid.pipe;
-	      msg->from.ip = pid.ip;
-	      msg->from.port = pid.port;
-	      msg->to.pipe = process->pid.pipe;
-	      msg->to.ip = process->pid.ip;
-	      msg->to.port = process->pid.port;
-	      msg->id = PROCESS_EXIT;
-	      msg->len = 0;
-	      process->enqueue(msg);
-	    }
-	  }
-	  removed.push_back(pid);
-	}
-      }
-      foreach (const PID &pid, removed)
-	links.erase(pid);
-    }
-    pthread_mutex_unlock(&mutex);
-  }
+    if (ctx->close)
+      ctx->msg = link_manager->next_or_close(c);
+    else
+      ctx->msg = link_manager->next_or_sleep(c);
 
-  void exited(Process *process)
-  {
-    pthread_mutex_lock(&mutex);
-    {
-      /* Remove any links this process might have had. */
-      foreachpair (_, set<Process *> &processes, links)
-	processes.erase(process);
-
-      const PID &pid = process->getPID();
-
-      /* Look up all linked processes. */
-      map<PID, set<Process *> >::iterator it = links.find(pid);
-
-      if (it != links.end()) {
-	set<Process *> &processes = it->second;
-	/* Deliver PROCESS_EXIT messages (if we aren't replaying). */
-	if (!replaying) {
-	  foreach (Process *p, processes) {
-	    assert(process != p);
-	    struct msg *msg = (struct msg *) malloc(sizeof(struct msg));
-	    msg->from.pipe = pid.pipe;
-	    msg->from.ip = pid.ip;
-	    msg->from.port = pid.port;
-	    msg->to.pipe = p->pid.pipe;
-	    msg->to.ip = p->pid.ip;
-	    msg->to.port = p->pid.port;
-	    msg->id = PROCESS_EXIT;
-	    msg->len = 0;
-            // TODO(benh): Preserve happens-before when using clock.
-	    p->enqueue(msg);
-	  }
-	}
-	links.erase(pid);
-      }
+    if (ctx->msg != NULL) {
+      ctx->len = 0;
+      ev_io_init(w, write_msg, c, EV_WRITE);
+      ev_io_start(loop, w);
+    } else {
+      free(ctx);
+      free(w);
     }
-    pthread_mutex_unlock(&mutex);
   }
-};
-
-/* Singleton LinkManager instance. */
-template<> LinkManager * Singleton<LinkManager>::singleton = NULL;
-template<> bool Singleton<LinkManager>::instantiated = false;
+}
 
 
-class ProcessManager : public Singleton<ProcessManager>
+void write_msg(struct ev_loop *loop, ev_io *w, int revents)
 {
-private:
-  /* Map of all local spawned and running processes. */
-  map<uint32_t, Process *> processes;
-
-  /* Map of all waiting processes. */
-  map<Process *, set<Process *> > waiters;
-
-  /* Map of gates for waiting threads. */
-  map<Process *, Gate *> gates;
-
-  /* Processes lock/mutex. */
-#ifdef USE_LITHE
-  int processes_lock;
-#else
-  pthread_mutex_t processes_mutex;
-#endif /* USE_LITHE */
+  int c = w->fd;
 
-  /* Queue of runnable processes (implemented as deque). */
-  deque<Process *> runq;
+  struct write_ctx *ctx = (struct write_ctx *) w->data;
 
-  /* Run queue lock/mutex. */
-#ifdef USE_LITHE
-  int runq_lock;
-#else
-  pthread_mutex_t runq_mutex;
-#endif /* USE_LITHE */
+  int len = send(c,
+		 (char *) ctx->msg + ctx->len,
+		 sizeof (struct msg) - ctx->len,
+		 MSG_NOSIGNAL);
 
-  friend class Singleton<ProcessManager>;
+  if (len > 0) {
+    ctx->len += len;
+  } else if (len < 0 && errno == EWOULDBLOCK) {
+    return;
+  } else if (len == 0 || (len < 0 &&
+			  (errno == ECONNRESET ||
+			   errno == EBADF ||
+			   errno == EHOSTUNREACH ||
+			   errno == EPIPE))) {
+    /* Socket has closed. */
+    link_manager->closed(c);
 
-  ProcessManager()
-  {
-#ifdef USE_LITHE
-    processes_lock = UNLOCKED;
-    runq_lock = UNLOCKED;
-#else
-    pthread_mutex_init(&processes_mutex, NULL);
-    pthread_mutex_init(&runq_mutex, NULL);
-#endif /* USE_LITHE */
+    /* Stop receiving ... */
+    ev_io_stop (loop, w);
+    close(c);
+    free(ctx->msg);
+    free(ctx);
+    free(w);
+    return;
+  } else {
+    fatalerror("unhandled socket error: please report (write_msg)");
   }
 
-public:
-  Process * lookup(const PID &pid)
-  {
-    if (!(pid.ip == ip && pid.port == port))
-      return NULL;
-
-    Process *process = NULL;
+  if (ctx->len == sizeof(struct msg)) {
+    /* Check and see if we need to write data. */
+    if (ctx->msg->len > 0) {
+      
+      /* TODO(benh): Optimize ... try doing a write first! */
+      ctx->len = 0;
 
-    acquire(processes);
-    {
-      map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
-      if (it != processes.end()) {
-	process = it->second;
-      }
-    }
-    release(processes);
+      /* Start writing data ... */
+      ev_io_stop(loop, w);
+      ev_io_init(w, write_data, c, EV_WRITE);
+      ev_io_start(loop, w);
+    } else {
+      ev_io_stop(loop, w);
+      free(ctx->msg);
 
-    return process;
-  }
+      if (ctx->close)
+	ctx->msg = link_manager->next_or_close(c);
+      else
+	ctx->msg = link_manager->next_or_sleep(c);
 
-  void record(struct msg *msg)
-  {
-    assert(recording && !replaying);
-    acquire(processes);
-    {
-      record_msgs.write((char *) msg, sizeof(struct msg) + msg->len);
-      if (record_msgs.fail())
-	fatalerror("failed to write to messages record");
+      if (ctx->msg != NULL) {
+	ctx->len = 0;
+	ev_io_init(w, write_msg, c, EV_WRITE);
+	ev_io_start(loop, w);
+      } else {
+	free(ctx);
+	free(w);
+      }
     }
-    release(processes);
   }
+}
 
-  void replay()
-  {
-    assert(!recording && replaying);
-    acquire(processes);
-    {
-      if (!record_msgs.eof()) {
-	struct msg *msg = (struct msg *) malloc(sizeof(struct msg));
 
-	/* Read a message worth of data. */
-	record_msgs.read((char *) msg, sizeof(struct msg));
+void write_connect(struct ev_loop *loop, ev_io *w, int revents)
+{
+  int s = w->fd;
 
-	if (record_msgs.eof()) {
-	  free(msg);
-	  release(processes);
-	  return;
-	}
+  struct write_ctx *ctx = (struct write_ctx *) w->data;
 
-	if (record_msgs.fail())
-	  fatalerror("failed to read from messages record");
+  ev_io_stop(loop, w);
 
-	/* Read the body of the message if necessary. */
-	if (msg->len != 0) {
-	  struct msg *temp = msg;
-	  msg = (struct msg *) malloc(sizeof(struct msg) + msg->len);
-	  memcpy(msg, temp, sizeof(struct msg));
-	  free(temp);
-	  record_msgs.read((char *) msg + sizeof(struct msg), msg->len);
-	  if (record_msgs.fail())
-	    fatalerror("failed to read from messages record");
-	}
+  /* Check that the connection was successful. */
+  int opt;
+  socklen_t optlen = sizeof(opt);
 
-	/* Add message to be delivered later. */
-	(*replay_msgs)[msg->to.pipe].push(msg);
-      }
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0) {
+    link_manager->closed(s);
+    free(ctx->msg);
+    free(ctx);
+    free(w);
+    return;
+  }
 
-      /* Deliver any messages to available processes. */
-      foreachpair (uint32_t pipe, Process *process, processes) {
-	queue<struct msg *> &msgs = (*replay_msgs)[pipe];
-	while (!msgs.empty()) {
-	  struct msg *msg = msgs.front();
-	  msgs.pop();
-	  process->enqueue(msg);
-	}
-      }
-    }
-    release(processes);
+  if (opt != 0) {
+    link_manager->closed(s);
+    free(ctx->msg);
+    free(ctx);
+    free(w);
+    return;
   }
 
-#ifdef USE_LITHE
-  static void do_run(lithe_task_t *task, void *arg)
-  {
-    Process *process = (Process *) task->tls;
+  /* TODO(benh): Optimize ... try doing a write first. */
 
-    assert(process->state == Process::RUNNING);
+  ev_io_init(w, write_msg, s, EV_WRITE);
+  ev_io_start(loop, w);
+}
 
-    ProcessManager::instance()->cleanup(process);
 
-    /* Go be productive doing something else ... */
-    lithe_sched_reenter();
-  }
 
+void link_connect(struct ev_loop *loop, ev_io *w, int revents)
+{
+  int s = w->fd;
 
-  void run(Process *process)
-  {
-    assert(process != NULL);
+  ev_io_stop(loop, w);
 
-    process->state = Process::RUNNING;
+  /* Check that the connection was successful. */
+  int opt;
+  socklen_t optlen = sizeof(opt);
 
-    try {
-      (*process)();
-    } catch (const std::exception &e) {
-      cerr << "libprocess: " << process->pid
-	   << " exited due to " << e.what() << endl;
-    } catch (...) {
-      cerr << "libprocess: " << process->pid
-	   << " exited due to unknown exception" << endl;
-    }
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0) {
+    link_manager->closed(s);
+    free(w);
+    return;
+  }
 
-    lithe_task_block(do_run, NULL);
+  if (opt != 0) {
+    link_manager->closed(s);
+    free(w);
+    return;
   }
-#else
-  void run(Process *process)
-  {
-    /*
-     * N.B. The process gets locked before 'schedule' runs it (it gets
-     * enqueued in 'trampoline'). So, after we can update the state
-     * and then unlock.
-    */
-    {
-      process->state = Process::RUNNING;
-    }
-    process->unlock();
 
-    try {
-      (*process)();
-    } catch (const std::exception &e) {
-      cerr << "libprocess: " << process->pid
-	   << " exited due to "
-	   << e.what() << endl;
-    } catch (...) {
-      cerr << "libprocess: " << process->pid
-	   << " exited due to unknown exception" << endl;
-    }
+  /* Reuse/Initialize the watcher. */
+  w->data = malloc(sizeof(struct read_ctx));
 
-    cleanup(process);
+  /* Initialize read context. */
+  struct read_ctx *ctx = (struct read_ctx *) w->data;
 
-    proc_process = NULL;
-    setcontext(&proc_uctx_schedule);
-  }
-#endif /* USE_LITHE */
+  ctx->len = 0;
+  ctx->msg = (struct msg *) malloc(sizeof(struct msg));
 
+  /* Initialize watcher for reading. */
+  ev_io_init(w, read_msg, s, EV_READ);
 
-#ifdef USE_LITHE
-  static void do_kill(lithe_task_t *task, void *arg)
-  {
-    Process *process = (Process *) task->tls;
+  ev_io_start(loop, w);
+}
 
-    assert(process->state == Process::RUNNING);
 
-    ProcessManager::instance()->cleanup(process);
+void do_accept(struct ev_loop *loop, ev_io *w, int revents)
+{
+  int s = w->fd;
 
-    /* Go be productive doing something else ... */
-    lithe_sched_reenter();
-  }
+  struct sockaddr_in addr;
 
+  socklen_t addrlen = sizeof(addr);
 
-  void kill(Process *process)
-  {
-    lithe_task_block(do_kill, NULL);
-  }
-#else
-  void kill(Process *process)
-  {
-    cleanup(process);
+  /* Do accept. */
+  int c = accept(s, (struct sockaddr *) &addr, &addrlen);
 
-    proc_process = NULL;
-    setcontext(&proc_uctx_schedule);
+  if (c < 0) {
+    return;
   }
-#endif /* USE_LITHE */
-
-  
-  void spawn(Process *process)
-  {
-    assert(process != NULL);
 
-    process->state = Process::INIT;
+  /* Make socket non-blocking. */
+  if (set_nbio(c) < 0) {
+    close(c);
+    return;
+  }
 
-    void *stack = NULL;
+  /* Turn off Nagle (on TCP_NODELAY) so pipelined requests don't wait. */
+  int on = 1;
+  if (setsockopt(c, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    close(c);
+    return;
+  }
 
-    acquire(processes);
-    {
-      /* Record process. */
-      processes[process->pid.pipe] = process;
+  /* Allocate the watcher. */
+  ev_io *io_watcher = (ev_io *) malloc (sizeof (ev_io));
 
-      /* Reuse a stack if possible. */
-      if (!stacks->empty()) {
-	stack = stacks->top();
-	stacks->pop();
-      }
-    }
-    release(processes);
+  io_watcher->data = malloc(sizeof(struct read_ctx));
 
-    if (stack == NULL) {
-      const int protection = (PROT_READ | PROT_WRITE);
-      const int flags = (MAP_PRIVATE | MAP_ANONYMOUS | MAP_32BIT);
+  /* Initialize the read context */
+  struct read_ctx *ctx = (struct read_ctx *) io_watcher->data;
 
-      stack = mmap(NULL, PROCESS_STACK_SIZE, protection, flags, -1, 0);
+  ctx->len = 0;
+  ctx->msg = (struct msg *) malloc(sizeof(struct msg));
 
-      if (stack == MAP_FAILED)
-	fatalerror("mmap failed (spawn)");
+  /* Initialize watcher for reading. */
+  ev_io_init(io_watcher, read_msg, c, EV_READ);
 
-      /* Disallow all memory access to the last page. */
-      if (mprotect(stack, getpagesize(), PROT_NONE) != 0)
-	fatalerror("mprotect failed (spawn)");
-    }
+  ev_io_start(loop, io_watcher);
+}
 
-#ifdef USE_LITHE
-    stack_t s;
-    s.ss_sp = stack;
-    s.ss_size = PROCESS_STACK_SIZE;
 
-    lithe_task_init(&process->task, &s);
+void * serve(void *arg)
+{
+  ev_loop(((struct ev_loop *) arg), 0);
 
-    process->task.tls = process;
+  return NULL;
+}
 
-    /* TODO(benh): Is there a better way to store the stack info? */
-    process->uctx.uc_stack.ss_sp = stack;
-    process->uctx.uc_stack.ss_size = PROCESS_STACK_SIZE;
-#else
-    /* Set up the ucontext. */
-    if (getcontext(&process->uctx) < 0)
-      fatalerror("getcontext failed (spawn)");
-    
-    process->uctx.uc_stack.ss_sp = stack;
-    process->uctx.uc_stack.ss_size = PROCESS_STACK_SIZE;
-    process->uctx.uc_link = 0;
 
-    /* Package the arguments. */
+void trampoline(int stack0, int stack1, int process0, int process1)
+{
+  /* Unpackage the arguments. */
 #ifdef __x86_64__
-    assert(sizeof(unsigned long) == sizeof(Process *));
-    int process0 = (unsigned int) (unsigned long) process;
-    int process1 = (unsigned long) process >> 32;
+  assert (sizeof(unsigned long) == sizeof(Process *));
+  void *stack = (void *)
+    (((unsigned long) stack1 << 32) + (unsigned int) stack0);
+  Process *process = (Process *)
+    (((unsigned long) process1 << 32) + (unsigned int) process0);
 #else
-    assert(sizeof(unsigned int) == sizeof(Process *));
-    int process0 = (unsigned int) process;
-    int process1 = 0;
+  assert (sizeof(unsigned int) == sizeof(Process *));
+  void *stack = (void *) (unsigned int) stack0;
+  Process *process = (Process *) (unsigned int) process0;
 #endif /* __x86_64__ */
 
-    makecontext(&process->uctx, (void (*)()) trampoline, 2, process0, process1);
-#endif /* USE_LITHE */
+  /* Run the process. */
+  process_manager->run(process);
 
-    /* Add process to the run queue. */
-    enqueue(process);
-  }
+  /* Prepare to recycle this stack (global variable hack!). */
+  assert(recyclable == NULL);
+  recyclable = stack;
 
+  proc_process = NULL;
+  setcontext(&proc_uctx_schedule);
+}
 
-  void cleanup(Process *process)
-  {
-    //cout << "cleanup for " << process->pid << endl;
-
-#ifdef USE_LITHE
-    /* TODO(benh): Assert that we are on the transition stack. */
-#endif /* USE_LITHE */
 
-    /* Inform link manager. */
-    LinkManager::instance()->exited(process);
+void * schedule(void *arg)
+{
+  // Context for the entry into the schedule routine, used when a
+  // process exits, so that other processes can get scheduled!
+  if (getcontext(&proc_uctx_schedule) < 0)
+    fatalerror("getcontext failed (schedule)");
 
-    /* Processes that were waiting on exiting process. */
-    list<Process *> resumable;
+  // Recycle the stack from an exited process.
+  if (recyclable != NULL) {
+    synchronized(stacks) {
+      stacks->push(recyclable);
+    }
+    recyclable = NULL;
+  }
 
-    /* Possible gate non-libprocess threads are waiting at. */
-    Gate *gate = NULL;
+  do {
+    if (replaying)
+      process_manager->replay();
 
-    /* Remove process. */
-    acquire(processes);
-    {
-      /* Remove from internal clock (if necessary). */
-      acquire(timers);
-      {
-        if (clk != NULL)
-          clk->discard(process);
-      }
-      release(timers);
+    Process *process = process_manager->dequeue();
 
-      process->lock();
-      {
-	/* Free any pending messages. */
-	while (!process->msgs.empty()) {
-	  struct msg *msg = process->msgs.front();
-	  process->msgs.pop_front();
-	  free(msg);
-	}
+    if (process == NULL) {
+      Gate::state_t old = gate->approach();
+      process = process_manager->dequeue();
+      if (process == NULL) {
 
-	/* Free current message. */
-	if (process->current) free(process->current);
+        // When using the manual clock, we want to let all the
+        // processes "run" up to the current time so that processes
+        // receive messages in order. If we let one process have a
+        // drastically advanced current time then it may try send
+        // messages to another process that, due to the happens-before
+        // relationship, will inherit it's drastically advanced
+        // current time. If the processing thread gets to this point
+        // (i.e., the point where no other processes are runnable)
+        // with the manual clock means that all of the processes have
+        // been run which could be run up to the current time. The
+        // only way another process could become runnable is if (1) it
+        // receives a message from another node, (2) a file descriptor
+        // it is awaiting has become ready, or (3) if it has a
+        // timeout. We can ignore processes that become runnable due
+        // to receiving a message from another node or getting an
+        // event on a file descriptor because that should not change
+        // the timing happens-before relationship of the local
+        // processes (unless of course the file descriptor was created
+        // from something like timerfd, in which case, since the
+        // programmer is not using the timing source provided in
+        // libprocess and all bets are off). Thus, we can check that
+        // there are no pending timeouts before the current time and
+        // move the current time to the next timeout value, and tell
+        // the timer to update itself.
 
-	/*
-	 * TODO(benh): Can't recycle stacks unless we get off the
-	 * stack by the time someone actually wants to use the stack.
-	 *
-	 * stacks->push(process->uctx.uc_stack.ss_sp);
-	 */
-
-	processes.erase(process->pid.pipe);
-
-	/* TODO(benh): Confirm process not in timers. */
-
-	/* Confirm process not in runq. */
-	assert(find(runq.begin(), runq.end(), process) == runq.end());
-
-	/* Confirm that the process is not in any waiting queue. */
-	foreachpair (_, set<Process *> &waiting, waiters)
-	  assert(waiting.find(process) == waiting.end());
-
-	/* Grab all the waiting processes that are now resumable. */
-	foreach (Process *waiter, waiters[process])
-	  resumable.push_back(waiter);
-
-	waiters.erase(process);
-
-	/* Lookup gate to wake up waiting non-libprocess threads. */
-	map<Process *, Gate *>::iterator it = gates.find(process);
-	if (it != gates.end()) {
-	  gate = it->second;
-	  /* N.B. The last thread that leaves the gate also free's it. */
-	  gates.erase(it);
-	}
+        synchronized(timeouts) {
+          if (clk != NULL) {
+            if (!timeouts->empty()) {
+              // Adjust the current time to the next timeout, provided
+              // it is not past the elapsed time.
+              ev_tstamp tstamp = timeouts->begin()->first;
+              if (tstamp <= clk->getElapsed())
+                clk->setCurrent(tstamp);
+              
+              update_timer = true;
+              ev_async_send(loop, &async_watcher);
+            } else {
+              // Woah! This comment is the only thing in this else
+              // branch because this is a pretty serious state ... the
+              // only way to make progress is for another node to send
+              // a message or for an event to occur on a file
+              // descriptor that a process is awaiting. We may want to
+              // consider doing (or printing) something here.
+            }
+          }
+        }
 
-	process->state = Process::EXITED;
+	/* Wait at gate if idle. */
+	gate->arrive(old);
+	continue;
+      } else {
+	gate->leave();
       }
-      process->unlock();
     }
-    release(processes);
 
-    /*
-     * N.B. After opening the gate we can no longer dereference
-     * 'process' since it might already be cleaned up by user code (a
-     * waiter might have cleaned up the stack where the process was
-     * allocated).
-     */
-    if (gate != NULL)
-      gate->open();
+    process->lock();
+    {
+      assert(process->state == Process::INIT ||
+	     process->state == Process::READY ||
+	     process->state == Process::INTERRUPTED ||
+	     process->state == Process::TIMEDOUT);
 
-    foreach (Process *p, resumable) {
-      p->lock();
-      {
-	// Process 'p' might be RUNNING because it is racing to become
-	// WAITING while we are actually trying to get it to become
-	// running again..
-	assert(p->state == Process::RUNNING || p->state == Process::WAITING);
-	if (p->state == Process::RUNNING) {
-	  p->state = Process::INTERRUPTED;
-	} else {
-	  p->state = Process::READY;
-	  enqueue(p);
-	}
+      /* Continue process. */
+      assert(proc_process == NULL);
+      proc_process = process;
+      swapcontext(&proc_uctx_running, &process->uctx);
+      while (legacy) {
+	(*legacy_thunk)();
+	swapcontext(&proc_uctx_running, &process->uctx);
       }
-      p->unlock();
+      assert(proc_process != NULL);
+      proc_process = NULL;
     }
-  }
-
+    process->unlock();
+  } while (true);
+}
 
-  void link(Process *process, const PID &to)
-  {
-    /* Check if link is local. */
-    if (to.ip == ip && to.port == port) {
-      /* Make sure local process is still valid! */
-      bool valid = false;
 
-      acquire(processes);
-      {
-	if (processes.find(to.pipe) != processes.end())
-	  valid = true;
-      }
-      release(processes);
+/*
+ * We might need/want to catch terminating signals to close our log
+ * ... or the underlying filesystem and operating system might be
+ * robust enough to flush our last writes and close the file cleanly,
+ * or we might need to force flushes at appropriate times. However,
+ * for now, adding signal handlers freely is not allowed because they
+ * will clash with Java and Python virtual machines and causes hard to
+ * debug crashes/segfaults. This can be revisited when recording gets
+ * turned on by default.
+ */
 
-      if (!valid) {
-	struct msg *msg = (struct msg *) malloc(sizeof(struct msg));
-	msg->from.pipe = to.pipe;
-	msg->from.ip = to.ip;
-	msg->from.port = to.port;
-	msg->to.pipe = process->pid.pipe;
-	msg->to.ip = process->pid.ip;
-	msg->to.port = process->pid.port;
-	msg->id = PROCESS_EXIT;
-	msg->len = 0;
-	process->enqueue(msg);
-	return;
-      }
 
-      /* TODO(benh): Process object for 'to' could become invalid here! */
+// void sigbad(int signal, struct sigcontext *ctx)
+// {
+//   if (recording) {
+//     assert(!replaying);
+//     record_msgs.close();
+//     record_pipes.close();
+//   }
 
-      LinkManager::instance()->link(process, to);
+//   /* Pass on the signal (so that a core file is produced).  */
+//   struct sigaction sa;
+//   sa.sa_handler = SIG_DFL;
+//   sigemptyset(&sa.sa_mask);
+//   sa.sa_flags = 0;
+//   sigaction(signal, &sa, NULL);
+//   raise(signal);
+// }
 
-      /* Make sure local process is still valid! */
-      valid = false;
 
-      acquire(processes);
-      {
-	if (processes.find(to.pipe) != processes.end())
-	  valid = true;
-      }
-      release(processes);
+void initialize()
+{
+  static volatile bool initialized = false;
+  static volatile bool initializing = true;
 
-      /* TODO(benh): Better solution or send possible duplicate PROCESS_EXIT? */
-      assert(valid);
-    } else {
-      LinkManager::instance()->link(process, to);
+  // Try and do the initialization or wait for it to complete.
+  if (initialized && !initializing) {
+    return;
+  } else if (initialized && initializing) {
+    while (initializing);
+    return;
+  } else {
+    if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
+      while (initializing);
+      return;
     }
   }
 
+//   /* Install signal handler. */
+//   struct sigaction sa;
 
-#ifdef USE_LITHE
-  static void do_receive(lithe_task_t *task, void *arg)
-  {
-    timeout_t *timeout = (timeout_t *) arg;
+//   sa.sa_handler = (void (*) (int)) sigbad;
+//   sigemptyset (&sa.sa_mask);
+//   sa.sa_flags = SA_RESTART;
 
-    Process *process = (Process *) task->tls;
+//   sigaction (SIGTERM, &sa, NULL);
+//   sigaction (SIGINT, &sa, NULL);
+//   sigaction (SIGQUIT, &sa, NULL);
+//   sigaction (SIGSEGV, &sa, NULL);
+//   sigaction (SIGILL, &sa, NULL);
+// #ifdef SIGBUS
+//   sigaction (SIGBUS, &sa, NULL);
+// #endif
+// #ifdef SIGSTKFLT
+//   sigaction (SIGSTKFLT, &sa, NULL);
+// #endif
+//   sigaction (SIGABRT, &sa, NULL);
 
-    process->lock();
-    {
-      /* Start timeout if necessary. */
-      if (timeout != NULL)
-	ProcessManager::instance()->start_timeout(*timeout);
+//   sigaction (SIGFPE, &sa, NULL);
 
-      /* Context switch. */
-      process->state = Process::RECEIVING;
+#ifdef __sun__
+  /* Need to ignore this since we can't do MSG_NOSIGNAL on Solaris. */
+  signal(SIGPIPE, SIG_IGN);
+#endif /* __sun__ */
 
-      /* Ensure nothing enqueued since check in Process::receive. */
-      if (!process->msgs.empty()) {
-	process->state = Process::READY;
-	ProcessManager::instance()->enqueue(process);
-      }
-    }
-    process->unlock();
+  /* Create a new ProcessManager and LinkManager. */
+  process_manager = new ProcessManager();
+  link_manager = new LinkManager();
 
-    /* N.B. We could resume the task if a message has arrived. *
+  /* Setup processing thread. */
+  if (pthread_create (&proc_thread, NULL, schedule, NULL) != 0)
+    fatalerror("failed to initialize (pthread_create)");
 
-    /* Go be productive doing something else ... */
-    lithe_sched_reenter();
-  }
+  char *value;
 
+  /* Check environment for ip. */
+  value = getenv("LIBPROCESS_IP");
+  ip = value != NULL ? atoi(value) : 0;
 
-  void receive(Process *process, double secs)
-  {
-    assert(process != NULL);
-    if (secs > 0) {
-      timeout_t timeout = create_timeout(process, secs);
-      assert(sizeof(timeout_t *) == sizeof(void *));
-      lithe_task_block(do_receive, &timeout);
-      process->lock();
-      {
-	assert(process->state == Process::READY ||
-	       process->state == Process::TIMEDOUT);
+  /* Check environment for port. */
+  value = getenv("LIBPROCESS_PORT");
+  port = value != NULL ? atoi(value) : 0;
 
-	/*
-	 * Attempt to cancel the timeout if necessary.
-	 * N.B. Failed cancel means unnecessary timeouts (hence generation).
-	 */
-	if (process->state != Process::TIMEDOUT)
-	  cancel_timeout(timeout);
+  /* Check environment for replay. */
+  value = getenv("LIBPROCESS_REPLAY");
+  replaying = value != NULL;
 
-	/* Update the generation (handles racing timeouts). */
-	process->generation++;
+  /* Setup for recording or replaying. */
+  if (recording && !replaying) {
+    /* Setup record. */
+    time_t t;
+    time(&t);
+    std::string record(".record-");
+    std::string date(ctime(&t));
 
-	process->state = Process::RUNNING;
-      }
-      process->unlock();
-    } else {
-      lithe_task_block(do_receive, NULL);
-      process->lock();
-      {
-	assert(process->state == Process::READY);
-	process->state = Process::RUNNING;
-      }
-      process->unlock();
+    replace(date.begin(), date.end(), ' ', '_');
+    replace(date.begin(), date.end(), '\n', '\0');
+
+    /* TODO(benh): Create file if it doesn't exist. */
+    record_msgs.open((record + "msgs-" + date).c_str(),
+		     std::ios::out | std::ios::binary | std::ios::app);
+    record_pipes.open((record + "pipes-" + date).c_str(),
+		      std::ios::out | std::ios::app);
+    if (!record_msgs.is_open() || !record_pipes.is_open())
+      fatal("could not open record(s) for recording");
+  } else if (replaying) {
+    assert(!recording);
+    /* Open record(s) for replay. */
+    record_msgs.open((std::string(".record-msgs-") += value).c_str(),
+		     std::ios::in | std::ios::binary);
+    record_pipes.open((std::string(".record-pipes-") += value).c_str(),
+		      std::ios::in);
+    if (!record_msgs.is_open() || !record_pipes.is_open())
+      fatal("could not open record(s) with prefix %s for replay", value);
+
+    /* Read in all pipes from record. */
+    while (!record_pipes.eof()) {
+      uint32_t parent, child;
+      record_pipes >> parent >> child;
+      if (record_pipes.fail())
+	fatal("could not read from record");
+      (*replay_pipes)[parent].push_back(child);
     }
+
+    record_pipes.close();
   }
-#else
-  void receive(Process *process, double secs)
-  {
-    //cout << "ProcessManager::receive" << endl;
-    assert(process != NULL);
-    process->lock();
-    {
-      /* Ensure nothing enqueued since check in Process::receive. */
-      if (process->msgs.empty()) {
-	if (secs > 0) {
-	  /* Create timeout. */
-	  const timeout_t &timeout = create_timeout(process, secs);
 
-	  /* Start the timeout. */
-	  start_timeout(timeout);
+  /* TODO(benh): Check during replay that the same ip and port is used. */
 
-	  /* Context switch. */
-	  process->state = Process::RECEIVING;
-	  swapcontext(&process->uctx, &proc_uctx_running);
+  // Lookup hostname if missing ip (avoids getting 127.0.0.1). Note
+  // that we need only one ip address, so that other processes can
+  // send and receive and don't get confused as to whom they are
+  // sending to.
+  if (ip == 0) {
+    char hostname[512];
 
-	  assert(process->state == Process::READY ||
-		 process->state == Process::TIMEDOUT);
+    if (gethostname(hostname, sizeof(hostname)) < 0)
+      fatalerror("failed to initialize (gethostname)");
 
-	  /* Attempt to cancel the timer if necessary. */
-	  if (process->state != Process::TIMEDOUT)
-	    cancel_timeout(timeout);
+    /* Lookup IP address of local hostname. */
+    struct hostent *he;
 
-	  /* N.B. No cancel means possible unnecessary timeouts. */
+    if ((he = gethostbyname2(hostname, AF_INET)) == NULL)
+      fatalerror("failed to initialize (gethostbyname2)");
 
-	  process->state = Process::RUNNING;
-      
-	  /* Update the generation (handles racing timeouts). */
-	  process->generation++;
-	} else {
-	  /* Context switch. */
-	  process->state = Process::RECEIVING;
-	  swapcontext(&process->uctx, &proc_uctx_running);
-	  assert(process->state == Process::READY);
-	  process->state = Process::RUNNING;
-	}
-      }
-    }
-    process->unlock();
+    ip = *((uint32_t *) he->h_addr_list[0]);
   }
-#endif /* USE_LITHE */
-
 
-#ifdef USE_LITHE
-  static void do_pause(lithe_task_t *task, void *arg)
-  {
-    timeout_t *timeout = (timeout_t *) arg;
+  /* Create a "server" socket for communicating with other nodes. */
+  if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0)
+    fatalerror("failed to initialize (socket)");
 
-    Process *process = (Process *) task->tls;
+  /* Make socket non-blocking. */
+  if (set_nbio(s) < 0)
+    fatalerror("failed to initialize (set_nbio)");
 
-    process->lock();
-    {
-      /* Start timeout. */
-      ProcessManager::instance()->start_timeout(*timeout);
-
-      /* Context switch. */
-      process->state = Process::PAUSED;
-    }
-    process->unlock();
-
-    /* Go be productive doing something else ... */
-    lithe_sched_reenter();
-  }
-
-
-  void pause(Process *process, double secs)
-  {
-    assert(process != NULL);
-
-    if (secs > 0) {
-      timeout_t timeout = create_timeout(process, secs);
-      assert(sizeof(timeout_t *) == sizeof(void *));
-      lithe_task_block(do_pause, &timeout);
-      assert(process->state == Process::TIMEDOUT);
-      process->state = Process::RUNNING;
-    }
-  }
-#else
-  void pause(Process *process, double secs)
-  {
-    assert(process != NULL);
-
-    process->lock();
-    {
-      if (secs > 0) {
-	/* Create/Start the timeout. */
-	start_timeout(create_timeout(process, secs));
-
-	/* Context switch. */
-	process->state = Process::PAUSED;
-	swapcontext(&process->uctx, &proc_uctx_running);
-	assert(process->state == Process::TIMEDOUT);
-	process->state = Process::RUNNING;
-      } else {
-	/* Modified context switch (basically a yield). */
-	process->state = Process::READY;
-	enqueue(process);
-	swapcontext(&process->uctx, &proc_uctx_running);
-	assert(process->state == Process::READY);
-	process->state = Process::RUNNING;
-      }
-    }
-    process->unlock();
-  }
-#endif /* USE_LITHE */
-
-
-#ifdef USE_LITHE
-  static void do_wait(lithe_task_t *task, void *arg)
-  {
-    Process *process = (Process *) task->tls;
-
-    bool resume = false;
-
-    process->lock();
-    {
-      if (process->state == Process::RUNNING) {
-	/* Context switch. */
-	process->state = Process::WAITING;
-      } else {
-	assert(process->state == Process::INTERRUPTED);
-	process->state = Process::READY;
-	resume = true;
-      }
-    }
-    process->unlock();
-
-    if (resume)
-      lithe_task_resume(task);
-
-    /* Go be productive doing something else ... */
-    lithe_sched_reenter();
-  }
-
-
-  bool wait(PID pid)
-  {
-    /* TODO(benh): Account for a non-libprocess task/ctx. */
-    assert(false);
-
-    Process *process;
-
-    if (lithe_task_gettls((void **) &process) < 0)
-      abort();
-
-    bool waited = false;
-
-    /* Now we can add the process to the waiters. */
-    acquire(processes);
-    {
-      map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
-      if (it != processes.end()) {
-	assert(it->second->state != Process::EXITED);
-	waiters[it->second].insert(process);
-	waited = true;
-      }
-    }
-    release(processes);
-
-    /* If we waited then we should context switch. */
-    if (waited) {
-      lithe_task_block(do_wait, NULL);
-      assert(process->state == Process::READY);
-      process->state = Process::RUNNING;
-    }
-
-    return waited;
-  }
-#else
-  bool wait(PID pid)
-  {
-    if (pthread_self() != proc_thread)
-      return external_wait(pid);
-
-    Process *process = proc_process;
-
-    bool waited = false;
-
-    /* Now we can add the process to the waiters. */
-    acquire(processes);
-    {
-      map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
-      if (it != processes.end()) {
-	assert(it->second->state != Process::EXITED);
-	waiters[it->second].insert(process);
-	waited = true;
-      }
-    }
-    release(processes);
-
-    /* If we waited then we should context switch. */
-    if (waited) {
-      process->lock();
-      {
-	if (process->state == Process::RUNNING) {
-	  /* Context switch. */
-	  process->state = Process::WAITING;
-	  swapcontext(&process->uctx, &proc_uctx_running);
-	  assert(process->state == Process::READY);
-	  process->state = Process::RUNNING;
-	} else {
-	  /* Process is cleaned up and we have been removed from waiters. */
-	  assert(process->state == Process::INTERRUPTED);
-	  process->state = Process::RUNNING;
-	}
-      }
-      process->unlock();
-    }
-
-    return waited;
-  }
-#endif /* USE_LITHE */
-
-
-  bool external_wait(PID pid)
-  {
-    // We use a gate for external waiters. A gate is single use. That
-    // is, a new gate is created when the first external thread shows
-    // up and wants to wait for a process that currently has no
-    // gate. Once that process exits, the last external thread to
-    // leave the gate will also clean it up. Note that a gate will
-    // never get more external threads waiting on it after it has been
-    // opened, since the process should no longer be valid and
-    // therefore will not have an entry in 'processes'.
-
-    Gate *gate = NULL;
-    Gate::state_t old;
-
-    /* Try and approach the gate if necessary. */
-    acquire(processes);
-    {
-      map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
-      if (it != processes.end()) {
-	assert(it->second->state != Process::EXITED);
-	Process *process = it->second;
-	/* Check and see if a gate already exists. */
-	if (gates.find(process) == gates.end())
-	  gates[process] = new Gate();
-	gate = gates[process];
-	old = gate->approach();
-      }
-    }
-    release(processes);
-
-    /* Now arrive at the gate and wait until it opens. */
-    if (gate != NULL) {
-      gate->arrive(old);
-      if (gate->empty())
-	delete gate;
-      return true;
-    }
-
-    return false;
-  }
-
-
-#ifdef USE_LITHE
-  static void do_await(lithe_task_t *task, void *arg)
-  {
-    ev_io *io_watcher = (ev_io *) arg;
-
-    Process *process = (Process *) task->tls;
-
-    process->lock();
-    {
-      /* Enqueue the watcher. */
-      acquire(io_watchersq);
-      {
-	io_watchersq->push(io_watcher);
-      }
-      release(io_watchersq);
-
-      /* Interrupt the loop. */
-      ev_async_send(loop, &async_watcher);
-
-      /* Context switch. */
-      process->state = Process::AWAITING;
-
-      /*
-       * N.B. It is difficult to check if a new message has arrived
-       * since the await call was issued because the queue might not
-       * have been empty. One could imagine passing along the
-       * information in a subsequent version.
-       */
-    }
-    process->unlock();
-
-    /* Go be productive doing something else ... */
-    lithe_sched_reenter();
-  }
-
-
-  bool await(Process *process, int fd, int op)
-  {
-    assert(process != NULL);
-
-    bool interrupted = false;
-
-    if (fd < 0)
-      return false;
-
-    /* Allocate/Initialize the watcher. */
-    ev_io *io_watcher = (ev_io *) malloc(sizeof(ev_io));
-
-    if ((op & Process::RDWR) == Process::RDWR)
-      ev_io_init(io_watcher, handle_await, fd, EV_READ | EV_WRITE);
-    else if ((op & Process::RDONLY) == Process::RDONLY)
-      ev_io_init(io_watcher, handle_await, fd, EV_READ);
-    else if ((op & Process::WRONLY) == Process::WRONLY)
-      ev_io_init(io_watcher, handle_await, fd, EV_WRITE);
-
-    /* Create tuple describing state (on heap in case we get interrupted). */
-    io_watcher->data = new tuple<Process *, int>(process, process->generation);
-
-    assert(sizeof(ev_io *) == sizeof(void *));
-    lithe_task_block(do_await, io_watcher);
-
-    process->lock();
-    {
-      assert(process->state == Process::READY ||
-	     process->state == Process::INTERRUPTED);
-
-      /* Update the generation (handles racing awaited). */
-      process->generation++;
-
-      if (process->state == Process::INTERRUPTED)
-	interrupted = true;
-
-      process->state = Process::RUNNING;
-    }
-    process->unlock();
-
-    return !interrupted;
-  }
-#else
-  bool await(Process *process, int fd, int op, double secs, bool ignore)
-  {
-    assert(process != NULL);
-
-    bool interrupted = false;
-
-    process->lock();
-    {
-      /* Consider a non-empty message queue as an immediate interrupt. */
-      if (!ignore && !process->msgs.empty()) {
-	process->unlock();
-	return false;
-      }
-
-      assert(secs > 0);
-
-      /* Create timeout. */
-      const timeout_t &timeout = create_timeout(process, secs);
-
-      /* Start the timeout. */
-      start_timeout(timeout);
-
-      // Treat an await with a bad fd as an interruptible pause!
-      if (fd >= 0) {
-	/* Allocate/Initialize the watcher. */
-	ev_io *io_watcher = (ev_io *) malloc(sizeof(ev_io));
-
-	if ((op & Process::RDWR) == Process::RDWR)
-	  ev_io_init(io_watcher, handle_await, fd, EV_READ | EV_WRITE);
-	else if ((op & Process::RDONLY) == Process::RDONLY)
-	  ev_io_init(io_watcher, handle_await, fd, EV_READ);
-	else if ((op & Process::WRONLY) == Process::WRONLY)
-	  ev_io_init(io_watcher, handle_await, fd, EV_WRITE);
-
-	/* Tuple describing state (on heap in case we get interrupted). */
-	io_watcher->data =
-	  new tuple<Process *, int>(process, process->generation);
-
-	/* Enqueue the watcher. */
-	acquire(io_watchersq);
-	{
-	  io_watchersq->push(io_watcher);
-	}
-	release(io_watchersq);
-    
-	/* Interrupt the loop. */
-	ev_async_send(loop, &async_watcher);
-      }
-
-      /* Context switch. */
-      process->state = Process::AWAITING;
-      swapcontext(&process->uctx, &proc_uctx_running);
-      assert(process->state == Process::READY ||
-	     process->state == Process::TIMEDOUT ||
-	     process->state == Process::INTERRUPTED);
-
-      /* Attempt to cancel the timer if necessary. */
-      if (process->state != Process::TIMEDOUT)
-	cancel_timeout(timeout);
-
-      if (process->state == Process::INTERRUPTED)
-	interrupted = true;
-
-      process->state = Process::RUNNING;
-      
-      /* Update the generation (handles racing awaited). */
-      process->generation++;
-    }
-    process->unlock();
-
-    return !interrupted;
-  }
-#endif /* USE_LITHE */
-
-
-  void awaited(Process *process, int generation)
-  {
-    process->lock();
-    {
-      if (process->state == Process::AWAITING &&
-	  process->generation == generation) {
-	process->state = Process::READY;
-	enqueue(process);
-      }
-    }
-    process->unlock();
-  }
-
-
-  void timedout(Process *process, int generation)
-  {
-    assert(process != NULL);
-
-    /* Make sure the process is still valid! */
-    bool valid = false;
-
-    acquire(processes);
-    {
-      map<uint32_t, Process *>::iterator it = processes.find(process->pid.pipe);
-      if (it != processes.end() && it->second == process)
-	valid = true;
-    }
-    release(processes);
-
-    if (!valid)
-      return;
-
-    /* TODO(benh): Process could become invalid here! Reference counting? */
-
-    process->lock();
-    {
-      /* N.B. State != READY after timeout, but generation still same. */
-      if (process->state != Process::READY &&
-	  process->generation == generation) {
-	/* N.B. Process may be RUNNING due to "outside" thread 'receive'. */
-	assert(process->state == Process::RUNNING ||
-	       process->state == Process::RECEIVING ||
-	       process->state == Process::AWAITING ||
-	       process->state == Process::INTERRUPTED ||
-	       process->state == Process::PAUSED);
-	if (process->state != Process::RUNNING ||
-	    process->state != Process::INTERRUPTED)
-	  ProcessManager::instance()->enqueue(process);
-	process->state = Process::TIMEDOUT;
-      }
-    }
-    process->unlock();
-
-    /* TODO(benh): Check if process became invalid! */
-    valid = false;
-    acquire(processes);
-    {
-      map<uint32_t, Process *>::iterator it = processes.find(process->pid.pipe);
-      if (it != processes.end() && it->second == process)
-	valid = true;
-    }
-    release(processes);
-
-    /* If process is invalid, we probably just wrote over some memory ... */
-    assert(valid);
-  }
-
-
-  timeout_t create_timeout(Process *process, double secs)
-  {
-    assert(process != NULL);
-
-    ev_tstamp tstamp;
-
-    acquire(timers);
-    {
-      if (clk != NULL) {
-        tstamp = clk->getCurrent(process) + secs;
-      } else {
-	// TODO(benh): Unclear if want ev_now(...) or ev_time().
-	tstamp = ev_time() + secs;
-      }
-    }
-    release(timers);
-
-    return make_tuple(tstamp, process, process->generation);
-  }
-
-
-  void start_timeout(const timeout_t &timeout)
-  {
-    ev_tstamp tstamp = timeout.get<0>();
-
-    /* Add the timer. */
-    acquire(timers);
-    {
-      if (timers->size() == 0 || tstamp < timers->begin()->first) {
-	// Need to interrupt the loop to update/set timer repeat.
-	(*timers)[tstamp].push_back(timeout);
-	update_timer = true;
-	ev_async_send(loop, &async_watcher);
-      } else {
-	// Timer repeat is adequate, just add the timeout.
-	assert(timers->size() >= 1);
-	(*timers)[tstamp].push_back(timeout);
-      }
-    }
-    release(timers);
-  }
-
-
-  bool cancel_timeout(const timeout_t &timeout)
-  {
-    bool cancelled = false;
-
-    acquire(timers);
-    {
-      /* Check if the timer has fired (this is highly unoptimized). */
-      foreachpair (const ev_tstamp &tstamp, timeouts_t &timeouts, *timers) {
-	list<timeout_t>::iterator it = timeouts.begin();
-	while (it != timeouts.end()) {
-	  if (it->get<0>() == timeout.get<0>() &&
-	      it->get<1>() == timeout.get<1>() &&
-	      it->get<2>() == timeout.get<2>()) {
-	    timeouts.erase(it++);
-	    cancelled = true;
-	    break;
-	  } else {
-	    ++it;
-	  }
-	}
-	if (cancelled) {
-          if (timeouts.size() == 0)
-            timers->erase(tstamp);
-	  break;
-        }
-      }
-    }
-    release(timers);
-
-    return cancelled;
-  }
-
-
-  void enqueue(Process *process)
-  {
-    assert(process != NULL);
-    acquire(runq);
-    {
-      assert(find(runq.begin(), runq.end(), process) == runq.end());
-      runq.push_back(process);
-    }
-    release(runq);
-    
-    /* Wake up the processing thread if necessary. */
-    gate->open();
-  }
-
-  Process * dequeue()
-  {
-    Process *process = NULL;
-
-    acquire(runq);
-    {
-      if (!runq.empty()) {
-	process = runq.front();
-	runq.pop_front();
-      }
-    }
-    release(runq);
-
-    return process;
-  }
-
-  void deliver(struct msg *msg, Process *sender = NULL)
-  {
-    assert(msg != NULL);
-    assert(!replaying);
-//     cout << endl;
-//     cout << "msg->from.pipe: " << msg->from.pipe << endl;
-//     cout << "msg->from.ip: " << msg->from.ip << endl;
-//     cout << "msg->from.port: " << msg->from.port << endl;
-//     cout << "msg->to.pipe: " << msg->to.pipe << endl;
-//     cout << "msg->to.ip: " << msg->to.ip << endl;
-//     cout << "msg->to.port: " << msg->to.port << endl;
-//     cout << "msg->id: " << msg->id << endl;
-//     cout << "msg->len: " << msg->len << endl;
-
-    synchronized(processes) {
-      Process *receiver = NULL;
-
-      if (processes.count(msg->to.pipe) != 0)
-        receiver = processes[msg->to.pipe];
-
-      if (receiver != NULL) {
-        // If we have a local sender AND we are using a manual clock
-        // then update the current time of the receiver to preserve
-        // the happens-before relationship between the sender and
-        // receiver. Note that the assumption is that the sender
-        // remains valid for at least the duration of this routine (so
-        // that we can look up it's current time).
-        if (sender != NULL) {
-          synchronized(timers) {
-            if (clk != NULL) {
-              clk->setCurrent(receiver, max(clk->getCurrent(receiver),
-                                            clk->getCurrent(sender)));
-            }
-          }
-        }
-
-        receiver->enqueue(msg);
-      } else {
-	free(msg);
-      }
-    }
-  }
-};
-
-/* Singleton ProcessManager instance. */
-template<> ProcessManager * Singleton<ProcessManager>::singleton = NULL;
-template<> bool Singleton<ProcessManager>::instantiated = false;
-
-
-static void handle_async(struct ev_loop *loop, ev_async *w, int revents)
-{
-  acquire(io_watchersq);
-  {
-    /* Start all the new I/O watchers. */
-    while (!io_watchersq->empty()) {
-      ev_io *io_watcher = io_watchersq->front();
-      io_watchersq->pop();
-      ev_io_start(loop, io_watcher);
-    }
-  }
-  release(io_watchersq);
-
-  acquire(timers);
-  {
-    if (update_timer) {
-      if (!timers->empty()) {
-	// Determine the current time.
-	ev_tstamp current_tstamp;
-	if (clk != NULL) {
-	  current_tstamp = clk->getCurrent();
-	} else {
-	  // TODO(benh): Unclear if want ev_now(...) or ev_time().
-	  current_tstamp = ev_time();
-	}
-
-	timer_watcher.repeat = timers->begin()->first - current_tstamp;
-
-	// Check when the timer event should fire.
-        if (timer_watcher.repeat <= 0) {
-	  // Feed the event now!
-	  timer_watcher.repeat = 0;
-	  ev_timer_again(loop, &timer_watcher);
-          ev_feed_event(loop, &timer_watcher, EV_TIMEOUT);
-        } else {
-	  // Only repeat the timer if not using a manual clock (a call
-	  // to ProcessClock::advance() will force a timer event later).
-	  if (clk != NULL && timer_watcher.repeat > 0)
-	    timer_watcher.repeat = 0;
-	  ev_timer_again(loop, &timer_watcher);
-	}
-      }
-
-      update_timer = false;
-    }
-  }
-  release(timers);
-}
-
-
-void handle_await(struct ev_loop *loop, ev_io *w, int revents)
-{
-  tuple<Process *, int> *t = (tuple<Process *, int> *) w->data;
-
-  ProcessManager::instance()->awaited(t->get<0>(), t->get<1>());
-
-  ev_io_stop(loop, w);
-
-  delete t;
-
-  free(w);
-}
-
-
-void read_data(struct ev_loop *loop, ev_io *w, int revents)
-{
-  int c = w->fd;
-  //cout << "read_data on " << c << " started" << endl;
-
-  struct read_ctx *ctx = (struct read_ctx *) w->data;
-
-  /* Read the data starting from the last read. */
-  int len = recv(c,
-		 (char *) ctx->msg + sizeof(struct msg) + ctx->len,
-		 ctx->msg->len - ctx->len,
-		 0);
-
-  if (len > 0) {
-    ctx->len += len;
-  } else if (len < 0 && errno == EWOULDBLOCK) {
-    return;
-  } else if (len == 0 || (len < 0 &&
-			  (errno == ECONNRESET ||
-			   errno == EBADF ||
-			   errno == EHOSTUNREACH))) {
-    /* Socket has closed. */
-    //perror("libprocess recv error: ");
-    //cout << "read_data: closing socket " << c << endl;
-    LinkManager::instance()->closed(c);
-
-    /* Stop receiving ... */
-    ev_io_stop (loop, w);
-    close(c);
-    free(ctx->msg);
-    free(ctx);
-    free(w);
-    return;
-  } else {
-    fatalerror("unhandled socket error: please report (read_data)");
-  }
-
-  if (ctx->len == ctx->msg->len) {
-    /* Deliver message. */
-    ProcessManager::instance()->deliver(ctx->msg);
-
-    /* Reinitialize read context. */
-    ctx->len = 0;
-    ctx->msg = (struct msg *) malloc(sizeof(struct msg));
-
-    //cout << "read_data on " << c << " finished" << endl;
-
-    /* Continue receiving ... */
-    ev_io_stop (loop, w);
-    ev_io_init (w, read_msg, c, EV_READ);
-    ev_io_start (loop, w);
-  }
-}
-
-
-void read_msg(struct ev_loop *loop, ev_io *w, int revents)
-{
-  int c = w->fd;
-  //cout << "read_msg on " << c << " started" << endl;
-
-  struct read_ctx *ctx = (struct read_ctx *) w->data;
-
-  /* Read the message starting from the last read. */
-  int len = recv(c,
-		 (char *) ctx->msg + ctx->len,
-		 sizeof (struct msg) - ctx->len,
-		 0);
-
-  if (len > 0) {
-    ctx->len += len;
-  } else if (len < 0 && errno == EWOULDBLOCK) {
-    return;
-  } else if (len == 0 || (len < 0 &&
-			  (errno == ECONNRESET ||
-			   errno == EBADF ||
-			   errno == EHOSTUNREACH))) {
-    /* Socket has closed. */
-    //perror("libprocess recv error: ");
-    //cout << "read_msg: closing socket " << c << endl;
-    LinkManager::instance()->closed(c);
-
-    /* Stop receiving ... */
-    ev_io_stop (loop, w);
-    close(c);
-    free(ctx->msg);
-    free(ctx);
-    free(w);
-    return;
-  } else {
-    fatalerror("unhandled socket error: please report (read_msg)");
-  }
-
-  if (ctx->len == sizeof(struct msg)) {
-    /* Check and see if we need to receive data. */
-    if (ctx->msg->len > 0) {
-      /* Allocate enough space for data. */
-      ctx->msg = (struct msg *)
-	realloc (ctx->msg, sizeof(struct msg) + ctx->msg->len);
-
-      /* TODO(benh): Optimize ... try doing a read first! */
-      ctx->len = 0;
-
-      /* Start receiving data ... */
-      ev_io_stop (loop, w);
-      ev_io_init (w, read_data, c, EV_READ);
-      ev_io_start (loop, w);
-    } else {
-      /* Deliver message. */
-      //cout << "delivering message" << endl;
-      ProcessManager::instance()->deliver(ctx->msg);
-
-      /* Reinitialize read context. */
-      ctx->len = 0;
-      ctx->msg = (struct msg *) malloc(sizeof(struct msg));
-
-      /* Continue receiving ... */
-      ev_io_stop (loop, w);
-      ev_io_init (w, read_msg, c, EV_READ);
-      ev_io_start (loop, w);
-    }
-  }
-}
-
-
-static void write_data(struct ev_loop *loop, ev_io *w, int revents)
-{
-  int c = w->fd;
-
-  //cout << "write_data on " << c << " started" << endl;
-
-  struct write_ctx *ctx = (struct write_ctx *) w->data;
-
-  int len = send(c,
-		 (char *) ctx->msg + sizeof(struct msg) + ctx->len,
-		 ctx->msg->len - ctx->len,
-		 MSG_NOSIGNAL);
-
-  if (len > 0) {
-    ctx->len += len;
-  } else if (len < 0 && errno == EWOULDBLOCK) {
-    return;
-  } else if (len == 0 || (len < 0 &&
-			  (errno == ECONNRESET ||
-			   errno == EBADF ||
-			   errno == EHOSTUNREACH ||
-			   errno == EPIPE))) {
-    /* Socket has closed. */
-    //perror("libprocess send error: ");
-    //cout << "write_data: closing socket " << c << endl;
-    LinkManager::instance()->closed(c);
+  /* Set up socket. */
+  struct sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_addr.s_addr = ip;
+  addr.sin_port = htons(port);
 
-    /* Stop receiving ... */
-    ev_io_stop (loop, w);
-    close(c);
-    free(ctx->msg);
-    free(ctx);
-    free(w);
-    return;
-  } else {
-    fatalerror("unhandled socket error: please report (write_data)");
-  }
+  if (bind(s, (struct sockaddr *) &addr, sizeof(addr)) < 0)
+    fatalerror("failed to initialize (bind)");
 
-  if (ctx->len == ctx->msg->len) {
-    ev_io_stop (loop, w);
-    free(ctx->msg);
+  /* Lookup and store assigned ip and assigned port. */
+  socklen_t addrlen = sizeof(addr);
+  if (getsockname(s, (struct sockaddr *) &addr, &addrlen) < 0)
+    fatalerror("failed to initialize (getsockname)");
 
-    if (ctx->close)
-      ctx->msg = LinkManager::instance()->next_or_close(c);
-    else
-      ctx->msg = LinkManager::instance()->next_or_sleep(c);
+  ip = addr.sin_addr.s_addr;
+  port = ntohs(addr.sin_port);
 
-    if (ctx->msg != NULL) {
-      ctx->len = 0;
-      ev_io_init(w, write_msg, c, EV_WRITE);
-      ev_io_start(loop, w);
-    } else {
-      //cout << "write_data on " << c << " finished" << endl;
-      free(ctx);
-      free(w);
-    }
-  }
-}
+  if (listen(s, 500000) < 0)
+    fatalerror("failed to initialize (listen)");
 
+  /* Setup event loop. */
+#ifdef __sun__
+  loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
+#else
+  loop = ev_default_loop(EVFLAG_AUTO);
+#endif /* __sun__ */
 
-void write_msg(struct ev_loop *loop, ev_io *w, int revents)
-{
-  int c = w->fd;
-  //cout << "write_msg on " << c << " started" << endl;
+  ev_async_init(&async_watcher, handle_async);
+  ev_async_start(loop, &async_watcher);
 
-  struct write_ctx *ctx = (struct write_ctx *) w->data;
+  ev_timer_init(&timeouts_watcher, handle_timeout, 0., 2100000.0);
+  ev_timer_again(loop, &timeouts_watcher);
 
-  int len = send(c,
-		 (char *) ctx->msg + ctx->len,
-		 sizeof (struct msg) - ctx->len,
-		 MSG_NOSIGNAL);
+  ev_io_init(&server_watcher, do_accept, s, EV_READ);
+  ev_io_start(loop, &server_watcher);
 
-  if (len > 0) {
-    ctx->len += len;
-  } else if (len < 0 && errno == EWOULDBLOCK) {
-    return;
-  } else if (len == 0 || (len < 0 &&
-			  (errno == ECONNRESET ||
-			   errno == EBADF ||
-			   errno == EHOSTUNREACH ||
-			   errno == EPIPE))) {
-    /* Socket has closed. */
-    //perror("libprocess send error: ");
-    //cout << "write_msg: closing socket " << c << endl;
-    LinkManager::instance()->closed(c);
+//   ev_child_init(&child_watcher, child_exited, pid, 0);
+//   ev_child_start(loop, &cw);
 
-    /* Stop receiving ... */
-    ev_io_stop (loop, w);
-    close(c);
-    free(ctx->msg);
-    free(ctx);
-    free(w);
-    return;
-  } else {
-    fatalerror("unhandled socket error: please report (write_msg)");
-  }
+//   /* Install signal handler. */
+//   struct sigaction sa;
 
-  if (ctx->len == sizeof(struct msg)) {
-    /* Check and see if we need to write data. */
-    if (ctx->msg->len > 0) {
-      
-      /* TODO(benh): Optimize ... try doing a write first! */
-      ctx->len = 0;
+//   sa.sa_handler = ev_sighandler;
+//   sigfillset (&sa.sa_mask);
+//   sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
+//   sigaction (w->signum, &sa, 0);
 
-      /* Start writing data ... */
-      ev_io_stop(loop, w);
-      ev_io_init(w, write_data, c, EV_WRITE);
-      ev_io_start(loop, w);
-    } else {
-      //cout << "write_msg: closing socket" << endl;
-      ev_io_stop(loop, w);
-      free(ctx->msg);
+//   sigemptyset (&sa.sa_mask);
+//   sigaddset (&sa.sa_mask, w->signum);
+//   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
 
-      if (ctx->close)
-	ctx->msg = LinkManager::instance()->next_or_close(c);
-      else
-	ctx->msg = LinkManager::instance()->next_or_sleep(c);
+  if (pthread_create(&io_thread, NULL, serve, loop) != 0)
+    fatalerror("failed to initialize node (pthread_create)");
 
-      if (ctx->msg != NULL) {
-	ctx->len = 0;
-	ev_io_init(w, write_msg, c, EV_WRITE);
-	ev_io_start(loop, w);
-      } else {
-	//cout << "write_msg on " << c << " finished" << endl;
-	free(ctx);
-	free(w);
-      }
-    }
-  }
+  initializing = false;
 }
 
 
-void write_connect(struct ev_loop *loop, ev_io *w, int revents)
+LinkManager::LinkManager()
 {
-  //cout << "write_connect" << endl;
-  int s = w->fd;

[... 1981 lines stripped ...]