You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/13 02:55:08 UTC

svn commit: r1397753 - in /incubator/mesos/trunk/third_party/libprocess/src: process.cpp tests.cpp

Author: benh
Date: Sat Oct 13 00:55:08 2012
New Revision: 1397753

URL: http://svn.apache.org/viewvc?rev=1397753&view=rev
Log:
Added streaming HTTP responses (via pipes).

Review: https://reviews.apache.org/r/7544

Modified:
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp
    incubator/mesos/trunk/third_party/libprocess/src/tests.cpp

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1397753&r1=1397752&r2=1397753&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Sat Oct 13 00:55:08 2012
@@ -259,9 +259,17 @@ public:
   void handle(Future<Response>* future, bool persist);
 
 private:
+  // Starts "waiting" on the next available future response.
   void next();
+
+  // Invoked once a future response has been satisfied.
   void waited(const Future<Response>& future);
-  void process(Future<Response>* future, bool persist);
+
+  // Demuxes and handles a response.
+  bool process(const Future<Response>& future, bool persist);
+
+  // Handles stream (i.e., pipe) based responses.
+  void stream(const Future<short>& poll, bool persist);
 
   Socket socket; // Wrap the socket to keep it from getting closed.
 
@@ -282,6 +290,8 @@ private:
   };
 
   queue<Item*> items;
+
+  Option<int> pipe; // Current pipe, if streaming.
 };
 
 
@@ -1415,8 +1425,27 @@ HttpProxy::HttpProxy(const Socket& _sock
 
 HttpProxy::~HttpProxy()
 {
+  // Need to make sure response producers know not to continue to
+  // create a response (streaming or otherwise).
+  if (pipe.isSome()) {
+    close(pipe.get());
+  }
+  pipe = Option<int>::none();
+
   while (!items.empty()) {
     Item* item = items.front();
+
+    // Attempt to discard the future.
+    item->future->discard();
+
+    // But it might have already been ready ...
+    if (item->future->isReady()) {
+      const Response& response = item->future->get();
+      if (response.type == Response::PIPE) {
+        close(response.pipe);
+      }
+    }
+
     items.pop();
     delete item;
   }
@@ -1453,28 +1482,32 @@ void HttpProxy::waited(const Future<Resp
 {
   CHECK(items.size() > 0);
   Item* item = items.front();
+
   CHECK(future == *item->future);
 
-  if (item->future->isReady()) {
-    process(item->future, item->persist);
-  } else {
-    // TODO(benh): Consider handling other "states" of future
-    // (discarded, failed, etc) with different HTTP statuses.
-    socket_manager->send(ServiceUnavailable(), socket, item->persist);
-  }
+  // Process the item and determine if we're done or not (so we know
+  // whether to start waiting on the next responses).
+  bool processed = process(*item->future, item->persist);
 
   items.pop();
   delete item;
 
-  next();
+  if (processed) {
+    next();
+  }
 }
 
 
-void HttpProxy::process(Future<Response>* future, bool persist)
+bool HttpProxy::process(const Future<Response>& future, bool persist)
 {
-  CHECK(future->isReady());
+  if (!future.isReady()) {
+    // TODO(benh): Consider handling other "states" of future
+    // (discarded, failed, etc) with different HTTP statuses.
+    socket_manager->send(ServiceUnavailable(), socket, persist);
+    return true; // All done, can process next response.
+  }
 
-  Response response = future->get();
+  Response response = future.get();
 
   // Don't persist connection if headers include 'Connection: close'.
   if (response.headers.count("Connection") > 0) {
@@ -1512,14 +1545,14 @@ void HttpProxy::process(Future<Response>
         socket_manager->send(NotFound(), socket, persist);
       } else {
         // While the user is expected to properly set a 'Content-Type'
-        // header, we fill in the 'Content-Length' header.
+        // header, we fill in (or overwrite) 'Content-Length' header.
         stringstream out;
         out << s.st_size;
         response.headers["Content-Length"] = out.str();
 
         if (s.st_size == 0) {
           socket_manager->send(response, socket, persist);
-          return;
+          return true; // All done, can process next request.
         }
 
         VLOG(1) << "Sending file at '" << path << "' with length " << s.st_size;
@@ -1533,9 +1566,100 @@ void HttpProxy::process(Future<Response>
         socket_manager->send(encoder, socket, persist);
       }
     }
+  } else if (response.type == Response::PIPE) {
+    // Make sure no body is sent (this is really an error and
+    // should be reported and no response sent.
+    response.body.clear();
+
+    // Make sure the pipe is nonblocking.
+    Try<Nothing> nonblock = os::nonblock(response.pipe);
+    if (nonblock.isError()) {
+      const char* error = strerror(errno);
+      VLOG(1) << "Failed make pipe nonblocking: " << error;
+      socket_manager->send(InternalServerError(), socket, persist);
+      return true; // All done, can process next response.
+    }
+
+    // While the user is expected to properly set a 'Content-Type'
+    // header, we fill in (or overwrite) 'Transfer-Encoding' header.
+    response.headers["Transfer-Encoding"] = "chunked";
+
+    VLOG(1) << "Starting \"chunked\" streaming";
+
+    socket_manager->send(response, socket, true);
+
+    pipe = response.pipe;
+
+    io::poll(pipe.get(), io::READ).onAny(
+        defer(self(), &Self::stream, lambda::_1, persist));
+
+    return false; // Streaming, don't process next response (yet)!
   } else {
     socket_manager->send(response, socket, persist);
   }
+
+  return true; // All done, can process next response.
+}
+
+
+void HttpProxy::stream(const Future<short>& poll, bool persist)
+{
+  // TODO(benh): Use 'splice' on Linux.
+
+  CHECK(pipe.isSome());
+
+  bool finished = false; // Whether or not we're done streaming.
+
+  if (poll.isReady()) {
+    // Read and write.
+    CHECK(poll.get() == io::READ);
+    const size_t size = 4 * 1024; // 4K.
+    char data[size];
+    while (!finished) {
+      ssize_t length = ::read(pipe.get(), data, size);
+      if (length < 0 && (errno == EINTR)) {
+        // Interrupted, try again now.
+        continue;
+      } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+        // Might block, try again later.
+        io::poll(pipe.get(), io::READ).onAny(
+            defer(self(), &Self::stream, lambda::_1, persist));
+        break;
+      } else {
+        std::ostringstream out;
+        if (length <= 0) {
+          // Error or closed, treat both as closed.
+          if (length < 0) {
+            // Error.
+            const char* error = strerror(errno);
+            VLOG(1) << "Read error while streaming: " << error;
+          }
+          out << "0\r\n" << "\r\n";
+          finished = true;
+        } else {
+          // Data!
+          out << std::hex << length << "\r\n";
+          out.write(data, length);
+          out << "\r\n";
+        }
+        socket_manager->send(new DataEncoder(out.str()), socket, persist);
+      }
+    }
+  } else if (poll.isFailed()) {
+    VLOG(1) << "Failed to poll: " << poll.failure();
+    socket_manager->send(InternalServerError(), socket, persist);
+    finished = true;
+  } else {
+    VLOG(1) << "Unexpected discarded future while polling";
+    socket_manager->send(InternalServerError(), socket, persist);
+    finished = true;
+  }
+
+  if (finished) {
+    close(pipe.get());
+    pipe = Option<int>::none();
+    next();
+  }
 }
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1397753&r1=1397752&r2=1397753&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Sat Oct 13 00:55:08 2012
@@ -32,6 +32,7 @@ using namespace process;
 
 using testing::_;
 using testing::Assign;
+using testing::DoAll;
 using testing::Return;
 using testing::ReturnArg;
 
@@ -1092,10 +1093,12 @@ class HttpProcess : public Process<HttpP
 public:
   HttpProcess()
   {
-    route("/handler", &HttpProcess::handler);
+    route("/body", &HttpProcess::body);
+    route("/pipe", &HttpProcess::pipe);
   }
 
-  MOCK_METHOD1(handler, Future<http::Response>(const http::Request&));
+  MOCK_METHOD1(body, Future<http::Response>(const http::Request&));
+  MOCK_METHOD1(pipe, Future<http::Response>(const http::Request&));
 };
 
 
@@ -1105,11 +1108,9 @@ TEST(Process, http)
 
   HttpProcess process;
 
-  EXPECT_CALL(process, handler(_))
-    .WillOnce(Return(http::OK()));
-
   spawn(process);
 
+  // First hit '/body' (using explicit sockets and HTTP/1.0).
   int s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
 
   ASSERT_LE(0, s);
@@ -1123,14 +1124,16 @@ TEST(Process, http)
   ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
 
   std::ostringstream out;
-
-  out << "GET /" << process.self().id << "/" << "handler"
+  out << "GET /" << process.self().id << "/body"
       << " HTTP/1.0\r\n"
       << "Connection: Keep-Alive\r\n"
       << "\r\n";
 
   const std::string& data = out.str();
 
+  EXPECT_CALL(process, body(_))
+    .WillOnce(Return(http::OK()));
+
   ASSERT_EQ(data.size(), write(s, data.data(), data.size()));
 
   std::string response = "HTTP/1.1 200 OK";
@@ -1143,6 +1146,33 @@ TEST(Process, http)
 
   ASSERT_EQ(0, close(s));
 
+  // Now hit '/pipe' (by using http::get).
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  http::OK ok;
+  ok.type = http::Response::PIPE;
+  ok.pipe = pipes[0];
+
+  volatile bool pipeCalled = false;
+
+  EXPECT_CALL(process, pipe(_))
+    .WillOnce(DoAll(Assign(&pipeCalled, true),
+                    Return(ok)));
+
+  Future<http::Response> future = http::get(process.self(), "pipe");
+
+  while (!pipeCalled);
+
+  ASSERT_TRUE(os::write(pipes[1], "Hello World\n").isSome());
+  ASSERT_TRUE(os::close(pipes[1]).isSome());
+
+  future.await(Seconds(1.0));
+  ASSERT_TRUE(future.isReady());
+  ASSERT_EQ(http::statuses[200], future.get().status);
+  ASSERT_EQ("chunked", future.get().headers["Transfer-Encoding"]);
+  ASSERT_EQ("Hello World\n", future.get().body);
+
   terminate(process);
   wait(process);
 }