You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/02/10 22:41:20 UTC

svn commit: r1242939 [2/2] - in /incubator/mesos/trunk: src/master/ third_party/libprocess/ third_party/libprocess/include/process/ third_party/libprocess/src/

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1242939&r1=1242938&r2=1242939&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Fri Feb 10 21:41:20 2012
@@ -2,6 +2,7 @@
 #include <ev.h>
 #include <fcntl.h>
 #include <limits.h>
+#include <libgen.h>
 #include <netdb.h>
 #include <pthread.h>
 #include <signal.h>
@@ -25,6 +26,7 @@
 #include <sys/socket.h>
 #include <sys/time.h>
 #include <sys/types.h>
+#include <sys/uio.h>
 
 #include <algorithm>
 #include <deque>
@@ -40,14 +42,18 @@
 #include <stdexcept>
 #include <vector>
 
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
+
 #include <process/clock.hpp>
-#include <process/deferred.hpp>
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/executor.hpp>
 #include <process/filter.hpp>
 #include <process/future.hpp>
 #include <process/gc.hpp>
 #include <process/process.hpp>
+#include <process/socket.hpp>
 #include <process/timer.hpp>
 
 #include "config.hpp"
@@ -57,6 +63,7 @@
 #include "gate.hpp"
 #include "synchronized.hpp"
 #include "thread.hpp"
+#include "tokenize.hpp"
 
 
 using std::deque;
@@ -73,6 +80,8 @@ using std::string;
 using std::stringstream;
 using std::vector;
 
+// We wrap some of the functional std::tr1 symbols in the 'lambda'
+// namespace to make code less verbose.
 namespace lambda {
 
 using std::tr1::bind;
@@ -82,12 +91,6 @@ using namespace std::tr1::placeholders;
 } // namespace lambda {
 
 
-#define Byte (1)
-#define Kilobyte (1024*Byte)
-#define Megabyte (1024*Kilobyte)
-#define Gigabyte (1024*Megabyte)
-
-
 template <int i>
 std::ostream& fixedprecision(std::ostream& os)
 {
@@ -95,61 +98,58 @@ std::ostream& fixedprecision(std::ostrea
 }
 
 
-struct Node
+// Represents a remote "node" (encapsulates IP address and port).
+class Node
 {
+public:
   Node(uint32_t _ip = 0, uint16_t _port = 0)
     : ip(_ip), port(_port) {}
 
+  bool operator < (const Node& that) const
+  {
+    if (ip == that.ip) {
+      return port < that.port;
+    } else {
+      return ip < that.ip;
+    }
+  }
+
+  ostream& operator << (ostream& stream) const
+  {
+    stream << ip << ":" << port;
+    return stream;
+  }
+
   uint32_t ip;
   uint16_t port;
 };
 
 
-bool operator < (const Node& left, const Node& right)
-{
-  if (left.ip == right.ip)
-    return left.port < right.port;
-  else
-    return left.ip < right.ip;
-}
-
-
-ostream& operator << (ostream& stream, const Node& node)
-{
-  stream << node.ip << ":" << node.port;
-  return stream;
-}
-
-
 namespace process {
 
+// Provides reference counting semantics for a process pointer.
 class ProcessReference
 {
 public:
-  explicit ProcessReference(ProcessBase* _process) : process(_process)
-  {
-    if (process != NULL) {
-      __sync_fetch_and_add(&(process->refs), 1);
-    }
-  }
+  ProcessReference() : process(NULL) {}
 
   ~ProcessReference()
   {
-    if (process != NULL)
-      __sync_fetch_and_sub(&(process->refs), 1);
+    cleanup();
   }
 
   ProcessReference(const ProcessReference& that)
   {
-    process = that.process;
+    copy(that);
+  }
 
-    if (process != NULL) {
-      // There should be at least one reference to the process, so
-      // we don't need to worry about checking if it's exiting or
-      // not, since we know we can always create another reference.
-      CHECK(process->refs > 0);
-      __sync_fetch_and_add(&(process->refs), 1);
+  ProcessReference& operator = (const ProcessReference& that)
+  {
+    if (this != &that) {
+      cleanup();
+      copy(that);
     }
+    return *this;
   }
 
   ProcessBase* operator -> ()
@@ -168,46 +168,91 @@ public:
   }
 
 private:
-  ProcessReference& operator = (const ProcessReference& that);
-
-  ProcessBase* process;
-};
-
-
-class HttpProxy;
+  friend class ProcessManager; // For ProcessManager::use.
 
+  ProcessReference(ProcessBase* _process)
+    : process(_process)
+  {
+    if (process != NULL) {
+      __sync_fetch_and_add(&(process->refs), 1);
+    }
+  }
 
-class HttpResponseWaiter
-{
-public:
-  HttpResponseWaiter(const PID<HttpProxy>& proxy,
-                     Future<HttpResponse>* future,
-                     bool persist);
+  void copy(const ProcessReference& that)
+  {
+    process = that.process;
 
-  void waited(const Future<HttpResponse>&);
-  void timeout();
+    if (process != NULL) {
+      // There should be at least one reference to the process, so
+      // we don't need to worry about checking if it's exiting or
+      // not, since we know we can always create another reference.
+      CHECK(process->refs > 0);
+      __sync_fetch_and_add(&(process->refs), 1);
+    }
+  }
 
-private:
-  const PID<HttpProxy> proxy;
-  Future<HttpResponse>* future;
-  bool persist;
+  void cleanup()
+  {
+    if (process != NULL) {
+      __sync_fetch_and_sub(&(process->refs), 1);
+    }
+  }
 
-  Executor executor;
+  ProcessBase* process;
 };
 
 
+// Provides a process that manages sending HTTP responses so as to
+// satisfy HTTP/1.1 pipelining. Each request should either enqueue a
+// response, or ask the proxy to handle a future response. The process
+// is responsible for making sure the responses are sent in the same
+// order as the requests. Note that we use a 'Socket' in order to keep
+// the underyling file descriptor from getting closed while there
+// might still be outstanding responses even though the client might
+// have closed the connection (see more discussion in
+// SocketManger::close and SocketManager::proxy).
 class HttpProxy : public Process<HttpProxy>
 {
 public:
-  HttpProxy(int _c);
+  HttpProxy(const Socket& _socket);
+  virtual ~HttpProxy();
 
+  // Enqueues the response to be sent once all previously enqueued
+  // responses have been processed (e.g., waited for and sent).
+  void enqueue(const HttpResponse& response, bool persist);
+
+  // Enqueues a future to a response that will get waited on (up to
+  // some timeout) and then sent once all previously enqueued
+  // responses have been processed (e.g., waited for and sent).
   void handle(Future<HttpResponse>* future, bool persist);
-  void ready(Future<HttpResponse>* future, bool persist);
-  void unavailable(Future<HttpResponse>* future, bool persist);
 
 private:
-  int c;
-  map<Future<HttpResponse>*, HttpResponseWaiter*> waiters;
+  void next();
+  void waited(const Future<HttpResponse>& future);
+  void timedout(const Future<HttpResponse>& future);
+  void process(Future<HttpResponse>* future, bool persist);
+
+  Socket socket; // Wrap the socket to keep it from getting closed.
+
+  // Describes a queue "item" that wraps the future to the response
+  // and whether or not the socket should be persisted (vs closed).
+  struct Item
+  {
+    Item(Future<HttpResponse>* _future, bool _persist)
+      : future(_future), persist(_persist) {}
+
+    ~Item()
+    {
+      delete future;
+    }
+
+    Future<HttpResponse>* future;
+    bool persist;
+  };
+
+  queue<Item*> items;
+
+  Timer timer; // Used to bound the wait on each future.
 };
 
 
@@ -217,16 +262,19 @@ public:
   SocketManager();
   ~SocketManager();
 
+  Socket accepted(int s);
+
   void link(ProcessBase* process, const UPID& to);
 
   PID<HttpProxy> proxy(int s);
 
-  void send(DataEncoder* encoder, int s, bool persist);
+  void send(Encoder* encoder, int s, bool persist);
+  void send(const HttpResponse& response, int s, bool persist);
   void send(Message* message);
 
-  DataEncoder* next(int s);
+  Encoder* next(int s);
 
-  void closed(int s);
+  void close(int s);
 
   void exited(const Node& node);
   void exited(ProcessBase* process);
@@ -235,18 +283,30 @@ private:
   // Map from UPID (local/remote) to process.
   map<UPID, set<ProcessBase*> > links;
 
+  // Collection of all actice sockets.
+  map<int, Socket> sockets;
+
+  // Collection of sockets that should be disposed when they are
+  // finished being used (e.g., when there is no more data to send on
+  // them).
+  set<int> dispose;
+
   // Map from socket to node (ip, port).
-  map<int, Node> sockets;
+  map<int, Node> nodes;
 
-  // Maps from node (ip, port) to socket.
+  // Maps from node (ip, port) to temporary sockets (i.e., they will
+  // get closed once there is no more data to send on them).
   map<Node, int> temps;
-  map<Node, int> persists;
 
-  // Set of sockets that should be closed.
-  set<int> disposables;
+  // Maps from node (ip, port) to persistent sockets (i.e., they will
+  // remain open even if there is no more data to send on them).  We
+  // distinguish these from the 'temps' collection so we can tell when
+  // a persistant socket has been lost (and thus generate
+  // ExitedEvents).
+  map<Node, int> persists;
 
   // Map from socket to outgoing queue.
-  map<int, queue<DataEncoder*> > outgoing;
+  map<int, queue<Encoder*> > outgoing;
 
   // HTTP proxies.
   map<int, HttpProxy*> proxies;
@@ -259,18 +319,20 @@ private:
 class ProcessManager
 {
 public:
-  ProcessManager();
+  ProcessManager(const string& delegate);
   ~ProcessManager();
 
   ProcessReference use(const UPID& pid);
 
-  bool deliver(Message* message, ProcessBase* sender = NULL);
-
-  bool deliver(int c, HttpRequest* request, ProcessBase* sender = NULL);
-
-  bool deliver(const UPID& to,
-               lambda::function<void(ProcessBase*)>* f,
-               ProcessBase* sender = NULL);
+  bool handle(
+      const Socket& socket,
+      HttpRequest* request,
+      ProcessBase* sender = NULL);
+
+  bool deliver(
+      const UPID& to,
+      Event* event,
+      ProcessBase* sender = NULL);
 
   UPID spawn(ProcessBase* process, bool manage);
   void resume(ProcessBase* process);
@@ -285,6 +347,9 @@ public:
   void settle();
 
 private:
+  // Delegate process name to receive root HTTP requests.
+  const string delegate;
+
   // Map of all local spawned and running processes.
   map<string, ProcessBase*> processes;
   synchronizable(processes);
@@ -302,16 +367,16 @@ private:
 
 
 // Unique id that can be assigned to each process.
-static uint32_t id = 0;
+static uint32_t __id__ = 0;
 
 // Local server socket.
-static int s = -1;
+static int __s__ = -1;
 
 // Local IP address.
-static uint32_t ip = 0;
+static uint32_t __ip__ = 0;
 
 // Local port.
-static uint16_t port = 0;
+static uint16_t __port__ = 0;
 
 // Active SocketManager (eventually will probably be thread-local).
 static SocketManager* socket_manager = NULL;
@@ -338,8 +403,8 @@ static synchronizable(watchers) = SYNCHR
 // We store the timers in a map of lists indexed by the timeout of the
 // timer so that we can have two timers that have the same timeout. We
 // exploit that the map is SORTED!
-static map<double, list<timer> >* timeouts =
-  new map<double, list<timer> >();
+static map<double, list<Timer> >* timeouts =
+  new map<double, list<Timer> >();
 static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
 // For supporting Clock::settle(), true if timers have been removed
@@ -362,7 +427,7 @@ static ThreadLocal<ProcessBase>* _proces
 #define __process__ (*_process_)
 
 
-// Scheduler gate.
+// Scheduling gate that threads wait at when there is nothing to run.
 static Gate* gate = new Gate();
 
 // Filter. Synchronized support for using the filterer needs to be
@@ -374,15 +439,6 @@ static synchronizable(filterer) = SYNCHR
 // Global garbage collector.
 PID<GarbageCollector> gc;
 
-// Thunks to be invoked via process::invoke.
-static queue<lambda::function<void(void)>*>* thunks =
-  new queue<lambda::function<void(void)>*>();
-static synchronizable(thunks) = SYNCHRONIZED_INITIALIZER;
-
-// Thread to invoke thunks (see above).
-static Gate* invoke_gate = new Gate();
-static pthread_t invoke_thread;
-
 
 // We namespace the clock related variables to keep them well
 // named. In the future we'll probably want to associate a clock with
@@ -429,14 +485,13 @@ double Clock::now(ProcessBase* process)
 
 void Clock::pause()
 {
-  process::initialize(); // For the libev watchers to be setup.
+  process::initialize(); // To make sure the libev watchers are ready.
 
   synchronized (timeouts) {
     if (!clock::paused) {
       clock::initial = clock::current = now();
       clock::paused = true;
-      VLOG(1) << "Clock paused at "
-              << std::fixed << std::setprecision(9) << clock::initial;
+      VLOG(1) << "Clock paused at " << fixedprecision<9> << clock::initial;
     }
   }
 
@@ -454,7 +509,7 @@ bool Clock::paused()
 
 void Clock::resume()
 {
-  process::initialize(); // For the libev watchers to be setup.
+  process::initialize(); // To make sure the libev watchers are ready.
 
   synchronized (timeouts) {
     if (clock::paused) {
@@ -526,6 +581,7 @@ void Clock::order(ProcessBase* from, Pro
   update(to, now(from));
 }
 
+
 void Clock::settle()
 {
   CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
@@ -551,7 +607,10 @@ int set_nbio(int fd)
 }
 
 
-Message* encode(const UPID &from, const UPID &to, const string &name, const string &data = "")
+static Message* encode(const UPID& from,
+                       const UPID& to,
+                       const string& name,
+                       const string& data = "")
 {
   Message* message = new Message();
   message->from = from;
@@ -562,11 +621,11 @@ Message* encode(const UPID &from, const 
 }
 
 
-void transport(Message* message, ProcessBase* sender = NULL)
+static void transport(Message* message, ProcessBase* sender = NULL)
 {
-  if (message->to.ip == ip && message->to.port == port) {
+  if (message->to.ip == __ip__ && message->to.port == __port__) {
     // Local message.
-    process_manager->deliver(message, sender);
+    process_manager->deliver(message->to, new MessageEvent(message), sender);
   } else {
     // Remote message.
     socket_manager->send(message);
@@ -574,36 +633,44 @@ void transport(Message* message, Process
 }
 
 
-Message* parse(HttpRequest* request)
+static bool libprocess(HttpRequest* request)
 {
-  if (request->method == "POST" && request->headers.count("User-Agent") > 0) {
-    const string& temp = request->headers["User-Agent"];
-    const string& libprocess = "libprocess/";
-    size_t index = temp.find(libprocess);
-    if (index != string::npos) {
-      // Okay, now determine 'from'.
-      const UPID from(temp.substr(index + libprocess.size(), temp.size()));
-
-      // Now determine 'to'.
-      index = request->path.find('/', 1);
-      index = index != string::npos ? index - 1 : string::npos;
-      const UPID to(request->path.substr(1, index), ip, port);
-
-      // And now determine 'name'.
-      index = index != string::npos ? index + 2: request->path.size();
-      const string& name = request->path.substr(index);
-
-      VLOG(2) << "Parsed message name '" << name
-              << "' for " << to << " from " << from;
-
-      Message* message = new Message();
-      message->name = name;
-      message->from = from;
-      message->to = to;
-      message->body = request->body;
+  return request->method == "POST" &&
+    request->headers.count("User-Agent") > 0 &&
+    request->headers["User-Agent"].find("libprocess/") == 0;
+}
 
-      return message;
-    }
+
+static Message* parse(HttpRequest* request)
+{
+  // TODO(benh): Do better error handling (to deal with a malformed
+  // libprocess message, malicious or otherwise).
+  const string& agent = request->headers["User-Agent"];
+  const string& identifier = "libprocess/";
+  size_t index = agent.find(identifier);
+  if (index != string::npos) {
+    // Okay, now determine 'from'.
+    const UPID from(agent.substr(index + identifier.size(), agent.size()));
+
+    // Now determine 'to'.
+    index = request->path.find('/', 1);
+    index = index != string::npos ? index - 1 : string::npos;
+    const UPID to(request->path.substr(1, index), __ip__, __port__);
+
+    // And now determine 'name'.
+    index = index != string::npos ? index + 2: request->path.size();
+    const string& name = request->path.substr(index);
+
+    VLOG(2) << "Parsed message name '" << name
+            << "' for " << to << " from " << from;
+
+    Message* message = new Message();
+    message->name = name;
+    message->from = from;
+    message->to = to;
+    message->body = request->body;
+
+    return message;
   }
 
   return NULL;
@@ -652,7 +719,7 @@ void handle_async(struct ev_loop* loop, 
 
 void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 {
-  list<timer> timedout;
+  list<Timer> timedout;
 
   synchronized (timeouts) {
     double now = Clock::now();
@@ -672,7 +739,7 @@ void handle_timeouts(struct ev_loop* loo
       // Clock::settle() operation can wait until we're done.
       pending_timers = true;
 
-      foreach (const timer& timer, (*timeouts)[timeout]) {
+      foreach (const Timer& timer, (*timeouts)[timeout]) {
         timedout.push_back(timer);
       }
     }
@@ -719,17 +786,17 @@ void handle_timeouts(struct ev_loop* loo
   // current time may be greater than the timeout if a local message
   // was received (and happens-before kicks in).
   if (Clock::paused()) {
-    foreach (const timer& timer, timedout) {
-      if (ProcessReference process = process_manager->use(timer.pid)) {
-        Clock::update(process, timer.timeout);
+    foreach (const Timer& timer, timedout) {
+      if (ProcessReference process = process_manager->use(timer.creator())) {
+        Clock::update(process, timer.timeout().value());
       }
     }
   }
 
-  // Execute the thunks of the timeouts that timed out (TODO(benh): Do
-  // this async so that we don't tie up the event thread!).
-  foreach (const timer& timer, timedout) {
-    timer.thunk();
+  // Invoke the timers that timed out (TODO(benh): Do this
+  // asynchronously so that we don't tie up the event thread!).
+  foreach (const Timer& timer, timedout) {
+    timer();
   }
 
   // Mark ourselves as done executing the timers since it's now safe
@@ -741,11 +808,11 @@ void handle_timeouts(struct ev_loop* loo
 }
 
 
-void recv_data(struct ev_loop *loop, ev_io *watcher, int revents)
+void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   DataDecoder* decoder = (DataDecoder*) watcher->data;
-  
-  int c = watcher->fd;
+
+  int s = watcher->fd;
 
   while (true) {
     const ssize_t size = 80 * 1024;
@@ -753,7 +820,7 @@ void recv_data(struct ev_loop *loop, ev_
 
     char data[size];
 
-    length = recv(c, data, size, 0);
+    length = recv(s, data, size, 0);
 
     if (length < 0 && (errno == EINTR)) {
       // Interrupted, try again now.
@@ -769,7 +836,7 @@ void recv_data(struct ev_loop *loop, ev_
       } else {
         VLOG(2) << "Socket closed while receiving";
       }
-      socket_manager->closed(c);
+      socket_manager->close(s);
       delete decoder;
       ev_io_stop(loop, watcher);
       delete watcher;
@@ -782,11 +849,11 @@ void recv_data(struct ev_loop *loop, ev_
 
       if (!requests.empty()) {
         foreach (HttpRequest* request, requests) {
-          process_manager->deliver(c, request);
+          process_manager->handle(decoder->socket(), request);
         }
       } else if (requests.empty() && decoder->failed()) {
         VLOG(2) << "Decoder error while receiving";
-        socket_manager->closed(c);
+        socket_manager->close(s);
         delete decoder;
         ev_io_stop(loop, watcher);
         delete watcher;
@@ -797,11 +864,11 @@ void recv_data(struct ev_loop *loop, ev_
 }
 
 
-void send_data(struct ev_loop *loop, ev_io *watcher, int revents)
+void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   DataEncoder* encoder = (DataEncoder*) watcher->data;
 
-  int c = watcher->fd;
+  int s = watcher->fd;
 
   while (true) {
     const void* data;
@@ -810,7 +877,7 @@ void send_data(struct ev_loop *loop, ev_
     data = encoder->next(&size);
     CHECK(size > 0);
 
-    ssize_t length = send(c, data, size, MSG_NOSIGNAL);
+    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
 
     if (length < 0 && (errno == EINTR)) {
       // Interrupted, try again now.
@@ -828,7 +895,7 @@ void send_data(struct ev_loop *loop, ev_
       } else {
         VLOG(2) << "Socket closed while sending";
       }
-      socket_manager->closed(c);
+      socket_manager->close(s);
       delete encoder;
       ev_io_stop(loop, watcher);
       delete watcher;
@@ -843,34 +910,105 @@ void send_data(struct ev_loop *loop, ev_
       if (encoder->remaining() == 0) {
         delete encoder;
 
+        // Stop this watcher for now.
+        ev_io_stop(loop, watcher);
+
         // Check for more stuff to send on socket.
-        encoder = socket_manager->next(c);
-        if (encoder != NULL) {
-          watcher->data = encoder;
+        Encoder* next = socket_manager->next(s);
+        if (next != NULL) {
+          watcher->data = next;
+          ev_io_init(watcher, next->sender(), s, EV_WRITE);
+          ev_io_start(loop, watcher);
         } else {
           // Nothing more to send right now, clean up.
-          ev_io_stop(loop, watcher);
           delete watcher;
-          break;
         }
+        break;
       }
     }
   }
 }
 
 
-void sending_connect(struct ev_loop *loop, ev_io *watcher, int revents)
+void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
 {
-  int c = watcher->fd;
+  FileEncoder* encoder = (FileEncoder*) watcher->data;
+
+  int s = watcher->fd;
+
+  while (true) {
+    int fd;
+    off_t offset;
+    size_t size;
+
+    fd = encoder->next(&offset, &size);
+    CHECK(size > 0);
+
+    ssize_t length = sendfile(s, fd, offset, size);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      encoder->backup(size);
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      encoder->backup(size);
+      break;
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(2) << "Socket error while sending: " << error;
+      } else {
+        VLOG(2) << "Socket closed while sending";
+      }
+      socket_manager->close(s);
+      delete encoder;
+      ev_io_stop(loop, watcher);
+      delete watcher;
+      break;
+    } else {
+      CHECK(length > 0);
+
+      // Update the encoder with the amount sent.
+      encoder->backup(size - length);
+
+      // See if there is any more of the message to send.
+      if (encoder->remaining() == 0) {
+        delete encoder;
+
+        // Stop this watcher for now.
+        ev_io_stop(loop, watcher);
+
+        // Check for more stuff to send on socket.
+        Encoder* next = socket_manager->next(s);
+        if (next != NULL) {
+          watcher->data = next;
+          ev_io_init(watcher, next->sender(), s, EV_WRITE);
+          ev_io_start(loop, watcher);
+        } else {
+          // Nothing more to send right now, clean up.
+          delete watcher;
+        }
+        break;
+      }
+    }
+  }
+}
+
+
+void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+  int s = watcher->fd;
 
   // Now check that a successful connection was made.
   int opt;
   socklen_t optlen = sizeof(opt);
 
-  if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
-    socket_manager->closed(c);
+    socket_manager->close(s);
     MessageEncoder* encoder = (MessageEncoder*) watcher->data;
     delete encoder;
     ev_io_stop(loop, watcher);
@@ -878,24 +1016,24 @@ void sending_connect(struct ev_loop *loo
   } else {
     // We're connected! Now let's do some sending.
     ev_io_stop(loop, watcher);
-    ev_io_init(watcher, send_data, c, EV_WRITE);
+    ev_io_init(watcher, send_data, s, EV_WRITE);
     ev_io_start(loop, watcher);
   }
 }
 
 
-void receiving_connect(struct ev_loop *loop, ev_io *watcher, int revents)
+void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
 {
-  int c = watcher->fd;
+  int s = watcher->fd;
 
   // Now check that a successful connection was made.
   int opt;
   socklen_t optlen = sizeof(opt);
 
-  if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
-    socket_manager->closed(c);
+    socket_manager->close(s);
     DataDecoder* decoder = (DataDecoder*) watcher->data;
     delete decoder;
     ev_io_stop(loop, watcher);
@@ -903,42 +1041,47 @@ void receiving_connect(struct ev_loop *l
   } else {
     // We're connected! Now let's do some receiving.
     ev_io_stop(loop, watcher);
-    ev_io_init(watcher, recv_data, c, EV_READ);
+    ev_io_init(watcher, recv_data, s, EV_READ);
     ev_io_start(loop, watcher);
   }
 }
 
 
-void accept(struct ev_loop *loop, ev_io *watcher, int revents)
+void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 {
-  int s = watcher->fd;
+  CHECK_EQ(__s__, watcher->fd);
 
   sockaddr_in addr;
   socklen_t addrlen = sizeof(addr);
 
-  int c = ::accept(s, (sockaddr *) &addr, &addrlen);
+  int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
 
-  if (c < 0) {
+  if (s < 0) {
     return;
   }
 
-  if (set_nbio(c) < 0) {
-    close(c);
+  if (set_nbio(s) < 0) {
+    close(s);
     return;
   }
 
-  // Turn off Nagle (via TCP_NODELAY) so pipelined requests don't wait.
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
   int on = 1;
-  if (setsockopt(c, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    close(c);
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    close(s);
   } else {
+    // Inform the socket manager for proper bookkeeping.
+    const Socket& socket = socket_manager->accepted(s);
+
     // Allocate and initialize the decoder and watcher.
-    DataDecoder* decoder = new DataDecoder();
+    DataDecoder* decoder = new DataDecoder(socket);
 
-    ev_io *watcher = new ev_io();
+    ev_io* watcher = new ev_io();
     watcher->data = decoder;
 
-    ev_io_init(watcher, recv_data, c, EV_READ);
+    ev_io_init(watcher, recv_data, s, EV_READ);
     ev_io_start(loop, watcher);
   }
 }
@@ -990,10 +1133,13 @@ void* schedule(void* arg)
 // }
 
 
-void initialize(bool initialize_google_logging)
+void initialize(const string& delegate, bool initialize_glog)
 {
-//   static pthread_once_t init = PTHREAD_ONCE_INIT;
-//   pthread_once(&init, ...);
+  // TODO(benh): Return an error if attempting to initialize again
+  // with a different delegate then originally specified.
+
+  // static pthread_once_t init = PTHREAD_ONCE_INIT;
+  // pthread_once(&init, ...);
 
   static volatile bool initialized = false;
   static volatile bool initializing = true;
@@ -1011,8 +1157,8 @@ void initialize(bool initialize_google_l
     }
   }
 
-  if (initialize_google_logging) {
-    google::InitGoogleLogging("libprocess");
+  if (initialize_glog) {
+    google::InitGoogleLogging("<<libprocess>>");
     google::LogToStderr();
   }
 
@@ -1044,7 +1190,7 @@ void initialize(bool initialize_google_l
 #endif // __sun__
 
   // Create a new ProcessManager and SocketManager.
-  process_manager = new ProcessManager();
+  process_manager = new ProcessManager(delegate);
   socket_manager = new SocketManager();
 
   // Setup the thread local process pointer.
@@ -1063,15 +1209,15 @@ void initialize(bool initialize_google_l
     }
   }
 
-  ip = 0;
-  port = 0;
+  __ip__ = 0;
+  __port__ = 0;
 
-  char *value;
+  char* value;
 
   // Check environment for ip.
   value = getenv("LIBPROCESS_IP");
   if (value != NULL) {
-    int result = inet_pton(AF_INET, value, &ip);
+    int result = inet_pton(AF_INET, value, &__ip__);
     if (result == 0) {
       LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
     } else if (result < 0) {
@@ -1086,50 +1232,50 @@ void initialize(bool initialize_google_l
     if (result < 0 || result > USHRT_MAX) {
       LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
     }
-    port = result;
+    __port__ = result;
   }
 
   // Create a "server" socket for communicating with other nodes.
-  if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
+  if ((__s__ = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
     PLOG(FATAL) << "Failed to initialize, socket";
   }
 
   // Make socket non-blocking.
-  if (set_nbio(s) < 0) {
+  if (set_nbio(__s__) < 0) {
     PLOG(FATAL) << "Failed to initialize, set_nbio";
   }
 
   // Allow address reuse.
   int on = 1;
-  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+  if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
     PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
   }
 
   // Set up socket.
-  struct sockaddr_in addr;
+  sockaddr_in addr;
   memset(&addr, 0, sizeof(addr));
   addr.sin_family = PF_INET;
   addr.sin_addr.s_addr = INADDR_ANY;
-  addr.sin_port = htons(port);
+  addr.sin_port = htons(__port__);
 
-  if (bind(s, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
+  if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
     PLOG(FATAL) << "Failed to initialize, bind";
   }
 
   // Lookup and store assigned ip and assigned port.
   socklen_t addrlen = sizeof(addr);
-  if (getsockname(s, (struct sockaddr *) &addr, &addrlen) < 0) {
+  if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
     PLOG(FATAL) << "Failed to initialize, getsockname";
   }
 
-  ip = addr.sin_addr.s_addr;
-  port = ntohs(addr.sin_port);
+  __ip__ = addr.sin_addr.s_addr;
+  __port__ = ntohs(addr.sin_port);
 
   // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
   // actually have a valid external ip address. Note that we need only
   // one ip address, so that other processes can send and receive and
   // don't get confused as to whom they are sending to.
-  if (ip == 0 || ip == 2130706433) {
+  if (__ip__ == 0 || __ip__ == 2130706433) {
     char hostname[512];
 
     if (gethostname(hostname, sizeof(hostname)) < 0) {
@@ -1137,16 +1283,16 @@ void initialize(bool initialize_google_l
     }
 
     // Lookup IP address of local hostname.
-    struct hostent* he;
+    hostent* he;
 
     if ((he = gethostbyname2(hostname, AF_INET)) == NULL) {
       PLOG(FATAL) << "Failed to initialize, gethostbyname2";
     }
 
-    ip = *((uint32_t *) he->h_addr_list[0]);
+    __ip__ = *((uint32_t *) he->h_addr_list[0]);
   }
 
-  if (listen(s, 500000) < 0) {
+  if (listen(__s__, 500000) < 0) {
     PLOG(FATAL) << "Failed to initialize, listen";
   }
 
@@ -1163,7 +1309,7 @@ void initialize(bool initialize_google_l
   ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
   ev_timer_again(loop, &timeouts_watcher);
 
-  ev_io_init(&server_watcher, accept, s, EV_READ);
+  ev_io_init(&server_watcher, accept, __s__, EV_READ);
   ev_io_start(loop, &server_watcher);
 
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -1194,110 +1340,169 @@ void initialize(bool initialize_google_l
   gc = spawn(new GarbageCollector());
 
   char temp[INET_ADDRSTRLEN];
-  if (inet_ntop(AF_INET, (in_addr *) &ip, temp, INET_ADDRSTRLEN) == NULL) {
+  if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
     PLOG(FATAL) << "Failed to initialize, inet_ntop";
   }
 
-  VLOG(1) << "libprocess is initialized on " << temp << ":" << port;
+  VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__;
 }
 
 
-HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy,
-                                       Future<HttpResponse>* _future,
-                                       bool _persist)
-  : proxy(_proxy), future(_future), persist(_persist)
-{
-  // Wait for any event on the future.
-  deferred<void(const Future<HttpResponse>&)> waited = executor.defer(
-      lambda::bind(&HttpResponseWaiter::waited, this, lambda::_1));
+HttpProxy::HttpProxy(const Socket& _socket)
+  : socket(_socket) {}
 
-  future->onAny(waited);
 
-  // Also create a timer so we don't wait forever.
-  deferred<void(void)> timeout = executor.defer(
-      lambda::bind(&HttpResponseWaiter::timeout, this));
-
-  timers::create(30, timeout);
+HttpProxy::~HttpProxy()
+{
+  while (!items.empty()) {
+    Item* item = items.front();
+    items.pop();
+    delete item;
+  }
 }
 
 
-void HttpResponseWaiter::waited(const Future<HttpResponse>&)
+void HttpProxy::enqueue(const HttpResponse& response, bool persist)
 {
-  if (future->isReady()) {
-    process::dispatch(proxy, &HttpProxy::ready, future, persist);
-  } else {
-    // TODO(benh): Consider handling other "states" of future
-    // (discarded, failed, etc) with different HTTP statuses.
-    process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
-  }
-
-  executor.stop(); // Ensure we ignore the timeout.
+  handle(new Future<HttpResponse>(response), persist);
 }
 
 
-void HttpResponseWaiter::timeout()
+void HttpProxy::handle(Future<HttpResponse>* future, bool persist)
 {
-  process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
-
-  executor.stop(); // Ensure we ignore the future.
-}
+  items.push(new Item(future, persist));
 
+  if (items.size() == 1) {
+    // Wait for any transition of the future.
+    items.front()->future->onAny(
+        defer(self(), &HttpProxy::waited, *future));
 
-HttpProxy::HttpProxy(int _c) : c(_c) {}
+    // Also create a timer so we don't wait forever.
+    timer =
+      timers::create(30, defer(self(), &HttpProxy::timedout, *future));
+  }
+}
 
 
-void HttpProxy::handle(Future<HttpResponse>* future, bool persist)
+void HttpProxy::next()
 {
-  HttpResponseWaiter* waiter = new HttpResponseWaiter(this, future, persist);
-  waiters[future] = waiter;
+  if (items.size() > 0) {
+    Item* item = items.front();
+    // Wait for any transition of the future.
+    item->future->onAny(
+        defer(self(), &HttpProxy::waited, *item->future));
+
+    // Also create a timer so we don't wait forever.
+    timer =
+      timers::create(30, defer(self(), &HttpProxy::timedout, *item->future));
+  }
 }
 
 
-void HttpProxy::ready(Future<HttpResponse>* future, bool persist)
+void HttpProxy::waited(const Future<HttpResponse>& future)
 {
-  CHECK(waiters.count(future) > 0);
-  HttpResponseWaiter* waiter = waiters[future];
-  waiters.erase(future);
-  delete waiter;
+  if (items.size() > 0) { // The timer might have already fired.
+    Item* item = items.front();
+    if (future == *item->future) { // Another future might already be queued.
+      timers::cancel(timer);
 
-  CHECK(future->isReady());
+      if (item->future->isReady()) {
+        process(item->future, item->persist);
+      } else {
+        // TODO(benh): Consider handling other "states" of future
+        // (discarded, failed, etc) with different HTTP statuses.
+        socket_manager->send(
+            HttpServiceUnavailableResponse(), socket, item->persist);
+      }
 
-  const HttpResponse& response = future->get();
+      items.pop();
+      delete item;
 
-  // Don't persist the connection if the responder doesn't want it to.
-  if (response.headers.count("Connection") > 0) {
-    const string& connection = response.headers.find("Connection")->second;
-    if (connection == "close") {
-      persist = false;
+      next();
     }
   }
+}
 
-  HttpResponseEncoder* encoder =
-    new HttpResponseEncoder(response);
 
-  delete future;
+void HttpProxy::timedout(const Future<HttpResponse>& future)
+{
+  if (items.size() > 0) { // We might not have been able to cancel the timer.
+    Item* item = items.front();
+    if (future == *item->future) {
+      socket_manager->send(
+          HttpServiceUnavailableResponse(), socket, item->persist);
+      items.pop();
+      delete item;
+    }
 
-  // See the semantics of SocketManager::send for details about how
-  // the socket will get closed (it might actually already be closed
-  // before we issue this send).
-  socket_manager->send(encoder, c, persist);
+    next();
+  }
 }
 
 
-void HttpProxy::unavailable(Future<HttpResponse>* future, bool persist)
+void HttpProxy::process(Future<HttpResponse>* future, bool persist)
 {
-  CHECK(waiters.count(future) > 0);
-  HttpResponseWaiter* waiter = waiters[future];
-  waiters.erase(future);
-  delete waiter;
+  CHECK(future->isReady());
 
-  HttpResponseEncoder* encoder =
-    new HttpResponseEncoder(HttpServiceUnavailableResponse());
+  HttpResponse response = future->get();
 
-  delete future;
+  // Don't persist connection if headers include 'Connection: close'.
+  if (response.headers.count("Connection") > 0) {
+    const string& connection = response.headers.find("Connection")->second;
+    if (connection == "close") {
+      persist = false;
+    }
+  }
 
-  // As above, the socket might all ready be closed when we do a send.
-  socket_manager->send(encoder, c, persist);
+  // If the response specifies a path, try and perform a sendfile.
+  if (response.type == HttpResponse::PATH) {
+    // Make sure no body is sent (this is really an error and
+    // should be reported and no response sent.
+    response.body.clear();
+
+    const string& path = response.path;
+    int fd = open(path.c_str(), O_RDONLY);
+    if (fd < 0) {
+      if (errno == ENOENT || errno == ENOTDIR) {
+          VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
+          socket_manager->send(
+              HttpNotFoundResponse(), socket, persist);
+      } else {
+        const char* error = strerror(errno);
+        VLOG(1) << "Failed to send file at '" << path << "': " << error;
+        socket_manager->send(
+                             HttpInternalServerErrorResponse(), socket, persist);
+      }
+    } else {
+      struct stat s; // Need 'struct' because of function named 'stat'.
+      if (fstat(fd, &s) != 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Failed to send file at '" << path << "': " << error;
+        socket_manager->send(
+            HttpInternalServerErrorResponse(), socket, persist);
+      } else if (S_ISDIR(s.st_mode)) {
+        VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
+        socket_manager->send(
+            HttpNotFoundResponse(), socket, persist);
+      } else {
+        // While the user is expected to properly set a 'Content-Type'
+        // header, we fill in the 'Content-Length' header.
+        stringstream out;
+        out << s.st_size;
+        response.headers["Content-Length"] = out.str();
+
+        // TODO(benh): Consider a way to have the socket manager turn
+        // on TCP_CORK for both sends and then turn it off.
+        socket_manager->send(response, socket, true);
+
+        // Note the file descriptor gets closed by FileEncoder.
+        Encoder* encoder = new FileEncoder(fd, s.st_size);
+        socket_manager->send(encoder, socket, persist);
+      }
+    }
+  } else {
+    socket_manager->send(response, socket, persist);
+  }
 }
 
 
@@ -1310,7 +1515,13 @@ SocketManager::SocketManager()
 SocketManager::~SocketManager() {}
 
 
-void SocketManager::link(ProcessBase *process, const UPID &to)
+Socket SocketManager::accepted(int s)
+{
+  return sockets[s] = Socket(s);
+}
+
+
+void SocketManager::link(ProcessBase* process, const UPID& to)
 {
   // TODO(benh): The semantics we want to support for link are such
   // that if there is nobody to link to (local or remote) then an
@@ -1326,7 +1537,8 @@ void SocketManager::link(ProcessBase *pr
 
   synchronized (this) {
     // Check if node is remote and there isn't a persistant link.
-    if ((node.ip != ip || node.port != port) && persists.count(node) == 0) {
+    if ((node.ip != __ip__ || node.port != __port__)
+        && persists.count(node) == 0) {
       // Okay, no link, lets create a socket.
       int s;
 
@@ -1338,7 +1550,10 @@ void SocketManager::link(ProcessBase *pr
         PLOG(FATAL) << "Failed to link, set_nbio";
       }
 
-      sockets[s] = node;
+      Socket socket = Socket(s);
+
+      sockets[s] = socket;
+      nodes[s] = node;
 
       persists[node] = s;
 
@@ -1348,14 +1563,16 @@ void SocketManager::link(ProcessBase *pr
       addr.sin_port = htons(to.port);
       addr.sin_addr.s_addr = to.ip;
 
-      // Allocate and initialize the decoder and watcher.
-      DataDecoder* decoder = new DataDecoder();
+      // Allocate and initialize the decoder and watcher (we really
+      // only "receive" on this socket so that we can react when it
+      // gets closed and generate appropriate lost events).
+      DataDecoder* decoder = new DataDecoder(socket);
 
-      ev_io *watcher = new ev_io();
+      ev_io* watcher = new ev_io();
       watcher->data = decoder;
 
       // Try and connect to the node using this socket.
-      if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
+      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
         if (errno != EINPROGRESS) {
           PLOG(FATAL) << "Failed to link, connect";
         }
@@ -1383,39 +1600,37 @@ void SocketManager::link(ProcessBase *pr
 PID<HttpProxy> SocketManager::proxy(int s)
 {
   synchronized (this) {
+    // This socket might have been asked to get closed (e.g., remote
+    // side hang up) while a process is attempting to handle an HTTP
+    // request. Thus, if there is no more socket, return an empty PID.
     if (sockets.count(s) > 0) {
-      CHECK(proxies.count(s) > 0);
-      return proxies[s]->self();
-    } else {
-      // Register the socket with the manager for sending purposes. The
-      // current design doesn't let us create a valid "node" for this
-      // socket, so we use a "default" one for now.
-      sockets[s] = Node();
-
-      CHECK(proxies.count(s) == 0);
-
-      HttpProxy* proxy = new HttpProxy(s);
-      spawn(proxy, true);
-      proxies[s] = proxy;
-      return proxy->self();
+      if (proxies.count(s) > 0) {
+        return proxies[s]->self();
+      } else {
+        HttpProxy* proxy = new HttpProxy(sockets[s]);
+        spawn(proxy, true);
+        proxies[s] = proxy;
+        return proxy->self();
+      }
     }
   }
+
+  return PID<HttpProxy>();
 }
 
 
-void SocketManager::send(DataEncoder* encoder, int s, bool persist)
+void SocketManager::send(Encoder* encoder, int s, bool persist)
 {
   CHECK(encoder != NULL);
 
-  // TODO(benh): The current mechanism here is insufficient. It could
-  // be the case that an HttpProxy attempts to do a send on a socket
-  // just as that socket has been closed and then re-opened for
-  // another connection. In this case, the data sent on that socket
-  // will be completely bogus ... one easy fix would be to check the
-  // proxy that is associated with the socket to eliminate this race.
-
   synchronized (this) {
     if (sockets.count(s) > 0) {
+      // Update whether or not this socket should get disposed after
+      // there is no more data to send.
+      if (!persist) {
+        dispose.insert(s);
+      }
+
       if (outgoing.count(s) > 0) {
         outgoing[s].push(encoder);
       } else {
@@ -1423,10 +1638,10 @@ void SocketManager::send(DataEncoder* en
         outgoing[s];
 
         // Allocate and initialize the watcher.
-        ev_io *watcher = new ev_io();
+        ev_io* watcher = new ev_io();
         watcher->data = encoder;
 
-        ev_io_init(watcher, send_data, s, EV_WRITE);
+        ev_io_init(watcher, encoder->sender(), s, EV_WRITE);
 
         synchronized (watchers) {
           watchers->push(watcher);
@@ -1434,11 +1649,6 @@ void SocketManager::send(DataEncoder* en
 
         ev_async_send(loop, &async_watcher);
       }
-
-      // Set the socket to get closed if not persistant.
-      if (!persist) {
-        disposables.insert(s);
-      }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";
       delete encoder;
@@ -1447,6 +1657,12 @@ void SocketManager::send(DataEncoder* en
 }
 
 
+void SocketManager::send(const HttpResponse& response, int s, bool persist)
+{
+  send(new HttpResponseEncoder(response), s, persist);
+}
+
+
 void SocketManager::send(Message* message)
 {
   CHECK(message != NULL);
@@ -1457,11 +1673,11 @@ void SocketManager::send(Message* messag
 
   synchronized (this) {
     // Check if there is already a socket.
-    bool persistant = persists.count(node) > 0;
-    bool temporary = temps.count(node) > 0;
-    if (persistant || temporary) {
-      int s = persistant ? persists[node] : temps[node];
-      send(encoder, s, persistant);
+    bool persist = persists.count(node) > 0;
+    bool temp = temps.count(node) > 0;
+    if (persist || temp) {
+      int s = persist ? persists[node] : temps[node];
+      send(encoder, s, persist);
     } else {
       // No peristant or temporary socket to the node currently
       // exists, so we create a temporary one.
@@ -1475,10 +1691,11 @@ void SocketManager::send(Message* messag
         PLOG(FATAL) << "Failed to send, set_nbio";
       }
 
-      sockets[s] = node;
-
+      sockets[s] = Socket(s);
+      nodes[s] = node;
       temps[node] = s;
-      disposables.insert(s);
+
+      dispose.insert(s);
 
       // Initialize the outgoing queue.
       outgoing[s];
@@ -1491,10 +1708,10 @@ void SocketManager::send(Message* messag
       addr.sin_addr.s_addr = message->to.ip;
 
       // Allocate and initialize the watcher.
-      ev_io *watcher = new ev_io();
+      ev_io* watcher = new ev_io();
       watcher->data = encoder;
-    
-      if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
+
+      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
         if (errno != EINPROGRESS) {
           PLOG(FATAL) << "Failed to send, connect";
         }
@@ -1517,9 +1734,9 @@ void SocketManager::send(Message* messag
 }
 
 
-DataEncoder* SocketManager::next(int s)
+Encoder* SocketManager::next(int s)
 {
-  DataEncoder* encoder = NULL;
+  HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
 
   synchronized (this) {
     CHECK(sockets.count(s) > 0);
@@ -1527,56 +1744,89 @@ DataEncoder* SocketManager::next(int s)
 
     if (!outgoing[s].empty()) {
       // More messages!
-      encoder = outgoing[s].front();
+      Encoder* encoder = outgoing[s].front();
       outgoing[s].pop();
+      return encoder;
     } else {
       // No more messages ... erase the outgoing queue.
       outgoing.erase(s);
 
-      // Close the socket if it was set for disposal.
-      if (disposables.count(s) > 0) {
-        // Also try and remove from temps.
-        const Node& node = sockets[s];
-        if (temps.count(node) > 0 && temps[node] == s) {
+      if (dispose.count(s) > 0) {
+        // This is either a temporary socket we created or it's a
+        // socket that we were receiving data from and possibly
+        // sending HTTP responses back on. Clean up either way.
+        if (nodes.count(s) > 0) {
+          const Node& node = nodes[s];
+          CHECK(temps.count(node) > 0 && temps[node] == s);
           temps.erase(node);
-        } else if (proxies.count(s) > 0) {
-          HttpProxy* proxy = proxies[s];
+          nodes.erase(s);
+        }
+
+        if (proxies.count(s) > 0) {
+          proxy = proxies[s];
           proxies.erase(s);
-          terminate(proxy);
         }
 
-        disposables.erase(s);
+        dispose.erase(s);
         sockets.erase(s);
-        close(s);
       }
     }
   }
 
-  return encoder;
+  // We terminate the proxy outside the synchronized block to avoid
+  // possible deadlock between the ProcessManager and SocketManager.
+  if (proxy != NULL) {
+    terminate(proxy);
+  }
+
+  return NULL;
 }
 
 
-void SocketManager::closed(int s)
+void SocketManager::close(int s)
 {
   HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
 
   synchronized (this) {
+    // This socket might not be active if it was already asked to get
+    // closed (e.g., a write on the socket failed so we try and close
+    // it and then later the read side of the socket gets closed so we
+    // try and close it again). Thus, ignore the request if we don't
+    // know about the socket.
     if (sockets.count(s) > 0) {
-      const Node& node = sockets[s];
+      // Clean up any remaining encoders for this socket.
+      if (outgoing.count(s) > 0) {
+        while (!outgoing[s].empty()) {
+          Encoder* encoder = outgoing[s].front();
+          delete encoder;
+          outgoing[s].pop();
+        }
+
+        outgoing.erase(s);
+      }
 
-      // Don't bother invoking exited unless socket was persistant.
-      if (persists.count(node) > 0 && persists[node] == s) {
-	persists.erase(node);
-        exited(node);
-      } else if (temps.count(node) > 0 && temps[node] == s) {
-        temps.erase(node);
-      } else if (proxies.count(s) > 0) {
+      // Clean up after sockets used for node communication.
+      if (nodes.count(s) > 0) {
+        const Node& node = nodes[s];
+
+        // Don't bother invoking exited unless socket was persistant.
+        if (persists.count(node) > 0 && persists[node] == s) {
+          persists.erase(node);
+          exited(node); // Generate ExitedEvent(s)!
+        } else if (temps.count(node) > 0 && temps[node] == s) {
+          temps.erase(node);
+        }
+
+        nodes.erase(s);
+      }
+
+      // Clean up any proxy associated with this socket.
+      if (proxies.count(s) > 0) {
         proxy = proxies[s];
         proxies.erase(s);
       }
 
-      outgoing.erase(s);
-      disposables.erase(s);
+      dispose.erase(s);
       sockets.erase(s);
     }
   }
@@ -1587,11 +1837,20 @@ void SocketManager::closed(int s)
     terminate(proxy);
   }
 
-  // This might have just been a receiving socket (only sending
-  // sockets, with the exception of the receiving side of a persistant
-  // socket, get added to 'sockets'), so we want to make sure to call
-  // close so that the file descriptor can get reused.
-  close(s);
+  // Note that we don't actually:
+  //
+  //   close(s);
+  //
+  // Because, for example, there could be a race between an HttpProxy
+  // trying to do send a response with SocketManager::send() or a
+  // process might be responding to another HttpRequest (e.g., trying
+  // to do a sendfile) since these things may be happening
+  // asynchronously we can't close the socket yet, because it might
+  // get reused before any of the above things have finished, and then
+  // we'll end up sending data on the wrong socket! Instead, we rely
+  // on the last reference of our Socket object to close the
+  // socket. Note, however, that since socket is no longer in
+  // 'sockets' any attempt to send with it will just get ignored.
 }
 
 
@@ -1613,7 +1872,7 @@ void SocketManager::exited(const Node& n
       }
     }
 
-    foreach (const UPID &pid, removed) {
+    foreach (const UPID& pid, removed) {
       links.erase(pid);
     }
   }
@@ -1656,7 +1915,8 @@ void SocketManager::exited(ProcessBase* 
 }
 
 
-ProcessManager::ProcessManager()
+ProcessManager::ProcessManager(const string& _delegate)
+  : delegate(_delegate)
 {
   synchronizer(processes) = SYNCHRONIZED_INITIALIZER;
   synchronizer(runq) = SYNCHRONIZED_INITIALIZER;
@@ -1668,9 +1928,9 @@ ProcessManager::ProcessManager()
 ProcessManager::~ProcessManager() {}
 
 
-ProcessReference ProcessManager::use(const UPID &pid)
+ProcessReference ProcessManager::use(const UPID& pid)
 {
-  if (pid.ip == ip && pid.port == port) {
+  if (pid.ip == __ip__ && pid.port == __port__) {
     synchronized (processes) {
       if (processes.count(pid.id) > 0) {
         // Note that the ProcessReference constructor _must_ get
@@ -1685,54 +1945,68 @@ ProcessReference ProcessManager::use(con
 }
 
 
-bool ProcessManager::deliver(Message* message, ProcessBase* sender)
+bool ProcessManager::handle(
+    const Socket& socket,
+    HttpRequest* request,
+    ProcessBase* sender)
 {
-  CHECK(message != NULL);
+  CHECK(request != NULL);
 
-  if (ProcessReference receiver = use(message->to)) {
-    // If we have a local sender AND we are using a manual clock
-    // then update the current time of the receiver to preserve
-    // the happens-before relationship between the sender and
-    // receiver. Note that the assumption is that the sender
-    // remains valid for at least the duration of this routine (so
-    // that we can look up it's current time).
-    if (sender != NULL) {
-      synchronized (timeouts) {
-        if (Clock::paused()) {
-          Clock::order(sender, receiver);
-        }
-      }
+  // Check if this is a libprocess request (i.e., 'User-Agent:
+  // libprocess/id@ip:port') and if so, parse as a message.
+  if (libprocess(request)) {
+    Message* message = parse(request);
+    if (message != NULL) {
+      delete request;
+      return deliver(message->to, new MessageEvent(message), sender);
     }
 
-    receiver->enqueue(new MessageEvent(message));
-  } else {
-    delete message;
+    VLOG(1) << "Failed to handle libprocess request: "
+            << request->method << " " << request->path
+            << " (User-Agent: " << request->headers["User-Agent"] << ")";
+
+    delete request;
     return false;
   }
 
-  return true;
-}
-
+  // Treat this as an HTTP request. Start by checking that the path
+  // starts with a '/' (since the code below assumes as much).
+  if (request->path.find('/') != 0) {
+    VLOG(1) << "Returning '400 Bad Request' for '" << request->path << "'";
 
-// TODO(benh): Refactor and share code with above!
-bool ProcessManager::deliver(int c, HttpRequest* request, ProcessBase* sender)
-{
-  CHECK(request != NULL);
+    // Get the HttpProxy pid for this socket.
+    PID<HttpProxy> proxy = socket_manager->proxy(socket);
 
-  // Determine whether or not this is a libprocess message.
-  Message* message = parse(request);
+    // Enqueue the response with the HttpProxy so that it respects the
+    // order of requests to account for HTTP/1.1 pipelining.
+    dispatch(proxy, &HttpProxy::enqueue,
+             HttpBadRequestResponse(), request->keepAlive);
 
-  if (message != NULL) {
+    // Cleanup request.
     delete request;
-    return deliver(message, sender);
+    return false;
   }
 
-  // Treat this as an HTTP request and check for a valid receiver.
-  string path = request->path.substr(1, request->path.find('/', 1) - 1);
+  // Split the path by '/'.
+  vector<string> tokens = tokenize(request->path, "/");
 
-  UPID to(path, ip, port);
+  // Try and determine a receiver, otherwise try and delegate.
+  ProcessReference receiver;
 
-  if (ProcessReference receiver = use(to)) {
+  if (tokens.size() == 0 && delegate != "") {
+    request->path = "/" + delegate;
+    receiver = use(UPID(delegate, __ip__, __port__));
+  } else if (tokens.size() > 0) {
+    receiver = use(UPID(tokens[0], __ip__, __port__));
+  }
+
+  if (!receiver && delegate != "") {
+    // Try and delegate the request.
+    request->path = "/" + delegate + request->path;
+    receiver = use(UPID(delegate, __ip__, __port__));
+  }
+
+  if (receiver) {
     // If we have a local sender AND we are using a manual clock
     // then update the current time of the receiver to preserve
     // the happens-before relationship between the sender and
@@ -1748,16 +2022,18 @@ bool ProcessManager::deliver(int c, Http
     }
 
     // Enqueue the event.
-    receiver->enqueue(new HttpEvent(c, request));
+    receiver->enqueue(new HttpEvent(socket, request));
   } else {
     // This has no receiver, send error response.
     VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";
 
-    HttpResponseEncoder* encoder =
-      new HttpResponseEncoder(HttpNotFoundResponse());
+    // Get the HttpProxy pid for this socket.
+    PID<HttpProxy> proxy = socket_manager->proxy(socket);
 
-    // TODO(benh): Socket might be closed and then re-opened!
-    socket_manager->send(encoder, c, request->keepAlive);
+    // Enqueue the response with the HttpProxy so that it respects the
+    // order of requests to account for HTTP/1.1 pipelining.
+    dispatch(proxy, &HttpProxy::enqueue,
+             HttpNotFoundResponse(), request->keepAlive);
 
     // Cleanup request.
     delete request;
@@ -1768,13 +2044,12 @@ bool ProcessManager::deliver(int c, Http
 }
 
 
-// TODO(benh): Refactor and share code with above!
 bool ProcessManager::deliver(
     const UPID& to,
-    lambda::function<void(ProcessBase*)>* f,
+    Event* event,
     ProcessBase* sender)
 {
-  CHECK(f != NULL);
+  CHECK(event != NULL);
 
   if (ProcessReference receiver = use(to)) {
     // If we have a local sender AND we are using a manual clock
@@ -1791,9 +2066,9 @@ bool ProcessManager::deliver(
       }
     }
 
-    receiver->enqueue(new DispatchEvent(f));
+    receiver->enqueue(event);
   } else {
-    delete f;
+    delete event;
     return false;
   }
 
@@ -1981,7 +2256,7 @@ void ProcessManager::cleanup(ProcessBase
 void ProcessManager::link(ProcessBase* process, const UPID& to)
 {
   // Check if the pid is local.
-  if (!(to.ip == ip && to.port == port)) {
+  if (!(to.ip == __ip__ && to.port == __port__)) {
     socket_manager->link(process, to);
   } else {
     // Since the pid is local we want to get a reference to it's
@@ -2172,40 +2447,31 @@ void ProcessManager::settle()
 
 namespace timers {
 
-timer create(double secs, const lambda::function<void(void)>& thunk)
+Timer create(double secs, const lambda::function<void(void)>& thunk)
 {
-  static long id = 0;
+  static uint64_t id = 1; // Start at 1 since Timer() instances start with 0.
 
-  double timeout = Clock::now() + secs;
+  Timeout timeout(secs); // Assumes Clock::now() does Clock::now(__process__).
 
-  if (__process__ != NULL) {
-    synchronized (timeouts) {
-      if (Clock::paused()) {
-        timeout = Clock::now(__process__) + secs;
-      }
-    }
-  }
+  UPID pid = __process__ != NULL ? __process__->self() : UPID();
 
-  timer timer;
-  timer.id = id++;
-  timer.timeout = timeout;
-  timer.pid = __process__ != NULL ? __process__->self() : UPID();
-  timer.thunk = thunk;
+  Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
 
   VLOG(2) << "Created a timer for "
-          << std::fixed << std::setprecision(9) << timeout;
+          << std::fixed << std::setprecision(9) << timeout.value();
 
   // Add the timer.
   synchronized (timeouts) {
-    if (timeouts->size() == 0 || timer.timeout < timeouts->begin()->first) {
+    if (timeouts->size() == 0 ||
+        timer.timeout().value() < timeouts->begin()->first) {
       // Need to interrupt the loop to update/set timer repeat.
-      (*timeouts)[timer.timeout].push_back(timer);
+      (*timeouts)[timer.timeout().value()].push_back(timer);
       update_timer = true;
       ev_async_send(loop, &async_watcher);
     } else {
       // Timer repeat is adequate, just add the timeout.
       CHECK(timeouts->size() >= 1);
-      (*timeouts)[timer.timeout].push_back(timer);
+      (*timeouts)[timer.timeout().value()].push_back(timer);
     }
   }
 
@@ -2213,22 +2479,26 @@ timer create(double secs, const lambda::
 }
 
 
-void cancel(const timer& timer)
+bool cancel(const Timer& timer)
 {
+  bool canceled = false;
   synchronized (timeouts) {
-    // Check if the timeout is still pending, and if so, erase
-    // it. In addition, erase an empty list if we just removed the
-    // last timeout.
-    if (timeouts->count(timer.timeout) > 0) {
-      (*timeouts)[timer.timeout].remove(timer);
-      if ((*timeouts)[timer.timeout].empty()) {
-        timeouts->erase(timer.timeout);
+    // Check if the timeout is still pending, and if so, erase it. In
+    // addition, erase an empty list if we just removed the last
+    // timeout.
+    if (timeouts->count(timer.timeout().value()) > 0) {
+      canceled = true;
+      (*timeouts)[timer.timeout().value()].remove(timer);
+      if ((*timeouts)[timer.timeout().value()].empty()) {
+        timeouts->erase(timer.timeout().value());
       }
     }
   }
+
+  return canceled;
 }
 
-} // namespace timeouts {
+} // namespace timers {
 
 
 ProcessBase::ProcessBase(const std::string& _id)
@@ -2241,17 +2511,17 @@ ProcessBase::ProcessBase(const std::stri
 
   refs = 0;
 
-  // Generate string representation of unique id for process.
   if (_id != "") {
     pid.id = _id;
   } else {
+    // Generate string representation of unique id for process.
     stringstream out;
-    out << __sync_add_and_fetch(&id, 1);
+    out << __sync_add_and_fetch(&__id__, 1);
     pid.id = out.str();
   }
 
-  pid.ip = ip;
-  pid.port = port;
+  pid.ip = __ip__;
+  pid.port = __port__;
 
   // If using a manual clock, try and set current time of process
   // using happens before relationship between creator and createe!
@@ -2336,6 +2606,8 @@ void ProcessBase::enqueue(Event* event, 
       CHECK(state == BOTTOM ||
             state == READY ||
             state == RUNNING);
+    } else {
+      delete event;
     }
   }
   unlock();
@@ -2382,19 +2654,23 @@ void ProcessBase::visit(const MessageEve
 
 void ProcessBase::visit(const DispatchEvent& event)
 {
-  (*event.function)(this);
+  (*event.f)(this);
 }
 
 
 void ProcessBase::visit(const HttpEvent& event)
 {
-  // Determine the request "name" (i.e., path).
-  size_t index = event.request->path.find('/', 1);
+  VLOG(2) << "Handling HTTP event for process '" << pid.id << "'"
+          << " with path: '" << event.request->path << "'";
+
+  CHECK(event.request->path.find('/') == 0); // See ProcessManager::deliver.
 
-  // Only want the last part of the path (e.g., "foo" in /path/to/foo).
-  index = index != string::npos ? index + 1 : event.request->path.size();
+  // Split the path by '/'.
+  vector<string> tokens = tokenize(event.request->path, "/");
+  CHECK(tokens.size() >= 1);
+  CHECK(tokens[0] == pid.id);
 
-  const string& name = event.request->path.substr(index);
+  const string& name = tokens.size() > 1 ? tokens[1] : "";
 
   if (handlers.http.count(name) > 0) {
     // Create the promise to link with whatever gets returned, as well
@@ -2405,21 +2681,40 @@ void ProcessBase::visit(const HttpEvent&
     Future<HttpResponse>* future = new Future<HttpResponse>(promise->future());
 
     // Get the HttpProxy pid for this socket.
-    PID<HttpProxy> proxy = socket_manager->proxy(event.c);
+    PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
 
     // Let the HttpProxy know about this request (via the future).
     dispatch(proxy, &HttpProxy::handle, future, event.request->keepAlive);
 
-    // Finally, call the handler and associate the response with the promise.
+    // Now call the handler and associate the response with the promise.
     internal::associate(handlers.http[name](*event.request), promise);
+  } else if (resources.count(name) > 0) {
+    HttpOKResponse response;
+    response.headers["Content-Type"] = resources[name].type;
+    response.type = HttpResponse::PATH;
+    response.path = resources[name].path;
+
+    // Construct the final path by appending remaining tokens.
+    for (int i = 2; i < tokens.size(); i++) {
+      response.path += "/" + tokens[i];
+    }
+
+    // Get the HttpProxy pid for this socket.
+    PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
+
+    // Enqueue the response with the HttpProxy so that it respects the
+    // order of requests to account for HTTP/1.1 pipelining.
+    dispatch(proxy, &HttpProxy::enqueue, response, event.request->keepAlive);
   } else {
     VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";
 
-    HttpResponseEncoder* encoder =
-      new HttpResponseEncoder(HttpNotFoundResponse());
+    // Get the HttpProxy pid for this socket.
+    PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
 
-    // TODO(benh): Socket might be closed and then re-opened!
-    socket_manager->send(encoder, event.c, event.request->keepAlive);
+    // Enqueue the response with the HttpProxy so that it respects the
+    // order of requests to account for HTTP/1.1 pipelining.
+    dispatch(proxy, &HttpProxy::enqueue,
+             HttpNotFoundResponse(), event.request->keepAlive);
   }
 }
 
@@ -2567,11 +2862,13 @@ void post(const UPID& to, const string& 
 
 namespace internal {
 
-void dispatch(const UPID& pid, lambda::function<void(ProcessBase*)>* f)
+void dispatch(
+    const UPID& pid,
+    const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> >& f)
 {
   process::initialize();
 
-  process_manager->deliver(pid, f, __process__);
+  process_manager->deliver(pid, new DispatchEvent(f), __process__);
 }
 
 } // namespace internal {