You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/05/05 07:15:42 UTC
[1/2] git commit: Introduced 'Accepted' responses for libprocess
messages.
Repository: mesos
Updated Branches:
refs/heads/master c2e10ccc4 -> 9926af07b
Introduced 'Accepted' responses for libprocess messages.
Review: https://reviews.apache.org/r/20276
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5fe51cb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5fe51cb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5fe51cb
Branch: refs/heads/master
Commit: d5fe51cb5200862391d7bc1050399d678ecf1d18
Parents: c2e10cc
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Apr 11 13:42:38 2014 -0600
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun May 4 20:18:14 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 14 +++
3rdparty/libprocess/src/process.cpp | 123 +++++++++++++++++--
3rdparty/libprocess/src/tests/process_tests.cpp | 41 ++++++-
3 files changed, 159 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5fe51cb/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 0fe7219..06f2596 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -209,6 +209,20 @@ struct OK : Response
};
+struct Accepted : Response
+{
+ Accepted()
+ {
+ status = "202 Accepted";
+ }
+
+ explicit Accepted(const std::string& body) : Response(body)
+ {
+ status = "202 Accepted";
+ }
+};
+
+
struct TemporaryRedirect : Response
{
explicit TemporaryRedirect(const std::string& url)
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5fe51cb/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index da2d4ac..c14ecce 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -87,6 +87,7 @@ using namespace process::metrics::internal;
using process::wait; // Necessary on some OS's to disambiguate.
+using process::http::Accepted;
using process::http::BadRequest;
using process::http::InternalServerError;
using process::http::NotFound;
@@ -1050,6 +1051,52 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
}
+// A variant of 'recv_data' that doesn't do anything with the
+// data. Used by sockets created via SocketManager::link as well as
+// SocketManager::send(Message) where we don't care about the data
+// received we mostly just want to know when the socket has been
+// closed.
+void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ Socket* socket = (Socket*) watcher->data;
+
+ int s = watcher->fd;
+
+ while (true) {
+ const ssize_t size = 80 * 1024;
+ ssize_t length = 0;
+
+ char data[size];
+
+ length = recv(s, data, size, 0);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ break;
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while receiving: " << error;
+ } else {
+ VLOG(1) << "Socket closed while receiving";
+ }
+ socket_manager->close(s);
+ ev_io_stop(loop, watcher);
+ delete socket;
+ delete watcher;
+ break;
+ } else {
+ VLOG(2) << "Ignoring " << length << " bytes of data received "
+ << "on socket used only for sending";
+ }
+ }
+}
+
+
void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
DataEncoder* encoder = (DataEncoder*) watcher->data;
@@ -1227,7 +1274,7 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
} else {
// We're connected! Now let's do some receiving.
ev_io_stop(loop, watcher);
- ev_io_init(watcher, recv_data, s, EV_READ);
+ ev_io_init(watcher, ignore_data, s, EV_READ);
ev_io_start(loop, watcher);
}
}
@@ -1931,13 +1978,13 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
persists[node] = s;
- // Allocate and initialize the decoder and watcher (we really
- // only "receive" on this socket so that we can react when it
- // gets closed and generate appropriate lost events).
- DataDecoder* decoder = new DataDecoder(sockets[s]);
-
+ // Allocate and initialize a watcher for reading data from this
+ // socket. Note that we don't expect to receive anything other
+ // than HTTP '202 Accepted' responses which we anyway ignore.
+ // We do, however, want to react when it gets closed so we can
+ // generate appropriate lost events (since this is a 'link').
ev_io* watcher = new ev_io();
- watcher->data = decoder;
+ watcher->data = new Socket(sockets[s]);
// Try and connect to the node using this socket.
sockaddr_in addr;
@@ -1954,7 +2001,7 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
// Wait for socket to be connected.
ev_io_init(watcher, receiving_connect, s, EV_WRITE);
} else {
- ev_io_init(watcher, recv_data, s, EV_READ);
+ ev_io_init(watcher, ignore_data, s, EV_READ);
}
// Enqueue the watcher.
@@ -2103,8 +2150,21 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
- // Allocate and initialize the watcher.
+ // Allocate and initialize a watcher for reading data from this
+ // socket. Note that we don't expect to receive anything other
+ // than HTTP '202 Accepted' responses which we anyway ignore.
ev_io* watcher = new ev_io();
+ watcher->data = new Socket(sockets[s]);
+
+ ev_io_init(watcher, ignore_data, s, EV_READ);
+
+ // Enqueue the watcher.
+ synchronized (watchers) {
+ watchers->push(watcher);
+ }
+
+ // Allocate and initialize a watcher for sending the message.
+ watcher = new ev_io();
watcher->data = new MessageEncoder(sockets[s], message);
// Try and connect to the node using this socket.
@@ -2250,6 +2310,16 @@ void SocketManager::close(int s)
proxies.erase(s);
}
+ // We need to stop any 'ignore_data' readers as they may have
+ // the last Socket reference so we shutdown reads but don't do a
+ // full close (since that will be taken care of by ~Socket, see
+ // comment below). Calling 'shutdown' will trigger 'ignore_data'
+ // which will get back a 0 (i.e., EOF) when it tries to read
+ // from the socket. Note we need to do this before we call
+ // 'sockets.erase(s)' to avoid the potential race with the last
+ // reference being in 'sockets'.
+ shutdown(s, SHUT_RD);
+
dispose.erase(s);
sockets.erase(s);
}
@@ -2275,6 +2345,9 @@ void SocketManager::close(int s)
// on the last reference of our Socket object to close the
// socket. Note, however, that since socket is no longer in
// 'sockets' any attempt to send with it will just get ignored.
+ // TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
+ // should keep the file descriptor valid until the last Socket
+ // reference does a close but force all libev watchers to stop?
}
@@ -2380,13 +2453,37 @@ bool ProcessManager::handle(
if (libprocess(request)) {
Message* message = parse(request);
if (message != NULL) {
+ // TODO(benh): Use the sender PID when delivering in order to
+ // capture happens-before timing relationships for testing.
+ bool accepted = deliver(message->to, new MessageEvent(message));
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(socket);
+
+ // Only send back an HTTP response if this isn't from libprocess
+ // (which we determine by looking at the User-Agent). This is
+ // necessary because older versions of libprocess would try and
+ // read the data and parse it as an HTTP request which would
+ // fail thus causing the socket to get closed (but now
+ // libprocess will ignore responses, see ignore_data).
+ Option<string> agent = request->headers.get("User-Agent");
+ if (agent.get("").find("libprocess/") == string::npos) {
+ if (accepted) {
+ VLOG(2) << "Accepted libprocess message to " << request->path;
+ dispatch(proxy, &HttpProxy::enqueue, Accepted(), *request);
+ } else {
+ VLOG(1) << "Failed to handle libprocess message to "
+ << request->path << ": not found";
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
+ }
+ }
+
delete request;
- // TODO(benh): Use the sender PID in order to capture
- // happens-before timing relationships for testing.
- return deliver(message->to, new MessageEvent(message));
+
+ return accepted;
}
- VLOG(1) << "Failed to handle libprocess request: "
+ VLOG(1) << "Failed to handle libprocess message: "
<< request->method << " " << request->path
<< " (User-Agent: " << request->headers["User-Agent"] << ")";
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5fe51cb/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 659b063..41b57a9 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1388,13 +1388,11 @@ TEST(Process, remote)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
RemoteProcess process;
+ spawn(process);
- volatile bool handlerCalled = false;
-
+ Future<Nothing> handler;
EXPECT_CALL(process, handler(_, _))
- .WillOnce(Assign(&handlerCalled, true));
-
- spawn(process);
+ .WillOnce(FutureSatisfy(&handler));
int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
@@ -1419,7 +1417,38 @@ TEST(Process, remote)
ASSERT_EQ(0, close(s));
- while (!handlerCalled);
+ AWAIT_READY(handler);
+
+ terminate(process);
+ wait(process);
+}
+
+
+// Like the 'remote' test but uses http::post.
+TEST(Process, http)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ RemoteProcess process;
+ spawn(process);
+
+ Future<UPID> pid;
+ Future<string> body;
+ EXPECT_CALL(process, handler(_, _))
+ .WillOnce(DoAll(FutureArg<0>(&pid),
+ FutureArg<1>(&body)));
+
+ hashmap<string, string> headers;
+ headers["User-Agent"] = "libprocess/";
+
+ Future<http::Response> response =
+ http::post(process.self(), "handler", headers, "hello world");
+
+ AWAIT_READY(body);
+ ASSERT_EQ("hello world", body.get());
+
+ AWAIT_READY(pid);
+ ASSERT_EQ(UPID(), pid.get());
terminate(process);
wait(process);
[2/2] git commit: Introduced 'Libprocess-From' to identify a
libprocess "message".
Posted by be...@apache.org.
Introduced 'Libprocess-From' to identify a libprocess "message".
Review: https://reviews.apache.org/r/20277
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9926af07
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9926af07
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9926af07
Branch: refs/heads/master
Commit: 9926af07bb33873a801dd86a8e1229d1ea6c5588
Parents: d5fe51c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Apr 11 18:17:22 2014 -0600
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun May 4 20:24:11 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/encoder.hpp | 12 ++-
3rdparty/libprocess/src/process.cpp | 81 ++++++++++++--------
3rdparty/libprocess/src/tests/process_tests.cpp | 75 +++++++++++++++++-
3 files changed, 131 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9926af07/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 7f4593b..9c5aa81 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -106,8 +106,16 @@ public:
std::ostringstream out;
if (message != NULL) {
- out << "POST /" << message->to.id << "/" << message->name
- << " HTTP/1.0\r\n"
+ out << "POST ";
+ // Nothing keeps the 'id' component of a PID from being an empty
+ // string which would create a malformed path that has two
+ // '//' unless we check for it explicitly.
+ // TODO(benh): Make the 'id' part of a PID optional so when it's
+ // missing it's clear that we're simply addressing an ip:port.
+ if (message->to.id != "") {
+ out << "/" << message->to.id;
+ }
+ out << "/" << message->name << " HTTP/1.0\r\n"
<< "User-Agent: libprocess/" << message->from << "\r\n"
<< "Connection: Keep-Alive\r\n";
http://git-wip-us.apache.org/repos/asf/mesos/blob/9926af07/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c14ecce..58bae5b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -813,9 +813,12 @@ static void transport(Message* message, ProcessBase* sender = NULL)
static bool libprocess(Request* request)
{
- return request->method == "POST" &&
- request->headers.count("User-Agent") > 0 &&
- request->headers["User-Agent"].find("libprocess/") == 0;
+ return
+ (request->method == "POST" &&
+ request->headers.contains("User-Agent") &&
+ request->headers["User-Agent"].find("libprocess/") == 0) ||
+ (request->method == "POST" &&
+ request->headers.contains("Libprocess-From"));
}
@@ -823,44 +826,54 @@ static Message* parse(Request* request)
{
// TODO(benh): Do better error handling (to deal with a malformed
// libprocess message, malicious or otherwise).
- const string& agent = request->headers["User-Agent"];
- const string& identifier = "libprocess/";
- size_t index = agent.find(identifier);
- if (index != string::npos) {
- // Okay, now determine 'from'.
- const UPID from(agent.substr(index + identifier.size(), agent.size()));
-
- // Now determine 'to'.
- index = request->path.find('/', 1);
- index = index != string::npos ? index - 1 : string::npos;
-
- // Decode possible percent-encoded 'to'.
- Try<string> decode = http::decode(request->path.substr(1, index));
-
- if (decode.isError()) {
- VLOG(2) << "Failed to decode URL path: " << decode.get();
- return NULL;
- }
- const UPID to(decode.get(), __ip__, __port__);
+ // First try and determine 'from'.
+ Option<UPID> from = None();
+
+ if (request->headers.contains("Libprocess-From")) {
+ from = UPID(strings::trim(request->headers["Libprocess-From"]));
+ } else {
+ // Try and get 'from' from the User-Agent.
+ const string& agent = request->headers["User-Agent"];
+ const string& identifier = "libprocess/";
+ size_t index = agent.find(identifier);
+ if (index != string::npos) {
+ from = UPID(agent.substr(index + identifier.size(), agent.size()));
+ }
+ }
- // And now determine 'name'.
- index = index != string::npos ? index + 2: request->path.size();
- const string& name = request->path.substr(index);
+ if (from.isNone()) {
+ return NULL;
+ }
- VLOG(2) << "Parsed message name '" << name
- << "' for " << to << " from " << from;
+ // Now determine 'to'.
+ size_t index = request->path.find('/', 1);
+ index = index != string::npos ? index - 1 : string::npos;
- Message* message = new Message();
- message->name = name;
- message->from = from;
- message->to = to;
- message->body = request->body;
+ // Decode possible percent-encoded 'to'.
+ Try<string> decode = http::decode(request->path.substr(1, index));
- return message;
+ if (decode.isError()) {
+ VLOG(2) << "Failed to decode URL path: " << decode.get();
+ return NULL;
}
- return NULL;
+ const UPID to(decode.get(), __ip__, __port__);
+
+ // And now determine 'name'.
+ index = index != string::npos ? index + 2: request->path.size();
+ const string& name = request->path.substr(index);
+
+ VLOG(2) << "Parsed message name '" << name
+ << "' for " << to << " from " << from.get();
+
+ Message* message = new Message();
+ message->name = name;
+ message->from = from.get();
+ message->to = to;
+ message->body = request->body;
+
+ return message;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9926af07/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 41b57a9..ebce48c 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1425,7 +1425,7 @@ TEST(Process, remote)
// Like the 'remote' test but uses http::post.
-TEST(Process, http)
+TEST(Process, http1)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
@@ -1455,6 +1455,79 @@ TEST(Process, http)
}
+// Like 'http1' but using a 'Libprocess-From' header.
+TEST(Process, http2)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ RemoteProcess process;
+ spawn(process);
+
+ // Create a receiving socket so we can get messages back.
+ int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+ ASSERT_LE(0, s);
+
+ // Set up socket.
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_addr.s_addr = INADDR_ANY;
+ addr.sin_port = 0;
+
+ ASSERT_EQ(0, ::bind(s, (sockaddr*) &addr, sizeof(addr)));
+
+ // Create a UPID for 'Libprocess-From' based on the IP and port we
+ // got assigned.
+ socklen_t addrlen = sizeof(addr);
+ ASSERT_EQ(0, getsockname(s, (sockaddr*) &addr, &addrlen));
+
+ UPID from("", addr.sin_addr.s_addr, ntohs(addr.sin_port));
+
+ ASSERT_EQ(0, listen(s, 1));
+
+ Future<UPID> pid;
+ Future<string> body;
+ EXPECT_CALL(process, handler(_, _))
+ .WillOnce(DoAll(FutureArg<0>(&pid),
+ FutureArg<1>(&body)));
+
+ hashmap<string, string> headers;
+ headers["Libprocess-From"] = stringify(from);
+
+ Future<http::Response> response =
+ http::post(process.self(), "handler", headers, "hello world");
+
+ AWAIT_READY(response);
+ ASSERT_EQ(http::statuses[202], response.get().status);
+
+ AWAIT_READY(body);
+ ASSERT_EQ("hello world", body.get());
+
+ AWAIT_READY(pid);
+ ASSERT_EQ(from, pid.get());
+
+ // Now post a message as though it came from the process.
+ const string name = "reply";
+ post(process.self(), from, name);
+
+ // Accept the incoming connection.
+ memset(&addr, 0, sizeof(addr));
+ addrlen = sizeof(addr);
+
+ int c = ::accept(s, (sockaddr*) &addr, &addrlen);
+ ASSERT_LT(0, c);
+
+ const string data = "POST /" + name + " HTTP/1.0";
+ EXPECT_SOME_EQ(data, os::read(c, data.size()));
+
+ close(c);
+ close(s);
+
+ terminate(process);
+ wait(process);
+}
+
+
int foo()
{
return 1;