You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/20 07:18:37 UTC

mesos git commit: Introduced an http::Pipe abstraction to simplify streaming HTTP Responses.

Repository: mesos
Updated Branches:
  refs/heads/master 9f8ab2866 -> e76954abb


Introduced an http::Pipe abstraction to simplify streaming HTTP Responses.

Review: https://reviews.apache.org/r/31930


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e76954ab
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e76954ab
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e76954ab

Branch: refs/heads/master
Commit: e76954abb37a30da5bb211829d7033e53d830a7f
Parents: 9f8ab28
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Mar 5 18:33:28 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Thu Mar 19 23:12:24 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 148 +++++++++++++++++++--
 3rdparty/libprocess/src/http.cpp             | 150 ++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp          | 110 +++++++---------
 3rdparty/libprocess/src/tests/http_tests.cpp | 102 +++++++++++++--
 4 files changed, 427 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 10143fd..2b36698 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -8,10 +8,13 @@
 #include <cctype>
 #include <cstdlib>
 #include <iomanip>
+#include <queue>
 #include <sstream>
 #include <string>
 #include <vector>
 
+#include <process/future.hpp>
+#include <process/owned.hpp>
 #include <process/pid.hpp>
 
 #include <stout/error.hpp>
@@ -19,7 +22,9 @@
 #include <stout/hashmap.hpp>
 #include <stout/ip.hpp>
 #include <stout/json.hpp>
+#include <stout/memory.hpp>
 #include <stout/none.hpp>
+#include <stout/nothing.hpp>
 #include <stout/numify.hpp>
 #include <stout/option.hpp>
 #include <stout/stringify.hpp>
@@ -121,6 +126,130 @@ struct Request
 };
 
 
+// Represents an asynchronous in-memory unbuffered Pipe, currently
+// used for streaming HTTP responses via chunked encoding. Note that
+// being an in-memory pipe means that this cannot be used across OS
+// processes.
+//
+// Much like unix pipes, data is read until end-of-file is
+// encountered; this occurs when the the write-end of the pipe is
+// closed and there is no outstanding data left to read.
+//
+// Unlike unix pipes, if the read-end of the pipe is closed before
+// the write-end is closed, rather than receiving SIGPIPE or EPIPE
+// during a write, the writer is notified via a future. Like unix
+// pipes, we are not notified if the read-end is closed after the
+// write-end is closed, even if data is remaining in the pipe!
+//
+// No buffering means that each non-empty write to the pipe will
+// correspond to to an equivalent read from the pipe, and the
+// reader must "keep up" with the writer in order to avoid
+// unbounded memory growth.
+//
+// TODO(bmahler): The writer needs to be able to induce a failure
+// on the reader to signal an error has occurred. For example, if
+// we are receiving a response but a disconnection occurs before
+// the response is completed, we want the reader to detect that a
+// disconnection occurred!
+//
+// TODO(bmahler): Consider aggregating writes into larger reads to
+// help the reader keep up (a process::Stream abstraction with
+// backpressure would obviate the need for this).
+//
+// TODO(bmahler): Add a more general process::Stream<T> abstraction
+// to represent asynchronous finite/infinite streams (possibly
+// with "backpressure" on the writer). This is broadly useful
+// (e.g. allocator can expose Stream<Allocation>, http::Pipe
+// becomes Stream<string>, process::Queue<T> is just an infinite
+// Stream<T> (i.e. completion and error semantics hidden)).
+class Pipe
+{
+private:
+  struct Data; // Forward declaration.
+
+public:
+  class Reader
+  {
+  public:
+    // Returns data written to the pipe.
+    // Returns an empty read when end-of-file is reached.
+    // Returns Failure if the read-end of the pipe is closed.
+    Future<std::string> read();
+
+    // Closing the read-end of the pipe before the write-end closes
+    // will notify the writer that the reader is no longer interested.
+    // Returns false if the read-end of the pipe was already closed.
+    bool close();
+
+  private:
+    friend class Pipe;
+    explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {}
+    memory::shared_ptr<Data> data;
+  };
+
+  class Writer
+  {
+  public:
+    // Returns false if the data could not be written because
+    // either end of the pipe was already closed. Note that an
+    // empty write has no effect.
+    bool write(const std::string& s);
+
+    // Closing the write-end of the pipe will send end-of-file
+    // to the reader. Returns false if the write-end of the pipe
+    // was already closed.
+    bool close();
+
+    // Returns Nothing when the read-end of the pipe is closed
+    // before the write-end is closed, which means the reader
+    // was unable to continue reading!
+    Future<Nothing> readerClosed();
+
+  private:
+    friend class Pipe;
+    explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {}
+    memory::shared_ptr<Data> data;
+  };
+
+  Pipe() : data(new Data()) {}
+
+  Reader reader() const;
+  Writer writer() const;
+
+private:
+  enum State
+  {
+    OPEN,
+    CLOSED,
+  };
+
+  struct Data
+  {
+    Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {}
+
+    // Rather than use a process to serialize access to the pipe's
+    // internal data we use a low-level "lock" which we acquire and
+    // release using atomic builtins.
+    int lock;
+
+    State readEnd;
+    State writeEnd;
+
+    // Represents readers waiting for data from the pipe.
+    std::queue<Owned<Promise<std::string>>> reads;
+
+    // Represents unread writes in the pipe. Note that we omit
+    // empty strings as they serve as a signal for end-of-file.
+    std::queue<std::string> writes;
+
+    // Signals when the read-end is closed before the write-end.
+    Promise<Nothing> readerClosure;
+  };
+
+  memory::shared_ptr<Data> data;
+};
+
+
 struct Response
 {
   Response()
@@ -138,8 +267,8 @@ struct Response
   std::string status;
   hashmap<std::string, std::string> headers;
 
-  // Either provide a "body", an absolute "path" to a file, or a
-  // "pipe" for streaming a response. Distinguish between the cases
+  // Either provide a 'body', an absolute 'path' to a file, or a
+  // 'pipe' for streaming a response. Distinguish between the cases
   // using 'type' below.
   //
   // BODY: Uses 'body' as the body of the response. These may be
@@ -149,13 +278,12 @@ struct Response
   // PATH: Attempts to perform a 'sendfile' operation on the file
   // found at 'path'.
   //
-  // PIPE: Splices data from 'pipe' using 'Transfer-Encoding=chunked'.
-  // Note that the read end of the pipe will be closed by libprocess
-  // either after the write end has been closed or if the socket the
-  // data is being spliced to has been closed (i.e., nobody is
-  // listening any longer). This can cause writes to the pipe to
-  // generate a SIGPIPE (which will terminate your program unless you
-  // explicitly ignore them or handle them).
+  // PIPE: Splices data from the Pipe 'reader' using a "chunked"
+  // 'Transfer-Encoding'. The writer uses a Pipe::Writer to
+  // perform writes and to detect a closed read-end of the Pipe
+  // (i.e. nobody is listening any longer). Once the writer is
+  // finished, it will close its end of the pipe to signal end
+  // of file to the Reader.
   //
   // In all cases (BODY, PATH, PIPE), you are expected to properly
   // specify the 'Content-Type' header, but the 'Content-Length' and
@@ -169,7 +297,7 @@ struct Response
 
   std::string body;
   std::string path;
-  int pipe; // See comment above regarding the semantics for closing.
+  Option<Pipe::Reader> reader;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 7c0cee4..276cecd 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -2,14 +2,17 @@
 
 #include <stdint.h>
 
+#include <algorithm>
 #include <cstring>
 #include <deque>
 #include <iostream>
+#include <queue>
 #include <string>
 #include <vector>
 
 #include <process/future.hpp>
 #include <process/http.hpp>
+#include <process/internal.hpp>
 #include <process/owned.hpp>
 #include <process/socket.hpp>
 
@@ -23,6 +26,7 @@
 #include "decoder.hpp"
 
 using std::deque;
+using std::queue;
 using std::string;
 using std::vector;
 
@@ -35,6 +39,152 @@ using process::network::Socket;
 namespace process {
 namespace http {
 
+Pipe::Reader Pipe::reader() const
+{
+  return Pipe::Reader(data);
+}
+
+
+Pipe::Writer Pipe::writer() const
+{
+  return Pipe::Writer(data);
+}
+
+
+Future<string> Pipe::Reader::read()
+{
+  Future<string> future;
+
+  process::internal::acquire(&data->lock);
+  {
+    if (data->readEnd == CLOSED) {
+      future = Failure("closed");
+    } else if (!data->writes.empty()) {
+      future = data->writes.front();
+      data->writes.pop();
+    } else if (data->writeEnd == CLOSED) {
+      future = ""; // End-of-file.
+    } else {
+      data->reads.push(Owned<Promise<string>>(new Promise<string>()));
+      future = data->reads.back()->future();
+    }
+  }
+  process::internal::release(&data->lock);
+
+  return future;
+}
+
+
+bool Pipe::Reader::close()
+{
+  bool closed = false;
+  bool notify = false;
+  queue<Owned<Promise<string>>> reads;
+
+  process::internal::acquire(&data->lock);
+  {
+    if (data->readEnd == OPEN) {
+      // Throw away outstanding data.
+      while (!data->writes.empty()) {
+        data->writes.pop();
+      }
+
+      // Extract the pending reads so we can fail them.
+      std::swap(data->reads, reads);
+
+      closed = true;
+      data->readEnd = CLOSED;
+
+      // Notify if write-end is still open!
+      notify = data->writeEnd == OPEN;
+    }
+  }
+  process::internal::release(&data->lock);
+
+  // NOTE: We transition the promises outside the critical section
+  // to avoid triggering callbacks that try to reacquire the lock.
+  if (closed) {
+    while (!reads.empty()) {
+      reads.front()->fail("closed");
+      reads.pop();
+    }
+
+    if (notify) {
+      data->readerClosure.set(Nothing());
+    }
+  }
+
+  return closed;
+}
+
+
+bool Pipe::Writer::write(const string& s)
+{
+  bool written = false;
+  Owned<Promise<string>> read;
+
+  process::internal::acquire(&data->lock);
+  {
+    // Ignore writes if either end of the pipe is closed!
+    if (data->writeEnd == OPEN && data->readEnd == OPEN) {
+      // Don't bother surfacing empty writes to the readers.
+      if (!s.empty()) {
+        if (data->reads.empty()) {
+          data->writes.push(s);
+        } else {
+          read = data->reads.front();
+          data->reads.pop();
+        }
+      }
+      written = true;
+    }
+  }
+  process::internal::release(&data->lock);
+
+  // NOTE: We set the promise outside the critical section to avoid
+  // triggering callbacks that try to reacquire the lock.
+  if (read.get() != NULL) {
+    read->set(s);
+  }
+
+  return written;
+}
+
+
+bool Pipe::Writer::close()
+{
+  bool closed = false;
+  queue<Owned<Promise<string>>> reads;
+
+  process::internal::acquire(&data->lock);
+  {
+    if (data->writeEnd == OPEN) {
+      // Extract all the pending reads so we can complete them.
+      std::swap(data->reads, reads);
+
+      data->writeEnd = CLOSED;
+      closed = true;
+    }
+  }
+  process::internal::release(&data->lock);
+
+  // NOTE: We set the promises outside the critical section to avoid
+  // triggering callbacks that try to reacquire the lock.
+  while (!reads.empty()) {
+    reads.front()->set(string("")); // End-of-file.
+    reads.pop();
+  }
+
+  return closed;
+}
+
+
+Future<Nothing> Pipe::Writer::readerClosed()
+{
+  return data->readerClosure.future();
+}
+
+
 hashmap<uint16_t, string> statuses;
 
 namespace query {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e7b029b..10ad670 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -175,8 +175,8 @@ private:
   // Demuxes and handles a response.
   bool process(const Future<Response>& future, const Request& request);
 
-  // Handles stream (i.e., pipe) based responses.
-  void stream(const Future<short>& poll, const Request& request);
+  // Handles stream based responses.
+  void stream(const Request& request, const Future<string>& chunk);
 
   Socket socket; // Wrap the socket to keep it from getting closed.
 
@@ -194,12 +194,14 @@ private:
       delete future;
     }
 
-    // Helper for cleaning up a response (i.e., closing any open pipes
+    // Helper for cleaning up a response (i.e., closing any open Pipes
     // in the event Response::type is PIPE).
     static void cleanup(const Response& response)
     {
       if (response.type == Response::PIPE) {
-        os::close(response.pipe);
+        CHECK_SOME(response.reader);
+        http::Pipe::Reader reader = response.reader.get(); // Remove const.
+        reader.close();
       }
     }
 
@@ -209,7 +211,7 @@ private:
 
   queue<Item*> items;
 
-  Option<int> pipe; // Current pipe, if streaming.
+  Option<http::Pipe::Reader> pipe; // Current pipe, if streaming.
 };
 
 
@@ -939,7 +941,8 @@ HttpProxy::~HttpProxy()
   // Need to make sure response producers know not to continue to
   // create a response (streaming or otherwise).
   if (pipe.isSome()) {
-    os::close(pipe.get());
+    http::Pipe::Reader reader = pipe.get();
+    reader.close();
   }
   pipe = None();
 
@@ -1074,29 +1077,23 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
     // should be reported and no response sent.
     response.body.clear();
 
-    // Make sure the pipe is nonblocking.
-    Try<Nothing> nonblock = os::nonblock(response.pipe);
-    if (nonblock.isError()) {
-      const char* error = strerror(errno);
-      VLOG(1) << "Failed make pipe nonblocking: " << error;
-      socket_manager->send(InternalServerError(), request, socket);
-      return true; // All done, can process next response.
-    }
-
     // While the user is expected to properly set a 'Content-Type'
     // header, we fill in (or overwrite) 'Transfer-Encoding' header.
     response.headers["Transfer-Encoding"] = "chunked";
 
-    VLOG(1) << "Starting \"chunked\" streaming";
+    VLOG(3) << "Starting \"chunked\" streaming";
 
     socket_manager->send(
         new HttpResponseEncoder(socket, response, request),
         true);
 
-    pipe = response.pipe;
+    CHECK_SOME(response.reader);
+    http::Pipe::Reader reader = response.reader.get();
 
-    io::poll(pipe.get(), io::READ).onAny(
-        defer(self(), &Self::stream, lambda::_1, request));
+    pipe = reader;
+
+    reader.read()
+      .onAny(defer(self(), &Self::stream, request, lambda::_1));
 
     return false; // Streaming, don't process next response (yet)!
   } else {
@@ -1107,66 +1104,51 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
 }
 
 
-void HttpProxy::stream(const Future<short>& poll, const Request& request)
+void HttpProxy::stream(
+    const Request& request,
+    const Future<string>& chunk)
 {
-  // TODO(benh): Use 'splice' on Linux.
+  CHECK_SOME(pipe);
 
-  CHECK(pipe.isSome());
+  http::Pipe::Reader reader = pipe.get();
 
   bool finished = false; // Whether we're done streaming.
 
-  if (poll.isReady()) {
-    // Read and write.
-    CHECK(poll.get() == io::READ);
-    const size_t size = 4 * 1024; // 4K.
-    char data[size];
-    while (!finished) {
-      ssize_t length = ::read(pipe.get(), data, size);
-      if (length < 0 && (errno == EINTR)) {
-        // Interrupted, try again now.
-        continue;
-      } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-        // Might block, try again later.
-        io::poll(pipe.get(), io::READ).onAny(
-            defer(self(), &Self::stream, lambda::_1, request));
-        break;
-      } else {
-        std::ostringstream out;
-        if (length <= 0) {
-          // Error or closed, treat both as closed.
-          if (length < 0) {
-            // Error.
-            const char* error = strerror(errno);
-            VLOG(1) << "Read error while streaming: " << error;
-          }
-          out << "0\r\n" << "\r\n";
-          finished = true;
-        } else {
-          // Data!
-          out << std::hex << length << "\r\n";
-          out.write(data, length);
-          out << "\r\n";
-        }
+  if (chunk.isReady()) {
+    std::ostringstream out;
 
-        // We always persist the connection when we're not finished
-        // streaming.
-        socket_manager->send(
-            new DataEncoder(socket, out.str()),
-            finished ? request.keepAlive : true);
-      }
+    if (chunk.get().empty()) {
+      // Finished reading.
+      out << "0\r\n" << "\r\n";
+      finished = true;
+    } else {
+      out << std::hex << chunk.get().size() << "\r\n";
+      out << chunk.get();
+      out << "\r\n";
+
+      // Keep reading.
+      reader.read()
+        .onAny(defer(self(), &Self::stream, request, lambda::_1));
     }
-  } else if (poll.isFailed()) {
-    VLOG(1) << "Failed to poll: " << poll.failure();
+
+    // Always persist the connection when streaming is not finished.
+    socket_manager->send(
+        new DataEncoder(socket, out.str()),
+        finished ? request.keepAlive : true);
+  } else if (chunk.isFailed()) {
+    VLOG(1) << "Failed to read from stream: " << chunk.failure();
+    // TODO(bmahler): Have to close connection if headers were sent!
     socket_manager->send(InternalServerError(), request, socket);
     finished = true;
   } else {
-    VLOG(1) << "Unexpected discarded future while polling";
+    VLOG(1) << "Failed to read from stream: discarded";
+    // TODO(bmahler): Have to close connection if headers were sent!
     socket_manager->send(InternalServerError(), request, socket);
     finished = true;
   }
 
   if (finished) {
-    os::close(pipe.get());
+    reader.close();
     pipe = None();
     next();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 800752a..17fb092 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -143,24 +143,24 @@ TEST(HTTP, Endpoints)
   AWAIT_EXPECT_EQ(response, socket.recv(response.size()));
 
   // Now hit '/pipe' (by using http::get).
-  int pipes[2];
-  ASSERT_NE(-1, ::pipe(pipes));
-
+  http::Pipe pipe;
   http::OK ok;
   ok.type = http::Response::PIPE;
-  ok.pipe = pipes[0];
+  ok.reader = pipe.reader();
 
-  Future<Nothing> pipe;
+  Future<Nothing> request;
   EXPECT_CALL(process, pipe(_))
-    .WillOnce(DoAll(FutureSatisfy(&pipe),
+    .WillOnce(DoAll(FutureSatisfy(&request),
                     Return(ok)));
 
   Future<http::Response> future = http::get(process.self(), "pipe");
 
-  AWAIT_READY(pipe);
+  AWAIT_READY(request);
 
-  ASSERT_SOME(os::write(pipes[1], "Hello World\n"));
-  ASSERT_SOME(os::close(pipes[1]));
+  // Write the response.
+  http::Pipe::Writer writer = pipe.writer();
+  EXPECT_TRUE(writer.write("Hello World\n"));
+  EXPECT_TRUE(writer.close());
 
   AWAIT_READY(future);
   EXPECT_EQ(http::statuses[200], future.get().status);
@@ -172,6 +172,90 @@ TEST(HTTP, Endpoints)
 }
 
 
+TEST(HTTP, PipeEOF)
+{
+  http::Pipe pipe;
+  http::Pipe::Reader reader = pipe.reader();
+  http::Pipe::Writer writer = pipe.writer();
+
+  // A 'read' on an empty pipe should block.
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Writing an empty string should have no effect.
+  EXPECT_TRUE(writer.write(""));
+  EXPECT_TRUE(read.isPending());
+
+  // After a 'write' the pending 'read' should complete.
+  EXPECT_TRUE(writer.write("hello"));
+  ASSERT_TRUE(read.isReady());
+  EXPECT_EQ("hello", read.get());
+
+  // After a 'write' a call to 'read' should be completed immediately.
+  ASSERT_TRUE(writer.write("world"));
+
+  read = reader.read();
+  ASSERT_TRUE(read.isReady());
+  EXPECT_EQ("world", read.get());
+
+  // Close the write end of the pipe and ensure the remaining
+  // data can be read.
+  EXPECT_TRUE(writer.write("!"));
+  EXPECT_TRUE(writer.close());
+  AWAIT_EQ("!", reader.read());
+
+  // End of file should be reached.
+  AWAIT_EQ("", reader.read());
+  AWAIT_EQ("", reader.read());
+
+  // Writes to a pipe with the write end closed are ignored.
+  EXPECT_FALSE(writer.write("!"));
+  AWAIT_EQ("", reader.read());
+
+  // The write end cannot be closed twice.
+  EXPECT_FALSE(writer.close());
+
+  // Close the read end, this should not notify the writer
+  // since the write end was already closed.
+  EXPECT_TRUE(reader.close());
+  EXPECT_TRUE(writer.readerClosed().isPending());
+}
+
+
+TEST(HTTP, PipeReaderCloses)
+{
+  http::Pipe pipe;
+  http::Pipe::Reader reader = pipe.reader();
+  http::Pipe::Writer writer = pipe.writer();
+
+  // If the read end of the pipe is closed,
+  // it should discard any unread data.
+  EXPECT_TRUE(writer.write("hello"));
+  EXPECT_TRUE(writer.write("world"));
+
+  // The writer should discover the closure.
+  Future<Nothing> closed = writer.readerClosed();
+  EXPECT_TRUE(reader.close());
+  EXPECT_TRUE(closed.isReady());
+
+  // The read end is closed, subsequent reads will fail.
+  AWAIT_FAILED(reader.read());
+
+  // The read end is closed, writes are ignored.
+  EXPECT_FALSE(writer.write("!"));
+  AWAIT_FAILED(reader.read());
+
+  // The read end cannot be closed twice.
+  EXPECT_FALSE(reader.close());
+
+  // Close the write end.
+  EXPECT_TRUE(writer.close());
+
+  // Reads should fail since the read end is closed.
+  AWAIT_FAILED(reader.read());
+}
+
+
 TEST(HTTP, Encode)
 {
   string unencoded = "a$&+,/:;=?@ \"<>#%{}|\\^~[]`\x19\x80\xFF";