You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/05/05 07:15:42 UTC

[1/2] git commit: Introduced 'Accepted' responses for libprocess messages.

Repository: mesos
Updated Branches:
  refs/heads/master c2e10ccc4 -> 9926af07b


Introduced 'Accepted' responses for libprocess messages.

Review: https://reviews.apache.org/r/20276


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5fe51cb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5fe51cb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5fe51cb

Branch: refs/heads/master
Commit: d5fe51cb5200862391d7bc1050399d678ecf1d18
Parents: c2e10cc
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Apr 11 13:42:38 2014 -0600
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun May 4 20:18:14 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp    |  14 +++
 3rdparty/libprocess/src/process.cpp             | 123 +++++++++++++++++--
 3rdparty/libprocess/src/tests/process_tests.cpp |  41 ++++++-
 3 files changed, 159 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d5fe51cb/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 0fe7219..06f2596 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -209,6 +209,20 @@ struct OK : Response
 };
 
 
+struct Accepted : Response
+{
+  Accepted()
+  {
+    status = "202 Accepted";
+  }
+
+  explicit Accepted(const std::string& body) : Response(body)
+  {
+    status = "202 Accepted";
+  }
+};
+
+
 struct TemporaryRedirect : Response
 {
   explicit TemporaryRedirect(const std::string& url)

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5fe51cb/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index da2d4ac..c14ecce 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -87,6 +87,7 @@ using namespace process::metrics::internal;
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
+using process::http::Accepted;
 using process::http::BadRequest;
 using process::http::InternalServerError;
 using process::http::NotFound;
@@ -1050,6 +1051,52 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
+// A variant of 'recv_data' that doesn't do anything with the
+// data. Used by sockets created via SocketManager::link as well as
+// SocketManager::send(Message) where we don't care about the data
+// received we mostly just want to know when the socket has been
+// closed.
+void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+  Socket* socket = (Socket*) watcher->data;
+
+  int s = watcher->fd;
+
+  while (true) {
+    const ssize_t size = 80 * 1024;
+    ssize_t length = 0;
+
+    char data[size];
+
+    length = recv(s, data, size, 0);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      break;
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while receiving: " << error;
+      } else {
+        VLOG(1) << "Socket closed while receiving";
+      }
+      socket_manager->close(s);
+      ev_io_stop(loop, watcher);
+      delete socket;
+      delete watcher;
+      break;
+    } else {
+      VLOG(2) << "Ignoring " << length << " bytes of data received "
+              << "on socket used only for sending";
+    }
+  }
+}
+
+
 void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   DataEncoder* encoder = (DataEncoder*) watcher->data;
@@ -1227,7 +1274,7 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
   } else {
     // We're connected! Now let's do some receiving.
     ev_io_stop(loop, watcher);
-    ev_io_init(watcher, recv_data, s, EV_READ);
+    ev_io_init(watcher, ignore_data, s, EV_READ);
     ev_io_start(loop, watcher);
   }
 }
@@ -1931,13 +1978,13 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
       persists[node] = s;
 
-      // Allocate and initialize the decoder and watcher (we really
-      // only "receive" on this socket so that we can react when it
-      // gets closed and generate appropriate lost events).
-      DataDecoder* decoder = new DataDecoder(sockets[s]);
-
+      // Allocate and initialize a watcher for reading data from this
+      // socket. Note that we don't expect to receive anything other
+      // than HTTP '202 Accepted' responses which we anyway ignore.
+      // We do, however, want to react when it gets closed so we can
+      // generate appropriate lost events (since this is a 'link').
       ev_io* watcher = new ev_io();
-      watcher->data = decoder;
+      watcher->data = new Socket(sockets[s]);
 
       // Try and connect to the node using this socket.
       sockaddr_in addr;
@@ -1954,7 +2001,7 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
         // Wait for socket to be connected.
         ev_io_init(watcher, receiving_connect, s, EV_WRITE);
       } else {
-        ev_io_init(watcher, recv_data, s, EV_READ);
+        ev_io_init(watcher, ignore_data, s, EV_READ);
       }
 
       // Enqueue the watcher.
@@ -2103,8 +2150,21 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      // Allocate and initialize the watcher.
+      // Allocate and initialize a watcher for reading data from this
+      // socket. Note that we don't expect to receive anything other
+      // than HTTP '202 Accepted' responses which we anyway ignore.
       ev_io* watcher = new ev_io();
+      watcher->data = new Socket(sockets[s]);
+
+      ev_io_init(watcher, ignore_data, s, EV_READ);
+
+      // Enqueue the watcher.
+      synchronized (watchers) {
+        watchers->push(watcher);
+      }
+
+      // Allocate and initialize a watcher for sending the message.
+      watcher = new ev_io();
       watcher->data = new MessageEncoder(sockets[s], message);
 
       // Try and connect to the node using this socket.
@@ -2250,6 +2310,16 @@ void SocketManager::close(int s)
         proxies.erase(s);
       }
 
+      // We need to stop any 'ignore_data' readers as they may have
+      // the last Socket reference so we shutdown reads but don't do a
+      // full close (since that will be taken care of by ~Socket, see
+      // comment below). Calling 'shutdown' will trigger 'ignore_data'
+      // which will get back a 0 (i.e., EOF) when it tries to read
+      // from the socket. Note we need to do this before we call
+      // 'sockets.erase(s)' to avoid the potential race with the last
+      // reference being in 'sockets'.
+      shutdown(s, SHUT_RD);
+
       dispose.erase(s);
       sockets.erase(s);
     }
@@ -2275,6 +2345,9 @@ void SocketManager::close(int s)
   // on the last reference of our Socket object to close the
   // socket. Note, however, that since socket is no longer in
   // 'sockets' any attempt to send with it will just get ignored.
+  // TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
+  // should keep the file descriptor valid until the last Socket
+  // reference does a close but force all libev watchers to stop?
 }
 
 
@@ -2380,13 +2453,37 @@ bool ProcessManager::handle(
   if (libprocess(request)) {
     Message* message = parse(request);
     if (message != NULL) {
+      // TODO(benh): Use the sender PID when delivering in order to
+      // capture happens-before timing relationships for testing.
+      bool accepted = deliver(message->to, new MessageEvent(message));
+
+      // Get the HttpProxy pid for this socket.
+      PID<HttpProxy> proxy = socket_manager->proxy(socket);
+
+      // Only send back an HTTP response if this isn't from libprocess
+      // (which we determine by looking at the User-Agent). This is
+      // necessary because older versions of libprocess would try and
+      // read the data and parse it as an HTTP request which would
+      // fail thus causing the socket to get closed (but now
+      // libprocess will ignore responses, see ignore_data).
+      Option<string> agent = request->headers.get("User-Agent");
+      if (agent.get("").find("libprocess/") == string::npos) {
+        if (accepted) {
+          VLOG(2) << "Accepted libprocess message to " << request->path;
+          dispatch(proxy, &HttpProxy::enqueue, Accepted(), *request);
+        } else {
+          VLOG(1) << "Failed to handle libprocess message to "
+                  << request->path << ": not found";
+          dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
+        }
+      }
+
       delete request;
-      // TODO(benh): Use the sender PID in order to capture
-      // happens-before timing relationships for testing.
-      return deliver(message->to, new MessageEvent(message));
+
+      return accepted;
     }
 
-    VLOG(1) << "Failed to handle libprocess request: "
+    VLOG(1) << "Failed to handle libprocess message: "
             << request->method << " " << request->path
             << " (User-Agent: " << request->headers["User-Agent"] << ")";
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5fe51cb/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 659b063..41b57a9 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1388,13 +1388,11 @@ TEST(Process, remote)
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
   RemoteProcess process;
+  spawn(process);
 
-  volatile bool handlerCalled = false;
-
+  Future<Nothing> handler;
   EXPECT_CALL(process, handler(_, _))
-    .WillOnce(Assign(&handlerCalled, true));
-
-  spawn(process);
+    .WillOnce(FutureSatisfy(&handler));
 
   int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
 
@@ -1419,7 +1417,38 @@ TEST(Process, remote)
 
   ASSERT_EQ(0, close(s));
 
-  while (!handlerCalled);
+  AWAIT_READY(handler);
+
+  terminate(process);
+  wait(process);
+}
+
+
+// Like the 'remote' test but uses http::post.
+TEST(Process, http)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  RemoteProcess process;
+  spawn(process);
+
+  Future<UPID> pid;
+  Future<string> body;
+  EXPECT_CALL(process, handler(_, _))
+    .WillOnce(DoAll(FutureArg<0>(&pid),
+                    FutureArg<1>(&body)));
+
+  hashmap<string, string> headers;
+  headers["User-Agent"] = "libprocess/";
+
+  Future<http::Response> response =
+    http::post(process.self(), "handler", headers, "hello world");
+
+  AWAIT_READY(body);
+  ASSERT_EQ("hello world", body.get());
+
+  AWAIT_READY(pid);
+  ASSERT_EQ(UPID(), pid.get());
 
   terminate(process);
   wait(process);


[2/2] git commit: Introduced 'Libprocess-From' to identify a libprocess "message".

Posted by be...@apache.org.
Introduced 'Libprocess-From' to identify a libprocess "message".

Review: https://reviews.apache.org/r/20277


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9926af07
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9926af07
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9926af07

Branch: refs/heads/master
Commit: 9926af07bb33873a801dd86a8e1229d1ea6c5588
Parents: d5fe51c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Apr 11 18:17:22 2014 -0600
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun May 4 20:24:11 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp             | 12 ++-
 3rdparty/libprocess/src/process.cpp             | 81 ++++++++++++--------
 3rdparty/libprocess/src/tests/process_tests.cpp | 75 +++++++++++++++++-
 3 files changed, 131 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9926af07/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 7f4593b..9c5aa81 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -106,8 +106,16 @@ public:
     std::ostringstream out;
 
     if (message != NULL) {
-      out << "POST /" << message->to.id << "/" << message->name
-          << " HTTP/1.0\r\n"
+      out << "POST ";
+      // Nothing keeps the 'id' component of a PID from being an empty
+      // string which would create a malformed path that has two
+      // '//' unless we check for it explicitly.
+      // TODO(benh): Make the 'id' part of a PID optional so when it's
+      // missing it's clear that we're simply addressing an ip:port.
+      if (message->to.id != "") {
+        out << "/" << message->to.id;
+      }
+      out << "/" << message->name << " HTTP/1.0\r\n"
           << "User-Agent: libprocess/" << message->from << "\r\n"
           << "Connection: Keep-Alive\r\n";
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9926af07/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c14ecce..58bae5b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -813,9 +813,12 @@ static void transport(Message* message, ProcessBase* sender = NULL)
 
 static bool libprocess(Request* request)
 {
-  return request->method == "POST" &&
-    request->headers.count("User-Agent") > 0 &&
-    request->headers["User-Agent"].find("libprocess/") == 0;
+  return
+    (request->method == "POST" &&
+     request->headers.contains("User-Agent") &&
+     request->headers["User-Agent"].find("libprocess/") == 0) ||
+    (request->method == "POST" &&
+     request->headers.contains("Libprocess-From"));
 }
 
 
@@ -823,44 +826,54 @@ static Message* parse(Request* request)
 {
   // TODO(benh): Do better error handling (to deal with a malformed
   // libprocess message, malicious or otherwise).
-  const string& agent = request->headers["User-Agent"];
-  const string& identifier = "libprocess/";
-  size_t index = agent.find(identifier);
-  if (index != string::npos) {
-    // Okay, now determine 'from'.
-    const UPID from(agent.substr(index + identifier.size(), agent.size()));
-
-    // Now determine 'to'.
-    index = request->path.find('/', 1);
-    index = index != string::npos ? index - 1 : string::npos;
-
-    // Decode possible percent-encoded 'to'.
-    Try<string> decode = http::decode(request->path.substr(1, index));
-
-    if (decode.isError()) {
-      VLOG(2) << "Failed to decode URL path: " << decode.get();
-      return NULL;
-    }
 
-    const UPID to(decode.get(), __ip__, __port__);
+  // First try and determine 'from'.
+  Option<UPID> from = None();
+
+  if (request->headers.contains("Libprocess-From")) {
+    from = UPID(strings::trim(request->headers["Libprocess-From"]));
+  } else {
+    // Try and get 'from' from the User-Agent.
+    const string& agent = request->headers["User-Agent"];
+    const string& identifier = "libprocess/";
+    size_t index = agent.find(identifier);
+    if (index != string::npos) {
+      from = UPID(agent.substr(index + identifier.size(), agent.size()));
+    }
+  }
 
-    // And now determine 'name'.
-    index = index != string::npos ? index + 2: request->path.size();
-    const string& name = request->path.substr(index);
+  if (from.isNone()) {
+    return NULL;
+  }
 
-    VLOG(2) << "Parsed message name '" << name
-            << "' for " << to << " from " << from;
+  // Now determine 'to'.
+  size_t index = request->path.find('/', 1);
+  index = index != string::npos ? index - 1 : string::npos;
 
-    Message* message = new Message();
-    message->name = name;
-    message->from = from;
-    message->to = to;
-    message->body = request->body;
+  // Decode possible percent-encoded 'to'.
+  Try<string> decode = http::decode(request->path.substr(1, index));
 
-    return message;
+  if (decode.isError()) {
+    VLOG(2) << "Failed to decode URL path: " << decode.get();
+    return NULL;
   }
 
-  return NULL;
+  const UPID to(decode.get(), __ip__, __port__);
+
+  // And now determine 'name'.
+  index = index != string::npos ? index + 2: request->path.size();
+  const string& name = request->path.substr(index);
+
+  VLOG(2) << "Parsed message name '" << name
+          << "' for " << to << " from " << from.get();
+
+  Message* message = new Message();
+  message->name = name;
+  message->from = from.get();
+  message->to = to;
+  message->body = request->body;
+
+  return message;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9926af07/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 41b57a9..ebce48c 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1425,7 +1425,7 @@ TEST(Process, remote)
 
 
 // Like the 'remote' test but uses http::post.
-TEST(Process, http)
+TEST(Process, http1)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
@@ -1455,6 +1455,79 @@ TEST(Process, http)
 }
 
 
+// Like 'http1' but using a 'Libprocess-From' header.
+TEST(Process, http2)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  RemoteProcess process;
+  spawn(process);
+
+  // Create a receiving socket so we can get messages back.
+  int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+  ASSERT_LE(0, s);
+
+  // Set up socket.
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_addr.s_addr = INADDR_ANY;
+  addr.sin_port = 0;
+
+  ASSERT_EQ(0, ::bind(s, (sockaddr*) &addr, sizeof(addr)));
+
+  // Create a UPID for 'Libprocess-From' based on the IP and port we
+  // got assigned.
+  socklen_t addrlen = sizeof(addr);
+  ASSERT_EQ(0, getsockname(s, (sockaddr*) &addr, &addrlen));
+
+  UPID from("", addr.sin_addr.s_addr, ntohs(addr.sin_port));
+
+  ASSERT_EQ(0, listen(s, 1));
+
+  Future<UPID> pid;
+  Future<string> body;
+  EXPECT_CALL(process, handler(_, _))
+    .WillOnce(DoAll(FutureArg<0>(&pid),
+                    FutureArg<1>(&body)));
+
+  hashmap<string, string> headers;
+  headers["Libprocess-From"] = stringify(from);
+
+  Future<http::Response> response =
+    http::post(process.self(), "handler", headers, "hello world");
+
+  AWAIT_READY(response);
+  ASSERT_EQ(http::statuses[202], response.get().status);
+
+  AWAIT_READY(body);
+  ASSERT_EQ("hello world", body.get());
+
+  AWAIT_READY(pid);
+  ASSERT_EQ(from, pid.get());
+
+  // Now post a message as though it came from the process.
+  const string name = "reply";
+  post(process.self(), from, name);
+
+  // Accept the incoming connection.
+  memset(&addr, 0, sizeof(addr));
+  addrlen = sizeof(addr);
+
+  int c = ::accept(s, (sockaddr*) &addr, &addrlen);
+  ASSERT_LT(0, c);
+
+  const string data = "POST /" + name + " HTTP/1.0";
+  EXPECT_SOME_EQ(data, os::read(c, data.size()));
+
+  close(c);
+  close(s);
+
+  terminate(process);
+  wait(process);
+}
+
+
 int foo()
 {
   return 1;