You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/29 19:41:04 UTC
[26/35] Renamed 'third_party' to '3rdparty'.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gtest.hpp b/3rdparty/libprocess/include/process/gtest.hpp
new file mode 100644
index 0000000..8473452
--- /dev/null
+++ b/3rdparty/libprocess/include/process/gtest.hpp
@@ -0,0 +1,338 @@
+#ifndef __PROCESS_GTEST_HPP__
+#define __PROCESS_GTEST_HPP__
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/http.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/option.hpp>
+
+namespace process {
+
+// A simple test event listener that makes sure to resume the clock
+// after each test even if the previous test had a partial result
+// (i.e., an ASSERT_* failed).
+class ClockTestEventListener : public ::testing::EmptyTestEventListener
+{
+public:
+ // Returns the singleton instance of the listener.
+ static ClockTestEventListener* instance()
+ {
+ static ClockTestEventListener* listener = new ClockTestEventListener();
+ return listener;
+ }
+
+ virtual void OnTestEnd(const ::testing::TestInfo&)
+ {
+ if (process::Clock::paused()) {
+ process::Clock::resume();
+ }
+ }
+private:
+ ClockTestEventListener() {}
+};
+
+} // namespace process {
+
+template <typename T>
+::testing::AssertionResult AwaitAssertReady(
+ const char* expr,
+ const char*, // Unused string representation of 'duration'.
+ const process::Future<T>& actual,
+ const Duration& duration)
+{
+ if (!actual.await(duration)) {
+ return ::testing::AssertionFailure()
+ << "Failed to wait " << duration << " for " << expr;
+ } else if (actual.isDiscarded()) {
+ return ::testing::AssertionFailure()
+ << expr << " was discarded";
+ } else if (actual.isFailed()) {
+ return ::testing::AssertionFailure()
+ << "(" << expr << ").failure(): " << actual.failure();
+ }
+
+ return ::testing::AssertionSuccess();
+}
+
+
+template <typename T>
+::testing::AssertionResult AwaitAssertFailed(
+ const char* expr,
+ const char*, // Unused string representation of 'duration'.
+ const process::Future<T>& actual,
+ const Duration& duration)
+{
+ if (!actual.await(duration)) {
+ return ::testing::AssertionFailure()
+ << "Failed to wait " << duration << " for " << expr;
+ } else if (actual.isDiscarded()) {
+ return ::testing::AssertionFailure()
+ << expr << " was discarded";
+ } else if (actual.isReady()) {
+ return ::testing::AssertionFailure()
+ << expr << " is ready (" << ::testing::PrintToString(actual.get()) << ")";
+ }
+
+ return ::testing::AssertionSuccess();
+}
+
+
+template <typename T>
+::testing::AssertionResult AwaitAssertDiscarded(
+ const char* expr,
+ const char*, // Unused string representation of 'duration'.
+ const process::Future<T>& actual,
+ const Duration& duration)
+{
+ if (!actual.await(duration)) {
+ return ::testing::AssertionFailure()
+ << "Failed to wait " << duration << " for " << expr;
+ } else if (actual.isFailed()) {
+ return ::testing::AssertionFailure()
+ << "(" << expr << ").failure(): " << actual.failure();
+ } else if (actual.isReady()) {
+ return ::testing::AssertionFailure()
+ << expr << " is ready (" << ::testing::PrintToString(actual.get()) << ")";
+ }
+
+ return ::testing::AssertionSuccess();
+}
+
+
+template <typename T1, typename T2>
+::testing::AssertionResult AwaitAssertEq(
+ const char* expectedExpr,
+ const char* actualExpr,
+ const char* durationExpr,
+ const T1& expected,
+ const process::Future<T2>& actual,
+ const Duration& duration)
+{
+ const ::testing::AssertionResult result =
+ AwaitAssertReady(actualExpr, durationExpr, actual, duration);
+
+ if (result) {
+ if (expected == actual.get()) {
+ return ::testing::AssertionSuccess();
+ } else {
+ return ::testing::AssertionFailure()
+ << "Value of: (" << actualExpr << ").get()\n"
+ << " Actual: " << ::testing::PrintToString(actual.get()) << "\n"
+ << "Expected: " << expectedExpr << "\n"
+ << "Which is: " << ::testing::PrintToString(expected);
+ }
+ }
+
+ return result;
+}
+
+
+#define AWAIT_ASSERT_READY_FOR(actual, duration) \
+ ASSERT_PRED_FORMAT2(AwaitAssertReady, actual, duration)
+
+
+#define AWAIT_ASSERT_READY(actual) \
+ AWAIT_ASSERT_READY_FOR(actual, Seconds(5))
+
+
+#define AWAIT_READY_FOR(actual, duration) \
+ AWAIT_ASSERT_READY_FOR(actual, duration)
+
+
+#define AWAIT_READY(actual) \
+ AWAIT_ASSERT_READY(actual)
+
+
+#define AWAIT_EXPECT_READY_FOR(actual, duration) \
+ EXPECT_PRED_FORMAT2(AwaitAssertReady, actual, duration)
+
+
+#define AWAIT_EXPECT_READY(actual) \
+ AWAIT_EXPECT_READY_FOR(actual, Seconds(5))
+
+
+#define AWAIT_ASSERT_FAILED_FOR(actual, duration) \
+ ASSERT_PRED_FORMAT2(AwaitAssertFailed, actual, duration)
+
+
+#define AWAIT_ASSERT_FAILED(actual) \
+ AWAIT_ASSERT_FAILED_FOR(actual, Seconds(5))
+
+
+#define AWAIT_FAILED_FOR(actual, duration) \
+ AWAIT_ASSERT_FAILED_FOR(actual, duration)
+
+
+#define AWAIT_FAILED(actual) \
+ AWAIT_ASSERT_FAILED(actual)
+
+
+#define AWAIT_EXPECT_FAILED_FOR(actual, duration) \
+ EXPECT_PRED_FORMAT2(AwaitAssertFailed, actual, duration)
+
+
+#define AWAIT_EXPECT_FAILED(actual) \
+ AWAIT_EXPECT_FAILED_FOR(actual, Seconds(5))
+
+
+#define AWAIT_ASSERT_DISCARDED_FOR(actual, duration) \
+ ASSERT_PRED_FORMAT2(AwaitAssertDiscarded, actual, duration)
+
+
+#define AWAIT_ASSERT_DISCARDED(actual) \
+ AWAIT_ASSERT_DISCARDED_FOR(actual, Seconds(5))
+
+
+#define AWAIT_DISCARDED_FOR(actual, duration) \
+ AWAIT_ASSERT_DISCARDED_FOR(actual, duration)
+
+
+#define AWAIT_DISCARDED(actual) \
+ AWAIT_ASSERT_DISCARDED(actual)
+
+
+#define AWAIT_EXPECT_DISCARDED_FOR(actual, duration) \
+ EXPECT_PRED_FORMAT2(AwaitAssertDiscarded, actual, duration)
+
+
+#define AWAIT_EXPECT_DISCARDED(actual) \
+ AWAIT_EXPECT_DISCARDED_FOR(actual, Seconds(5))
+
+
+#define AWAIT_ASSERT_EQ_FOR(expected, actual, duration) \
+ ASSERT_PRED_FORMAT3(AwaitAssertEq, expected, actual, duration)
+
+
+#define AWAIT_ASSERT_EQ(expected, actual) \
+ AWAIT_ASSERT_EQ_FOR(expected, actual, Seconds(5))
+
+
+#define AWAIT_EQ(expected, actual) \
+ AWAIT_ASSERT_EQ(expected, actual)
+
+
+#define AWAIT_EXPECT_EQ_FOR(expected, actual, duration) \
+ EXPECT_PRED_FORMAT3(AwaitAssertEq, expected, actual, duration)
+
+
+#define AWAIT_EXPECT_EQ(expected, actual) \
+ AWAIT_EXPECT_EQ_FOR(expected, actual, Seconds(5))
+
+
+inline ::testing::AssertionResult AwaitAssertResponseStatusEq(
+ const char* expectedExpr,
+ const char* actualExpr,
+ const char* durationExpr,
+ const std::string& expected,
+ const process::Future<process::http::Response>& actual,
+ const Duration& duration)
+{
+ const ::testing::AssertionResult result =
+ AwaitAssertReady(actualExpr, durationExpr, actual, duration);
+
+ if (result) {
+ if (expected == actual.get().status) {
+ return ::testing::AssertionSuccess();
+ } else {
+ return ::testing::AssertionFailure()
+ << "Value of: (" << actualExpr << ").get().status\n"
+ << " Actual: " << ::testing::PrintToString(actual.get().status) << "\n"
+ << "Expected: " << expectedExpr << "\n"
+ << "Which is: " << ::testing::PrintToString(expected);
+ }
+ }
+
+ return result;
+}
+
+
+#define AWAIT_EXPECT_RESPONSE_STATUS_EQ_FOR(expected, actual, duration) \
+ EXPECT_PRED_FORMAT3(AwaitAssertResponseStatusEq, expected, actual, duration)
+
+
+#define AWAIT_EXPECT_RESPONSE_STATUS_EQ(expected, actual) \
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ_FOR(expected, actual, Seconds(5))
+
+
+inline ::testing::AssertionResult AwaitAssertResponseBodyEq(
+ const char* expectedExpr,
+ const char* actualExpr,
+ const char* durationExpr,
+ const std::string& expected,
+ const process::Future<process::http::Response>& actual,
+ const Duration& duration)
+{
+ const ::testing::AssertionResult result =
+ AwaitAssertReady(actualExpr, durationExpr, actual, duration);
+
+ if (result) {
+ if (expected == actual.get().body) {
+ return ::testing::AssertionSuccess();
+ } else {
+ return ::testing::AssertionFailure()
+ << "Value of: (" << actualExpr << ").get().body\n"
+ << " Actual: " << ::testing::PrintToString(actual.get().body) << "\n"
+ << "Expected: " << expectedExpr << "\n"
+ << "Which is: " << ::testing::PrintToString(expected);
+ }
+ }
+
+ return result;
+}
+
+
+#define AWAIT_EXPECT_RESPONSE_BODY_EQ_FOR(expected, actual, duration) \
+ EXPECT_PRED_FORMAT3(AwaitAssertResponseBodyEq, expected, actual, duration)
+
+
+#define AWAIT_EXPECT_RESPONSE_BODY_EQ(expected, actual) \
+ AWAIT_EXPECT_RESPONSE_BODY_EQ_FOR(expected, actual, Seconds(5))
+
+
+inline ::testing::AssertionResult AwaitAssertResponseHeaderEq(
+ const char* expectedExpr,
+ const char* keyExpr,
+ const char* actualExpr,
+ const char* durationExpr,
+ const std::string& expected,
+ const std::string& key,
+ const process::Future<process::http::Response>& actual,
+ const Duration& duration)
+{
+ const ::testing::AssertionResult result =
+ AwaitAssertReady(actualExpr, durationExpr, actual, duration);
+
+ if (result) {
+ const Option<std::string> value = actual.get().headers.get(key);
+ if (value.isNone()) {
+ return ::testing::AssertionFailure()
+ << "Response does not contain header '" << key << "'";
+ } else if (expected == value.get()) {
+ return ::testing::AssertionSuccess();
+ } else {
+ return ::testing::AssertionFailure()
+ << "Value of: (" << actualExpr << ").get().headers[" << keyExpr << "]\n"
+ << " Actual: " << ::testing::PrintToString(value.get()) << "\n"
+ << "Expected: " << expectedExpr << "\n"
+ << "Which is: " << ::testing::PrintToString(expected);
+ }
+ }
+
+ return result;
+}
+
+
+#define AWAIT_EXPECT_RESPONSE_HEADER_EQ_FOR(expected, key, actual, duration) \
+ EXPECT_PRED_FORMAT4(AwaitAssertResponseHeaderEq, expected, key, actual, duration)
+
+
+#define AWAIT_EXPECT_RESPONSE_HEADER_EQ(expected, key, actual) \
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ_FOR(expected, key, actual, Seconds(5))
+
+#endif // __PROCESS_GTEST_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
new file mode 100644
index 0000000..751cfb8
--- /dev/null
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -0,0 +1,468 @@
+#ifndef __PROCESS_HTTP_HPP__
+#define __PROCESS_HTTP_HPP__
+
+#include <cctype>
+#include <cstdlib>
+#include <iomanip>
+#include <sstream>
+#include <string>
+
+#include <limits.h>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/error.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+namespace http {
+
+struct Request
+{
+ // TODO(benh): Add major/minor version.
+ // TODO(bmahler): Header names are not case sensitive! Either make these
+ // case-insensitive, or add a variable for each header in HTTP 1.0/1.1 (like
+ // we've done here with keepAlive).
+ // Tracked by: https://issues.apache.org/jira/browse/MESOS-328.
+ hashmap<std::string, std::string> headers;
+ std::string method;
+ std::string path;
+ std::string url;
+ std::string fragment;
+ hashmap<std::string, std::string> query;
+ std::string body;
+ bool keepAlive;
+
+ // Returns whether the encoding is considered acceptable in the request.
+ // TODO(bmahler): Consider this logic being in decoder.hpp, and having the
+ // Request contain a member variable for each popular HTTP 1.0/1.1 header.
+ bool accepts(const std::string& encoding) const
+ {
+ // See RFC 2616, section 14.3 for the details.
+ Option<std::string> accepted = headers.get("Accept-Encoding");
+
+ if (accepted.isNone()) {
+ return false;
+ }
+
+ // Remove spaces and tabs for easier parsing.
+ accepted = strings::remove(accepted.get(), " ");
+ accepted = strings::remove(accepted.get(), "\t");
+ accepted = strings::remove(accepted.get(), "\n");
+
+ // From RFC 2616:
+ // 1. If the content-coding is one of the content-codings listed in
+ // the Accept-Encoding field, then it is acceptable, unless it is
+ // accompanied by a qvalue of 0. (As defined in section 3.9, a
+ // qvalue of 0 means "not acceptable.")
+ // 2. The special "*" symbol in an Accept-Encoding field matches any
+ // available content-coding not explicitly listed in the header
+ // field.
+
+ // First we'll look for the encoding specified explicitly, then '*'.
+ std::vector<std::string> candidates;
+ candidates.push_back(encoding); // Rule 1.
+ candidates.push_back("*"); // Rule 2.
+
+ foreach (std::string& candidate, candidates) {
+ // Is the candidate one of the accepted encodings?
+ foreach (const std::string& _encoding,
+ strings::tokenize(accepted.get(), ",")) {
+ if (strings::startsWith(_encoding, candidate)) {
+ // Is there a 0 q value? Ex: 'gzip;q=0.0'.
+ const std::map<std::string, std::vector<std::string> >& values =
+ strings::pairs(_encoding, ";", "=");
+
+ // Look for { "q": ["0"] }.
+ if (values.count("q") == 0 || values.find("q")->second.size() != 1) {
+ // No q value, or malformed q value.
+ return true;
+ }
+
+ // Is the q value > 0?
+ Try<double> value = numify<double>(values.find("q")->second[0]);
+ return value.isSome() && value.get() > 0;
+ }
+ }
+ }
+
+ // NOTE: 3 and 4 are partially ignored since we can only provide gzip.
+ // 3. If multiple content-codings are acceptable, then the acceptable
+ // content-coding with the highest non-zero qvalue is preferred.
+ // 4. The "identity" content-coding is always acceptable, unless
+ // specifically refused because the Accept-Encoding field includes
+ // "identity;q=0", or because the field includes "*;q=0" and does
+ // not explicitly include the "identity" content-coding. If the
+ // Accept-Encoding field-value is empty, then only the "identity"
+ // encoding is acceptable.
+ return false;
+ }
+};
+
+
+struct Response
+{
+ Response()
+ : type(NONE)
+ {}
+
+ Response(const std::string& _body)
+ : type(BODY),
+ body(_body)
+ {
+ headers["Content-Length"] = stringify(body.size());
+ }
+
+ // TODO(benh): Add major/minor version.
+ std::string status;
+ hashmap<std::string, std::string> headers;
+
+ // Either provide a "body", an absolute "path" to a file, or a
+ // "pipe" for streaming a response. Distinguish between the cases
+ // using 'type' below.
+ //
+ // BODY: Uses 'body' as the body of the response. These may be
+ // encoded using gzip for efficiency, if 'Content-Encoding' is not
+ // already specified.
+ //
+ // PATH: Attempts to perform a 'sendfile' operation on the file
+ // found at 'path'.
+ //
+ // PIPE: Splices data from 'pipe' using 'Transfer-Encoding=chunked'.
+ // Note that the read end of the pipe will be closed by libprocess
+ // either after the write end has been closed or if the socket the
+ // data is being spliced to has been closed (i.e., nobody is
+ // listening any longer). This can cause writes to the pipe to
+ // generate a SIGPIPE (which will terminate your program unless you
+ // explicitly ignore them or handle them).
+ //
+ // In all cases (BODY, PATH, PIPE), you are expected to properly
+ // specify the 'Content-Type' header, but the 'Content-Length' and
+ // or 'Transfer-Encoding' headers will be filled in for you.
+ enum {
+ NONE,
+ BODY,
+ PATH,
+ PIPE
+ } type;
+
+ std::string body;
+ std::string path;
+ int pipe; // See comment above regarding the semantics for closing.
+};
+
+
+struct OK : Response
+{
+ OK()
+ {
+ status = "200 OK";
+ }
+
+ OK(const char* body) : Response(std::string(body))
+ {
+ status = "200 OK";
+ }
+
+ OK(const std::string& body) : Response(body)
+ {
+ status = "200 OK";
+ }
+
+ OK(const JSON::Value& value, const Option<std::string>& jsonp = None())
+ {
+ type = BODY;
+
+ status = "200 OK";
+
+ std::ostringstream out;
+
+ if (jsonp.isSome()) {
+ out << jsonp.get() << "(";
+ }
+
+ JSON::render(out, value);
+
+ if (jsonp.isSome()) {
+ out << ");";
+ headers["Content-Type"] = "text/javascript";
+ } else {
+ headers["Content-Type"] = "application/json";
+ }
+
+ headers["Content-Length"] = stringify(out.str().size());
+ body = out.str().data();
+ }
+};
+
+
+struct TemporaryRedirect : Response
+{
+ TemporaryRedirect(const std::string& url)
+ {
+ status = "307 Temporary Redirect";
+ headers["Location"] = url;
+ }
+};
+
+
+struct BadRequest : Response
+{
+ BadRequest()
+ {
+ status = "400 Bad Request";
+ }
+
+ BadRequest(const std::string& body)
+ : Response(body)
+ {
+ status = "400 Bad Request";
+ }
+};
+
+
+struct NotFound : Response
+{
+ NotFound()
+ {
+ status = "404 Not Found";
+ }
+
+ NotFound(const std::string& body) : Response(body)
+ {
+ status = "404 Not Found";
+ }
+};
+
+
+struct InternalServerError : Response
+{
+ InternalServerError()
+ {
+ status = "500 Internal Server Error";
+ }
+
+ InternalServerError(const std::string& body) : Response(body)
+ {
+ status = "500 Internal Server Error";
+ }
+};
+
+
+struct ServiceUnavailable : Response
+{
+ ServiceUnavailable()
+ {
+ status = "503 Service Unavailable";
+ }
+
+ ServiceUnavailable(const std::string& body) : Response(body)
+ {
+ status = "503 Service Unavailable";
+ }
+};
+
+
+namespace query {
+
+// Parses an HTTP query string into a map. For example:
+//
+// parse("foo=1;bar=2;baz;foo=3")
+//
+// Would return a map with the following:
+// bar: "2"
+// baz: ""
+// foo: "3"
+//
+// We use the last value for a key for simplicity, since the RFC does not
+// specify how to handle duplicate keys:
+// http://en.wikipedia.org/wiki/Query_string
+// TODO(bmahler): If needed, investigate populating the query map inline
+// for better performance.
+inline hashmap<std::string, std::string> parse(const std::string& query)
+{
+ hashmap<std::string, std::string> result;
+
+ const std::vector<std::string>& tokens = strings::tokenize(query, ";&");
+ foreach (const std::string& token, tokens) {
+ const std::vector<std::string>& pairs = strings::split(token, "=");
+ if (pairs.size() == 2) {
+ result[pairs[0]] = pairs[1];
+ } else if (pairs.size() == 1) {
+ result[pairs[0]] = "";
+ }
+ }
+
+ return result;
+}
+
+} // namespace query {
+
+
+// Returns a percent-encoded string according to RFC 3986.
+// The input string must not already be percent encoded.
+inline std::string encode(const std::string& s)
+{
+ std::ostringstream out;
+
+ foreach (unsigned char c, s) {
+ switch (c) {
+ // Reserved characters.
+ case '$':
+ case '&':
+ case '+':
+ case ',':
+ case '/':
+ case ':':
+ case ';':
+ case '=':
+ case '?':
+ case '@':
+ // Unsafe characters.
+ case ' ':
+ case '"':
+ case '<':
+ case '>':
+ case '#':
+ case '%':
+ case '{':
+ case '}':
+ case '|':
+ case '\\':
+ case '^':
+ case '~':
+ case '[':
+ case ']':
+ case '`':
+ // NOTE: The cast to unsigned int is needed.
+ out << '%' << std::setfill('0') << std::setw(2) << std::hex
+ << std::uppercase << (unsigned int) c;
+ break;
+ default:
+ // ASCII control characters and non-ASCII characters.
+ // NOTE: The cast to unsigned int is needed.
+ if (c < 0x20 || c > 0x7F) {
+ out << '%' << std::setfill('0') << std::setw(2) << std::hex
+ << std::uppercase << (unsigned int) c;
+ } else {
+ out << c;
+ }
+ break;
+ }
+ }
+
+ return out.str();
+}
+
+
+// Decodes a percent-encoded string according to RFC 3986.
+// The input string must not already be decoded.
+// Returns error on the occurrence of a malformed % escape in s.
+inline Try<std::string> decode(const std::string& s)
+{
+ std::ostringstream out;
+
+ for (size_t i = 0; i < s.length(); ++i) {
+ if (s[i] != '%') {
+ out << s[i];
+ continue;
+ }
+
+ // We now expect two more characters: % HEXDIG HEXDIG
+ if (i + 2 >= s.length() || !isxdigit(s[i+1]) || !isxdigit(s[i+2])) {
+ return Error(
+ "Malformed % escape in '" + s + "': '" + s.substr(i, 3) + "'");
+ }
+
+ // Convert from HEXDIG HEXDIG to char value.
+ std::istringstream in(s.substr(i + 1, 2));
+ unsigned long l;
+ in >> std::hex >> l;
+ if (l > UCHAR_MAX) {
+ std::cerr << "Unexpected conversion from hex string: "
+ << s.substr(i + 1, 2) << " to unsigned long: "
+ << l << std::endl;
+ abort();
+ }
+ out << static_cast<unsigned char>(l);
+
+ i += 2;
+ }
+
+ return out.str();
+}
+
+
+// Sends a blocking HTTP GET request to the process with the given upid.
+// Returns the HTTP response from the process, read asynchronously.
+//
+// TODO(bmahler): Have the request sent asynchronously as well.
+// TODO(bmahler): For efficiency, this should properly use the ResponseDecoder
+// on the read stream, rather than parsing the full string response at the end.
+Future<Response> get(
+ const UPID& upid,
+ const std::string& path = "",
+ const std::string& query = "");
+
+
+// Status code reason strings, from the HTTP1.1 RFC:
+// http://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html
+extern hashmap<uint16_t, std::string> statuses;
+
+
+inline void initialize()
+{
+ statuses[100] = "100 Continue";
+ statuses[101] = "101 Switching Protocols";
+ statuses[200] = "200 OK";
+ statuses[201] = "201 Created";
+ statuses[202] = "202 Accepted";
+ statuses[203] = "203 Non-Authoritative Information";
+ statuses[204] = "204 No Content";
+ statuses[205] = "205 Reset Content";
+ statuses[206] = "206 Partial Content";
+ statuses[300] = "300 Multiple Choices";
+ statuses[301] = "301 Moved Permanently";
+ statuses[302] = "302 Found";
+ statuses[303] = "303 See Other";
+ statuses[304] = "304 Not Modified";
+ statuses[305] = "305 Use Proxy";
+ statuses[307] = "307 Temporary Redirect";
+ statuses[400] = "400 Bad Request";
+ statuses[401] = "401 Unauthorized";
+ statuses[402] = "402 Payment Required";
+ statuses[403] = "403 Forbidden";
+ statuses[404] = "404 Not Found";
+ statuses[405] = "405 Method Not Allowed";
+ statuses[406] = "406 Not Acceptable";
+ statuses[407] = "407 Proxy Authentication Required";
+ statuses[408] = "408 Request Time-out";
+ statuses[409] = "409 Conflict";
+ statuses[410] = "410 Gone";
+ statuses[411] = "411 Length Required";
+ statuses[412] = "412 Precondition Failed";
+ statuses[413] = "413 Request Entity Too Large";
+ statuses[414] = "414 Request-URI Too Large";
+ statuses[415] = "415 Unsupported Media Type";
+ statuses[416] = "416 Requested range not satisfiable";
+ statuses[417] = "417 Expectation Failed";
+ statuses[500] = "500 Internal Server Error";
+ statuses[501] = "501 Not Implemented";
+ statuses[502] = "502 Bad Gateway";
+ statuses[503] = "503 Service Unavailable";
+ statuses[504] = "504 Gateway Time-out";
+ statuses[505] = "505 HTTP Version not supported";
+}
+
+
+} // namespace http {
+} // namespace process {
+
+#endif // __PROCESS_HTTP_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/id.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/id.hpp b/3rdparty/libprocess/include/process/id.hpp
new file mode 100644
index 0000000..8c256b9
--- /dev/null
+++ b/3rdparty/libprocess/include/process/id.hpp
@@ -0,0 +1,16 @@
+#ifndef __PROCESS_ID_HPP__
+#define __PROCESS_ID_HPP__
+
+#include <string>
+
+namespace process {
+namespace ID {
+
+// Returns 'prefix(N)' where N represents the number of instances
+// where this prefix has been used to generate an ID.
+std::string generate(const std::string& prefix = "");
+
+} // namespace ID {
+} // namespace process {
+
+#endif // __PROCESS_ID_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
new file mode 100644
index 0000000..8cf3244
--- /dev/null
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -0,0 +1,44 @@
+#ifndef __PROCESS_IO_HPP__
+#define __PROCESS_IO_HPP__
+
+#include <cstring> // For size_t.
+#include <string>
+
+#include <process/future.hpp>
+
+namespace process {
+namespace io {
+
+// Possible events for polling.
+const short READ = 0x01;
+const short WRITE = 0x02;
+
+// Buffered read chunk size. Roughly 16 pages.
+const size_t BUFFERED_READ_SIZE = 16*4096;
+
+// TODO(benh): Add a version which takes multiple file descriptors.
+// Returns the events (a subset of the events specified) that can be
+// performed on the specified file descriptor without blocking.
+Future<short> poll(int fd, short events);
+
+
+// Performs a single non-blocking read by polling on the specified file
+// descriptor until any data can be be read. The future will become ready when
+// some data is read (may be less than that specified by size). A future failure
+// will be returned if an error is detected. If end-of-file is reached, value
+// zero will be returned. Note that the return type of this function differs
+// from the standard 'read'. In particular, this function returns the number of
+// bytes read or zero on end-of-file (an error is indicated by failing the
+// future, thus only a 'size_t' is necessary rather than a 'ssize_t').
+Future<size_t> read(int fd, void* data, size_t size);
+
+
+// Performs a series of asynchronous reads, until EOF is reached.
+// NOTE: When using this, ensure the sender will close the connection
+// so that EOF can be reached.
+Future<std::string> read(int fd);
+
+} // namespace io {
+} // namespace process {
+
+#endif // __PROCESS_IO_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/latch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/latch.hpp b/3rdparty/libprocess/include/process/latch.hpp
new file mode 100644
index 0000000..5170aa8
--- /dev/null
+++ b/3rdparty/libprocess/include/process/latch.hpp
@@ -0,0 +1,33 @@
+#ifndef __PROCESS_LATCH_HPP__
+#define __PROCESS_LATCH_HPP__
+
+#include <process/pid.hpp>
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+class Latch
+{
+public:
+ Latch();
+ virtual ~Latch();
+
+ bool operator == (const Latch& that) const { return pid == that.pid; }
+ bool operator < (const Latch& that) const { return pid < that.pid; }
+
+ void trigger();
+ bool await(const Duration& duration = Seconds(-1));
+
+private:
+ // Not copyable, not assignable.
+ Latch(const Latch& that);
+ Latch& operator = (const Latch& that);
+
+ bool triggered;
+ UPID pid;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_LATCH_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/logging.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/logging.hpp b/3rdparty/libprocess/include/process/logging.hpp
new file mode 100644
index 0000000..cba2fd4
--- /dev/null
+++ b/3rdparty/libprocess/include/process/logging.hpp
@@ -0,0 +1,111 @@
+#ifndef __PROCESS_LOGGING_HPP__
+#define __PROCESS_LOGGING_HPP__
+
+#include <glog/logging.h>
+
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/numify.hpp>
+#include <stout/option.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+
+class Logging : public Process<Logging>
+{
+public:
+ Logging()
+ : ProcessBase("logging"),
+ original(FLAGS_v)
+ {
+ // Make sure all reads/writes can be done atomically (i.e., to
+ // make sure VLOG(*) statements don't read partial writes).
+ // TODO(benh): Use "atomics" primitives for doing reads/writes of
+ // FLAGS_v anyway to account for proper memory barriers.
+ CHECK(sizeof(FLAGS_v) == sizeof(int32_t));
+ }
+
+ virtual ~Logging() {}
+
+protected:
+ virtual void initialize()
+ {
+ route("/toggle", &This::toggle);
+ }
+
+private:
+ Future<http::Response> toggle(const http::Request& request)
+ {
+ Option<std::string> level = request.query.get("level");
+ Option<std::string> duration = request.query.get("duration");
+
+ if (level.isNone() && duration.isNone()) {
+ return http::OK(stringify(FLAGS_v) + "\n");
+ }
+
+ if (level.isSome() && duration.isNone()) {
+ return http::BadRequest("Expecting 'duration=value' in query.\n");
+ } else if (level.isNone() && duration.isSome()) {
+ return http::BadRequest("Expecting 'level=value' in query.\n");
+ }
+
+ Try<int> v = numify<int>(level.get());
+
+ if (v.isError()) {
+ return http::BadRequest(v.error() + ".\n");
+ }
+
+ if (v.get() < 0) {
+ return http::BadRequest("Invalid level '" + stringify(v.get()) + "'.\n");
+ } else if (v.get() < original) {
+ return http::BadRequest("'" + stringify(v.get()) + "' < original level.\n");
+ }
+
+ Try<Duration> d = Duration::parse(duration.get());
+
+ if (d.isError()) {
+ return http::BadRequest(d.error() + ".\n");
+ }
+
+ // Set the logging level.
+ set(v.get());
+
+ // Start a revert timer (if necessary).
+ if (v.get() != original) {
+ timeout = d.get();
+ delay(timeout.remaining(), this, &This::revert);
+ }
+
+ return http::OK();
+ }
+
+ void set(int v)
+ {
+ if (FLAGS_v != v) {
+ VLOG(FLAGS_v) << "Setting verbose logging level to " << v;
+ FLAGS_v = v;
+ __sync_synchronize(); // Ensure 'FLAGS_v' visible in other threads.
+ }
+ }
+
+ void revert()
+ {
+ if (timeout.remaining() == Seconds(0)) {
+ set(original);
+ }
+ }
+
+ Timeout timeout;
+
+ const int32_t original; // Original value of FLAGS_v.
+};
+
+} // namespace process {
+
+#endif // __PROCESS_LOGGING_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/message.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/message.hpp b/3rdparty/libprocess/include/process/message.hpp
new file mode 100644
index 0000000..c67c5e1
--- /dev/null
+++ b/3rdparty/libprocess/include/process/message.hpp
@@ -0,0 +1,20 @@
+#ifndef __PROCESS_MESSAGE_HPP__
+#define __PROCESS_MESSAGE_HPP__
+
+#include <string>
+
+#include <process/pid.hpp>
+
+namespace process {
+
+struct Message
+{
+ std::string name;
+ UPID from;
+ UPID to;
+ std::string body;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_MESSAGE_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/mime.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/mime.hpp b/3rdparty/libprocess/include/process/mime.hpp
new file mode 100644
index 0000000..0abeac1
--- /dev/null
+++ b/3rdparty/libprocess/include/process/mime.hpp
@@ -0,0 +1,145 @@
+#ifndef __PROCESS_MIME_HPP__
+#define __PROCESS_MIME_HPP__
+
+namespace process {
+namespace mime {
+
+extern std::map<std::string, std::string> types;
+
+inline void initialize()
+{
+ // These MIME types were collected via:
+ /*
+ python -c '
+ import mimetypes
+ for extension, type in mimetypes.types_map.iteritems():
+ print "types[\"%s\"] = \"%s\";" % (extension, type)
+ '
+ */
+
+ types[".obj"] = "application/octet-stream";
+ types[".ra"] = "audio/x-pn-realaudio";
+ types[".wsdl"] = "application/xml";
+ types[".dll"] = "application/octet-stream";
+ types[".ras"] = "image/x-cmu-raster";
+ types[".ram"] = "application/x-pn-realaudio";
+ types[".bcpio"] = "application/x-bcpio";
+ types[".sh"] = "application/x-sh";
+ types[".m1v"] = "video/mpeg";
+ types[".xwd"] = "image/x-xwindowdump";
+ types[".doc"] = "application/msword";
+ types[".bmp"] = "image/x-ms-bmp";
+ types[".shar"] = "application/x-shar";
+ types[".js"] = "application/x-javascript";
+ types[".src"] = "application/x-wais-source";
+ types[".dvi"] = "application/x-dvi";
+ types[".aif"] = "audio/x-aiff";
+ types[".ksh"] = "text/plain";
+ types[".dot"] = "application/msword";
+ types[".mht"] = "message/rfc822";
+ types[".p12"] = "application/x-pkcs12";
+ types[".css"] = "text/css";
+ types[".csh"] = "application/x-csh";
+ types[".pwz"] = "application/vnd.ms-powerpoint";
+ types[".pdf"] = "application/pdf";
+ types[".cdf"] = "application/x-netcdf";
+ types[".pl"] = "text/plain";
+ types[".ai"] = "application/postscript";
+ types[".jpe"] = "image/jpeg";
+ types[".jpg"] = "image/jpeg";
+ types[".py"] = "text/x-python";
+ types[".xml"] = "text/xml";
+ types[".jpeg"] = "image/jpeg";
+ types[".ps"] = "application/postscript";
+ types[".gtar"] = "application/x-gtar";
+ types[".xpm"] = "image/x-xpixmap";
+ types[".hdf"] = "application/x-hdf";
+ types[".nws"] = "message/rfc822";
+ types[".tsv"] = "text/tab-separated-values";
+ types[".xpdl"] = "application/xml";
+ types[".p7c"] = "application/pkcs7-mime";
+ types[".eps"] = "application/postscript";
+ types[".ief"] = "image/ief";
+ types[".so"] = "application/octet-stream";
+ types[".xlb"] = "application/vnd.ms-excel";
+ types[".pbm"] = "image/x-portable-bitmap";
+ types[".texinfo"] = "application/x-texinfo";
+ types[".xls"] = "application/vnd.ms-excel";
+ types[".tex"] = "application/x-tex";
+ types[".rtx"] = "text/richtext";
+ types[".html"] = "text/html";
+ types[".aiff"] = "audio/x-aiff";
+ types[".aifc"] = "audio/x-aiff";
+ types[".exe"] = "application/octet-stream";
+ types[".sgm"] = "text/x-sgml";
+ types[".tif"] = "image/tiff";
+ types[".mpeg"] = "video/mpeg";
+ types[".ustar"] = "application/x-ustar";
+ types[".gif"] = "image/gif";
+ types[".ppt"] = "application/vnd.ms-powerpoint";
+ types[".pps"] = "application/vnd.ms-powerpoint";
+ types[".sgml"] = "text/x-sgml";
+ types[".ppm"] = "image/x-portable-pixmap";
+ types[".latex"] = "application/x-latex";
+ types[".bat"] = "text/plain";
+ types[".mov"] = "video/quicktime";
+ types[".ppa"] = "application/vnd.ms-powerpoint";
+ types[".tr"] = "application/x-troff";
+ types[".rdf"] = "application/xml";
+ types[".xsl"] = "application/xml";
+ types[".eml"] = "message/rfc822";
+ types[".nc"] = "application/x-netcdf";
+ types[".sv4cpio"] = "application/x-sv4cpio";
+ types[".bin"] = "application/octet-stream";
+ types[".h"] = "text/plain";
+ types[".tcl"] = "application/x-tcl";
+ types[".wiz"] = "application/msword";
+ types[".o"] = "application/octet-stream";
+ types[".a"] = "application/octet-stream";
+ types[".c"] = "text/plain";
+ types[".wav"] = "audio/x-wav";
+ types[".vcf"] = "text/x-vcard";
+ types[".xbm"] = "image/x-xbitmap";
+ types[".txt"] = "text/plain";
+ types[".au"] = "audio/basic";
+ types[".t"] = "application/x-troff";
+ types[".tiff"] = "image/tiff";
+ types[".texi"] = "application/x-texinfo";
+ types[".oda"] = "application/oda";
+ types[".ms"] = "application/x-troff-ms";
+ types[".rgb"] = "image/x-rgb";
+ types[".me"] = "application/x-troff-me";
+ types[".sv4crc"] = "application/x-sv4crc";
+ types[".qt"] = "video/quicktime";
+ types[".mpa"] = "video/mpeg";
+ types[".mpg"] = "video/mpeg";
+ types[".mpe"] = "video/mpeg";
+ types[".avi"] = "video/x-msvideo";
+ types[".pgm"] = "image/x-portable-graymap";
+ types[".pot"] = "application/vnd.ms-powerpoint";
+ types[".mif"] = "application/x-mif";
+ types[".roff"] = "application/x-troff";
+ types[".htm"] = "text/html";
+ types[".man"] = "application/x-troff-man";
+ types[".etx"] = "text/x-setext";
+ types[".zip"] = "application/zip";
+ types[".movie"] = "video/x-sgi-movie";
+ types[".pyc"] = "application/x-python-code";
+ types[".png"] = "image/png";
+ types[".pfx"] = "application/x-pkcs12";
+ types[".mhtml"] = "message/rfc822";
+ types[".tar"] = "application/x-tar";
+ types[".pnm"] = "image/x-portable-anymap";
+ types[".pyo"] = "application/x-python-code";
+ types[".snd"] = "audio/basic";
+ types[".cpio"] = "application/x-cpio";
+ types[".swf"] = "application/x-shockwave-flash";
+ types[".mp3"] = "audio/mpeg";
+ types[".mp2"] = "audio/mpeg";
+ types[".mp4"] = "video/mp4";
+}
+
+} // } namespace mime {
+} // } namespace process {
+
+#endif // __PROCESS_MIME_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/once.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/once.hpp b/3rdparty/libprocess/include/process/once.hpp
new file mode 100644
index 0000000..e85b382
--- /dev/null
+++ b/3rdparty/libprocess/include/process/once.hpp
@@ -0,0 +1,48 @@
+#ifndef __PROCESS_ONCE_HPP__
+#define __PROCESS_ONCE_HPP__
+
+#include <process/future.hpp>
+
+#include <stout/nothing.hpp>
+
+namespace process {
+
+// Provides a _blocking_ abstraction that's useful for performing a
+// task exactly once.
+class Once
+{
+public:
+ Once() {}
+
+ // Returns true if this Once instance has already transitioned to a
+ // 'done' state (i.e., the action you wanted to perform "once" has
+ // been completed). Note that this BLOCKS until Once::done has been
+ // called.
+ bool once()
+ {
+ if (!outer.set(&inner)) {
+ inner.future().await();
+ return true;
+ }
+
+ return false;
+ }
+
+ // Transitions this Once instance to a 'done' state.
+ void done()
+ {
+ inner.set(Nothing());
+ }
+
+private:
+ // Not copyable, not assignable.
+ Once(const Once& that);
+ Once& operator = (const Once& that);
+
+ Promise<Nothing> inner;
+ Promise<Promise<Nothing>*> outer;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_ONCE_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
new file mode 100644
index 0000000..5a77dbc
--- /dev/null
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -0,0 +1,121 @@
+#ifndef __PROCESS_PID_HPP__
+#define __PROCESS_PID_HPP__
+
+#include <stdint.h>
+
+#include <iostream>
+#include <sstream>
+#include <string>
+
+
+namespace process {
+
+// Forward declaration to break cyclic dependencies.
+class ProcessBase;
+
+
+struct UPID
+{
+ UPID()
+ : ip(0), port(0) {}
+
+ UPID(const UPID& that)
+ : id(that.id), ip(that.ip), port(that.port) {}
+
+ UPID(const char* id_, uint32_t ip_, uint16_t port_)
+ : id(id_), ip(ip_), port(port_) {}
+
+ UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
+ : id(id_), ip(ip_), port(port_) {}
+
+ UPID(const char* s);
+
+ UPID(const std::string& s);
+
+ UPID(const ProcessBase& process);
+
+ operator std::string () const;
+
+ operator bool () const
+ {
+ return id != "" && ip != 0 && port != 0;
+ }
+
+ bool operator ! () const
+ {
+ return id == "" && ip == 0 && port == 0;
+ }
+
+ bool operator < (const UPID& that) const
+ {
+ if (this != &that) {
+ if (ip == that.ip && port == that.port)
+ return id < that.id;
+ else if (ip == that.ip && port != that.port)
+ return port < that.port;
+ else
+ return ip < that.ip;
+ }
+
+ return false;
+ }
+
+ bool operator == (const UPID& that) const
+ {
+ if (this != &that) {
+ return (id == that.id &&
+ ip == that.ip &&
+ port == that.port);
+ }
+
+ return true;
+ }
+
+ bool operator != (const UPID& that) const
+ {
+ return !(this->operator == (that));
+ }
+
+ std::string id;
+ uint32_t ip;
+ uint16_t port;
+};
+
+
+template <typename T = ProcessBase>
+struct PID : UPID
+{
+ PID() : UPID() {}
+
+ PID(const T* t) : UPID(static_cast<const ProcessBase&>(*t)) {}
+ PID(const T& t) : UPID(static_cast<const ProcessBase&>(t)) {}
+
+ template <typename Base>
+ operator PID<Base> () const
+ {
+ // Only allow upcasts!
+ T* t = NULL;
+ Base* base = t;
+ (void)base; // Eliminate unused base warning.
+ PID<Base> pid;
+ pid.id = id;
+ pid.ip = ip;
+ pid.port = port;
+ return pid;
+ }
+};
+
+
+// Outputing UPIDs and generating UPIDs using streams.
+std::ostream& operator << (std::ostream&, const UPID&);
+std::istream& operator >> (std::istream&, UPID&);
+
+
+// UPID hash value (for example, to use in Boost's unordered maps).
+std::size_t hash_value(const UPID&);
+
+} // namespace process {
+
+
+
+#endif // __PROCESS_PID_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
new file mode 100644
index 0000000..8228e4f
--- /dev/null
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -0,0 +1,370 @@
+#ifndef __PROCESS_PROCESS_HPP__
+#define __PROCESS_PROCESS_HPP__
+
+#include <stdint.h>
+#include <pthread.h>
+
+#include <map>
+#include <queue>
+
+#include <tr1/functional>
+
+#include <process/clock.hpp>
+#include <process/event.hpp>
+#include <process/filter.hpp>
+#include <process/http.hpp>
+#include <process/message.hpp>
+#include <process/mime.hpp>
+#include <process/pid.hpp>
+#include <process/thread.hpp>
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+class ProcessBase : public EventVisitor
+{
+public:
+ ProcessBase(const std::string& id = "");
+
+ virtual ~ProcessBase();
+
+ UPID self() const { return pid; }
+
+protected:
+ // Invoked when an event is serviced.
+ virtual void serve(const Event& event)
+ {
+ event.visit(this);
+ }
+
+ // Callbacks used to visit (i.e., handle) a specific event.
+ virtual void visit(const MessageEvent& event);
+ virtual void visit(const DispatchEvent& event);
+ virtual void visit(const HttpEvent& event);
+ virtual void visit(const ExitedEvent& event);
+ virtual void visit(const TerminateEvent& event);
+
+ // Invoked when a process gets spawned.
+ virtual void initialize() {}
+
+ // Invoked when a process is terminated (unless visit is overriden).
+ virtual void finalize() {}
+
+ // Invoked when a linked process has exited (see link).
+ virtual void exited(const UPID& pid) {}
+
+ // Invoked when a linked process can no longer be monitored (see link).
+ virtual void lost(const UPID& pid) {}
+
+ // Puts a message at front of queue.
+ void inject(
+ const UPID& from,
+ const std::string& name,
+ const char* data = NULL,
+ size_t length = 0);
+
+ // Sends a message with data to PID.
+ void send(
+ const UPID& to,
+ const std::string& name,
+ const char* data = NULL,
+ size_t length = 0);
+
+ // Links with the specified PID. Linking with a process from within
+ // the same "operating system process" is gauranteed to give you
+ // perfect monitoring of that process. However, linking with a
+ // process on another machine might result in receiving lost
+ // callbacks due to the nature of a distributed environment.
+ UPID link(const UPID& pid);
+
+ // The default visit implementation for message events invokes
+ // installed message handlers, or delegates the message to another
+ // process (a delegate can be installed below but a message handler
+ // always takes precedence over delegating). A message handler is
+ // any function which takes two arguments, the "from" pid and the
+ // message body.
+ typedef std::tr1::function<void(const UPID&, const std::string&)>
+ MessageHandler;
+
+ // Setup a handler for a message.
+ void install(
+ const std::string& name,
+ const MessageHandler& handler)
+ {
+ handlers.message[name] = handler;
+ }
+
+ template <typename T>
+ void install(
+ const std::string& name,
+ void (T::*method)(const UPID&, const std::string&))
+ {
+ // Note that we use dynamic_cast here so a process can use
+ // multiple inheritance if it sees so fit (e.g., to implement
+ // multiple callback interfaces).
+ MessageHandler handler =
+ std::tr1::bind(method,
+ dynamic_cast<T*>(this),
+ std::tr1::placeholders::_1,
+ std::tr1::placeholders::_2);
+ install(name, handler);
+ }
+
+ // Delegate incoming message's with the specified name to pid.
+ void delegate(const std::string& name, const UPID& pid)
+ {
+ delegates[name] = pid;
+ }
+
+ // The default visit implementation for HTTP events invokes
+ // installed HTTP handlers. A HTTP handler is any function which
+ // takes an http::Request object and returns an http::Response.
+ typedef std::tr1::function<Future<http::Response>(const http::Request&)>
+ HttpRequestHandler;
+
+ // Setup a handler for an HTTP request.
+ bool route(
+ const std::string& name,
+ const HttpRequestHandler& handler)
+ {
+ if (name.find('/') != 0) {
+ return false;
+ }
+ handlers.http[name.substr(1)] = handler;
+ return true;
+ }
+
+ template <typename T>
+ bool route(
+ const std::string& name,
+ Future<http::Response> (T::*method)(const http::Request&))
+ {
+ // Note that we use dynamic_cast here so a process can use
+ // multiple inheritance if it sees so fit (e.g., to implement
+ // multiple callback interfaces).
+ HttpRequestHandler handler =
+ std::tr1::bind(method, dynamic_cast<T*>(this),
+ std::tr1::placeholders::_1);
+ return route(name, handler);
+ }
+
+ // Provide the static asset(s) at the specified _absolute_ path for
+ // the specified name. For example, assuming the process named
+ // "server" invoked 'provide("name", "path")' then an HTTP request
+ // for '/server/name' would return the asset found at 'path'. If the
+ // specified path is a directory then an HTTP request for
+ // '/server/name/file' would return the asset found at
+ // '/path/file'. The 'Content-Type' header of the HTTP response will
+ // be set to the specified type given the file extension (you can
+ // manipulate this via the optional 'types' parameter).
+ void provide(
+ const std::string& name,
+ const std::string& path,
+ const std::map<std::string, std::string>& types = mime::types)
+ {
+ // TODO(benh): Check that name is only alphanumeric (i.e., has no
+ // '/') and that path is absolute.
+ Asset asset;
+ asset.path = path;
+ asset.types = types;
+ assets[name] = asset;
+ }
+
+private:
+ friend class SocketManager;
+ friend class ProcessManager;
+ friend class ProcessReference;
+ friend void* schedule(void*);
+
+ // Process states.
+ enum { BOTTOM,
+ READY,
+ RUNNING,
+ BLOCKED,
+ TERMINATING,
+ TERMINATED } state;
+
+ // Mutex protecting internals.
+ // TODO(benh): Consider replacing with a spinlock, on multi-core systems.
+ pthread_mutex_t m;
+ void lock() { pthread_mutex_lock(&m); }
+ void unlock() { pthread_mutex_unlock(&m); }
+
+ // Enqueue the specified message, request, or function call.
+ void enqueue(Event* event, bool inject = false);
+
+ // Queue of received events.
+ std::deque<Event*> events;
+
+ // Delegates for messages.
+ std::map<std::string, UPID> delegates;
+
+ // Handlers for messages and HTTP requests.
+ struct {
+ std::map<std::string, MessageHandler> message;
+ std::map<std::string, HttpRequestHandler> http;
+ } handlers;
+
+ // Definition of a static asset.
+ struct Asset
+ {
+ std::string path;
+ std::map<std::string, std::string> types;
+ };
+
+ // Static assets(s) to provide.
+ std::map<std::string, Asset> assets;
+
+ // Active references.
+ int refs;
+
+ // Process PID.
+ UPID pid;
+};
+
+
+template <typename T>
+class Process : public virtual ProcessBase {
+public:
+ virtual ~Process() {}
+
+ // Returns pid of process; valid even before calling spawn.
+ PID<T> self() const { return PID<T>(dynamic_cast<const T*>(this)); }
+
+protected:
+ // Useful typedefs for dispatch/delay/defer to self()/this.
+ typedef T Self;
+ typedef T This;
+};
+
+
+/**
+ * Initialize the library. Note that libprocess uses Google's glog and
+ * you can specify options for it (e.g., a logging directory) via
+ * environment variables (see the glog documentation for more
+ * information).
+ *
+ * @param delegate process to receive root HTTP requests
+ */
+void initialize(const std::string& delegate = "");
+
+
+/**
+ * Returns the IP address associated with this instance of the
+ * library.
+ */
+uint32_t ip();
+
+
+/**
+ * Returns the port associated with this instance of the library.
+ */
+uint16_t port();
+
+
+/**
+ * Spawn a new process.
+ *
+ * @param process process to be spawned
+ * @param manage boolean whether process should get garbage collected
+ */
+UPID spawn(ProcessBase* process, bool manage = false);
+
+template <typename T>
+PID<T> spawn(T* t, bool manage = false)
+{
+ // We save the pid before spawn is called because it's possible that
+ // the process has already been deleted after spawn returns (e.g.,
+ // if 'manage' is true).
+ PID<T> pid(t);
+
+ if (!spawn(static_cast<ProcessBase*>(t), manage)) {
+ return PID<T>();
+ }
+
+ return pid;
+}
+
+template <typename T>
+PID<T> spawn(T& t, bool manage = false)
+{
+ return spawn(&t, manage);
+}
+
+
+/**
+ * Send a TERMINATE message to a process, injecting the message ahead
+ * of all other messages queued up for that process if requested. Note
+ * that currently terminate only works for local processes (in the
+ * future we plan to make this more explicit via the use of a PID
+ * instead of a UPID).
+ *
+ * @param inject if true message will be put on front of message queue
+ */
+void terminate(const UPID& pid, bool inject = true);
+void terminate(const ProcessBase& process, bool inject = true);
+void terminate(const ProcessBase* process, bool inject = true);
+
+
+/**
+ * Wait for process to exit no more than specified seconds (returns
+ * true if actually waited on a process).
+ *
+ * @param PID id of the process
+ * @param secs max time to wait, 0 implies wait for ever
+ */
+bool wait(const UPID& pid, const Duration& duration = Seconds(-1));
+bool wait(const ProcessBase& process, const Duration& duration = Seconds(-1));
+bool wait(const ProcessBase* process, const Duration& duration = Seconds(-1));
+
+
+/**
+ * Sends a message with data without a return address.
+ *
+ * @param to receiver
+ * @param name message name
+ * @param data data to send (gets copied)
+ * @param length length of data
+ */
+void post(const UPID& to,
+ const std::string& name,
+ const char* data = NULL,
+ size_t length = 0);
+
+
+// Inline implementations of above.
+inline void terminate(const ProcessBase& process, bool inject)
+{
+ terminate(process.self(), inject);
+}
+
+
+inline void terminate(const ProcessBase* process, bool inject)
+{
+ terminate(process->self(), inject);
+}
+
+
+inline bool wait(const ProcessBase& process, const Duration& duration)
+{
+ return process::wait(process.self(), duration); // Explicit to disambiguate.
+}
+
+
+inline bool wait(const ProcessBase* process, const Duration& duration)
+{
+ return process::wait(process->self(), duration); // Explicit to disambiguate.
+}
+
+
+// Per thread process pointer. The extra level of indirection from
+// _process_ to __process__ is used in order to take advantage of the
+// ThreadLocal operators without needing the extra dereference.
+extern ThreadLocal<ProcessBase>* _process_;
+
+#define __process__ (*_process_)
+
+} // namespace process {
+
+#endif // __PROCESS_PROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/profiler.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/profiler.hpp b/3rdparty/libprocess/include/process/profiler.hpp
new file mode 100644
index 0000000..64cf622
--- /dev/null
+++ b/3rdparty/libprocess/include/process/profiler.hpp
@@ -0,0 +1,116 @@
+#ifndef __PROCESS_PROFILER_HPP__
+#define __PROCESS_PROFILER_HPP__
+
+#include <glog/logging.h>
+
+#ifdef HAS_GPERFTOOLS
+#include <gperftools/profiler.h>
+#endif
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+#include <stout/format.hpp>
+#include <stout/os.hpp>
+
+namespace process {
+
+const std::string PROFILE_FILE = "perftools.out";
+
+class Profiler : public Process<Profiler>
+{
+public:
+ Profiler() : ProcessBase("profiler"), started(false) {}
+
+ virtual ~Profiler() {}
+
+protected:
+ virtual void initialize()
+ {
+ route("/start", &Profiler::start);
+ route("/stop", &Profiler::stop);
+ }
+
+private:
+ // HTTP endpoints.
+
+ // Starts the profiler. There are no request parameters.
+ Future<http::Response> start(const http::Request& request)
+ {
+#ifdef HAS_GPERFTOOLS
+ if (os::getenv("LIBPROCESS_ENABLE_PROFILER", false) != "1") {
+ return http::BadRequest(
+ "The profiler is not enabled. To enable the profiler, libprocess "
+ "must be started with LIBPROCESS_ENABLE_PROFILER=1 in the "
+ "environment.\n");
+ }
+
+ if (started) {
+ return http::BadRequest("Profiler already started.\n");
+ }
+
+ LOG(INFO) << "Starting Profiler";
+
+ // WARNING: If using libunwind < 1.0.1, profiling should not be used, as
+ // there are reports of crashes.
+ // WARNING: If using libunwind 1.0.1, profiling should not be turned on
+ // when it's possible for new threads to be created.
+ // This may cause a deadlock. The workaround used in libprocess is described
+ // here:
+ // https://groups.google.com/d/topic/google-perftools/Df10Uy4Djrg/discussion
+ // NOTE: We have not tested this with libunwind > 1.0.1.
+ if (!ProfilerStart(PROFILE_FILE.c_str())) {
+ std::string error =
+ strings::format("Failed to start profiler: %s", strerror(errno)).get();
+ LOG(ERROR) << error;
+ return http::InternalServerError(error);
+ }
+
+ started = true;
+ return http::OK("Profiler started.\n");
+#else
+ return http::BadRequest(
+ "Perftools is disabled. To enable perftools, "
+ "configure libprocess with --enable-perftools.\n");
+#endif
+ }
+
+ // Stops the profiler. There are no request parameters.
+ // This returns the profile output, it will also remain present
+ // in the working directory.
+ Future<http::Response> stop(const http::Request& request)
+ {
+#ifdef HAS_GPERFTOOLS
+ if (!started) {
+ return http::BadRequest("Profiler not running.\n");
+ }
+
+ LOG(INFO) << "Stopping Profiler";
+
+ ProfilerStop();
+
+ http::OK response;
+ response.type = response.PATH;
+ response.path = "perftools.out";
+ response.headers["Content-Type"] = "application/octet-stream";
+ response.headers["Content-Disposition"] =
+ strings::format("attachment; filename=%s", PROFILE_FILE).get();
+
+ started = false;
+ return response;
+#else
+ return http::BadRequest(
+ "Perftools is disabled. To enable perftools, "
+ "configure libprocess with --enable-perftools.\n");
+#endif
+ }
+
+ bool started;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_PROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/protobuf.hpp b/3rdparty/libprocess/include/process/protobuf.hpp
new file mode 100644
index 0000000..580c87a
--- /dev/null
+++ b/3rdparty/libprocess/include/process/protobuf.hpp
@@ -0,0 +1,415 @@
+#ifndef __PROCESS_PROTOBUF_HPP__
+#define __PROCESS_PROTOBUF_HPP__
+
+#include <glog/logging.h>
+
+#include <google/protobuf/message.h>
+#include <google/protobuf/repeated_field.h>
+
+#include <set>
+#include <vector>
+
+#include <tr1/functional>
+#include <tr1/unordered_map>
+
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
+
+// Provides an implementation of process::post that for a protobuf.
+namespace process {
+
+inline void post(const process::UPID& to,
+ const google::protobuf::Message& message)
+{
+ std::string data;
+ message.SerializeToString(&data);
+ post(to, message.GetTypeName(), data.data(), data.size());
+}
+
+} // namespace process {
+
+
+// The rest of this file provides libprocess "support" for using
+// protocol buffers. In particular, this file defines a subclass of
+// Process (ProtobufProcess) that allows you to install protocol
+// buffer handlers in addition to normal message and HTTP
+// handlers. Note that this header file assumes you will be linking
+// against BOTH libprotobuf and libglog.
+
+namespace google { namespace protobuf {
+
+// Type conversions helpful for changing between protocol buffer types
+// and standard C++ types (for parameters).
+template <typename T>
+const T& convert(const T& t)
+{
+ return t;
+}
+
+
+template <typename T>
+std::vector<T> convert(const google::protobuf::RepeatedPtrField<T>& items)
+{
+ std::vector<T> result;
+ for (int i = 0; i < items.size(); i++) {
+ result.push_back(items.Get(i));
+ }
+
+ return result;
+}
+
+}} // namespace google { namespace protobuf {
+
+
+template <typename T>
+class ProtobufProcess : public process::Process<T>
+{
+public:
+ virtual ~ProtobufProcess() {}
+
+protected:
+ virtual void visit(const process::MessageEvent& event)
+ {
+ if (protobufHandlers.count(event.message->name) > 0) {
+ from = event.message->from; // For 'reply'.
+ protobufHandlers[event.message->name](event.message->body);
+ from = process::UPID();
+ } else {
+ process::Process<T>::visit(event);
+ }
+ }
+
+ void send(const process::UPID& to,
+ const google::protobuf::Message& message)
+ {
+ std::string data;
+ message.SerializeToString(&data);
+ process::Process<T>::send(to, message.GetTypeName(),
+ data.data(), data.size());
+ }
+
+ using process::Process<T>::send;
+
+ void reply(const google::protobuf::Message& message)
+ {
+ CHECK(from) << "Attempting to reply without a sender";
+ std::string data;
+ message.SerializeToString(&data);
+ send(from, message);
+ }
+
+ template <typename M>
+ void install(void (T::*method)(const M&))
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handlerM<M>,
+ t, method,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ template <typename M>
+ void install(void (T::*method)())
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handler0,
+ t, method,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ template <typename M,
+ typename P1, typename P1C>
+ void install(void (T::*method)(P1C),
+ P1 (M::*param1)() const)
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handler1<M, P1, P1C>,
+ t, method, param1,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C>
+ void install(void (T::*method)(P1C, P2C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const)
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handler2<M, P1, P1C, P2, P2C>,
+ t, method, p1, p2,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C,
+ typename P3, typename P3C>
+ void install(void (T::*method)(P1C, P2C, P3C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ P3 (M::*p3)() const)
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handler3<M, P1, P1C, P2, P2C, P3, P3C>,
+ t, method, p1, p2, p3,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C,
+ typename P3, typename P3C,
+ typename P4, typename P4C>
+ void install(void (T::*method)(P1C, P2C, P3C, P4C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ P3 (M::*p3)() const,
+ P4 (M::*p4)() const)
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handler4<M, P1, P1C, P2, P2C, P3, P3C, P4, P4C>,
+ t, method, p1, p2, p3, p4,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C,
+ typename P3, typename P3C,
+ typename P4, typename P4C,
+ typename P5, typename P5C>
+ void install(void (T::*method)(P1C, P2C, P3C, P4C, P5C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ P3 (M::*p3)() const,
+ P4 (M::*p4)() const,
+ P5 (M::*p5)() const)
+ {
+ google::protobuf::Message* m = new M();
+ T* t = static_cast<T*>(this);
+ protobufHandlers[m->GetTypeName()] =
+ std::tr1::bind(&handler5<M, P1, P1C, P2, P2C, P3, P3C, P4, P4C, P5, P5C>,
+ t, method, p1, p2, p3, p4, p5,
+ std::tr1::placeholders::_1);
+ delete m;
+ }
+
+ using process::Process<T>::install;
+
+ process::UPID from; // Sender of "current" message, accessible by subclasses.
+
+private:
+ template <typename M>
+ static void handlerM(T* t, void (T::*method)(const M&),
+ const std::string& data)
+ {
+ M m;
+ m.ParseFromString(data);
+ if (m.IsInitialized()) {
+ (t->*method)(m);
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << m.InitializationErrorString();
+ }
+ }
+
+ static void handler0(T* t, void (T::*method)(),
+ const std::string& data)
+ {
+ (t->*method)();
+ }
+
+ template <typename M,
+ typename P1, typename P1C>
+ static void handler1(T* t, void (T::*method)(P1C),
+ P1 (M::*p1)() const,
+ const std::string& data)
+ {
+ M m;
+ m.ParseFromString(data);
+ if (m.IsInitialized()) {
+ (t->*method)(google::protobuf::convert((&m->*p1)()));
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << m.InitializationErrorString();
+ }
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C>
+ static void handler2(T* t, void (T::*method)(P1C, P2C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ const std::string& data)
+ {
+ M m;
+ m.ParseFromString(data);
+ if (m.IsInitialized()) {
+ (t->*method)(google::protobuf::convert((&m->*p1)()),
+ google::protobuf::convert((&m->*p2)()));
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << m.InitializationErrorString();
+ }
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C,
+ typename P3, typename P3C>
+ static void handler3(T* t, void (T::*method)(P1C, P2C, P3C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ P3 (M::*p3)() const,
+ const std::string& data)
+ {
+ M m;
+ m.ParseFromString(data);
+ if (m.IsInitialized()) {
+ (t->*method)(google::protobuf::convert((&m->*p1)()),
+ google::protobuf::convert((&m->*p2)()),
+ google::protobuf::convert((&m->*p3)()));
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << m.InitializationErrorString();
+ }
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C,
+ typename P3, typename P3C,
+ typename P4, typename P4C>
+ static void handler4(T* t, void (T::*method)(P1C, P2C, P3C, P4C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ P3 (M::*p3)() const,
+ P4 (M::*p4)() const,
+ const std::string& data)
+ {
+ M m;
+ m.ParseFromString(data);
+ if (m.IsInitialized()) {
+ (t->*method)(google::protobuf::convert((&m->*p1)()),
+ google::protobuf::convert((&m->*p2)()),
+ google::protobuf::convert((&m->*p3)()),
+ google::protobuf::convert((&m->*p4)()));
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << m.InitializationErrorString();
+ }
+ }
+
+ template <typename M,
+ typename P1, typename P1C,
+ typename P2, typename P2C,
+ typename P3, typename P3C,
+ typename P4, typename P4C,
+ typename P5, typename P5C>
+ static void handler5(T* t, void (T::*method)(P1C, P2C, P3C, P4C, P5C),
+ P1 (M::*p1)() const,
+ P2 (M::*p2)() const,
+ P3 (M::*p3)() const,
+ P4 (M::*p4)() const,
+ P5 (M::*p5)() const,
+ const std::string& data)
+ {
+ M m;
+ m.ParseFromString(data);
+ if (m.IsInitialized()) {
+ (t->*method)(google::protobuf::convert((&m->*p1)()),
+ google::protobuf::convert((&m->*p2)()),
+ google::protobuf::convert((&m->*p3)()),
+ google::protobuf::convert((&m->*p4)()),
+ google::protobuf::convert((&m->*p5)()));
+ } else {
+ LOG(WARNING) << "Initialization errors: "
+ << m.InitializationErrorString();
+ }
+ }
+
+ typedef std::tr1::function<void(const std::string&)> handler;
+ std::tr1::unordered_map<std::string, handler> protobufHandlers;
+};
+
+
+// Implements a process for sending protobuf "requests" to a process
+// and waiting for a protobuf "response", but uses futures so that
+// this can be done without needing to implement a process.
+template <typename Req, typename Res>
+class ReqResProcess : public ProtobufProcess<ReqResProcess<Req, Res> >
+{
+public:
+ ReqResProcess(const process::UPID& _pid, const Req& _req)
+ : pid(_pid), req(_req)
+ {
+ ProtobufProcess<ReqResProcess<Req, Res> >::template
+ install<Res>(&ReqResProcess<Req, Res>::response);
+ }
+
+ process::Future<Res> run()
+ {
+ // Terminate this process if no one cares about the response
+ // (note, we need to disambiguate the process::terminate).
+ void (*terminate)(const process::UPID&, bool) = &process::terminate;
+ promise.future().onDiscarded(
+ std::tr1::bind(terminate, process::ProcessBase::self(), true));
+
+ ProtobufProcess<ReqResProcess<Req, Res> >::send(pid, req);
+
+ return promise.future();
+ }
+
+private:
+ void response(const Res& res)
+ {
+ promise.set(res);
+ process::terminate(process::ProcessBase::self());
+ }
+
+ const process::UPID pid;
+ const Req req;
+ process::Promise<Res> promise;
+};
+
+
+// Allows you to describe request/response protocols and then use
+// those for sending requests and getting back responses.
+template <typename Req, typename Res>
+struct Protocol
+{
+ process::Future<Res> operator () (
+ const process::UPID& pid,
+ const Req& req) const
+ {
+ // Help debugging by adding some "type constraints".
+ { Req* req = NULL; google::protobuf::Message* m = req; (void)m; }
+ { Res* res = NULL; google::protobuf::Message* m = res; (void)m; }
+
+ ReqResProcess<Req, Res>* process = new ReqResProcess<Req, Res>(pid, req);
+ process::spawn(process, true);
+ return process::dispatch(process, &ReqResProcess<Req, Res>::run);
+ }
+};
+
+#endif // __PROCESS_PROTOBUF_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/run.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/run.hpp b/3rdparty/libprocess/include/process/run.hpp
new file mode 100644
index 0000000..a245b70
--- /dev/null
+++ b/3rdparty/libprocess/include/process/run.hpp
@@ -0,0 +1,80 @@
+#ifndef __PROCESS_RUN_HPP__
+#define __PROCESS_RUN_HPP__
+
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+
+#include <process/process.hpp>
+
+#include <stout/preprocessor.hpp>
+
+namespace process {
+
+namespace internal {
+
+template <typename R>
+class ThunkProcess : public Process<ThunkProcess<R> >
+{
+public:
+ ThunkProcess(std::tr1::shared_ptr<std::tr1::function<R(void)> > _thunk,
+ std::tr1::shared_ptr<Promise<R> > _promise)
+ : thunk(_thunk),
+ promise(_promise) {}
+
+ virtual ~ThunkProcess() {}
+
+protected:
+ virtual void serve(const Event& event)
+ {
+ promise->set((*thunk)());
+ }
+
+private:
+ std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk;
+ std::tr1::shared_ptr<Promise<R> > promise;
+};
+
+} // namespace internal {
+
+
+template <typename R>
+Future<R> run(R (*method)(void))
+{
+ std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk(
+ new std::tr1::function<R(void)>(
+ std::tr1::bind(method)));
+
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>());
+ Future<R> future = promise->future();
+
+ terminate(spawn(new internal::ThunkProcess<R>(thunk, promise), true));
+
+ return future;
+}
+
+
+#define TEMPLATE(Z, N, DATA) \
+ template <typename R, \
+ ENUM_PARAMS(N, typename P), \
+ ENUM_PARAMS(N, typename A)> \
+ Future<R> run( \
+ R (*method)(ENUM_PARAMS(N, P)), \
+ ENUM_BINARY_PARAMS(N, A, a)) \
+ { \
+ std::tr1::shared_ptr<std::tr1::function<R(void)> > thunk( \
+ new std::tr1::function<R(void)>( \
+ std::tr1::bind(method, ENUM_PARAMS(N, a)))); \
+ \
+ std::tr1::shared_ptr<Promise<R> > promise(new Promise<R>()); \
+ Future<R> future = promise->future(); \
+ \
+ terminate(spawn(new internal::ThunkProcess<R>(thunk, promise), true)); \
+ \
+ return future; \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args A0 -> A9.
+#undef TEMPLATE
+
+} // namespace process {
+
+#endif // __PROCESS_RUN_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
new file mode 100644
index 0000000..669a333
--- /dev/null
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -0,0 +1,84 @@
+#ifndef __PROCESS_SOCKET_HPP__
+#define __PROCESS_SOCKET_HPP__
+
+#include <assert.h>
+#include <unistd.h> // For close.
+
+#include <iostream>
+
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+// An abstraction around a socket (file descriptor) that provides
+// reference counting such that the socket is only closed (and thus,
+// has the possiblity of being reused) after there are no more
+// references.
+
+class Socket
+{
+public:
+ Socket()
+ : refs(new int(1)), s(-1) {}
+
+ explicit Socket(int _s)
+ : refs(new int(1)), s(_s) {}
+
+ ~Socket()
+ {
+ cleanup();
+ }
+
+ Socket(const Socket& that)
+ {
+ copy(that);
+ }
+
+ Socket& operator = (const Socket& that)
+ {
+ if (this != &that) {
+ cleanup();
+ copy(that);
+ }
+ return *this;
+ }
+
+ bool operator == (const Socket& that) const
+ {
+ return s == that.s && refs == that.refs;
+ }
+
+ operator int () const
+ {
+ return s;
+ }
+
+private:
+ void copy(const Socket& that)
+ {
+ assert(that.refs > 0);
+ __sync_fetch_and_add(that.refs, 1);
+ refs = that.refs;
+ s = that.s;
+ }
+
+ void cleanup()
+ {
+ assert(refs != NULL);
+ if (__sync_sub_and_fetch(refs, 1) == 0) {
+ delete refs;
+ if (s >= 0) {
+ Try<Nothing> close = os::close(s);
+ if (close.isError()) {
+ std::cerr << "Failed to close socket: " << close.error() << std::endl;
+ abort();
+ }
+ }
+ }
+ }
+
+ int* refs;
+ int s;
+};
+
+#endif // __PROCESS_SOCKET_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/statistics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/statistics.hpp b/3rdparty/libprocess/include/process/statistics.hpp
new file mode 100644
index 0000000..fbae641
--- /dev/null
+++ b/3rdparty/libprocess/include/process/statistics.hpp
@@ -0,0 +1,160 @@
+#ifndef __PROCESS_STATISTICS_HPP__
+#define __PROCESS_STATISTICS_HPP__
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/time.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/owned.hpp>
+
+namespace process {
+
+// Forward declarations.
+class Statistics;
+class StatisticsProcess;
+
+namespace meters {
+ class Meter;
+ class TimeRate;
+}
+
+
+// Libprocess statistics handle.
+// To be used from anywhere to manage statistics.
+//
+// Ex: process::statistics->increment("http", "num_requests");
+// process::statistics->set("http", "response_size", response.size());
+//
+// Statistics are exposed via JSON for external visibility.
+extern Statistics* statistics;
+
+const Duration STATISTICS_TRUNCATION_INTERVAL = Minutes(5);
+
+// Provides an in-memory time series of statistics over some window
+// (values are truncated outside of the window, but no limit is
+// currently placed on the number of values within a window).
+//
+// TODO(bmahler): Time series granularity should be coarsened over
+// time. This means, for high-frequency statistics, we keep a lot of
+// recent data points (fine granularity), and keep fewer older data
+// points (coarse granularity). The tunable bit here could be the
+// total number of data points to keep around, which informs how
+// often to delete older data points, while still keeping a window
+// worth of data.
+class Statistics
+{
+public:
+ Statistics(const Duration& window);
+ ~Statistics();
+
+ // Returns the time series of a statistic.
+ process::Future<std::map<Time, double> > timeseries(
+ const std::string& context,
+ const std::string& name,
+ const Option<Time>& start = None(),
+ const Option<Time>& stop = None());
+
+ // Returns the latest value of a statistic.
+ process::Future<Option<double> > get(
+ const std::string& context,
+ const std::string& name);
+
+ // Returns the latest values of all statistics in the context.
+ process::Future<std::map<std::string, double> > get(
+ const std::string& context);
+
+ // Adds a meter for the statistic with the provided context and name.
+ // get(context, meter->name) will return the metered time series.
+ // Returns an error if:
+ // -meter->name == name, or
+ // -The meter already exists.
+ Future<Try<Nothing> > meter(
+ const std::string& context,
+ const std::string& name,
+ Owned<meters::Meter> meter);
+
+ // Sets the current value of a statistic at the current clock time
+ // or at a specified time.
+ void set(
+ const std::string& context,
+ const std::string& name,
+ double value,
+ const Time& time = Clock::now());
+
+ // Archives the provided statistic time series, and any meters associated
+ // with it. This means three things:
+ // 1. The statistic will no longer be part of the snapshot.
+ // 2. However, the time series will be retained until the window expiration.
+ // 3. All meters associated with this statistic will be removed, both
+ // (1) and (2) will apply to the metered time series as well.
+ void archive(const std::string& context, const std::string& name);
+
+ // Increments the current value of a statistic. If no statistic was
+ // previously present, an initial value of 0.0 is used.
+ void increment(const std::string& context, const std::string& name);
+
+ // Decrements the current value of a statistic. If no statistic was
+ // previously present, an initial value of 0.0 is used.
+ void decrement(const std::string& context, const std::string& name);
+
+private:
+ StatisticsProcess* process;
+};
+
+
+namespace meters {
+
+// This is the interface for statistical meters.
+// Meters provide additional metering on top of the raw statistical
+// value. Ex: Track the maximum, average, rate, etc.
+class Meter
+{
+protected:
+ Meter(const std::string& _name) : name(_name) {}
+
+public:
+ virtual ~Meter() {}
+
+ // Updates the meter with another input value.
+ // Returns the new metered value, or none if no metered value can be produced.
+ virtual Option<double> update(const Time& time, double value) = 0;
+
+ const std::string name;
+};
+
+
+// Tracks the percent of time 'used' since the last update.
+// Input values to this meter must be in seconds.
+class TimeRate : public Meter
+{
+public:
+ TimeRate(const std::string& name)
+ : Meter(name), time(None()), value(0) {}
+
+ virtual ~TimeRate() {}
+
+ virtual Option<double> update(const Time& _time, double _value)
+ {
+ Option<double> rate;
+ if (time.isSome()) {
+ rate = (_value - value) / (_time - time.get()).secs();
+ }
+
+ time = _time;
+ value = _value;
+ return rate;
+ }
+
+private:
+ Option<Time> time;
+ double value;
+};
+
+} // namespace meters {
+} // namespace process {
+
+#endif // __PROCESS_STATISTICS_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/thread.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/thread.hpp b/3rdparty/libprocess/include/process/thread.hpp
new file mode 100644
index 0000000..3e3b5d2
--- /dev/null
+++ b/3rdparty/libprocess/include/process/thread.hpp
@@ -0,0 +1,49 @@
+#ifndef __PROCESS_THREAD_HPP__
+#define __PROCESS_THREAD_HPP__
+
+#include <pthread.h>
+#include <stdio.h> // For perror.
+#include <stdlib.h> // For abort.
+
+template <typename T>
+struct ThreadLocal
+{
+ ThreadLocal()
+ {
+ if (pthread_key_create(&key, NULL) != 0) {
+ perror("Failed to create thread local, pthread_key_create");
+ abort();
+ }
+ }
+
+ ThreadLocal<T>& operator = (T* t)
+ {
+ if (pthread_setspecific(key, t) != 0) {
+ perror("Failed to set thread local, pthread_setspecific");
+ abort();
+ }
+ return *this;
+ }
+
+ operator T* () const
+ {
+ return reinterpret_cast<T*>(pthread_getspecific(key));
+ }
+
+ T* operator -> () const
+ {
+ return reinterpret_cast<T*>(pthread_getspecific(key));
+ }
+
+private:
+ // Not expecting any other operators to be used (and the rest?).
+ bool operator * (const ThreadLocal<T>&) const;
+ bool operator == (const ThreadLocal<T>&) const;
+ bool operator != (const ThreadLocal<T>&) const;
+ bool operator < (const ThreadLocal<T>&) const;
+ bool operator > (const ThreadLocal<T>&) const;
+
+ pthread_key_t key;
+};
+
+#endif // __PROCESS_THREAD_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/time.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/time.hpp b/3rdparty/libprocess/include/process/time.hpp
new file mode 100644
index 0000000..307fd2c
--- /dev/null
+++ b/3rdparty/libprocess/include/process/time.hpp
@@ -0,0 +1,124 @@
+#ifndef __PROCESS_TIME_HPP__
+#define __PROCESS_TIME_HPP__
+
+#include <iomanip>
+
+#include <glog/logging.h>
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+// Represents an instant in time.
+class Time
+{
+public:
+ // Constructs a time at the Epoch. It is needed because collections
+ // (e.g., std::map) require a default constructor to construct
+ // empty values.
+ Time() : sinceEpoch(Duration::zero()) {}
+
+ static Time EPOCH;
+ static Time MAX;
+
+ static Try<Time> create(double secs)
+ {
+ Try<Duration> duration = Duration::create(secs);
+ if (duration.isSome()) {
+ return Time(duration.get());
+ } else {
+ return Error("Argument too large for Time: " + duration.error());
+ }
+ }
+
+ Duration duration() const { return sinceEpoch; }
+
+ double secs() const { return sinceEpoch.secs(); }
+
+ bool operator < (const Time& t) const { return sinceEpoch < t.sinceEpoch; }
+ bool operator <= (const Time& t) const { return sinceEpoch <= t.sinceEpoch; }
+ bool operator > (const Time& t) const { return sinceEpoch > t.sinceEpoch; }
+ bool operator >= (const Time& t) const { return sinceEpoch >= t.sinceEpoch; }
+ bool operator == (const Time& t) const { return sinceEpoch == t.sinceEpoch; }
+ bool operator != (const Time& t) const { return sinceEpoch != t.sinceEpoch; }
+
+ Time& operator += (const Duration& d)
+ {
+ sinceEpoch += d;
+ return *this;
+ }
+
+ Time& operator -= (const Duration& d)
+ {
+ sinceEpoch -= d;
+ return *this;
+ }
+
+ Duration operator - (const Time& that) const
+ {
+ return sinceEpoch - that.sinceEpoch;
+ }
+
+ Time operator + (const Duration& duration) const
+ {
+ Time new_ = *this;
+ new_ += duration;
+ return new_;
+ }
+
+ Time operator - (const Duration& duration) const
+ {
+ Time new_ = *this;
+ new_ -= duration;
+ return new_;
+ }
+
+private:
+ Duration sinceEpoch;
+
+ // Made it private to avoid the confusion between Time and Duration.
+ // Users should explicitly use Clock::now() and Time::create() to
+ // create a new time instance.
+ Time(const Duration& _sinceEpoch) : sinceEpoch(_sinceEpoch) {}
+};
+
+
+// Outputs the time in RFC 3339 Format.
+inline std::ostream& operator << (std::ostream& stream, const Time& time)
+{
+ // Round down the secs to use it with strftime and then append the
+ // fraction part.
+ long secs = static_cast<long>(time.secs());
+ char date[64];
+
+ // The RFC 3339 Format.
+ tm* tm_ = gmtime(&secs);
+ if (tm_ == NULL) {
+ LOG(ERROR) << "Cannot convert the 'time' to a tm struct using gmtime(): "
+ << errno;
+ return stream;
+ }
+
+ strftime(date, 64, "%Y-%m-%d %H:%M:%S", tm_);
+ stream << date;
+
+ // Append the fraction part in nanoseconds.
+ int64_t nsecs = (time.duration() - Seconds(secs)).ns();
+
+ if (nsecs != 0) {
+ char prev = stream.fill();
+
+ // 9 digits for nanosecond level precision.
+ stream << "." << std::setfill('0') << std::setw(9) << nsecs;
+
+ // Return the stream to original formatting state.
+ stream.fill(prev);
+ }
+
+ stream << "+00:00";
+ return stream;
+}
+
+} // namespace process {
+
+#endif // __PROCESS_TIME_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/include/process/timeout.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timeout.hpp b/3rdparty/libprocess/include/process/timeout.hpp
new file mode 100644
index 0000000..4634b9f
--- /dev/null
+++ b/3rdparty/libprocess/include/process/timeout.hpp
@@ -0,0 +1,84 @@
+#ifndef __PROCESS_TIMEOUT_HPP__
+#define __PROCESS_TIMEOUT_HPP__
+
+#include <process/process.hpp>
+
+#include <process/time.hpp>
+
+#include <stout/duration.hpp>
+
+
+namespace process {
+
+class Timeout
+{
+public:
+ Timeout() : timeout(Clock::now()) {}
+
+ Timeout(const Time& time) : timeout(time) {}
+
+ Timeout(const Timeout& that) : timeout(that.timeout) {}
+
+ // Constructs a Timeout instance from a Time that is the 'duration'
+ // from now.
+ static Timeout in(const Duration& duration)
+ {
+ return Timeout(Clock::now() + duration);
+ }
+
+ Timeout& operator = (const Timeout& that)
+ {
+ if (this != &that) {
+ timeout = that.timeout;
+ }
+
+ return *this;
+ }
+
+ Timeout& operator = (const Duration& duration)
+ {
+ timeout = Clock::now() + duration;
+ return *this;
+ }
+
+ bool operator == (const Timeout& that) const
+ {
+ return timeout == that.timeout;
+ }
+
+ bool operator < (const Timeout& that) const
+ {
+ return timeout < that.timeout;
+ }
+
+ bool operator <= (const Timeout& that) const
+ {
+ return timeout <= that.timeout;
+ }
+
+ // Returns the value of the timeout as a Time object.
+ Time time() const
+ {
+ return timeout;
+ }
+
+ // Returns the amount of time remaining.
+ Duration remaining() const
+ {
+ Duration remaining = timeout - Clock::now();
+ return remaining > Duration::zero() ? remaining : Duration::zero();
+ }
+
+ // Returns true if the timeout expired.
+ bool expired() const
+ {
+ return timeout <= Clock::now();
+ }
+
+private:
+ Time timeout;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_TIMEOUT_HPP__