You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/29 19:40:53 UTC
[15/35] Renamed 'third_party' to '3rdparty'.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/third_party/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/third_party/libprocess/src/process.cpp b/third_party/libprocess/src/process.cpp
deleted file mode 100644
index 86414e5..0000000
--- a/third_party/libprocess/src/process.cpp
+++ /dev/null
@@ -1,3446 +0,0 @@
-#include <errno.h>
-#include <ev.h>
-#include <limits.h>
-#include <libgen.h>
-#include <netdb.h>
-#include <pthread.h>
-#include <signal.h>
-#include <stdarg.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include <arpa/inet.h>
-
-#include <glog/logging.h>
-
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-
-#include <sys/ioctl.h>
-#include <sys/mman.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-
-#include <algorithm>
-#include <deque>
-#include <fstream>
-#include <iomanip>
-#include <iostream>
-#include <list>
-#include <map>
-#include <queue>
-#include <set>
-#include <sstream>
-#include <stack>
-#include <stdexcept>
-#include <vector>
-
-#include <tr1/functional>
-#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
-
-#include <boost/shared_array.hpp>
-
-#include <process/clock.hpp>
-#include <process/defer.hpp>
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/executor.hpp>
-#include <process/filter.hpp>
-#include <process/future.hpp>
-#include <process/gc.hpp>
-#include <process/id.hpp>
-#include <process/io.hpp>
-#include <process/logging.hpp>
-#include <process/mime.hpp>
-#include <process/process.hpp>
-#include <process/profiler.hpp>
-#include <process/socket.hpp>
-#include <process/statistics.hpp>
-#include <process/thread.hpp>
-#include <process/time.hpp>
-#include <process/timer.hpp>
-
-#include <stout/duration.hpp>
-#include <stout/foreach.hpp>
-#include <stout/lambda.hpp>
-#include <stout/net.hpp>
-#include <stout/os.hpp>
-#include <stout/strings.hpp>
-
-#include "config.hpp"
-#include "decoder.hpp"
-#include "encoder.hpp"
-#include "gate.hpp"
-#include "synchronized.hpp"
-
-using process::wait; // Necessary on some OS's to disambiguate.
-
-using process::http::BadRequest;
-using process::http::InternalServerError;
-using process::http::NotFound;
-using process::http::OK;
-using process::http::Request;
-using process::http::Response;
-using process::http::ServiceUnavailable;
-
-using std::deque;
-using std::find;
-using std::list;
-using std::map;
-using std::ostream;
-using std::pair;
-using std::queue;
-using std::set;
-using std::stack;
-using std::string;
-using std::stringstream;
-using std::vector;
-
-// Represents a remote "node" (encapsulates IP address and port).
-class Node
-{
-public:
- Node(uint32_t _ip = 0, uint16_t _port = 0)
- : ip(_ip), port(_port) {}
-
- bool operator < (const Node& that) const
- {
- if (ip == that.ip) {
- return port < that.port;
- } else {
- return ip < that.ip;
- }
- }
-
- ostream& operator << (ostream& stream) const
- {
- stream << ip << ":" << port;
- return stream;
- }
-
- uint32_t ip;
- uint16_t port;
-};
-
-
-namespace process {
-
-namespace ID {
-
-string generate(const string& prefix)
-{
- static map<string, int> prefixes;
- stringstream out;
- out << __sync_add_and_fetch(&prefixes[prefix], 1);
- return prefix + "(" + out.str() + ")";
-}
-
-} // namespace ID {
-
-
-namespace http {
-
-hashmap<uint16_t, string> statuses;
-
-} // namespace http {
-
-
-namespace mime {
-
-map<string, string> types;
-
-} // namespace mime {
-
-
-// Provides reference counting semantics for a process pointer.
-class ProcessReference
-{
-public:
- ProcessReference() : process(NULL) {}
-
- ~ProcessReference()
- {
- cleanup();
- }
-
- ProcessReference(const ProcessReference& that)
- {
- copy(that);
- }
-
- ProcessReference& operator = (const ProcessReference& that)
- {
- if (this != &that) {
- cleanup();
- copy(that);
- }
- return *this;
- }
-
- ProcessBase* operator -> ()
- {
- return process;
- }
-
- operator ProcessBase* ()
- {
- return process;
- }
-
- operator bool () const
- {
- return process != NULL;
- }
-
-private:
- friend class ProcessManager; // For ProcessManager::use.
-
- ProcessReference(ProcessBase* _process)
- : process(_process)
- {
- if (process != NULL) {
- __sync_fetch_and_add(&(process->refs), 1);
- }
- }
-
- void copy(const ProcessReference& that)
- {
- process = that.process;
-
- if (process != NULL) {
- // There should be at least one reference to the process, so
- // we don't need to worry about checking if it's exiting or
- // not, since we know we can always create another reference.
- CHECK(process->refs > 0);
- __sync_fetch_and_add(&(process->refs), 1);
- }
- }
-
- void cleanup()
- {
- if (process != NULL) {
- __sync_fetch_and_sub(&(process->refs), 1);
- }
- }
-
- ProcessBase* process;
-};
-
-
-// Provides a process that manages sending HTTP responses so as to
-// satisfy HTTP/1.1 pipelining. Each request should either enqueue a
-// response, or ask the proxy to handle a future response. The process
-// is responsible for making sure the responses are sent in the same
-// order as the requests. Note that we use a 'Socket' in order to keep
-// the underyling file descriptor from getting closed while there
-// might still be outstanding responses even though the client might
-// have closed the connection (see more discussion in
-// SocketManger::close and SocketManager::proxy).
-class HttpProxy : public Process<HttpProxy>
-{
-public:
- HttpProxy(const Socket& _socket);
- virtual ~HttpProxy();
-
- // Enqueues the response to be sent once all previously enqueued
- // responses have been processed (e.g., waited for and sent).
- void enqueue(const Response& response, const Request& request);
-
- // Enqueues a future to a response that will get waited on (up to
- // some timeout) and then sent once all previously enqueued
- // responses have been processed (e.g., waited for and sent).
- void handle(Future<Response>* future, const Request& request);
-
-private:
- // Starts "waiting" on the next available future response.
- void next();
-
- // Invoked once a future response has been satisfied.
- void waited(const Future<Response>& future);
-
- // Demuxes and handles a response.
- bool process(const Future<Response>& future, const Request& request);
-
- // Handles stream (i.e., pipe) based responses.
- void stream(const Future<short>& poll, const Request& request);
-
- Socket socket; // Wrap the socket to keep it from getting closed.
-
- // Describes a queue "item" that wraps the future to the response
- // and the original request.
- // The original request contains needed information such as what encodings
- // are acceptable and whether to persist the connection.
- struct Item
- {
- Item(const Request& _request, Future<Response>* _future)
- : request(_request), future(_future) {}
-
- ~Item()
- {
- delete future;
- }
-
- const Request request; // Make a copy.
- Future<Response>* future;
- };
-
- queue<Item*> items;
-
- Option<int> pipe; // Current pipe, if streaming.
-};
-
-
-class SocketManager
-{
-public:
- SocketManager();
- ~SocketManager();
-
- Socket accepted(int s);
-
- void link(ProcessBase* process, const UPID& to);
-
- PID<HttpProxy> proxy(const Socket& socket);
-
- void send(Encoder* encoder, bool persist);
- void send(const Response& response,
- const Request& request,
- const Socket& socket);
- void send(Message* message);
-
- Encoder* next(int s);
-
- void close(int s);
-
- void exited(const Node& node);
- void exited(ProcessBase* process);
-
-private:
- // Map from UPID (local/remote) to process.
- map<UPID, set<ProcessBase*> > links;
-
- // Collection of all actice sockets.
- map<int, Socket> sockets;
-
- // Collection of sockets that should be disposed when they are
- // finished being used (e.g., when there is no more data to send on
- // them).
- set<int> dispose;
-
- // Map from socket to node (ip, port).
- map<int, Node> nodes;
-
- // Maps from node (ip, port) to temporary sockets (i.e., they will
- // get closed once there is no more data to send on them).
- map<Node, int> temps;
-
- // Maps from node (ip, port) to persistent sockets (i.e., they will
- // remain open even if there is no more data to send on them). We
- // distinguish these from the 'temps' collection so we can tell when
- // a persistant socket has been lost (and thus generate
- // ExitedEvents).
- map<Node, int> persists;
-
- // Map from socket to outgoing queue.
- map<int, queue<Encoder*> > outgoing;
-
- // HTTP proxies.
- map<int, HttpProxy*> proxies;
-
- // Protects instance variables.
- synchronizable(this);
-};
-
-
-class ProcessManager
-{
-public:
- ProcessManager(const string& delegate);
- ~ProcessManager();
-
- ProcessReference use(const UPID& pid);
-
- bool handle(
- const Socket& socket,
- Request* request);
-
- bool deliver(
- ProcessBase* receiver,
- Event* event,
- ProcessBase* sender = NULL);
-
- bool deliver(
- const UPID& to,
- Event* event,
- ProcessBase* sender = NULL);
-
- UPID spawn(ProcessBase* process, bool manage);
- void resume(ProcessBase* process);
- void cleanup(ProcessBase* process);
- void link(ProcessBase* process, const UPID& to);
- void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
- bool wait(const UPID& pid);
-
- void enqueue(ProcessBase* process);
- ProcessBase* dequeue();
-
- void settle();
-
-private:
- // Delegate process name to receive root HTTP requests.
- const string delegate;
-
- // Map of all local spawned and running processes.
- map<string, ProcessBase*> processes;
- synchronizable(processes);
-
- // Gates for waiting threads (protected by synchronizable(processes)).
- map<ProcessBase*, Gate*> gates;
-
- // Queue of runnable processes (implemented using list).
- list<ProcessBase*> runq;
- synchronizable(runq);
-
- // Number of running processes, to support Clock::settle operation.
- int running;
-};
-
-
-// Unique id that can be assigned to each process.
-static uint32_t __id__ = 0;
-
-// Local server socket.
-static int __s__ = -1;
-
-// Local IP address.
-static uint32_t __ip__ = 0;
-
-// Local port.
-static uint16_t __port__ = 0;
-
-// Active SocketManager (eventually will probably be thread-local).
-static SocketManager* socket_manager = NULL;
-
-// Active ProcessManager (eventually will probably be thread-local).
-static ProcessManager* process_manager = NULL;
-
-// Event loop.
-static struct ev_loop* loop = NULL;
-
-// Asynchronous watcher for interrupting loop.
-static ev_async async_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
-// Server watcher for accepting connections.
-static ev_io server_watcher;
-
-// Queue of I/O watchers.
-static queue<ev_io*>* watchers = new queue<ev_io*>();
-static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
-
-// We store the timers in a map of lists indexed by the timeout of the
-// timer so that we can have two timers that have the same timeout. We
-// exploit that the map is SORTED!
-static map<Time, list<Timer> >* timeouts =
- new map<Time, list<Timer> >();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// For supporting Clock::settle(), true if timers have been removed
-// from 'timeouts' but may not have been executed yet. Protected by
-// the timeouts lock. This is only used when the clock is paused.
-static bool pending_timers = false;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
-
-// Scheduling gate that threads wait at when there is nothing to run.
-static Gate* gate = new Gate();
-
-// Filter. Synchronized support for using the filterer needs to be
-// recursive incase a filterer wants to do anything fancy (which is
-// possible and likely given that filters will get used for testing).
-static Filter* filterer = NULL;
-static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Global garbage collector.
-PID<GarbageCollector> gc;
-
-// Per thread process pointer.
-ThreadLocal<ProcessBase>* _process_ = new ThreadLocal<ProcessBase>();
-
-// Per thread executor pointer.
-ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
-
-
-// We namespace the clock related variables to keep them well
-// named. In the future we'll probably want to associate a clock with
-// a specific ProcessManager/SocketManager instance pair, so this will
-// likely change.
-namespace clock {
-
-map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
-
-Time initial = Time::EPOCH;
-Time current = Time::EPOCH;
-
-bool paused = false;
-
-} // namespace clock {
-
-
-Time Time::EPOCH = Time(Duration::zero());
-
-
-Time Time::MAX = Time(Duration::max());
-
-
-Time Clock::now()
-{
- return now(__process__);
-}
-
-
-Time Clock::now(ProcessBase* process)
-{
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (process != NULL) {
- if (clock::currents->count(process) != 0) {
- return (*clock::currents)[process];
- } else {
- return (*clock::currents)[process] = clock::initial;
- }
- } else {
- return clock::current;
- }
- }
- }
-
- // TODO(benh): Versus ev_now()?
- double d = ev_time();
- Try<Time> time = Time::create(d);
-
- // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
- // here.
- if (time.isError()) {
- LOG(FATAL) << "Failed to create a Time from " << d << ": "
- << time.error();
- }
- return time.get();
-}
-
-
-void Clock::pause()
-{
- process::initialize(); // To make sure the libev watchers are ready.
-
- synchronized (timeouts) {
- if (!clock::paused) {
- clock::initial = clock::current = now();
- clock::paused = true;
- VLOG(2) << "Clock paused at " << clock::initial;
- }
- }
-
- // Note that after pausing the clock an existing libev timer might
- // still fire (invoking handle_timeout), but since paused == true no
- // "time" will actually have passed, so no timer will actually fire.
-}
-
-
-bool Clock::paused()
-{
- return clock::paused;
-}
-
-
-void Clock::resume()
-{
- process::initialize(); // To make sure the libev watchers are ready.
-
- synchronized (timeouts) {
- if (clock::paused) {
- VLOG(2) << "Clock resumed at " << clock::current;
- clock::paused = false;
- clock::currents->clear();
- update_timer = true;
- ev_async_send(loop, &async_watcher);
- }
- }
-}
-
-
-void Clock::advance(const Duration& duration)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- clock::current += duration;
- VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_watcher);
- }
- }
- }
-}
-
-
-void Clock::advance(ProcessBase* process, const Duration& duration)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- Time current = now(process);
- current += duration;
- (*clock::currents)[process] = current;
- VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
- << ") to " << current;
- }
- }
-}
-
-
-void Clock::update(const Time& time)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- if (clock::current < time) {
- clock::current = Time(time);
- VLOG(2) << "Clock updated to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_watcher);
- }
- }
- }
- }
-}
-
-
-void Clock::update(ProcessBase* process, const Time& time)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- if (now(process) < time) {
- VLOG(2) << "Clock of " << process->self() << " updated to " << time;
- (*clock::currents)[process] = Time(time);
- }
- }
- }
-}
-
-
-void Clock::order(ProcessBase* from, ProcessBase* to)
-{
- update(to, now(from));
-}
-
-
-void Clock::settle()
-{
- CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
- process_manager->settle();
-}
-
-
-static Message* encode(const UPID& from,
- const UPID& to,
- const string& name,
- const string& data = "")
-{
- Message* message = new Message();
- message->from = from;
- message->to = to;
- message->name = name;
- message->body = data;
- return message;
-}
-
-
-static void transport(Message* message, ProcessBase* sender = NULL)
-{
- if (message->to.ip == __ip__ && message->to.port == __port__) {
- // Local message.
- process_manager->deliver(message->to, new MessageEvent(message), sender);
- } else {
- // Remote message.
- socket_manager->send(message);
- }
-}
-
-
-static bool libprocess(Request* request)
-{
- return request->method == "POST" &&
- request->headers.count("User-Agent") > 0 &&
- request->headers["User-Agent"].find("libprocess/") == 0;
-}
-
-
-static Message* parse(Request* request)
-{
- // TODO(benh): Do better error handling (to deal with a malformed
- // libprocess message, malicious or otherwise).
- const string& agent = request->headers["User-Agent"];
- const string& identifier = "libprocess/";
- size_t index = agent.find(identifier);
- if (index != string::npos) {
- // Okay, now determine 'from'.
- const UPID from(agent.substr(index + identifier.size(), agent.size()));
-
- // Now determine 'to'.
- index = request->path.find('/', 1);
- index = index != string::npos ? index - 1 : string::npos;
- const UPID to(request->path.substr(1, index), __ip__, __port__);
-
- // And now determine 'name'.
- index = index != string::npos ? index + 2: request->path.size();
- const string& name = request->path.substr(index);
-
- VLOG(2) << "Parsed message name '" << name
- << "' for " << to << " from " << from;
-
- Message* message = new Message();
- message->name = name;
- message->from = from;
- message->to = to;
- message->body = request->body;
-
- return message;
- }
-
- return NULL;
-}
-
-
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
-{
- synchronized (watchers) {
- // Start all the new I/O watchers.
- while (!watchers->empty()) {
- ev_io* watcher = watchers->front();
- watchers->pop();
- ev_io_start(loop, watcher);
- }
- }
-
- synchronized (timeouts) {
- if (update_timer) {
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat = (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we
- // don't want time to advance (instead a call to
- // clock::advance() will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
-
- ev_timer_again(loop, &timeouts_watcher);
- }
- }
-
- update_timer = false;
- }
- }
-}
-
-
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
-{
- list<Timer> timedout;
-
- synchronized (timeouts) {
- Time now = Clock::now();
-
- VLOG(3) << "Handling timeouts up to " << now;
-
- foreachkey (const Time& timeout, *timeouts) {
- if (timeout > now) {
- break;
- }
-
- VLOG(3) << "Have timeout(s) at " << timeout;
-
- // Record that we have pending timers to execute so the
- // Clock::settle() operation can wait until we're done.
- pending_timers = true;
-
- foreach (const Timer& timer, (*timeouts)[timeout]) {
- timedout.push_back(timer);
- }
- }
-
- // Now erase the range of timeouts that timed out.
- timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
-
- // Okay, so the timeout for the next timer should not have fired.
- CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
- // Update the timer as necessary.
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we don't
- // want time to advance (instead a call to Clock::advance()
- // will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
-
- ev_timer_again(loop, &timeouts_watcher);
- }
- }
-
- update_timer = false; // Since we might have a queued update_timer.
- }
-
- // Update current time of process (if it's present/valid). It might
- // be necessary to actually add some more synchronization around
- // this so that, for example, pausing and resuming the clock doesn't
- // cause some processes to get thier current times updated and
- // others not. Since ProcessManager::use acquires the 'processes'
- // lock we had to move this out of the synchronized (timeouts) above
- // since there was a deadlock with acquring 'processes' then
- // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
- // current time may be greater than the timeout if a local message
- // was received (and happens-before kicks in).
- if (Clock::paused()) {
- foreach (const Timer& timer, timedout) {
- if (ProcessReference process = process_manager->use(timer.creator())) {
- Clock::update(process, timer.timeout().time());
- }
- }
- }
-
- // Invoke the timers that timed out (TODO(benh): Do this
- // asynchronously so that we don't tie up the event thread!).
- foreach (const Timer& timer, timedout) {
- timer();
- }
-
- // Mark ourselves as done executing the timers since it's now safe
- // for a call to Clock::settle() to check if there will be any
- // future timeouts reached.
- synchronized (timeouts) {
- pending_timers = false;
- }
-}
-
-
-void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- DataDecoder* decoder = (DataDecoder*) watcher->data;
-
- int s = watcher->fd;
-
- while (true) {
- const ssize_t size = 80 * 1024;
- ssize_t length = 0;
-
- char data[size];
-
- length = recv(s, data, size, 0);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while receiving: " << error;
- } else {
- VLOG(1) << "Socket closed while receiving";
- }
- socket_manager->close(s);
- delete decoder;
- ev_io_stop(loop, watcher);
- delete watcher;
- break;
- } else {
- CHECK(length > 0);
-
- // Decode as much of the data as possible into HTTP requests.
- const deque<Request*>& requests = decoder->decode(data, length);
-
- if (!requests.empty()) {
- foreach (Request* request, requests) {
- process_manager->handle(decoder->socket(), request);
- }
- } else if (requests.empty() && decoder->failed()) {
- VLOG(1) << "Decoder error while receiving";
- socket_manager->close(s);
- delete decoder;
- ev_io_stop(loop, watcher);
- delete watcher;
- break;
- }
- }
- }
-}
-
-
-void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- DataEncoder* encoder = (DataEncoder*) watcher->data;
-
- int s = watcher->fd;
-
- while (true) {
- const void* data;
- size_t size;
-
- data = encoder->next(&size);
- CHECK(size > 0);
-
- ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- encoder->backup(size);
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- encoder->backup(size);
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- socket_manager->close(s);
- delete encoder;
- ev_io_stop(loop, watcher);
- delete watcher;
- break;
- } else {
- CHECK(length > 0);
-
- // Update the encoder with the amount sent.
- encoder->backup(size - length);
-
- // See if there is any more of the message to send.
- if (encoder->remaining() == 0) {
- delete encoder;
-
- // Stop this watcher for now.
- ev_io_stop(loop, watcher);
-
- // Check for more stuff to send on socket.
- Encoder* next = socket_manager->next(s);
- if (next != NULL) {
- watcher->data = next;
- ev_io_init(watcher, next->sender(), s, EV_WRITE);
- ev_io_start(loop, watcher);
- } else {
- // Nothing more to send right now, clean up.
- delete watcher;
- }
- break;
- }
- }
- }
-}
-
-
-void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- FileEncoder* encoder = (FileEncoder*) watcher->data;
-
- int s = watcher->fd;
-
- while (true) {
- int fd;
- off_t offset;
- size_t size;
-
- fd = encoder->next(&offset, &size);
- CHECK(size > 0);
-
- ssize_t length = sendfile(s, fd, offset, size);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- encoder->backup(size);
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- encoder->backup(size);
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- socket_manager->close(s);
- delete encoder;
- ev_io_stop(loop, watcher);
- delete watcher;
- break;
- } else {
- CHECK(length > 0);
-
- // Update the encoder with the amount sent.
- encoder->backup(size - length);
-
- // See if there is any more of the message to send.
- if (encoder->remaining() == 0) {
- delete encoder;
-
- // Stop this watcher for now.
- ev_io_stop(loop, watcher);
-
- // Check for more stuff to send on socket.
- Encoder* next = socket_manager->next(s);
- if (next != NULL) {
- watcher->data = next;
- ev_io_init(watcher, next->sender(), s, EV_WRITE);
- ev_io_start(loop, watcher);
- } else {
- // Nothing more to send right now, clean up.
- delete watcher;
- }
- break;
- }
- }
- }
-}
-
-
-void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- int s = watcher->fd;
-
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
-
- if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
- // Connect failure.
- VLOG(1) << "Socket error while connecting";
- socket_manager->close(s);
- MessageEncoder* encoder = (MessageEncoder*) watcher->data;
- delete encoder;
- ev_io_stop(loop, watcher);
- delete watcher;
- } else {
- // We're connected! Now let's do some sending.
- ev_io_stop(loop, watcher);
- ev_io_init(watcher, send_data, s, EV_WRITE);
- ev_io_start(loop, watcher);
- }
-}
-
-
-void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- int s = watcher->fd;
-
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
-
- if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
- // Connect failure.
- VLOG(1) << "Socket error while connecting";
- socket_manager->close(s);
- DataDecoder* decoder = (DataDecoder*) watcher->data;
- delete decoder;
- ev_io_stop(loop, watcher);
- delete watcher;
- } else {
- // We're connected! Now let's do some receiving.
- ev_io_stop(loop, watcher);
- ev_io_init(watcher, recv_data, s, EV_READ);
- ev_io_start(loop, watcher);
- }
-}
-
-
-void accept(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- CHECK_EQ(__s__, watcher->fd);
-
- sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
-
- if (s < 0) {
- return;
- }
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return;
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return;
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- } else {
- // Inform the socket manager for proper bookkeeping.
- const Socket& socket = socket_manager->accepted(s);
-
- // Allocate and initialize the decoder and watcher.
- DataDecoder* decoder = new DataDecoder(socket);
-
- ev_io* watcher = new ev_io();
- watcher->data = decoder;
-
- ev_io_init(watcher, recv_data, s, EV_READ);
- ev_io_start(loop, watcher);
- }
-}
-
-
-void polled(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- Promise<short>* promise = (Promise<short>*) watcher->data;
- promise->set(revents);
- delete promise;
-
- ev_io_stop(loop, watcher);
- delete watcher;
-}
-
-
-void* serve(void* arg)
-{
- ev_loop(((struct ev_loop*) arg), 0);
-
- return NULL;
-}
-
-
-void* schedule(void* arg)
-{
- do {
- ProcessBase* process = process_manager->dequeue();
- if (process == NULL) {
- Gate::state_t old = gate->approach();
- process = process_manager->dequeue();
- if (process == NULL) {
- gate->arrive(old); // Wait at gate if idle.
- continue;
- } else {
- gate->leave();
- }
- }
- process_manager->resume(process);
- } while (true);
-}
-
-
-// We might find value in catching terminating signals at some point.
-// However, for now, adding signal handlers freely is not allowed
-// because they will clash with Java and Python virtual machines and
-// causes hard to debug crashes/segfaults.
-
-// void sigbad(int signal, struct sigcontext *ctx)
-// {
-// // Pass on the signal (so that a core file is produced).
-// struct sigaction sa;
-// sa.sa_handler = SIG_DFL;
-// sigemptyset(&sa.sa_mask);
-// sa.sa_flags = 0;
-// sigaction(signal, &sa, NULL);
-// raise(signal);
-// }
-
-
-void initialize(const string& delegate)
-{
- // TODO(benh): Return an error if attempting to initialize again
- // with a different delegate then originally specified.
-
- // static pthread_once_t init = PTHREAD_ONCE_INIT;
- // pthread_once(&init, ...);
-
- static volatile bool initialized = false;
- static volatile bool initializing = true;
-
- // Try and do the initialization or wait for it to complete.
- if (initialized && !initializing) {
- return;
- } else if (initialized && initializing) {
- while (initializing);
- return;
- } else {
- if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
- while (initializing);
- return;
- }
- }
-
-// // Install signal handler.
-// struct sigaction sa;
-
-// sa.sa_handler = (void (*) (int)) sigbad;
-// sigemptyset (&sa.sa_mask);
-// sa.sa_flags = SA_RESTART;
-
-// sigaction (SIGTERM, &sa, NULL);
-// sigaction (SIGINT, &sa, NULL);
-// sigaction (SIGQUIT, &sa, NULL);
-// sigaction (SIGSEGV, &sa, NULL);
-// sigaction (SIGILL, &sa, NULL);
-// #ifdef SIGBUS
-// sigaction (SIGBUS, &sa, NULL);
-// #endif
-// #ifdef SIGSTKFLT
-// sigaction (SIGSTKFLT, &sa, NULL);
-// #endif
-// sigaction (SIGABRT, &sa, NULL);
-
-// sigaction (SIGFPE, &sa, NULL);
-
-#ifdef __sun__
- /* Need to ignore this since we can't do MSG_NOSIGNAL on Solaris. */
- signal(SIGPIPE, SIG_IGN);
-#endif // __sun__
-
- // Create a new ProcessManager and SocketManager.
- process_manager = new ProcessManager(delegate);
- socket_manager = new SocketManager();
-
- // Setup processing threads.
- long cpus = std::max(4L, sysconf(_SC_NPROCESSORS_ONLN));
-
- for (int i = 0; i < cpus; i++) {
- pthread_t thread; // For now, not saving handles on our threads.
- if (pthread_create(&thread, NULL, schedule, NULL) != 0) {
- LOG(FATAL) << "Failed to initialize, pthread_create";
- }
- }
-
- __ip__ = 0;
- __port__ = 0;
-
- char* value;
-
- // Check environment for ip.
- value = getenv("LIBPROCESS_IP");
- if (value != NULL) {
- int result = inet_pton(AF_INET, value, &__ip__);
- if (result == 0) {
- LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
- } else if (result < 0) {
- PLOG(FATAL) << "Failed to initialize, inet_pton";
- }
- }
-
- // Check environment for port.
- value = getenv("LIBPROCESS_PORT");
- if (value != NULL) {
- int result = atoi(value);
- if (result < 0 || result > USHRT_MAX) {
- LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
- }
- __port__ = result;
- }
-
- // Create a "server" socket for communicating with other nodes.
- if ((__s__ = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- PLOG(FATAL) << "Failed to initialize, socket";
- }
-
- // Make socket non-blocking.
- Try<Nothing> nonblock = os::nonblock(__s__);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
- }
-
- // Set FD_CLOEXEC flag.
- Try<Nothing> cloexec = os::cloexec(__s__);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
- }
-
- // Allow address reuse.
- int on = 1;
- if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
- PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
- }
-
- // Set up socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = __ip__;
- addr.sin_port = htons(__port__);
-
- if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
- PLOG(FATAL) << "Failed to initialize, bind";
- }
-
- // Lookup and store assigned ip and assigned port.
- socklen_t addrlen = sizeof(addr);
- if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
- PLOG(FATAL) << "Failed to initialize, getsockname";
- }
-
- __ip__ = addr.sin_addr.s_addr;
- __port__ = ntohs(addr.sin_port);
-
- // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
- // actually have a valid external ip address. Note that we need only
- // one ip address, so that other processes can send and receive and
- // don't get confused as to whom they are sending to.
- if (__ip__ == 0 || __ip__ == 2130706433) {
- char hostname[512];
-
- if (gethostname(hostname, sizeof(hostname)) < 0) {
- PLOG(FATAL) << "Ffailed to initialize, gethostname";
- }
-
- // Lookup IP address of local hostname.
- hostent* he;
-
- if ((he = gethostbyname2(hostname, AF_INET)) == NULL) {
- PLOG(FATAL) << "Failed to initialize, gethostbyname2";
- }
-
- __ip__ = *((uint32_t *) he->h_addr_list[0]);
- }
-
- if (listen(__s__, 500000) < 0) {
- PLOG(FATAL) << "Failed to initialize, listen";
- }
-
- // Setup event loop.
-#ifdef __sun__
- loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
-#else
- loop = ev_default_loop(EVFLAG_AUTO);
-#endif // __sun__
-
- ev_async_init(&async_watcher, handle_async);
- ev_async_start(loop, &async_watcher);
-
- ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
- ev_timer_again(loop, &timeouts_watcher);
-
- ev_io_init(&server_watcher, accept, __s__, EV_READ);
- ev_io_start(loop, &server_watcher);
-
-// ev_child_init(&child_watcher, child_exited, pid, 0);
-// ev_child_start(loop, &cw);
-
-// /* Install signal handler. */
-// struct sigaction sa;
-
-// sa.sa_handler = ev_sighandler;
-// sigfillset (&sa.sa_mask);
-// sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
-// sigaction (w->signum, &sa, 0);
-
-// sigemptyset (&sa.sa_mask);
-// sigaddset (&sa.sa_mask, w->signum);
-// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
-
- pthread_t thread; // For now, not saving handles on our threads.
- if (pthread_create(&thread, NULL, serve, loop) != 0) {
- LOG(FATAL) << "Failed to initialize, pthread_create";
- }
-
- // Need to set initialzing here so that we can actually invoke
- // 'spawn' below for the garbage collector.
- initializing = false;
-
- // TODO(benh): Make sure creating the garbage collector, logging
- // process, and profiler always succeeds and use supervisors to make
- // sure that none terminate.
-
- // Create global garbage collector process.
- gc = spawn(new GarbageCollector());
-
- // Create the global logging process.
- spawn(new Logging(), true);
-
- // Create the global profiler process.
- spawn(new Profiler(), true);
-
- // Create the global statistics.
- // TODO(bmahler): Investigate memory implications of this window
- // size. We may also want to provide a maximum memory size rather than
- // time window. Or, offload older data to disk, etc.
- process::statistics = new Statistics(Weeks(2));
-
- // Initialize the mime types.
- mime::initialize();
-
- // Initialize the response statuses.
- http::initialize();
-
- char temp[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
- PLOG(FATAL) << "Failed to initialize, inet_ntop";
- }
-
- VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
- << " for " << cpus << " cpus";
-}
-
-
-uint32_t ip()
-{
- process::initialize();
- return __ip__;
-}
-
-
-uint16_t port()
-{
- process::initialize();
- return __port__;
-}
-
-
-HttpProxy::HttpProxy(const Socket& _socket)
- : ProcessBase(ID::generate("__http__")),
- socket(_socket) {}
-
-
-HttpProxy::~HttpProxy()
-{
- // Need to make sure response producers know not to continue to
- // create a response (streaming or otherwise).
- if (pipe.isSome()) {
- os::close(pipe.get());
- }
- pipe = None();
-
- while (!items.empty()) {
- Item* item = items.front();
-
- // Attempt to discard the future.
- item->future->discard();
-
- // But it might have already been ready ...
- if (item->future->isReady()) {
- const Response& response = item->future->get();
- if (response.type == Response::PIPE) {
- os::close(response.pipe);
- }
- }
-
- items.pop();
- delete item;
- }
-}
-
-
-void HttpProxy::enqueue(const Response& response, const Request& request)
-{
- handle(new Future<Response>(response), request);
-}
-
-
-void HttpProxy::handle(Future<Response>* future, const Request& request)
-{
- items.push(new Item(request, future));
-
- if (items.size() == 1) {
- next();
- }
-}
-
-
-void HttpProxy::next()
-{
- if (items.size() > 0) {
- // Wait for any transition of the future.
- items.front()->future->onAny(
- defer(self(), &HttpProxy::waited, lambda::_1));
- }
-}
-
-
-void HttpProxy::waited(const Future<Response>& future)
-{
- CHECK(items.size() > 0);
- Item* item = items.front();
-
- CHECK(future == *item->future);
-
- // Process the item and determine if we're done or not (so we know
- // whether to start waiting on the next responses).
- bool processed = process(*item->future, item->request);
-
- items.pop();
- delete item;
-
- if (processed) {
- next();
- }
-}
-
-
-bool HttpProxy::process(const Future<Response>& future, const Request& request)
-{
- if (!future.isReady()) {
- // TODO(benh): Consider handling other "states" of future
- // (discarded, failed, etc) with different HTTP statuses.
- socket_manager->send(ServiceUnavailable(), request, socket);
- return true; // All done, can process next response.
- }
-
- Response response = future.get();
-
- // If the response specifies a path, try and perform a sendfile.
- if (response.type == Response::PATH) {
- // Make sure no body is sent (this is really an error and
- // should be reported and no response sent.
- response.body.clear();
-
- const string& path = response.path;
- int fd = open(path.c_str(), O_RDONLY);
- if (fd < 0) {
- if (errno == ENOENT || errno == ENOTDIR) {
- VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
- socket_manager->send(NotFound(), request, socket);
- } else {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to send file at '" << path << "': " << error;
- socket_manager->send(InternalServerError(), request, socket);
- }
- } else {
- struct stat s; // Need 'struct' because of function named 'stat'.
- if (fstat(fd, &s) != 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to send file at '" << path << "': " << error;
- socket_manager->send(InternalServerError(), request, socket);
- } else if (S_ISDIR(s.st_mode)) {
- VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
- socket_manager->send(NotFound(), request, socket);
- } else {
- // While the user is expected to properly set a 'Content-Type'
- // header, we fill in (or overwrite) 'Content-Length' header.
- stringstream out;
- out << s.st_size;
- response.headers["Content-Length"] = out.str();
-
- if (s.st_size == 0) {
- socket_manager->send(response, request, socket);
- return true; // All done, can process next request.
- }
-
- VLOG(1) << "Sending file at '" << path << "' with length " << s.st_size;
-
- // TODO(benh): Consider a way to have the socket manager turn
- // on TCP_CORK for both sends and then turn it off.
- socket_manager->send(
- new HttpResponseEncoder(socket, response, request),
- true);
-
- // Note the file descriptor gets closed by FileEncoder.
- socket_manager->send(
- new FileEncoder(socket, fd, s.st_size),
- request.keepAlive);
- }
- }
- } else if (response.type == Response::PIPE) {
- // Make sure no body is sent (this is really an error and
- // should be reported and no response sent.
- response.body.clear();
-
- // Make sure the pipe is nonblocking.
- Try<Nothing> nonblock = os::nonblock(response.pipe);
- if (nonblock.isError()) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed make pipe nonblocking: " << error;
- socket_manager->send(InternalServerError(), request, socket);
- return true; // All done, can process next response.
- }
-
- // While the user is expected to properly set a 'Content-Type'
- // header, we fill in (or overwrite) 'Transfer-Encoding' header.
- response.headers["Transfer-Encoding"] = "chunked";
-
- VLOG(1) << "Starting \"chunked\" streaming";
-
- socket_manager->send(
- new HttpResponseEncoder(socket, response, request),
- true);
-
- pipe = response.pipe;
-
- io::poll(pipe.get(), io::READ).onAny(
- defer(self(), &Self::stream, lambda::_1, request));
-
- return false; // Streaming, don't process next response (yet)!
- } else {
- socket_manager->send(response, request, socket);
- }
-
- return true; // All done, can process next response.
-}
-
-
-void HttpProxy::stream(const Future<short>& poll, const Request& request)
-{
- // TODO(benh): Use 'splice' on Linux.
-
- CHECK(pipe.isSome());
-
- bool finished = false; // Whether we're done streaming.
-
- if (poll.isReady()) {
- // Read and write.
- CHECK(poll.get() == io::READ);
- const size_t size = 4 * 1024; // 4K.
- char data[size];
- while (!finished) {
- ssize_t length = ::read(pipe.get(), data, size);
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- io::poll(pipe.get(), io::READ).onAny(
- defer(self(), &Self::stream, lambda::_1, request));
- break;
- } else {
- std::ostringstream out;
- if (length <= 0) {
- // Error or closed, treat both as closed.
- if (length < 0) {
- // Error.
- const char* error = strerror(errno);
- VLOG(1) << "Read error while streaming: " << error;
- }
- out << "0\r\n" << "\r\n";
- finished = true;
- } else {
- // Data!
- out << std::hex << length << "\r\n";
- out.write(data, length);
- out << "\r\n";
- }
-
- // We always persist the connection when we're not finished
- // streaming.
- socket_manager->send(
- new DataEncoder(socket, out.str()),
- finished ? request.keepAlive : true);
- }
- }
- } else if (poll.isFailed()) {
- VLOG(1) << "Failed to poll: " << poll.failure();
- socket_manager->send(InternalServerError(), request, socket);
- finished = true;
- } else {
- VLOG(1) << "Unexpected discarded future while polling";
- socket_manager->send(InternalServerError(), request, socket);
- finished = true;
- }
-
- if (finished) {
- os::close(pipe.get());
- pipe = None();
- next();
- }
-}
-
-
-SocketManager::SocketManager()
-{
- synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-}
-
-
-SocketManager::~SocketManager() {}
-
-
-Socket SocketManager::accepted(int s)
-{
- synchronized (this) {
- return sockets[s] = Socket(s);
- }
-}
-
-
-void SocketManager::link(ProcessBase* process, const UPID& to)
-{
- // TODO(benh): The semantics we want to support for link are such
- // that if there is nobody to link to (local or remote) then an
- // ExitedEvent gets generated. This functionality has only been
- // implemented when the link is local, not remote. Of course, if
- // there is nobody listening on the remote side, then this should
- // work remotely ... but if there is someone listening remotely just
- // not at that id, then it will silently continue executing.
-
- CHECK(process != NULL);
-
- Node node(to.ip, to.port);
-
- synchronized (this) {
- // Check if node is remote and there isn't a persistant link.
- if ((node.ip != __ip__ || node.port != __port__)
- && persists.count(node) == 0) {
- // Okay, no link, lets create a socket.
- int s;
-
- if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- PLOG(FATAL) << "Failed to link, socket";
- }
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
- }
-
- sockets[s] = Socket(s);
- nodes[s] = node;
-
- persists[node] = s;
-
- // Allocate and initialize the decoder and watcher (we really
- // only "receive" on this socket so that we can react when it
- // gets closed and generate appropriate lost events).
- DataDecoder* decoder = new DataDecoder(sockets[s]);
-
- ev_io* watcher = new ev_io();
- watcher->data = decoder;
-
- // Try and connect to the node using this socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(to.port);
- addr.sin_addr.s_addr = to.ip;
-
- if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS) {
- PLOG(FATAL) << "Failed to link, connect";
- }
-
- // Wait for socket to be connected.
- ev_io_init(watcher, receiving_connect, s, EV_WRITE);
- } else {
- ev_io_init(watcher, recv_data, s, EV_READ);
- }
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
- }
-
- links[to].insert(process);
- }
-}
-
-
-PID<HttpProxy> SocketManager::proxy(const Socket& socket)
-{
- HttpProxy* proxy = NULL;
-
- synchronized (this) {
- // This socket might have been asked to get closed (e.g., remote
- // side hang up) while a process is attempting to handle an HTTP
- // request. Thus, if there is no more socket, return an empty PID.
- if (sockets.count(socket) > 0) {
- if (proxies.count(socket) > 0) {
- return proxies[socket]->self();
- } else {
- proxy = new HttpProxy(sockets[socket]);
- proxies[socket] = proxy;
- }
- }
- }
-
- // Now check if we need to spawn a newly created proxy. Note that we
- // need to do this outside of the synchronized block above to avoid
- // a possible deadlock (because spawn eventually synchronizes on
- // ProcessManager and ProcessManager::cleanup synchronizes on
- // ProcessManager and then SocketManager, so a deadlock results if
- // we do spawn within the synchronized block above).
- if (proxy != NULL) {
- return spawn(proxy, true);
- }
-
- return PID<HttpProxy>();
-}
-
-
-void SocketManager::send(Encoder* encoder, bool persist)
-{
- CHECK(encoder != NULL);
-
- synchronized (this) {
- if (sockets.count(encoder->socket()) > 0) {
- // Update whether or not this socket should get disposed after
- // there is no more data to send.
- if (!persist) {
- dispose.insert(encoder->socket());
- }
-
- if (outgoing.count(encoder->socket()) > 0) {
- outgoing[encoder->socket()].push(encoder);
- } else {
- // Initialize the outgoing queue.
- outgoing[encoder->socket()];
-
- // Allocate and initialize the watcher.
- ev_io* watcher = new ev_io();
- watcher->data = encoder;
-
- ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
-
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- ev_async_send(loop, &async_watcher);
- }
- } else {
- VLOG(1) << "Attempting to send on a no longer valid socket!";
- delete encoder;
- }
- }
-}
-
-
-void SocketManager::send(
- const Response& response,
- const Request& request,
- const Socket& socket)
-{
- bool persist = request.keepAlive;
-
- // Don't persist the connection if the headers include
- // 'Connection: close'.
- if (response.headers.contains("Connection")) {
- if (response.headers.get("Connection").get() == "close") {
- persist = false;
- }
- }
-
- send(new HttpResponseEncoder(socket, response, request), persist);
-}
-
-
-void SocketManager::send(Message* message)
-{
- CHECK(message != NULL);
-
- Node node(message->to.ip, message->to.port);
-
- synchronized (this) {
- // Check if there is already a socket.
- bool persist = persists.count(node) > 0;
- bool temp = temps.count(node) > 0;
- if (persist || temp) {
- int s = persist ? persists[node] : temps[node];
- CHECK(sockets.count(s) > 0);
- send(new MessageEncoder(sockets[s], message), persist);
- } else {
- // No peristant or temporary socket to the node currently
- // exists, so we create a temporary one.
- int s;
-
- if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- PLOG(FATAL) << "Failed to send, socket";
- }
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
- }
-
- sockets[s] = Socket(s);
- nodes[s] = node;
- temps[node] = s;
-
- dispose.insert(s);
-
- // Initialize the outgoing queue.
- outgoing[s];
-
- // Allocate and initialize the watcher.
- ev_io* watcher = new ev_io();
- watcher->data = new MessageEncoder(sockets[s], message);
-
- // Try and connect to the node using this socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(message->to.port);
- addr.sin_addr.s_addr = message->to.ip;
-
- if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS) {
- PLOG(FATAL) << "Failed to send, connect";
- }
-
- // Initialize watcher for connecting.
- ev_io_init(watcher, sending_connect, s, EV_WRITE);
- } else {
- // Initialize watcher for sending.
- ev_io_init(watcher, send_data, s, EV_WRITE);
- }
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- ev_async_send(loop, &async_watcher);
- }
- }
-}
-
-
-Encoder* SocketManager::next(int s)
-{
- HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
-
- synchronized (this) {
- // We cannot assume 'sockets.count(s) > 0' here because it's
- // possible that 's' has been removed with a a call to
- // SocketManager::close. For example, it could be the case that a
- // socket has gone to CLOSE_WAIT and the call to 'recv' in
- // recv_data returned 0 causing SocketManager::close to get
- // invoked. Later a call to 'send' or 'sendfile' (e.g., in
- // send_data or send_file) can "succeed" (because the socket is
- // not "closed" yet because there are still some Socket
- // references, namely the reference being used in send_data or
- // send_file!). However, when SocketManger::next is actually
- // invoked we find out there there is no more data and thus stop
- // sending.
- // TODO(benh): Should we actually finish sending the data!?
- if (sockets.count(s) > 0) {
- CHECK(outgoing.count(s) > 0);
-
- if (!outgoing[s].empty()) {
- // More messages!
- Encoder* encoder = outgoing[s].front();
- outgoing[s].pop();
- return encoder;
- } else {
- // No more messages ... erase the outgoing queue.
- outgoing.erase(s);
-
- if (dispose.count(s) > 0) {
- // This is either a temporary socket we created or it's a
- // socket that we were receiving data from and possibly
- // sending HTTP responses back on. Clean up either way.
- if (nodes.count(s) > 0) {
- const Node& node = nodes[s];
- CHECK(temps.count(node) > 0 && temps[node] == s);
- temps.erase(node);
- nodes.erase(s);
- }
-
- if (proxies.count(s) > 0) {
- proxy = proxies[s];
- proxies.erase(s);
- }
-
- dispose.erase(s);
- sockets.erase(s);
-
- // We don't actually close the socket (we wait for the Socket
- // abstraction to close it once there are no more references),
- // but we do shutdown the receiving end so any DataDecoder
- // will get cleaned up (which might have the last reference).
- shutdown(s, SHUT_RD);
- }
- }
- }
- }
-
- // We terminate the proxy outside the synchronized block to avoid
- // possible deadlock between the ProcessManager and SocketManager
- // (see comment in SocketManager::proxy for more information).
- if (proxy != NULL) {
- terminate(proxy);
- }
-
- return NULL;
-}
-
-
-void SocketManager::close(int s)
-{
- HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
-
- synchronized (this) {
- // This socket might not be active if it was already asked to get
- // closed (e.g., a write on the socket failed so we try and close
- // it and then later the read side of the socket gets closed so we
- // try and close it again). Thus, ignore the request if we don't
- // know about the socket.
- if (sockets.count(s) > 0) {
- // Clean up any remaining encoders for this socket.
- if (outgoing.count(s) > 0) {
- while (!outgoing[s].empty()) {
- Encoder* encoder = outgoing[s].front();
- delete encoder;
- outgoing[s].pop();
- }
-
- outgoing.erase(s);
- }
-
- // Clean up after sockets used for node communication.
- if (nodes.count(s) > 0) {
- const Node& node = nodes[s];
-
- // Don't bother invoking exited unless socket was persistant.
- if (persists.count(node) > 0 && persists[node] == s) {
- persists.erase(node);
- exited(node); // Generate ExitedEvent(s)!
- } else if (temps.count(node) > 0 && temps[node] == s) {
- temps.erase(node);
- }
-
- nodes.erase(s);
- }
-
- // Clean up any proxy associated with this socket.
- if (proxies.count(s) > 0) {
- proxy = proxies[s];
- proxies.erase(s);
- }
-
- dispose.erase(s);
- sockets.erase(s);
- }
- }
-
- // We terminate the proxy outside the synchronized block to avoid
- // possible deadlock between the ProcessManager and SocketManager.
- if (proxy != NULL) {
- terminate(proxy);
- }
-
- // Note that we don't actually:
- //
- // close(s);
- //
- // Because, for example, there could be a race between an HttpProxy
- // trying to do send a response with SocketManager::send() or a
- // process might be responding to another Request (e.g., trying
- // to do a sendfile) since these things may be happening
- // asynchronously we can't close the socket yet, because it might
- // get reused before any of the above things have finished, and then
- // we'll end up sending data on the wrong socket! Instead, we rely
- // on the last reference of our Socket object to close the
- // socket. Note, however, that since socket is no longer in
- // 'sockets' any attempt to send with it will just get ignored.
-}
-
-
-void SocketManager::exited(const Node& node)
-{
- // TODO(benh): It would be cleaner if this routine could call back
- // into ProcessManager ... then we wouldn't have to convince
- // ourselves that the accesses to each Process object will always be
- // valid.
- synchronized (this) {
- list<UPID> removed;
- // Look up all linked processes.
- foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
- if (linkee.ip == node.ip && linkee.port == node.port) {
- foreach (ProcessBase* linker, processes) {
- linker->enqueue(new ExitedEvent(linkee));
- }
- removed.push_back(linkee);
- }
- }
-
- foreach (const UPID& pid, removed) {
- links.erase(pid);
- }
- }
-}
-
-
-void SocketManager::exited(ProcessBase* process)
-{
- // An exited event is enough to cause the process to get deleted
- // (e.g., by the garbage collector), which means we can't
- // dereference process (or even use the address) after we enqueue at
- // least one exited event. Thus, we save the process pid.
- const UPID pid = process->pid;
-
- // Likewise, we need to save the current time of the process so we
- // can update the clocks of linked processes as appropriate.
- const Time time = Clock::now(process);
-
- synchronized (this) {
- // Iterate through the links, removing any links the process might
- // have had and creating exited events for any linked processes.
- foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
- processes.erase(process);
-
- if (linkee == pid) {
- foreach (ProcessBase* linker, processes) {
- CHECK(linker != process) << "Process linked with itself";
- synchronized (timeouts) {
- if (Clock::paused()) {
- Clock::update(linker, time);
- }
- }
- linker->enqueue(new ExitedEvent(linkee));
- }
- }
- }
-
- links.erase(pid);
- }
-}
-
-
-ProcessManager::ProcessManager(const string& _delegate)
- : delegate(_delegate)
-{
- synchronizer(processes) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
- synchronizer(runq) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
- running = 0;
- __sync_synchronize(); // Ensure write to 'running' visible in other threads.
-}
-
-
-ProcessManager::~ProcessManager() {}
-
-
-ProcessReference ProcessManager::use(const UPID& pid)
-{
- if (pid.ip == __ip__ && pid.port == __port__) {
- synchronized (processes) {
- if (processes.count(pid.id) > 0) {
- // Note that the ProcessReference constructor _must_ get
- // called while holding the lock on processes so that waiting
- // for references is atomic (i.e., race free).
- return ProcessReference(processes[pid.id]);
- }
- }
- }
-
- return ProcessReference(NULL);
-}
-
-
-bool ProcessManager::handle(
- const Socket& socket,
- Request* request)
-{
- CHECK(request != NULL);
-
- // Check if this is a libprocess request (i.e., 'User-Agent:
- // libprocess/id@ip:port') and if so, parse as a message.
- if (libprocess(request)) {
- Message* message = parse(request);
- if (message != NULL) {
- delete request;
- // TODO(benh): Use the sender PID in order to capture
- // happens-before timing relationships for testing.
- return deliver(message->to, new MessageEvent(message));
- }
-
- VLOG(1) << "Failed to handle libprocess request: "
- << request->method << " " << request->path
- << " (User-Agent: " << request->headers["User-Agent"] << ")";
-
- delete request;
- return false;
- }
-
- // Treat this as an HTTP request. Start by checking that the path
- // starts with a '/' (since the code below assumes as much).
- if (request->path.find('/') != 0) {
- VLOG(1) << "Returning '400 Bad Request' for '" << request->path << "'";
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(socket);
-
- // Enqueue the response with the HttpProxy so that it respects the
- // order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, BadRequest(), *request);
-
- // Cleanup request.
- delete request;
- return false;
- }
-
- // Ignore requests with relative paths (i.e., contain "/..").
- if (request->path.find("/..") != string::npos) {
- VLOG(1) << "Returning '404 Not Found' for '" << request->path
- << "' (ignoring requests with relative paths)";
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(socket);
-
- // Enqueue the response with the HttpProxy so that it respects the
- // order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
-
- // Cleanup request.
- delete request;
- return false;
- }
-
- // Split the path by '/'.
- vector<string> tokens = strings::tokenize(request->path, "/");
-
- // Try and determine a receiver, otherwise try and delegate.
- ProcessReference receiver;
-
- if (tokens.size() == 0 && delegate != "") {
- request->path = "/" + delegate;
- receiver = use(UPID(delegate, __ip__, __port__));
- } else if (tokens.size() > 0) {
- receiver = use(UPID(tokens[0], __ip__, __port__));
- }
-
- if (!receiver && delegate != "") {
- // Try and delegate the request.
- request->path = "/" + delegate + request->path;
- receiver = use(UPID(delegate, __ip__, __port__));
- }
-
- if (receiver) {
- // TODO(benh): Use the sender PID in order to capture
- // happens-before timing relationships for testing.
- return deliver(receiver, new HttpEvent(socket, request));
- }
-
- // This has no receiver, send error response.
- VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(socket);
-
- // Enqueue the response with the HttpProxy so that it respects the
- // order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
-
- // Cleanup request.
- delete request;
- return false;
-}
-
-
-bool ProcessManager::deliver(
- ProcessBase* receiver,
- Event* event,
- ProcessBase* sender)
-{
- CHECK(event != NULL);
-
- // If we are using a manual clock then update the current time of
- // the receiver using the sender if necessary to preserve the
- // happens-before relationship between the sender and receiver. Note
- // that the assumption is that the sender remains valid for at least
- // the duration of this routine (so that we can look up it's current
- // time).
- if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (sender != NULL) {
- Clock::order(sender, receiver);
- } else {
- Clock::update(receiver, Clock::now());
- }
- }
- }
- }
-
- receiver->enqueue(event);
-
- return true;
-}
-
-bool ProcessManager::deliver(
- const UPID& to,
- Event* event,
- ProcessBase* sender)
-{
- CHECK(event != NULL);
-
- if (ProcessReference receiver = use(to)) {
- return deliver(receiver, event, sender);
- }
-
- delete event;
- return false;
-}
-
-
-UPID ProcessManager::spawn(ProcessBase* process, bool manage)
-{
- CHECK(process != NULL);
-
- synchronized (processes) {
- if (processes.count(process->pid.id) > 0) {
- return UPID();
- } else {
- processes[process->pid.id] = process;
- }
- }
-
- // Use the garbage collector if requested.
- if (manage) {
- dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
- }
-
- // We save the PID before enqueueing the process to avoid the race
- // condition that occurs when a user has a very short process and
- // the process gets run and cleaned up before we return from enqueue
- // (e.g., when 'manage' is set to true).
- UPID pid = process->self();
-
- // Add process to the run queue (so 'initialize' will get invoked).
- enqueue(process);
-
- VLOG(2) << "Spawned process " << pid;
-
- return pid;
-}
-
-
-void ProcessManager::resume(ProcessBase* process)
-{
- __process__ = process;
-
- VLOG(2) << "Resuming " << process->pid << " at " << Clock::now();
-
- bool terminate = false;
- bool blocked = false;
-
- CHECK(process->state == ProcessBase::BOTTOM ||
- process->state == ProcessBase::READY);
-
- if (process->state == ProcessBase::BOTTOM) {
- process->state = ProcessBase::RUNNING;
- try { process->initialize(); }
- catch (...) { terminate = true; }
- }
-
- while (!terminate && !blocked) {
- Event* event = NULL;
-
- process->lock();
- {
- if (process->events.size() > 0) {
- event = process->events.front();
- process->events.pop_front();
- process->state = ProcessBase::RUNNING;
- } else {
- process->state = ProcessBase::BLOCKED;
- blocked = true;
- }
- }
- process->unlock();
-
- if (!blocked) {
- CHECK(event != NULL);
-
- // Determine if we should filter this event.
- synchronized (filterer) {
- if (filterer != NULL) {
- bool filter = false;
- struct FilterVisitor : EventVisitor
- {
- FilterVisitor(bool* _filter) : filter(_filter) {}
-
- virtual void visit(const MessageEvent& event)
- {
- *filter = filterer->filter(event);
- }
-
- virtual void visit(const DispatchEvent& event)
- {
- *filter = filterer->filter(event);
- }
-
- virtual void visit(const HttpEvent& event)
- {
- *filter = filterer->filter(event);
- }
-
- virtual void visit(const ExitedEvent& event)
- {
- *filter = filterer->filter(event);
- }
-
- bool* filter;
- } visitor(&filter);
-
- event->visit(&visitor);
-
- if (filter) {
- delete event;
- continue; // Try and execute the next event.
- }
- }
- }
-
- // Determine if we should terminate.
- terminate = event->is<TerminateEvent>();
-
- // Now service the event.
- try {
- process->serve(*event);
- } catch (const std::exception& e) {
- std::cerr << "libprocess: " << process->pid
- << " terminating due to "
- << e.what() << std::endl;
- terminate = true;
- } catch (...) {
- std::cerr << "libprocess: " << process->pid
- << " terminating due to unknown exception" << std::endl;
- terminate = true;
- }
-
- delete event;
-
- if (terminate) {
- cleanup(process);
- }
- }
- }
-
- __process__ = NULL;
-
- CHECK_GE(running, 1);
- __sync_fetch_and_sub(&running, 1);
-}
-
-
-void ProcessManager::cleanup(ProcessBase* process)
-{
- VLOG(2) << "Cleaning up " << process->pid;
-
- // First, set the terminating state so no more events will get
- // enqueued and delete al the pending events. We want to delete the
- // events before we hold the processes lock because deleting an
- // event could cause code outside libprocess to get executed which
- // might cause a deadlock with the processes lock. Likewise,
- // deleting the events now rather than later has the nice property
- // of making sure that any events that might have gotten enqueued on
- // the process we are cleaning up will get dropped (since it's
- // terminating) and eliminates the potential of enqueueing them on
- // another process that gets spawned with the same PID.
- deque<Event*> events;
-
- process->lock();
- {
- process->state = ProcessBase::TERMINATING;
- events = process->events;
- process->events.clear();
- }
- process->unlock();
-
- // Delete pending events.
- while (!events.empty()) {
- Event* event = events.front();
- events.pop_front();
- delete event;
- }
-
- // Possible gate non-libprocess threads are waiting at.
- Gate* gate = NULL;
-
- // Remove process.
- synchronized (processes) {
- // Wait for all process references to get cleaned up.
- while (process->refs > 0) {
- asm ("pause");
- __sync_synchronize();
- }
-
- process->lock();
- {
- CHECK(process->events.empty());
-
- processes.erase(process->pid.id);
-
- // Lookup gate to wake up waiting threads.
- map<ProcessBase*, Gate*>::iterator it = gates.find(process);
- if (it != gates.end()) {
- gate = it->second;
- // N.B. The last thread that leaves the gate also free's it.
- gates.erase(it);
- }
-
- CHECK(process->refs == 0);
- process->state = ProcessBase::TERMINATED;
- }
- process->unlock();
-
- // Note that we don't remove the process from the clock during
- // cleanup, but rather the clock is reset for a process when it is
- // created (see ProcessBase::ProcessBase). We do this so that
- // SocketManager::exited can access the current time of the
- // process to "order" exited events. TODO(benh): It might make
- // sense to consider storing the time of the process as a field of
- // the class instead.
-
- // Now we tell the socket manager about this process exiting so
- // that it can create exited events for linked processes. We
- // _must_ do this while synchronized on processes because
- // otherwise another process could attempt to link this process
- // and SocketManger::link would see that the processes doesn't
- // exist when it attempts to get a ProcessReference (since we
- // removed the process above) thus causing an exited event, which
- // could cause the process to get deleted (e.g., the garbage
- // collector might link _after_ the process has already been
- // removed from processes thus getting an exited event but we
- // don't want that exited event to fire and actually delete the
- // process until after we have used the process in
- // SocketManager::exited).
- socket_manager->exited(process);
-
- // ***************************************************************
- // At this point we can no longer dereference the process since it
- // might already be deallocated (e.g., by the garbage collector).
- // ***************************************************************
-
- // Note that we need to open the gate while synchronized on
- // processes because otherwise we might _open_ the gate before
- // another thread _approaches_ the gate causing that thread to
- // wait on _arrival_ to the gate forever (see
- // ProcessManager::wait).
- if (gate != NULL) {
- gate->open();
- }
- }
-}
-
-
-void ProcessManager::link(ProcessBase* process, const UPID& to)
-{
- // Check if the pid is local.
- if (!(to.ip == __ip__ && to.port == __port__)) {
- socket_manager->link(process, to);
- } else {
- // Since the pid is local we want to get a reference to it's
- // underlying process so that while we are invoking the link
- // manager we don't miss sending a possible ExitedEvent.
- if (ProcessReference _ = use(to)) {
- socket_manager->link(process, to);
- } else {
- // Since the pid isn't valid it's process must have already died
- // (or hasn't been spawned yet) so send a process exit message.
- process->enqueue(new ExitedEvent(to));
- }
- }
-}
-
-
-void ProcessManager::terminate(
- const UPID& pid,
- bool inject,
- ProcessBase* sender)
-{
- if (ProcessReference process = use(pid)) {
- if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (sender != NULL) {
- Clock::order(sender, process);
- } else {
- Clock::update(process, Clock::now());
- }
- }
- }
- }
-
- if (sender != NULL) {
- process->enqueue(new TerminateEvent(sender->self()), inject);
- } else {
- process->enqueue(new TerminateEvent(UPID()), inject);
- }
- }
-}
-
-
-bool ProcessManager::wait(const UPID& pid)
-{
- // We use a gate for waiters. A gate is single use. That is, a new
- // gate is created when the first thread shows up and wants to wait
- // for a process that currently has no gate. Once that process
- // exits, the last thread to leave the gate will also clean it
- // up. Note that a gate will never get more threads waiting on it
- // after it has been opened, since the process should no longer be
- // valid and therefore will not have an entry in 'processes'.
-
- Gate* gate = NULL;
- Gate::state_t old;
-
- ProcessBase* process = NULL; // Set to non-null if we donate thread.
-
- // Try and approach the gate if necessary.
- synchronized (processes) {
- if (processes.count(pid.id) > 0) {
- process = processes[pid.id];
- CHECK(process->state != ProcessBase::TERMINATED);
-
- // Check and see if a gate already exists.
- if (gates.find(process) == gates.end()) {
- gates[process] = new Gate();
- }
-
- gate = gates[process];
- old = gate->approach();
-
- // Check if it is runnable in order to donate this thread.
- if (process->state == ProcessBase::BOTTOM ||
- process->state == ProcessBase::READY) {
- synchronized (runq) {
- list<ProcessBase*>::iterator it =
- find(runq.begin(), runq.end(), process);
- if (it != runq.end()) {
- runq.erase(it);
- } else {
- // Another thread has resumed the process ...
- process = NULL;
- }
- }
- } else {
- // Process is not runnable, so no need to donate ...
- process = NULL;
- }
- }
- }
-
- if (process != NULL) {
- VLOG(2) << "Donating thread to " << process->pid << " while waiting";
- ProcessBase* donator = __process__;
- __sync_fetch_and_add(&running, 1);
- process_manager->resume(process);
- __process__ = donator;
- }
-
- // TODO(benh): Donating only once may not be sufficient, so we might
- // still deadlock here ... perhaps warn if that's the case?
-
- // Now arrive at the gate and wait until it opens.
- if (gate != NULL) {
- gate->arrive(old);
-
- if (gate->empty()) {
- delete gate;
- }
-
- return true;
- }
-
- return false;
-}
-
-
-void ProcessManager::enqueue(ProcessBase* process)
-{
- CHECK(process != NULL);
-
- // TODO(benh): Check and see if this process has it's own thread. If
- // it does, push it on that threads runq, and wake up that thread if
- // it's not running. Otherwise, check and see which thread this
- // process was last running on, and put it on that threads runq.
-
- synchronized (runq) {
- CHECK(find(runq.begin(), runq.end(), process) == runq.end());
- runq.push_back(process);
- }
-
- // Wake up the processing thread if necessary.
- gate->open();
-}
-
-
-ProcessBase* ProcessManager::dequeue()
-{
- // TODO(benh): Remove a process from this thread's runq. If there
- // are no processes to run, and this is not a dedicated thread, then
- // steal one from another threads runq.
-
- ProcessBase* process = NULL;
-
- synchronized (runq) {
- if (!runq.empty()) {
- process = runq.front();
- runq.pop_front();
- // Increment the running count of processes in order to support
- // the Clock::settle() operation (this must be done atomically
- // with removing the process from the runq).
- __sync_fetch_and_add(&running, 1);
- }
- }
-
- return process;
-}
-
-
-void ProcessManager::settle()
-{
- bool done = true;
- do {
- os::sleep(Milliseconds(10));
- done = true;
- // Hopefully this is the only place we acquire both these locks.
- synchronized (runq) {
- synchronized (timeouts) {
- CHECK(Clock::paused()); // Since another thread could resume the clock!
-
- if (!runq.empty()) {
- done = false;
- }
-
- __sync_synchronize(); // Read barrier for 'running'.
- if (running > 0) {
- done = false;
- }
-
- if (timeouts->size() > 0 &&
- timeouts->begin()->first <= clock::current) {
- done = false;
- }
-
- if (pending_timers) {
- done = false;
- }
- }
- }
- } while (!done);
-}
-
-
-Timer Timer::create(
- const Duration& duration,
- const lambda::function<void(void)>& thunk)
-{
- static uint64_t id = 1; // Start at 1 since Timer() instances start with 0.
-
- // Assumes Clock::now() does Clock::now(__process__).
- Timeout timeout = Timeout::in(duration);
-
- UPID pid = __process__ != NULL ? __process__->self() : UPID();
-
- Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
-
- VLOG(3) << "Created a timer for " << timeout.time();
-
- // Add the timer.
- synchronized (timeouts) {
- if (timeouts->size() == 0 ||
- timer.timeout().time() < timeouts->begin()->first) {
- // Need to interrupt the loop to update/set timer repeat.
- (*timeouts)[timer.timeout().time()].push_back(timer);
- update_timer = true;
- ev_async_send(loop, &async_watcher);
- } else {
- // Timer repeat is adequate, just add the timeout.
- CHECK(timeouts->size() >= 1);
- (*timeouts)[timer.timeout().time()].push_back(timer);
- }
- }
-
- return timer;
-}
-
-
-bool Timer::cancel(const Timer& timer)
-{
- bool canceled = false;
- synchronized (timeouts) {
- // Check if the timeout is still pending, and if so, erase it. In
- // addition, erase an empty list if we just removed the last
- // timeout.
- // TODO(benh): If two timers are created with the same timeout,
- // this will erase *both*. Fix this!
- Time time = timer.timeout().time();
- if (timeouts->count(time) > 0) {
- canceled = true;
- (*timeouts)[time].remove(timer);
- if ((*timeouts)[time].empty()) {
- timeouts->erase(time);
- }
- }
- }
-
- return canceled;
-}
-
-
-ProcessBase::ProcessBase(const string& id)
-{
- process::initialize();
-
- state = ProcessBase::BOTTOM;
-
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&m, &attr);
- pthread_mutexattr_destroy(&attr);
-
- refs = 0;
-
- pid.id = id != "" ? id : ID::generate();
- pid.ip = __ip__;
- pid.port = __port__;
-
- // If using a manual clock, try and set current time of process
- // using happens before relationship between creator and createe!
- if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- clock::currents->erase(this); // In case the address is reused!
- if (__process__ != NULL) {
- Clock::order(__process__, this);
- } else {
- Clock::update(this, Clock::now());
- }
- }
- }
- }
-}
-
-
-ProcessBase::~ProcessBase() {}
-
-
-void ProcessBase::enqueue(Event* event, bool inject)
-{
- CHECK(event != NULL);
-
- lock();
- {
- if (state != TERMINATING && state != TERMINATED) {
- if (!inject) {
- events.push_back(event);
- } else {
- events.push_front(event);
- }
-
- if (state == BLOCKED) {
- state = READY;
- process_manager->enqueue(this);
- }
-
- CHECK(state == BOTTOM ||
- state == READY ||
- state == RUNNING);
- } else {
- delete event;
- }
- }
- unlock();
-}
-
-
-void ProcessBase::inject(const UPID& from, const string& name, const char* data, size_t length)
-{
- if (!from)
- return;
-
- Message* message = encode(from, pid, name, string(data, length));
-
- enqueue(new MessageEvent(message), true);
-}
-
-
-void ProcessBase::send(const UPID& to, const string& name, const char* data, size_t length)
-{
- if (!to) {
- return;
- }
-
- // Encode and transport outgoing message.
- transport(encode(pid, to, name, string(data, length)), this);
-}
-
-
-void ProcessBase::visit(const MessageEvent& event)
-{
- if (handlers.message.count(event.message->name) > 0) {
- handlers.message[event.message->name](
- event.message->from,
- event.message->body);
- } else if (delegates.count(event.message->name) > 0) {
- VLOG(1) << "Delegating message '" << event.message->name
- << "' to " << delegates[event.message->name];
- Message* message = new Message(*event.message);
- message->to = delegates[event.message->name];
- transport(message, this);
- }
-}
-
-
-void ProcessBase::visit(const DispatchEvent& event)
-{
- (*event.f)(this);
-}
-
-
-void ProcessBase::visit(const HttpEvent& event)
-{
- VLOG(1) << "Handling HTTP event for process '" << pid.id << "'"
- << " with path: '" << event.request->path << "'";
-
- CHECK(event.request->path.find('/') == 0); // See ProcessManager::handle.
-
- // Split the path by '/'.
- vector<string> tokens = strings::tokenize(event.request->path, "/");
- CHECK(tokens.size() >= 1);
- CHECK(tokens[0] == pid.id);
-
- const string& name = tokens.size() > 1 ? tokens[1] : "";
-
- if (handlers.http.count(name) > 0) {
- // Create the promise to link with whatever gets returned, as well
- // as a future to wait for the response.
- std::tr1::shared_ptr<Promise<Response> > promise(
- new Promise<Response>());
-
- Future<Response>* future = new Future<Response>(promise->future());
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
-
- // Let the HttpProxy know about this request (via the future).
- dispatch(proxy, &HttpProxy::handle, future, *event.request);
-
- // Now call the handler and associate the response with the promise.
- promise->associate(handlers.http[name](*event.request));
- } else if (assets.count(name) > 0) {
- OK response;
- response.type = Response::PATH;
- response.path = assets[name].path;
-
- // Construct the final path by appending remaining tokens.
- for (int i = 2; i < tokens.size(); i++) {
- response.path += "/" + tokens[i];
- }
-
- // Try and determine the Content-Type from an extension.
- Try<string> basename = os::basename(response.path);
- if (!basename.isError()) {
- size_t index = basename.get().find_last_of('.');
- if (index != string::npos) {
- string extension = basename.get().substr(index);
- if (assets[name].types.count(extension) > 0) {
- response.headers["Content-Type"] = assets[name].types[extension];
- }
- }
- }
-
- // TODO(benh): Use "text/plain" for assets that don't have an
- // extension or we don't have a mapping for? It might be better to
- // just let the browser guess (or do it's own default).
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
-
- // Enqueue the response with the HttpProxy so that it respects the
- // order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, response, *event.request);
- } else {
- VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";
-
- // Get the HttpProxy pid for this socket.
- PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
-
- // Enqueue the response with the HttpProxy so that it respects the
- // order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, NotFound(), *event.request);
- }
-}
-
-
-void ProcessBase::visit(const ExitedEvent& event)
-{
- exited(event.pid);
-}
-
-
-void ProcessBase::visit(const TerminateEvent& event)
-{
- finalize();
-}
-
-
-UPID ProcessBase::link(const UPID& to)
-{
- if (!to) {
- return to;
- }
-
- process_manager->link(this, to);
-
- return to;
-}
-
-
-UPID spawn(ProcessBase* process, bool manage)
-{
- process::initialize();
-
- if (process != NULL) {
- // If using a manual clock, try and set current time of process
- // using happens before relationship between spawner and spawnee!
- if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (__process__ != NULL) {
- Clock::order(__process__, process);
- } else {
- Clock::update(process, Clock::now());
- }
- }
- }
- }
-
- return process_manager->spawn(process, manage);
- } else {
- return UPID();
- }
-}
-
-
-void terminate(const UPID& pid, bool inject)
-{
- process_manager->terminate(pid, inject, __process__);
-}
-
-
-class WaitWaiter : public Process<WaitWaiter>
-{
-public:
- WaitWaiter(const UPID& _pid, const Duration& _duration, bool* _waited)
- : ProcessBase(ID::generate("__waiter__")),
- pid(_pid),
- duration(_duration),
- waited(_waited) {}
-
- virtual void initialize()
- {
- VLOG(3) << "Running waiter process for " << pid;
- link(pid);
- delay(duration, self(), &WaitWaiter::timeout);
- }
-
-private:
- virtual void exited(const UPID&)
- {
- VLOG(3) << "Waiter process waited for " << pid;
- *waited = true;
- terminate(self());
- }
-
- void timeout()
- {
- VLOG(3) << "Waiter process timed out waiting for " << pid;
- *waited = false;
- terminate(self());
- }
-
-private:
- const UPID pid;
- const Duration duration;
- bool* const waited;
-};
-
-
-bool wait(const UPID& pid, const Duration& duration)
-{
- process::initialize();
-
- if (!pid) {
- return false;
- }
-
- // This could result in a deadlock if some code decides to wait on a
- // process that has invoked that code!
- if (__process__ != NULL && __process__->self() == pid) {
- std::cerr << "\n**** DEADLOCK DETECTED! ****\nYou are waiting on process "
- << pid << " that it is currently executing." << std::endl;
- }
-
- if (duration == Seconds(-1)) {
- return process_manager->wait(pid);
- }
-
- bool waited = false;
-
- WaitWaiter waiter(pid, duration, &waited);
- spawn(waiter);
- wait(waiter);
-
- return waited;
-}
-
-
-void filter(Filter *filter)
-{
- process::initialize();
-
- synchronized (filterer) {
- filterer
<TRUNCATED>