You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/02/10 22:41:20 UTC
svn commit: r1242939 [2/2] - in /incubator/mesos/trunk: src/master/
third_party/libprocess/ third_party/libprocess/include/process/
third_party/libprocess/src/
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1242939&r1=1242938&r2=1242939&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Fri Feb 10 21:41:20 2012
@@ -2,6 +2,7 @@
#include <ev.h>
#include <fcntl.h>
#include <limits.h>
+#include <libgen.h>
#include <netdb.h>
#include <pthread.h>
#include <signal.h>
@@ -25,6 +26,7 @@
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
+#include <sys/uio.h>
#include <algorithm>
#include <deque>
@@ -40,14 +42,18 @@
#include <stdexcept>
#include <vector>
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
+
#include <process/clock.hpp>
-#include <process/deferred.hpp>
+#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/executor.hpp>
#include <process/filter.hpp>
#include <process/future.hpp>
#include <process/gc.hpp>
#include <process/process.hpp>
+#include <process/socket.hpp>
#include <process/timer.hpp>
#include "config.hpp"
@@ -57,6 +63,7 @@
#include "gate.hpp"
#include "synchronized.hpp"
#include "thread.hpp"
+#include "tokenize.hpp"
using std::deque;
@@ -73,6 +80,8 @@ using std::string;
using std::stringstream;
using std::vector;
+// We wrap some of the functional std::tr1 symbols in the 'lambda'
+// namespace to make code less verbose.
namespace lambda {
using std::tr1::bind;
@@ -82,12 +91,6 @@ using namespace std::tr1::placeholders;
} // namespace lambda {
-#define Byte (1)
-#define Kilobyte (1024*Byte)
-#define Megabyte (1024*Kilobyte)
-#define Gigabyte (1024*Megabyte)
-
-
template <int i>
std::ostream& fixedprecision(std::ostream& os)
{
@@ -95,61 +98,58 @@ std::ostream& fixedprecision(std::ostrea
}
-struct Node
+// Represents a remote "node" (encapsulates IP address and port).
+class Node
{
+public:
Node(uint32_t _ip = 0, uint16_t _port = 0)
: ip(_ip), port(_port) {}
+ bool operator < (const Node& that) const
+ {
+ if (ip == that.ip) {
+ return port < that.port;
+ } else {
+ return ip < that.ip;
+ }
+ }
+
+ ostream& operator << (ostream& stream) const
+ {
+ stream << ip << ":" << port;
+ return stream;
+ }
+
uint32_t ip;
uint16_t port;
};
-bool operator < (const Node& left, const Node& right)
-{
- if (left.ip == right.ip)
- return left.port < right.port;
- else
- return left.ip < right.ip;
-}
-
-
-ostream& operator << (ostream& stream, const Node& node)
-{
- stream << node.ip << ":" << node.port;
- return stream;
-}
-
-
namespace process {
+// Provides reference counting semantics for a process pointer.
class ProcessReference
{
public:
- explicit ProcessReference(ProcessBase* _process) : process(_process)
- {
- if (process != NULL) {
- __sync_fetch_and_add(&(process->refs), 1);
- }
- }
+ ProcessReference() : process(NULL) {}
~ProcessReference()
{
- if (process != NULL)
- __sync_fetch_and_sub(&(process->refs), 1);
+ cleanup();
}
ProcessReference(const ProcessReference& that)
{
- process = that.process;
+ copy(that);
+ }
- if (process != NULL) {
- // There should be at least one reference to the process, so
- // we don't need to worry about checking if it's exiting or
- // not, since we know we can always create another reference.
- CHECK(process->refs > 0);
- __sync_fetch_and_add(&(process->refs), 1);
+ ProcessReference& operator = (const ProcessReference& that)
+ {
+ if (this != &that) {
+ cleanup();
+ copy(that);
}
+ return *this;
}
ProcessBase* operator -> ()
@@ -168,46 +168,91 @@ public:
}
private:
- ProcessReference& operator = (const ProcessReference& that);
-
- ProcessBase* process;
-};
-
-
-class HttpProxy;
+ friend class ProcessManager; // For ProcessManager::use.
+ ProcessReference(ProcessBase* _process)
+ : process(_process)
+ {
+ if (process != NULL) {
+ __sync_fetch_and_add(&(process->refs), 1);
+ }
+ }
-class HttpResponseWaiter
-{
-public:
- HttpResponseWaiter(const PID<HttpProxy>& proxy,
- Future<HttpResponse>* future,
- bool persist);
+ void copy(const ProcessReference& that)
+ {
+ process = that.process;
- void waited(const Future<HttpResponse>&);
- void timeout();
+ if (process != NULL) {
+ // There should be at least one reference to the process, so
+ // we don't need to worry about checking if it's exiting or
+ // not, since we know we can always create another reference.
+ CHECK(process->refs > 0);
+ __sync_fetch_and_add(&(process->refs), 1);
+ }
+ }
-private:
- const PID<HttpProxy> proxy;
- Future<HttpResponse>* future;
- bool persist;
+ void cleanup()
+ {
+ if (process != NULL) {
+ __sync_fetch_and_sub(&(process->refs), 1);
+ }
+ }
- Executor executor;
+ ProcessBase* process;
};
+// Provides a process that manages sending HTTP responses so as to
+// satisfy HTTP/1.1 pipelining. Each request should either enqueue a
+// response, or ask the proxy to handle a future response. The process
+// is responsible for making sure the responses are sent in the same
+// order as the requests. Note that we use a 'Socket' in order to keep
+// the underyling file descriptor from getting closed while there
+// might still be outstanding responses even though the client might
+// have closed the connection (see more discussion in
+// SocketManger::close and SocketManager::proxy).
class HttpProxy : public Process<HttpProxy>
{
public:
- HttpProxy(int _c);
+ HttpProxy(const Socket& _socket);
+ virtual ~HttpProxy();
+ // Enqueues the response to be sent once all previously enqueued
+ // responses have been processed (e.g., waited for and sent).
+ void enqueue(const HttpResponse& response, bool persist);
+
+ // Enqueues a future to a response that will get waited on (up to
+ // some timeout) and then sent once all previously enqueued
+ // responses have been processed (e.g., waited for and sent).
void handle(Future<HttpResponse>* future, bool persist);
- void ready(Future<HttpResponse>* future, bool persist);
- void unavailable(Future<HttpResponse>* future, bool persist);
private:
- int c;
- map<Future<HttpResponse>*, HttpResponseWaiter*> waiters;
+ void next();
+ void waited(const Future<HttpResponse>& future);
+ void timedout(const Future<HttpResponse>& future);
+ void process(Future<HttpResponse>* future, bool persist);
+
+ Socket socket; // Wrap the socket to keep it from getting closed.
+
+ // Describes a queue "item" that wraps the future to the response
+ // and whether or not the socket should be persisted (vs closed).
+ struct Item
+ {
+ Item(Future<HttpResponse>* _future, bool _persist)
+ : future(_future), persist(_persist) {}
+
+ ~Item()
+ {
+ delete future;
+ }
+
+ Future<HttpResponse>* future;
+ bool persist;
+ };
+
+ queue<Item*> items;
+
+ Timer timer; // Used to bound the wait on each future.
};
@@ -217,16 +262,19 @@ public:
SocketManager();
~SocketManager();
+ Socket accepted(int s);
+
void link(ProcessBase* process, const UPID& to);
PID<HttpProxy> proxy(int s);
- void send(DataEncoder* encoder, int s, bool persist);
+ void send(Encoder* encoder, int s, bool persist);
+ void send(const HttpResponse& response, int s, bool persist);
void send(Message* message);
- DataEncoder* next(int s);
+ Encoder* next(int s);
- void closed(int s);
+ void close(int s);
void exited(const Node& node);
void exited(ProcessBase* process);
@@ -235,18 +283,30 @@ private:
// Map from UPID (local/remote) to process.
map<UPID, set<ProcessBase*> > links;
+ // Collection of all actice sockets.
+ map<int, Socket> sockets;
+
+ // Collection of sockets that should be disposed when they are
+ // finished being used (e.g., when there is no more data to send on
+ // them).
+ set<int> dispose;
+
// Map from socket to node (ip, port).
- map<int, Node> sockets;
+ map<int, Node> nodes;
- // Maps from node (ip, port) to socket.
+ // Maps from node (ip, port) to temporary sockets (i.e., they will
+ // get closed once there is no more data to send on them).
map<Node, int> temps;
- map<Node, int> persists;
- // Set of sockets that should be closed.
- set<int> disposables;
+ // Maps from node (ip, port) to persistent sockets (i.e., they will
+ // remain open even if there is no more data to send on them). We
+ // distinguish these from the 'temps' collection so we can tell when
+ // a persistant socket has been lost (and thus generate
+ // ExitedEvents).
+ map<Node, int> persists;
// Map from socket to outgoing queue.
- map<int, queue<DataEncoder*> > outgoing;
+ map<int, queue<Encoder*> > outgoing;
// HTTP proxies.
map<int, HttpProxy*> proxies;
@@ -259,18 +319,20 @@ private:
class ProcessManager
{
public:
- ProcessManager();
+ ProcessManager(const string& delegate);
~ProcessManager();
ProcessReference use(const UPID& pid);
- bool deliver(Message* message, ProcessBase* sender = NULL);
-
- bool deliver(int c, HttpRequest* request, ProcessBase* sender = NULL);
-
- bool deliver(const UPID& to,
- lambda::function<void(ProcessBase*)>* f,
- ProcessBase* sender = NULL);
+ bool handle(
+ const Socket& socket,
+ HttpRequest* request,
+ ProcessBase* sender = NULL);
+
+ bool deliver(
+ const UPID& to,
+ Event* event,
+ ProcessBase* sender = NULL);
UPID spawn(ProcessBase* process, bool manage);
void resume(ProcessBase* process);
@@ -285,6 +347,9 @@ public:
void settle();
private:
+ // Delegate process name to receive root HTTP requests.
+ const string delegate;
+
// Map of all local spawned and running processes.
map<string, ProcessBase*> processes;
synchronizable(processes);
@@ -302,16 +367,16 @@ private:
// Unique id that can be assigned to each process.
-static uint32_t id = 0;
+static uint32_t __id__ = 0;
// Local server socket.
-static int s = -1;
+static int __s__ = -1;
// Local IP address.
-static uint32_t ip = 0;
+static uint32_t __ip__ = 0;
// Local port.
-static uint16_t port = 0;
+static uint16_t __port__ = 0;
// Active SocketManager (eventually will probably be thread-local).
static SocketManager* socket_manager = NULL;
@@ -338,8 +403,8 @@ static synchronizable(watchers) = SYNCHR
// We store the timers in a map of lists indexed by the timeout of the
// timer so that we can have two timers that have the same timeout. We
// exploit that the map is SORTED!
-static map<double, list<timer> >* timeouts =
- new map<double, list<timer> >();
+static map<double, list<Timer> >* timeouts =
+ new map<double, list<Timer> >();
static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
// For supporting Clock::settle(), true if timers have been removed
@@ -362,7 +427,7 @@ static ThreadLocal<ProcessBase>* _proces
#define __process__ (*_process_)
-// Scheduler gate.
+// Scheduling gate that threads wait at when there is nothing to run.
static Gate* gate = new Gate();
// Filter. Synchronized support for using the filterer needs to be
@@ -374,15 +439,6 @@ static synchronizable(filterer) = SYNCHR
// Global garbage collector.
PID<GarbageCollector> gc;
-// Thunks to be invoked via process::invoke.
-static queue<lambda::function<void(void)>*>* thunks =
- new queue<lambda::function<void(void)>*>();
-static synchronizable(thunks) = SYNCHRONIZED_INITIALIZER;
-
-// Thread to invoke thunks (see above).
-static Gate* invoke_gate = new Gate();
-static pthread_t invoke_thread;
-
// We namespace the clock related variables to keep them well
// named. In the future we'll probably want to associate a clock with
@@ -429,14 +485,13 @@ double Clock::now(ProcessBase* process)
void Clock::pause()
{
- process::initialize(); // For the libev watchers to be setup.
+ process::initialize(); // To make sure the libev watchers are ready.
synchronized (timeouts) {
if (!clock::paused) {
clock::initial = clock::current = now();
clock::paused = true;
- VLOG(1) << "Clock paused at "
- << std::fixed << std::setprecision(9) << clock::initial;
+ VLOG(1) << "Clock paused at " << fixedprecision<9> << clock::initial;
}
}
@@ -454,7 +509,7 @@ bool Clock::paused()
void Clock::resume()
{
- process::initialize(); // For the libev watchers to be setup.
+ process::initialize(); // To make sure the libev watchers are ready.
synchronized (timeouts) {
if (clock::paused) {
@@ -526,6 +581,7 @@ void Clock::order(ProcessBase* from, Pro
update(to, now(from));
}
+
void Clock::settle()
{
CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
@@ -551,7 +607,10 @@ int set_nbio(int fd)
}
-Message* encode(const UPID &from, const UPID &to, const string &name, const string &data = "")
+static Message* encode(const UPID& from,
+ const UPID& to,
+ const string& name,
+ const string& data = "")
{
Message* message = new Message();
message->from = from;
@@ -562,11 +621,11 @@ Message* encode(const UPID &from, const
}
-void transport(Message* message, ProcessBase* sender = NULL)
+static void transport(Message* message, ProcessBase* sender = NULL)
{
- if (message->to.ip == ip && message->to.port == port) {
+ if (message->to.ip == __ip__ && message->to.port == __port__) {
// Local message.
- process_manager->deliver(message, sender);
+ process_manager->deliver(message->to, new MessageEvent(message), sender);
} else {
// Remote message.
socket_manager->send(message);
@@ -574,36 +633,44 @@ void transport(Message* message, Process
}
-Message* parse(HttpRequest* request)
+static bool libprocess(HttpRequest* request)
{
- if (request->method == "POST" && request->headers.count("User-Agent") > 0) {
- const string& temp = request->headers["User-Agent"];
- const string& libprocess = "libprocess/";
- size_t index = temp.find(libprocess);
- if (index != string::npos) {
- // Okay, now determine 'from'.
- const UPID from(temp.substr(index + libprocess.size(), temp.size()));
-
- // Now determine 'to'.
- index = request->path.find('/', 1);
- index = index != string::npos ? index - 1 : string::npos;
- const UPID to(request->path.substr(1, index), ip, port);
-
- // And now determine 'name'.
- index = index != string::npos ? index + 2: request->path.size();
- const string& name = request->path.substr(index);
-
- VLOG(2) << "Parsed message name '" << name
- << "' for " << to << " from " << from;
-
- Message* message = new Message();
- message->name = name;
- message->from = from;
- message->to = to;
- message->body = request->body;
+ return request->method == "POST" &&
+ request->headers.count("User-Agent") > 0 &&
+ request->headers["User-Agent"].find("libprocess/") == 0;
+}
- return message;
- }
+
+static Message* parse(HttpRequest* request)
+{
+ // TODO(benh): Do better error handling (to deal with a malformed
+ // libprocess message, malicious or otherwise).
+ const string& agent = request->headers["User-Agent"];
+ const string& identifier = "libprocess/";
+ size_t index = agent.find(identifier);
+ if (index != string::npos) {
+ // Okay, now determine 'from'.
+ const UPID from(agent.substr(index + identifier.size(), agent.size()));
+
+ // Now determine 'to'.
+ index = request->path.find('/', 1);
+ index = index != string::npos ? index - 1 : string::npos;
+ const UPID to(request->path.substr(1, index), __ip__, __port__);
+
+ // And now determine 'name'.
+ index = index != string::npos ? index + 2: request->path.size();
+ const string& name = request->path.substr(index);
+
+ VLOG(2) << "Parsed message name '" << name
+ << "' for " << to << " from " << from;
+
+ Message* message = new Message();
+ message->name = name;
+ message->from = from;
+ message->to = to;
+ message->body = request->body;
+
+ return message;
}
return NULL;
@@ -652,7 +719,7 @@ void handle_async(struct ev_loop* loop,
void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
{
- list<timer> timedout;
+ list<Timer> timedout;
synchronized (timeouts) {
double now = Clock::now();
@@ -672,7 +739,7 @@ void handle_timeouts(struct ev_loop* loo
// Clock::settle() operation can wait until we're done.
pending_timers = true;
- foreach (const timer& timer, (*timeouts)[timeout]) {
+ foreach (const Timer& timer, (*timeouts)[timeout]) {
timedout.push_back(timer);
}
}
@@ -719,17 +786,17 @@ void handle_timeouts(struct ev_loop* loo
// current time may be greater than the timeout if a local message
// was received (and happens-before kicks in).
if (Clock::paused()) {
- foreach (const timer& timer, timedout) {
- if (ProcessReference process = process_manager->use(timer.pid)) {
- Clock::update(process, timer.timeout);
+ foreach (const Timer& timer, timedout) {
+ if (ProcessReference process = process_manager->use(timer.creator())) {
+ Clock::update(process, timer.timeout().value());
}
}
}
- // Execute the thunks of the timeouts that timed out (TODO(benh): Do
- // this async so that we don't tie up the event thread!).
- foreach (const timer& timer, timedout) {
- timer.thunk();
+ // Invoke the timers that timed out (TODO(benh): Do this
+ // asynchronously so that we don't tie up the event thread!).
+ foreach (const Timer& timer, timedout) {
+ timer();
}
// Mark ourselves as done executing the timers since it's now safe
@@ -741,11 +808,11 @@ void handle_timeouts(struct ev_loop* loo
}
-void recv_data(struct ev_loop *loop, ev_io *watcher, int revents)
+void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
DataDecoder* decoder = (DataDecoder*) watcher->data;
-
- int c = watcher->fd;
+
+ int s = watcher->fd;
while (true) {
const ssize_t size = 80 * 1024;
@@ -753,7 +820,7 @@ void recv_data(struct ev_loop *loop, ev_
char data[size];
- length = recv(c, data, size, 0);
+ length = recv(s, data, size, 0);
if (length < 0 && (errno == EINTR)) {
// Interrupted, try again now.
@@ -769,7 +836,7 @@ void recv_data(struct ev_loop *loop, ev_
} else {
VLOG(2) << "Socket closed while receiving";
}
- socket_manager->closed(c);
+ socket_manager->close(s);
delete decoder;
ev_io_stop(loop, watcher);
delete watcher;
@@ -782,11 +849,11 @@ void recv_data(struct ev_loop *loop, ev_
if (!requests.empty()) {
foreach (HttpRequest* request, requests) {
- process_manager->deliver(c, request);
+ process_manager->handle(decoder->socket(), request);
}
} else if (requests.empty() && decoder->failed()) {
VLOG(2) << "Decoder error while receiving";
- socket_manager->closed(c);
+ socket_manager->close(s);
delete decoder;
ev_io_stop(loop, watcher);
delete watcher;
@@ -797,11 +864,11 @@ void recv_data(struct ev_loop *loop, ev_
}
-void send_data(struct ev_loop *loop, ev_io *watcher, int revents)
+void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
DataEncoder* encoder = (DataEncoder*) watcher->data;
- int c = watcher->fd;
+ int s = watcher->fd;
while (true) {
const void* data;
@@ -810,7 +877,7 @@ void send_data(struct ev_loop *loop, ev_
data = encoder->next(&size);
CHECK(size > 0);
- ssize_t length = send(c, data, size, MSG_NOSIGNAL);
+ ssize_t length = send(s, data, size, MSG_NOSIGNAL);
if (length < 0 && (errno == EINTR)) {
// Interrupted, try again now.
@@ -828,7 +895,7 @@ void send_data(struct ev_loop *loop, ev_
} else {
VLOG(2) << "Socket closed while sending";
}
- socket_manager->closed(c);
+ socket_manager->close(s);
delete encoder;
ev_io_stop(loop, watcher);
delete watcher;
@@ -843,34 +910,105 @@ void send_data(struct ev_loop *loop, ev_
if (encoder->remaining() == 0) {
delete encoder;
+ // Stop this watcher for now.
+ ev_io_stop(loop, watcher);
+
// Check for more stuff to send on socket.
- encoder = socket_manager->next(c);
- if (encoder != NULL) {
- watcher->data = encoder;
+ Encoder* next = socket_manager->next(s);
+ if (next != NULL) {
+ watcher->data = next;
+ ev_io_init(watcher, next->sender(), s, EV_WRITE);
+ ev_io_start(loop, watcher);
} else {
// Nothing more to send right now, clean up.
- ev_io_stop(loop, watcher);
delete watcher;
- break;
}
+ break;
}
}
}
}
-void sending_connect(struct ev_loop *loop, ev_io *watcher, int revents)
+void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
{
- int c = watcher->fd;
+ FileEncoder* encoder = (FileEncoder*) watcher->data;
+
+ int s = watcher->fd;
+
+ while (true) {
+ int fd;
+ off_t offset;
+ size_t size;
+
+ fd = encoder->next(&offset, &size);
+ CHECK(size > 0);
+
+ ssize_t length = sendfile(s, fd, offset, size);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ encoder->backup(size);
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ encoder->backup(size);
+ break;
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(2) << "Socket error while sending: " << error;
+ } else {
+ VLOG(2) << "Socket closed while sending";
+ }
+ socket_manager->close(s);
+ delete encoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ break;
+ } else {
+ CHECK(length > 0);
+
+ // Update the encoder with the amount sent.
+ encoder->backup(size - length);
+
+ // See if there is any more of the message to send.
+ if (encoder->remaining() == 0) {
+ delete encoder;
+
+ // Stop this watcher for now.
+ ev_io_stop(loop, watcher);
+
+ // Check for more stuff to send on socket.
+ Encoder* next = socket_manager->next(s);
+ if (next != NULL) {
+ watcher->data = next;
+ ev_io_init(watcher, next->sender(), s, EV_WRITE);
+ ev_io_start(loop, watcher);
+ } else {
+ // Nothing more to send right now, clean up.
+ delete watcher;
+ }
+ break;
+ }
+ }
+ }
+}
+
+
+void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ int s = watcher->fd;
// Now check that a successful connection was made.
int opt;
socklen_t optlen = sizeof(opt);
- if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
// Connect failure.
VLOG(1) << "Socket error while connecting";
- socket_manager->closed(c);
+ socket_manager->close(s);
MessageEncoder* encoder = (MessageEncoder*) watcher->data;
delete encoder;
ev_io_stop(loop, watcher);
@@ -878,24 +1016,24 @@ void sending_connect(struct ev_loop *loo
} else {
// We're connected! Now let's do some sending.
ev_io_stop(loop, watcher);
- ev_io_init(watcher, send_data, c, EV_WRITE);
+ ev_io_init(watcher, send_data, s, EV_WRITE);
ev_io_start(loop, watcher);
}
}
-void receiving_connect(struct ev_loop *loop, ev_io *watcher, int revents)
+void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
{
- int c = watcher->fd;
+ int s = watcher->fd;
// Now check that a successful connection was made.
int opt;
socklen_t optlen = sizeof(opt);
- if (getsockopt(c, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
// Connect failure.
VLOG(1) << "Socket error while connecting";
- socket_manager->closed(c);
+ socket_manager->close(s);
DataDecoder* decoder = (DataDecoder*) watcher->data;
delete decoder;
ev_io_stop(loop, watcher);
@@ -903,42 +1041,47 @@ void receiving_connect(struct ev_loop *l
} else {
// We're connected! Now let's do some receiving.
ev_io_stop(loop, watcher);
- ev_io_init(watcher, recv_data, c, EV_READ);
+ ev_io_init(watcher, recv_data, s, EV_READ);
ev_io_start(loop, watcher);
}
}
-void accept(struct ev_loop *loop, ev_io *watcher, int revents)
+void accept(struct ev_loop* loop, ev_io* watcher, int revents)
{
- int s = watcher->fd;
+ CHECK_EQ(__s__, watcher->fd);
sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
- int c = ::accept(s, (sockaddr *) &addr, &addrlen);
+ int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
- if (c < 0) {
+ if (s < 0) {
return;
}
- if (set_nbio(c) < 0) {
- close(c);
+ if (set_nbio(s) < 0) {
+ close(s);
return;
}
- // Turn off Nagle (via TCP_NODELAY) so pipelined requests don't wait.
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
int on = 1;
- if (setsockopt(c, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- close(c);
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ close(s);
} else {
+ // Inform the socket manager for proper bookkeeping.
+ const Socket& socket = socket_manager->accepted(s);
+
// Allocate and initialize the decoder and watcher.
- DataDecoder* decoder = new DataDecoder();
+ DataDecoder* decoder = new DataDecoder(socket);
- ev_io *watcher = new ev_io();
+ ev_io* watcher = new ev_io();
watcher->data = decoder;
- ev_io_init(watcher, recv_data, c, EV_READ);
+ ev_io_init(watcher, recv_data, s, EV_READ);
ev_io_start(loop, watcher);
}
}
@@ -990,10 +1133,13 @@ void* schedule(void* arg)
// }
-void initialize(bool initialize_google_logging)
+void initialize(const string& delegate, bool initialize_glog)
{
-// static pthread_once_t init = PTHREAD_ONCE_INIT;
-// pthread_once(&init, ...);
+ // TODO(benh): Return an error if attempting to initialize again
+ // with a different delegate then originally specified.
+
+ // static pthread_once_t init = PTHREAD_ONCE_INIT;
+ // pthread_once(&init, ...);
static volatile bool initialized = false;
static volatile bool initializing = true;
@@ -1011,8 +1157,8 @@ void initialize(bool initialize_google_l
}
}
- if (initialize_google_logging) {
- google::InitGoogleLogging("libprocess");
+ if (initialize_glog) {
+ google::InitGoogleLogging("<<libprocess>>");
google::LogToStderr();
}
@@ -1044,7 +1190,7 @@ void initialize(bool initialize_google_l
#endif // __sun__
// Create a new ProcessManager and SocketManager.
- process_manager = new ProcessManager();
+ process_manager = new ProcessManager(delegate);
socket_manager = new SocketManager();
// Setup the thread local process pointer.
@@ -1063,15 +1209,15 @@ void initialize(bool initialize_google_l
}
}
- ip = 0;
- port = 0;
+ __ip__ = 0;
+ __port__ = 0;
- char *value;
+ char* value;
// Check environment for ip.
value = getenv("LIBPROCESS_IP");
if (value != NULL) {
- int result = inet_pton(AF_INET, value, &ip);
+ int result = inet_pton(AF_INET, value, &__ip__);
if (result == 0) {
LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
} else if (result < 0) {
@@ -1086,50 +1232,50 @@ void initialize(bool initialize_google_l
if (result < 0 || result > USHRT_MAX) {
LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
}
- port = result;
+ __port__ = result;
}
// Create a "server" socket for communicating with other nodes.
- if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
+ if ((__s__ = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
PLOG(FATAL) << "Failed to initialize, socket";
}
// Make socket non-blocking.
- if (set_nbio(s) < 0) {
+ if (set_nbio(__s__) < 0) {
PLOG(FATAL) << "Failed to initialize, set_nbio";
}
// Allow address reuse.
int on = 1;
- if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+ if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
// Set up socket.
- struct sockaddr_in addr;
+ sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
- addr.sin_port = htons(port);
+ addr.sin_port = htons(__port__);
- if (bind(s, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
+ if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
PLOG(FATAL) << "Failed to initialize, bind";
}
// Lookup and store assigned ip and assigned port.
socklen_t addrlen = sizeof(addr);
- if (getsockname(s, (struct sockaddr *) &addr, &addrlen) < 0) {
+ if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
PLOG(FATAL) << "Failed to initialize, getsockname";
}
- ip = addr.sin_addr.s_addr;
- port = ntohs(addr.sin_port);
+ __ip__ = addr.sin_addr.s_addr;
+ __port__ = ntohs(addr.sin_port);
// Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
// actually have a valid external ip address. Note that we need only
// one ip address, so that other processes can send and receive and
// don't get confused as to whom they are sending to.
- if (ip == 0 || ip == 2130706433) {
+ if (__ip__ == 0 || __ip__ == 2130706433) {
char hostname[512];
if (gethostname(hostname, sizeof(hostname)) < 0) {
@@ -1137,16 +1283,16 @@ void initialize(bool initialize_google_l
}
// Lookup IP address of local hostname.
- struct hostent* he;
+ hostent* he;
if ((he = gethostbyname2(hostname, AF_INET)) == NULL) {
PLOG(FATAL) << "Failed to initialize, gethostbyname2";
}
- ip = *((uint32_t *) he->h_addr_list[0]);
+ __ip__ = *((uint32_t *) he->h_addr_list[0]);
}
- if (listen(s, 500000) < 0) {
+ if (listen(__s__, 500000) < 0) {
PLOG(FATAL) << "Failed to initialize, listen";
}
@@ -1163,7 +1309,7 @@ void initialize(bool initialize_google_l
ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
ev_timer_again(loop, &timeouts_watcher);
- ev_io_init(&server_watcher, accept, s, EV_READ);
+ ev_io_init(&server_watcher, accept, __s__, EV_READ);
ev_io_start(loop, &server_watcher);
// ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -1194,110 +1340,169 @@ void initialize(bool initialize_google_l
gc = spawn(new GarbageCollector());
char temp[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr *) &ip, temp, INET_ADDRSTRLEN) == NULL) {
+ if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
PLOG(FATAL) << "Failed to initialize, inet_ntop";
}
- VLOG(1) << "libprocess is initialized on " << temp << ":" << port;
+ VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__;
}
-HttpResponseWaiter::HttpResponseWaiter(const PID<HttpProxy>& _proxy,
- Future<HttpResponse>* _future,
- bool _persist)
- : proxy(_proxy), future(_future), persist(_persist)
-{
- // Wait for any event on the future.
- deferred<void(const Future<HttpResponse>&)> waited = executor.defer(
- lambda::bind(&HttpResponseWaiter::waited, this, lambda::_1));
+HttpProxy::HttpProxy(const Socket& _socket)
+ : socket(_socket) {}
- future->onAny(waited);
- // Also create a timer so we don't wait forever.
- deferred<void(void)> timeout = executor.defer(
- lambda::bind(&HttpResponseWaiter::timeout, this));
-
- timers::create(30, timeout);
+HttpProxy::~HttpProxy()
+{
+ while (!items.empty()) {
+ Item* item = items.front();
+ items.pop();
+ delete item;
+ }
}
-void HttpResponseWaiter::waited(const Future<HttpResponse>&)
+void HttpProxy::enqueue(const HttpResponse& response, bool persist)
{
- if (future->isReady()) {
- process::dispatch(proxy, &HttpProxy::ready, future, persist);
- } else {
- // TODO(benh): Consider handling other "states" of future
- // (discarded, failed, etc) with different HTTP statuses.
- process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
- }
-
- executor.stop(); // Ensure we ignore the timeout.
+ handle(new Future<HttpResponse>(response), persist);
}
-void HttpResponseWaiter::timeout()
+void HttpProxy::handle(Future<HttpResponse>* future, bool persist)
{
- process::dispatch(proxy, &HttpProxy::unavailable, future, persist);
-
- executor.stop(); // Ensure we ignore the future.
-}
+ items.push(new Item(future, persist));
+ if (items.size() == 1) {
+ // Wait for any transition of the future.
+ items.front()->future->onAny(
+ defer(self(), &HttpProxy::waited, *future));
-HttpProxy::HttpProxy(int _c) : c(_c) {}
+ // Also create a timer so we don't wait forever.
+ timer =
+ timers::create(30, defer(self(), &HttpProxy::timedout, *future));
+ }
+}
-void HttpProxy::handle(Future<HttpResponse>* future, bool persist)
+void HttpProxy::next()
{
- HttpResponseWaiter* waiter = new HttpResponseWaiter(this, future, persist);
- waiters[future] = waiter;
+ if (items.size() > 0) {
+ Item* item = items.front();
+ // Wait for any transition of the future.
+ item->future->onAny(
+ defer(self(), &HttpProxy::waited, *item->future));
+
+ // Also create a timer so we don't wait forever.
+ timer =
+ timers::create(30, defer(self(), &HttpProxy::timedout, *item->future));
+ }
}
-void HttpProxy::ready(Future<HttpResponse>* future, bool persist)
+void HttpProxy::waited(const Future<HttpResponse>& future)
{
- CHECK(waiters.count(future) > 0);
- HttpResponseWaiter* waiter = waiters[future];
- waiters.erase(future);
- delete waiter;
+ if (items.size() > 0) { // The timer might have already fired.
+ Item* item = items.front();
+ if (future == *item->future) { // Another future might already be queued.
+ timers::cancel(timer);
- CHECK(future->isReady());
+ if (item->future->isReady()) {
+ process(item->future, item->persist);
+ } else {
+ // TODO(benh): Consider handling other "states" of future
+ // (discarded, failed, etc) with different HTTP statuses.
+ socket_manager->send(
+ HttpServiceUnavailableResponse(), socket, item->persist);
+ }
- const HttpResponse& response = future->get();
+ items.pop();
+ delete item;
- // Don't persist the connection if the responder doesn't want it to.
- if (response.headers.count("Connection") > 0) {
- const string& connection = response.headers.find("Connection")->second;
- if (connection == "close") {
- persist = false;
+ next();
}
}
+}
- HttpResponseEncoder* encoder =
- new HttpResponseEncoder(response);
- delete future;
+void HttpProxy::timedout(const Future<HttpResponse>& future)
+{
+ if (items.size() > 0) { // We might not have been able to cancel the timer.
+ Item* item = items.front();
+ if (future == *item->future) {
+ socket_manager->send(
+ HttpServiceUnavailableResponse(), socket, item->persist);
+ items.pop();
+ delete item;
+ }
- // See the semantics of SocketManager::send for details about how
- // the socket will get closed (it might actually already be closed
- // before we issue this send).
- socket_manager->send(encoder, c, persist);
+ next();
+ }
}
-void HttpProxy::unavailable(Future<HttpResponse>* future, bool persist)
+void HttpProxy::process(Future<HttpResponse>* future, bool persist)
{
- CHECK(waiters.count(future) > 0);
- HttpResponseWaiter* waiter = waiters[future];
- waiters.erase(future);
- delete waiter;
+ CHECK(future->isReady());
- HttpResponseEncoder* encoder =
- new HttpResponseEncoder(HttpServiceUnavailableResponse());
+ HttpResponse response = future->get();
- delete future;
+ // Don't persist connection if headers include 'Connection: close'.
+ if (response.headers.count("Connection") > 0) {
+ const string& connection = response.headers.find("Connection")->second;
+ if (connection == "close") {
+ persist = false;
+ }
+ }
- // As above, the socket might all ready be closed when we do a send.
- socket_manager->send(encoder, c, persist);
+ // If the response specifies a path, try and perform a sendfile.
+ if (response.type == HttpResponse::PATH) {
+ // Make sure no body is sent (this is really an error and
+ // should be reported and no response sent.
+ response.body.clear();
+
+ const string& path = response.path;
+ int fd = open(path.c_str(), O_RDONLY);
+ if (fd < 0) {
+ if (errno == ENOENT || errno == ENOTDIR) {
+ VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
+ socket_manager->send(
+ HttpNotFoundResponse(), socket, persist);
+ } else {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to send file at '" << path << "': " << error;
+ socket_manager->send(
+ HttpInternalServerErrorResponse(), socket, persist);
+ }
+ } else {
+ struct stat s; // Need 'struct' because of function named 'stat'.
+ if (fstat(fd, &s) != 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to send file at '" << path << "': " << error;
+ socket_manager->send(
+ HttpInternalServerErrorResponse(), socket, persist);
+ } else if (S_ISDIR(s.st_mode)) {
+ VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
+ socket_manager->send(
+ HttpNotFoundResponse(), socket, persist);
+ } else {
+ // While the user is expected to properly set a 'Content-Type'
+ // header, we fill in the 'Content-Length' header.
+ stringstream out;
+ out << s.st_size;
+ response.headers["Content-Length"] = out.str();
+
+ // TODO(benh): Consider a way to have the socket manager turn
+ // on TCP_CORK for both sends and then turn it off.
+ socket_manager->send(response, socket, true);
+
+ // Note the file descriptor gets closed by FileEncoder.
+ Encoder* encoder = new FileEncoder(fd, s.st_size);
+ socket_manager->send(encoder, socket, persist);
+ }
+ }
+ } else {
+ socket_manager->send(response, socket, persist);
+ }
}
@@ -1310,7 +1515,13 @@ SocketManager::SocketManager()
SocketManager::~SocketManager() {}
-void SocketManager::link(ProcessBase *process, const UPID &to)
+Socket SocketManager::accepted(int s)
+{
+ return sockets[s] = Socket(s);
+}
+
+
+void SocketManager::link(ProcessBase* process, const UPID& to)
{
// TODO(benh): The semantics we want to support for link are such
// that if there is nobody to link to (local or remote) then an
@@ -1326,7 +1537,8 @@ void SocketManager::link(ProcessBase *pr
synchronized (this) {
// Check if node is remote and there isn't a persistant link.
- if ((node.ip != ip || node.port != port) && persists.count(node) == 0) {
+ if ((node.ip != __ip__ || node.port != __port__)
+ && persists.count(node) == 0) {
// Okay, no link, lets create a socket.
int s;
@@ -1338,7 +1550,10 @@ void SocketManager::link(ProcessBase *pr
PLOG(FATAL) << "Failed to link, set_nbio";
}
- sockets[s] = node;
+ Socket socket = Socket(s);
+
+ sockets[s] = socket;
+ nodes[s] = node;
persists[node] = s;
@@ -1348,14 +1563,16 @@ void SocketManager::link(ProcessBase *pr
addr.sin_port = htons(to.port);
addr.sin_addr.s_addr = to.ip;
- // Allocate and initialize the decoder and watcher.
- DataDecoder* decoder = new DataDecoder();
+ // Allocate and initialize the decoder and watcher (we really
+ // only "receive" on this socket so that we can react when it
+ // gets closed and generate appropriate lost events).
+ DataDecoder* decoder = new DataDecoder(socket);
- ev_io *watcher = new ev_io();
+ ev_io* watcher = new ev_io();
watcher->data = decoder;
// Try and connect to the node using this socket.
- if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
+ if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
PLOG(FATAL) << "Failed to link, connect";
}
@@ -1383,39 +1600,37 @@ void SocketManager::link(ProcessBase *pr
PID<HttpProxy> SocketManager::proxy(int s)
{
synchronized (this) {
+ // This socket might have been asked to get closed (e.g., remote
+ // side hang up) while a process is attempting to handle an HTTP
+ // request. Thus, if there is no more socket, return an empty PID.
if (sockets.count(s) > 0) {
- CHECK(proxies.count(s) > 0);
- return proxies[s]->self();
- } else {
- // Register the socket with the manager for sending purposes. The
- // current design doesn't let us create a valid "node" for this
- // socket, so we use a "default" one for now.
- sockets[s] = Node();
-
- CHECK(proxies.count(s) == 0);
-
- HttpProxy* proxy = new HttpProxy(s);
- spawn(proxy, true);
- proxies[s] = proxy;
- return proxy->self();
+ if (proxies.count(s) > 0) {
+ return proxies[s]->self();
+ } else {
+ HttpProxy* proxy = new HttpProxy(sockets[s]);
+ spawn(proxy, true);
+ proxies[s] = proxy;
+ return proxy->self();
+ }
}
}
+
+ return PID<HttpProxy>();
}
-void SocketManager::send(DataEncoder* encoder, int s, bool persist)
+void SocketManager::send(Encoder* encoder, int s, bool persist)
{
CHECK(encoder != NULL);
- // TODO(benh): The current mechanism here is insufficient. It could
- // be the case that an HttpProxy attempts to do a send on a socket
- // just as that socket has been closed and then re-opened for
- // another connection. In this case, the data sent on that socket
- // will be completely bogus ... one easy fix would be to check the
- // proxy that is associated with the socket to eliminate this race.
-
synchronized (this) {
if (sockets.count(s) > 0) {
+ // Update whether or not this socket should get disposed after
+ // there is no more data to send.
+ if (!persist) {
+ dispose.insert(s);
+ }
+
if (outgoing.count(s) > 0) {
outgoing[s].push(encoder);
} else {
@@ -1423,10 +1638,10 @@ void SocketManager::send(DataEncoder* en
outgoing[s];
// Allocate and initialize the watcher.
- ev_io *watcher = new ev_io();
+ ev_io* watcher = new ev_io();
watcher->data = encoder;
- ev_io_init(watcher, send_data, s, EV_WRITE);
+ ev_io_init(watcher, encoder->sender(), s, EV_WRITE);
synchronized (watchers) {
watchers->push(watcher);
@@ -1434,11 +1649,6 @@ void SocketManager::send(DataEncoder* en
ev_async_send(loop, &async_watcher);
}
-
- // Set the socket to get closed if not persistant.
- if (!persist) {
- disposables.insert(s);
- }
} else {
VLOG(1) << "Attempting to send on a no longer valid socket!";
delete encoder;
@@ -1447,6 +1657,12 @@ void SocketManager::send(DataEncoder* en
}
+void SocketManager::send(const HttpResponse& response, int s, bool persist)
+{
+ send(new HttpResponseEncoder(response), s, persist);
+}
+
+
void SocketManager::send(Message* message)
{
CHECK(message != NULL);
@@ -1457,11 +1673,11 @@ void SocketManager::send(Message* messag
synchronized (this) {
// Check if there is already a socket.
- bool persistant = persists.count(node) > 0;
- bool temporary = temps.count(node) > 0;
- if (persistant || temporary) {
- int s = persistant ? persists[node] : temps[node];
- send(encoder, s, persistant);
+ bool persist = persists.count(node) > 0;
+ bool temp = temps.count(node) > 0;
+ if (persist || temp) {
+ int s = persist ? persists[node] : temps[node];
+ send(encoder, s, persist);
} else {
// No peristant or temporary socket to the node currently
// exists, so we create a temporary one.
@@ -1475,10 +1691,11 @@ void SocketManager::send(Message* messag
PLOG(FATAL) << "Failed to send, set_nbio";
}
- sockets[s] = node;
-
+ sockets[s] = Socket(s);
+ nodes[s] = node;
temps[node] = s;
- disposables.insert(s);
+
+ dispose.insert(s);
// Initialize the outgoing queue.
outgoing[s];
@@ -1491,10 +1708,10 @@ void SocketManager::send(Message* messag
addr.sin_addr.s_addr = message->to.ip;
// Allocate and initialize the watcher.
- ev_io *watcher = new ev_io();
+ ev_io* watcher = new ev_io();
watcher->data = encoder;
-
- if (connect(s, (sockaddr *) &addr, sizeof(addr)) < 0) {
+
+ if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
PLOG(FATAL) << "Failed to send, connect";
}
@@ -1517,9 +1734,9 @@ void SocketManager::send(Message* messag
}
-DataEncoder* SocketManager::next(int s)
+Encoder* SocketManager::next(int s)
{
- DataEncoder* encoder = NULL;
+ HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
synchronized (this) {
CHECK(sockets.count(s) > 0);
@@ -1527,56 +1744,89 @@ DataEncoder* SocketManager::next(int s)
if (!outgoing[s].empty()) {
// More messages!
- encoder = outgoing[s].front();
+ Encoder* encoder = outgoing[s].front();
outgoing[s].pop();
+ return encoder;
} else {
// No more messages ... erase the outgoing queue.
outgoing.erase(s);
- // Close the socket if it was set for disposal.
- if (disposables.count(s) > 0) {
- // Also try and remove from temps.
- const Node& node = sockets[s];
- if (temps.count(node) > 0 && temps[node] == s) {
+ if (dispose.count(s) > 0) {
+ // This is either a temporary socket we created or it's a
+ // socket that we were receiving data from and possibly
+ // sending HTTP responses back on. Clean up either way.
+ if (nodes.count(s) > 0) {
+ const Node& node = nodes[s];
+ CHECK(temps.count(node) > 0 && temps[node] == s);
temps.erase(node);
- } else if (proxies.count(s) > 0) {
- HttpProxy* proxy = proxies[s];
+ nodes.erase(s);
+ }
+
+ if (proxies.count(s) > 0) {
+ proxy = proxies[s];
proxies.erase(s);
- terminate(proxy);
}
- disposables.erase(s);
+ dispose.erase(s);
sockets.erase(s);
- close(s);
}
}
}
- return encoder;
+ // We terminate the proxy outside the synchronized block to avoid
+ // possible deadlock between the ProcessManager and SocketManager.
+ if (proxy != NULL) {
+ terminate(proxy);
+ }
+
+ return NULL;
}
-void SocketManager::closed(int s)
+void SocketManager::close(int s)
{
HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
synchronized (this) {
+ // This socket might not be active if it was already asked to get
+ // closed (e.g., a write on the socket failed so we try and close
+ // it and then later the read side of the socket gets closed so we
+ // try and close it again). Thus, ignore the request if we don't
+ // know about the socket.
if (sockets.count(s) > 0) {
- const Node& node = sockets[s];
+ // Clean up any remaining encoders for this socket.
+ if (outgoing.count(s) > 0) {
+ while (!outgoing[s].empty()) {
+ Encoder* encoder = outgoing[s].front();
+ delete encoder;
+ outgoing[s].pop();
+ }
+
+ outgoing.erase(s);
+ }
- // Don't bother invoking exited unless socket was persistant.
- if (persists.count(node) > 0 && persists[node] == s) {
- persists.erase(node);
- exited(node);
- } else if (temps.count(node) > 0 && temps[node] == s) {
- temps.erase(node);
- } else if (proxies.count(s) > 0) {
+ // Clean up after sockets used for node communication.
+ if (nodes.count(s) > 0) {
+ const Node& node = nodes[s];
+
+ // Don't bother invoking exited unless socket was persistant.
+ if (persists.count(node) > 0 && persists[node] == s) {
+ persists.erase(node);
+ exited(node); // Generate ExitedEvent(s)!
+ } else if (temps.count(node) > 0 && temps[node] == s) {
+ temps.erase(node);
+ }
+
+ nodes.erase(s);
+ }
+
+ // Clean up any proxy associated with this socket.
+ if (proxies.count(s) > 0) {
proxy = proxies[s];
proxies.erase(s);
}
- outgoing.erase(s);
- disposables.erase(s);
+ dispose.erase(s);
sockets.erase(s);
}
}
@@ -1587,11 +1837,20 @@ void SocketManager::closed(int s)
terminate(proxy);
}
- // This might have just been a receiving socket (only sending
- // sockets, with the exception of the receiving side of a persistant
- // socket, get added to 'sockets'), so we want to make sure to call
- // close so that the file descriptor can get reused.
- close(s);
+ // Note that we don't actually:
+ //
+ // close(s);
+ //
+ // Because, for example, there could be a race between an HttpProxy
+ // trying to do send a response with SocketManager::send() or a
+ // process might be responding to another HttpRequest (e.g., trying
+ // to do a sendfile) since these things may be happening
+ // asynchronously we can't close the socket yet, because it might
+ // get reused before any of the above things have finished, and then
+ // we'll end up sending data on the wrong socket! Instead, we rely
+ // on the last reference of our Socket object to close the
+ // socket. Note, however, that since socket is no longer in
+ // 'sockets' any attempt to send with it will just get ignored.
}
@@ -1613,7 +1872,7 @@ void SocketManager::exited(const Node& n
}
}
- foreach (const UPID &pid, removed) {
+ foreach (const UPID& pid, removed) {
links.erase(pid);
}
}
@@ -1656,7 +1915,8 @@ void SocketManager::exited(ProcessBase*
}
-ProcessManager::ProcessManager()
+ProcessManager::ProcessManager(const string& _delegate)
+ : delegate(_delegate)
{
synchronizer(processes) = SYNCHRONIZED_INITIALIZER;
synchronizer(runq) = SYNCHRONIZED_INITIALIZER;
@@ -1668,9 +1928,9 @@ ProcessManager::ProcessManager()
ProcessManager::~ProcessManager() {}
-ProcessReference ProcessManager::use(const UPID &pid)
+ProcessReference ProcessManager::use(const UPID& pid)
{
- if (pid.ip == ip && pid.port == port) {
+ if (pid.ip == __ip__ && pid.port == __port__) {
synchronized (processes) {
if (processes.count(pid.id) > 0) {
// Note that the ProcessReference constructor _must_ get
@@ -1685,54 +1945,68 @@ ProcessReference ProcessManager::use(con
}
-bool ProcessManager::deliver(Message* message, ProcessBase* sender)
+bool ProcessManager::handle(
+ const Socket& socket,
+ HttpRequest* request,
+ ProcessBase* sender)
{
- CHECK(message != NULL);
+ CHECK(request != NULL);
- if (ProcessReference receiver = use(message->to)) {
- // If we have a local sender AND we are using a manual clock
- // then update the current time of the receiver to preserve
- // the happens-before relationship between the sender and
- // receiver. Note that the assumption is that the sender
- // remains valid for at least the duration of this routine (so
- // that we can look up it's current time).
- if (sender != NULL) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- Clock::order(sender, receiver);
- }
- }
+ // Check if this is a libprocess request (i.e., 'User-Agent:
+ // libprocess/id@ip:port') and if so, parse as a message.
+ if (libprocess(request)) {
+ Message* message = parse(request);
+ if (message != NULL) {
+ delete request;
+ return deliver(message->to, new MessageEvent(message), sender);
}
- receiver->enqueue(new MessageEvent(message));
- } else {
- delete message;
+ VLOG(1) << "Failed to handle libprocess request: "
+ << request->method << " " << request->path
+ << " (User-Agent: " << request->headers["User-Agent"] << ")";
+
+ delete request;
return false;
}
- return true;
-}
-
+ // Treat this as an HTTP request. Start by checking that the path
+ // starts with a '/' (since the code below assumes as much).
+ if (request->path.find('/') != 0) {
+ VLOG(1) << "Returning '400 Bad Request' for '" << request->path << "'";
-// TODO(benh): Refactor and share code with above!
-bool ProcessManager::deliver(int c, HttpRequest* request, ProcessBase* sender)
-{
- CHECK(request != NULL);
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(socket);
- // Determine whether or not this is a libprocess message.
- Message* message = parse(request);
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue,
+ HttpBadRequestResponse(), request->keepAlive);
- if (message != NULL) {
+ // Cleanup request.
delete request;
- return deliver(message, sender);
+ return false;
}
- // Treat this as an HTTP request and check for a valid receiver.
- string path = request->path.substr(1, request->path.find('/', 1) - 1);
+ // Split the path by '/'.
+ vector<string> tokens = tokenize(request->path, "/");
- UPID to(path, ip, port);
+ // Try and determine a receiver, otherwise try and delegate.
+ ProcessReference receiver;
- if (ProcessReference receiver = use(to)) {
+ if (tokens.size() == 0 && delegate != "") {
+ request->path = "/" + delegate;
+ receiver = use(UPID(delegate, __ip__, __port__));
+ } else if (tokens.size() > 0) {
+ receiver = use(UPID(tokens[0], __ip__, __port__));
+ }
+
+ if (!receiver && delegate != "") {
+ // Try and delegate the request.
+ request->path = "/" + delegate + request->path;
+ receiver = use(UPID(delegate, __ip__, __port__));
+ }
+
+ if (receiver) {
// If we have a local sender AND we are using a manual clock
// then update the current time of the receiver to preserve
// the happens-before relationship between the sender and
@@ -1748,16 +2022,18 @@ bool ProcessManager::deliver(int c, Http
}
// Enqueue the event.
- receiver->enqueue(new HttpEvent(c, request));
+ receiver->enqueue(new HttpEvent(socket, request));
} else {
// This has no receiver, send error response.
VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";
- HttpResponseEncoder* encoder =
- new HttpResponseEncoder(HttpNotFoundResponse());
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(socket);
- // TODO(benh): Socket might be closed and then re-opened!
- socket_manager->send(encoder, c, request->keepAlive);
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue,
+ HttpNotFoundResponse(), request->keepAlive);
// Cleanup request.
delete request;
@@ -1768,13 +2044,12 @@ bool ProcessManager::deliver(int c, Http
}
-// TODO(benh): Refactor and share code with above!
bool ProcessManager::deliver(
const UPID& to,
- lambda::function<void(ProcessBase*)>* f,
+ Event* event,
ProcessBase* sender)
{
- CHECK(f != NULL);
+ CHECK(event != NULL);
if (ProcessReference receiver = use(to)) {
// If we have a local sender AND we are using a manual clock
@@ -1791,9 +2066,9 @@ bool ProcessManager::deliver(
}
}
- receiver->enqueue(new DispatchEvent(f));
+ receiver->enqueue(event);
} else {
- delete f;
+ delete event;
return false;
}
@@ -1981,7 +2256,7 @@ void ProcessManager::cleanup(ProcessBase
void ProcessManager::link(ProcessBase* process, const UPID& to)
{
// Check if the pid is local.
- if (!(to.ip == ip && to.port == port)) {
+ if (!(to.ip == __ip__ && to.port == __port__)) {
socket_manager->link(process, to);
} else {
// Since the pid is local we want to get a reference to it's
@@ -2172,40 +2447,31 @@ void ProcessManager::settle()
namespace timers {
-timer create(double secs, const lambda::function<void(void)>& thunk)
+Timer create(double secs, const lambda::function<void(void)>& thunk)
{
- static long id = 0;
+ static uint64_t id = 1; // Start at 1 since Timer() instances start with 0.
- double timeout = Clock::now() + secs;
+ Timeout timeout(secs); // Assumes Clock::now() does Clock::now(__process__).
- if (__process__ != NULL) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- timeout = Clock::now(__process__) + secs;
- }
- }
- }
+ UPID pid = __process__ != NULL ? __process__->self() : UPID();
- timer timer;
- timer.id = id++;
- timer.timeout = timeout;
- timer.pid = __process__ != NULL ? __process__->self() : UPID();
- timer.thunk = thunk;
+ Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
VLOG(2) << "Created a timer for "
- << std::fixed << std::setprecision(9) << timeout;
+ << std::fixed << std::setprecision(9) << timeout.value();
// Add the timer.
synchronized (timeouts) {
- if (timeouts->size() == 0 || timer.timeout < timeouts->begin()->first) {
+ if (timeouts->size() == 0 ||
+ timer.timeout().value() < timeouts->begin()->first) {
// Need to interrupt the loop to update/set timer repeat.
- (*timeouts)[timer.timeout].push_back(timer);
+ (*timeouts)[timer.timeout().value()].push_back(timer);
update_timer = true;
ev_async_send(loop, &async_watcher);
} else {
// Timer repeat is adequate, just add the timeout.
CHECK(timeouts->size() >= 1);
- (*timeouts)[timer.timeout].push_back(timer);
+ (*timeouts)[timer.timeout().value()].push_back(timer);
}
}
@@ -2213,22 +2479,26 @@ timer create(double secs, const lambda::
}
-void cancel(const timer& timer)
+bool cancel(const Timer& timer)
{
+ bool canceled = false;
synchronized (timeouts) {
- // Check if the timeout is still pending, and if so, erase
- // it. In addition, erase an empty list if we just removed the
- // last timeout.
- if (timeouts->count(timer.timeout) > 0) {
- (*timeouts)[timer.timeout].remove(timer);
- if ((*timeouts)[timer.timeout].empty()) {
- timeouts->erase(timer.timeout);
+ // Check if the timeout is still pending, and if so, erase it. In
+ // addition, erase an empty list if we just removed the last
+ // timeout.
+ if (timeouts->count(timer.timeout().value()) > 0) {
+ canceled = true;
+ (*timeouts)[timer.timeout().value()].remove(timer);
+ if ((*timeouts)[timer.timeout().value()].empty()) {
+ timeouts->erase(timer.timeout().value());
}
}
}
+
+ return canceled;
}
-} // namespace timeouts {
+} // namespace timers {
ProcessBase::ProcessBase(const std::string& _id)
@@ -2241,17 +2511,17 @@ ProcessBase::ProcessBase(const std::stri
refs = 0;
- // Generate string representation of unique id for process.
if (_id != "") {
pid.id = _id;
} else {
+ // Generate string representation of unique id for process.
stringstream out;
- out << __sync_add_and_fetch(&id, 1);
+ out << __sync_add_and_fetch(&__id__, 1);
pid.id = out.str();
}
- pid.ip = ip;
- pid.port = port;
+ pid.ip = __ip__;
+ pid.port = __port__;
// If using a manual clock, try and set current time of process
// using happens before relationship between creator and createe!
@@ -2336,6 +2606,8 @@ void ProcessBase::enqueue(Event* event,
CHECK(state == BOTTOM ||
state == READY ||
state == RUNNING);
+ } else {
+ delete event;
}
}
unlock();
@@ -2382,19 +2654,23 @@ void ProcessBase::visit(const MessageEve
void ProcessBase::visit(const DispatchEvent& event)
{
- (*event.function)(this);
+ (*event.f)(this);
}
void ProcessBase::visit(const HttpEvent& event)
{
- // Determine the request "name" (i.e., path).
- size_t index = event.request->path.find('/', 1);
+ VLOG(2) << "Handling HTTP event for process '" << pid.id << "'"
+ << " with path: '" << event.request->path << "'";
+
+ CHECK(event.request->path.find('/') == 0); // See ProcessManager::deliver.
- // Only want the last part of the path (e.g., "foo" in /path/to/foo).
- index = index != string::npos ? index + 1 : event.request->path.size();
+ // Split the path by '/'.
+ vector<string> tokens = tokenize(event.request->path, "/");
+ CHECK(tokens.size() >= 1);
+ CHECK(tokens[0] == pid.id);
- const string& name = event.request->path.substr(index);
+ const string& name = tokens.size() > 1 ? tokens[1] : "";
if (handlers.http.count(name) > 0) {
// Create the promise to link with whatever gets returned, as well
@@ -2405,21 +2681,40 @@ void ProcessBase::visit(const HttpEvent&
Future<HttpResponse>* future = new Future<HttpResponse>(promise->future());
// Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(event.c);
+ PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
// Let the HttpProxy know about this request (via the future).
dispatch(proxy, &HttpProxy::handle, future, event.request->keepAlive);
- // Finally, call the handler and associate the response with the promise.
+ // Now call the handler and associate the response with the promise.
internal::associate(handlers.http[name](*event.request), promise);
+ } else if (resources.count(name) > 0) {
+ HttpOKResponse response;
+ response.headers["Content-Type"] = resources[name].type;
+ response.type = HttpResponse::PATH;
+ response.path = resources[name].path;
+
+ // Construct the final path by appending remaining tokens.
+ for (int i = 2; i < tokens.size(); i++) {
+ response.path += "/" + tokens[i];
+ }
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
+
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue, response, event.request->keepAlive);
} else {
VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";
- HttpResponseEncoder* encoder =
- new HttpResponseEncoder(HttpNotFoundResponse());
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
- // TODO(benh): Socket might be closed and then re-opened!
- socket_manager->send(encoder, event.c, event.request->keepAlive);
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue,
+ HttpNotFoundResponse(), event.request->keepAlive);
}
}
@@ -2567,11 +2862,13 @@ void post(const UPID& to, const string&
namespace internal {
-void dispatch(const UPID& pid, lambda::function<void(ProcessBase*)>* f)
+void dispatch(
+ const UPID& pid,
+ const std::tr1::shared_ptr<std::tr1::function<void(ProcessBase*)> >& f)
{
process::initialize();
- process_manager->deliver(pid, f, __process__);
+ process_manager->deliver(pid, new DispatchEvent(f), __process__);
}
} // namespace internal {