You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/20 07:18:37 UTC
mesos git commit: Introduced an http::Pipe abstraction to simplify
streaming HTTP Responses.
Repository: mesos
Updated Branches:
refs/heads/master 9f8ab2866 -> e76954abb
Introduced an http::Pipe abstraction to simplify streaming HTTP Responses.
Review: https://reviews.apache.org/r/31930
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e76954ab
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e76954ab
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e76954ab
Branch: refs/heads/master
Commit: e76954abb37a30da5bb211829d7033e53d830a7f
Parents: 9f8ab28
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Mar 5 18:33:28 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Thu Mar 19 23:12:24 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 148 +++++++++++++++++++--
3rdparty/libprocess/src/http.cpp | 150 ++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 110 +++++++---------
3rdparty/libprocess/src/tests/http_tests.cpp | 102 +++++++++++++--
4 files changed, 427 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 10143fd..2b36698 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -8,10 +8,13 @@
#include <cctype>
#include <cstdlib>
#include <iomanip>
+#include <queue>
#include <sstream>
#include <string>
#include <vector>
+#include <process/future.hpp>
+#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/error.hpp>
@@ -19,7 +22,9 @@
#include <stout/hashmap.hpp>
#include <stout/ip.hpp>
#include <stout/json.hpp>
+#include <stout/memory.hpp>
#include <stout/none.hpp>
+#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/stringify.hpp>
@@ -121,6 +126,130 @@ struct Request
};
+// Represents an asynchronous in-memory unbuffered Pipe, currently
+// used for streaming HTTP responses via chunked encoding. Note that
+// being an in-memory pipe means that this cannot be used across OS
+// processes.
+//
+// Much like unix pipes, data is read until end-of-file is
+// encountered; this occurs when the the write-end of the pipe is
+// closed and there is no outstanding data left to read.
+//
+// Unlike unix pipes, if the read-end of the pipe is closed before
+// the write-end is closed, rather than receiving SIGPIPE or EPIPE
+// during a write, the writer is notified via a future. Like unix
+// pipes, we are not notified if the read-end is closed after the
+// write-end is closed, even if data is remaining in the pipe!
+//
+// No buffering means that each non-empty write to the pipe will
+// correspond to to an equivalent read from the pipe, and the
+// reader must "keep up" with the writer in order to avoid
+// unbounded memory growth.
+//
+// TODO(bmahler): The writer needs to be able to induce a failure
+// on the reader to signal an error has occurred. For example, if
+// we are receiving a response but a disconnection occurs before
+// the response is completed, we want the reader to detect that a
+// disconnection occurred!
+//
+// TODO(bmahler): Consider aggregating writes into larger reads to
+// help the reader keep up (a process::Stream abstraction with
+// backpressure would obviate the need for this).
+//
+// TODO(bmahler): Add a more general process::Stream<T> abstraction
+// to represent asynchronous finite/infinite streams (possibly
+// with "backpressure" on the writer). This is broadly useful
+// (e.g. allocator can expose Stream<Allocation>, http::Pipe
+// becomes Stream<string>, process::Queue<T> is just an infinite
+// Stream<T> (i.e. completion and error semantics hidden)).
+class Pipe
+{
+private:
+ struct Data; // Forward declaration.
+
+public:
+ class Reader
+ {
+ public:
+ // Returns data written to the pipe.
+ // Returns an empty read when end-of-file is reached.
+ // Returns Failure if the read-end of the pipe is closed.
+ Future<std::string> read();
+
+ // Closing the read-end of the pipe before the write-end closes
+ // will notify the writer that the reader is no longer interested.
+ // Returns false if the read-end of the pipe was already closed.
+ bool close();
+
+ private:
+ friend class Pipe;
+ explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {}
+ memory::shared_ptr<Data> data;
+ };
+
+ class Writer
+ {
+ public:
+ // Returns false if the data could not be written because
+ // either end of the pipe was already closed. Note that an
+ // empty write has no effect.
+ bool write(const std::string& s);
+
+ // Closing the write-end of the pipe will send end-of-file
+ // to the reader. Returns false if the write-end of the pipe
+ // was already closed.
+ bool close();
+
+ // Returns Nothing when the read-end of the pipe is closed
+ // before the write-end is closed, which means the reader
+ // was unable to continue reading!
+ Future<Nothing> readerClosed();
+
+ private:
+ friend class Pipe;
+ explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {}
+ memory::shared_ptr<Data> data;
+ };
+
+ Pipe() : data(new Data()) {}
+
+ Reader reader() const;
+ Writer writer() const;
+
+private:
+ enum State
+ {
+ OPEN,
+ CLOSED,
+ };
+
+ struct Data
+ {
+ Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {}
+
+ // Rather than use a process to serialize access to the pipe's
+ // internal data we use a low-level "lock" which we acquire and
+ // release using atomic builtins.
+ int lock;
+
+ State readEnd;
+ State writeEnd;
+
+ // Represents readers waiting for data from the pipe.
+ std::queue<Owned<Promise<std::string>>> reads;
+
+ // Represents unread writes in the pipe. Note that we omit
+ // empty strings as they serve as a signal for end-of-file.
+ std::queue<std::string> writes;
+
+ // Signals when the read-end is closed before the write-end.
+ Promise<Nothing> readerClosure;
+ };
+
+ memory::shared_ptr<Data> data;
+};
+
+
struct Response
{
Response()
@@ -138,8 +267,8 @@ struct Response
std::string status;
hashmap<std::string, std::string> headers;
- // Either provide a "body", an absolute "path" to a file, or a
- // "pipe" for streaming a response. Distinguish between the cases
+ // Either provide a 'body', an absolute 'path' to a file, or a
+ // 'pipe' for streaming a response. Distinguish between the cases
// using 'type' below.
//
// BODY: Uses 'body' as the body of the response. These may be
@@ -149,13 +278,12 @@ struct Response
// PATH: Attempts to perform a 'sendfile' operation on the file
// found at 'path'.
//
- // PIPE: Splices data from 'pipe' using 'Transfer-Encoding=chunked'.
- // Note that the read end of the pipe will be closed by libprocess
- // either after the write end has been closed or if the socket the
- // data is being spliced to has been closed (i.e., nobody is
- // listening any longer). This can cause writes to the pipe to
- // generate a SIGPIPE (which will terminate your program unless you
- // explicitly ignore them or handle them).
+ // PIPE: Splices data from the Pipe 'reader' using a "chunked"
+ // 'Transfer-Encoding'. The writer uses a Pipe::Writer to
+ // perform writes and to detect a closed read-end of the Pipe
+ // (i.e. nobody is listening any longer). Once the writer is
+ // finished, it will close its end of the pipe to signal end
+ // of file to the Reader.
//
// In all cases (BODY, PATH, PIPE), you are expected to properly
// specify the 'Content-Type' header, but the 'Content-Length' and
@@ -169,7 +297,7 @@ struct Response
std::string body;
std::string path;
- int pipe; // See comment above regarding the semantics for closing.
+ Option<Pipe::Reader> reader;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 7c0cee4..276cecd 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -2,14 +2,17 @@
#include <stdint.h>
+#include <algorithm>
#include <cstring>
#include <deque>
#include <iostream>
+#include <queue>
#include <string>
#include <vector>
#include <process/future.hpp>
#include <process/http.hpp>
+#include <process/internal.hpp>
#include <process/owned.hpp>
#include <process/socket.hpp>
@@ -23,6 +26,7 @@
#include "decoder.hpp"
using std::deque;
+using std::queue;
using std::string;
using std::vector;
@@ -35,6 +39,152 @@ using process::network::Socket;
namespace process {
namespace http {
+Pipe::Reader Pipe::reader() const
+{
+ return Pipe::Reader(data);
+}
+
+
+Pipe::Writer Pipe::writer() const
+{
+ return Pipe::Writer(data);
+}
+
+
+Future<string> Pipe::Reader::read()
+{
+ Future<string> future;
+
+ process::internal::acquire(&data->lock);
+ {
+ if (data->readEnd == CLOSED) {
+ future = Failure("closed");
+ } else if (!data->writes.empty()) {
+ future = data->writes.front();
+ data->writes.pop();
+ } else if (data->writeEnd == CLOSED) {
+ future = ""; // End-of-file.
+ } else {
+ data->reads.push(Owned<Promise<string>>(new Promise<string>()));
+ future = data->reads.back()->future();
+ }
+ }
+ process::internal::release(&data->lock);
+
+ return future;
+}
+
+
+bool Pipe::Reader::close()
+{
+ bool closed = false;
+ bool notify = false;
+ queue<Owned<Promise<string>>> reads;
+
+ process::internal::acquire(&data->lock);
+ {
+ if (data->readEnd == OPEN) {
+ // Throw away outstanding data.
+ while (!data->writes.empty()) {
+ data->writes.pop();
+ }
+
+ // Extract the pending reads so we can fail them.
+ std::swap(data->reads, reads);
+
+ closed = true;
+ data->readEnd = CLOSED;
+
+ // Notify if write-end is still open!
+ notify = data->writeEnd == OPEN;
+ }
+ }
+ process::internal::release(&data->lock);
+
+ // NOTE: We transition the promises outside the critical section
+ // to avoid triggering callbacks that try to reacquire the lock.
+ if (closed) {
+ while (!reads.empty()) {
+ reads.front()->fail("closed");
+ reads.pop();
+ }
+
+ if (notify) {
+ data->readerClosure.set(Nothing());
+ }
+ }
+
+ return closed;
+}
+
+
+bool Pipe::Writer::write(const string& s)
+{
+ bool written = false;
+ Owned<Promise<string>> read;
+
+ process::internal::acquire(&data->lock);
+ {
+ // Ignore writes if either end of the pipe is closed!
+ if (data->writeEnd == OPEN && data->readEnd == OPEN) {
+ // Don't bother surfacing empty writes to the readers.
+ if (!s.empty()) {
+ if (data->reads.empty()) {
+ data->writes.push(s);
+ } else {
+ read = data->reads.front();
+ data->reads.pop();
+ }
+ }
+ written = true;
+ }
+ }
+ process::internal::release(&data->lock);
+
+ // NOTE: We set the promise outside the critical section to avoid
+ // triggering callbacks that try to reacquire the lock.
+ if (read.get() != NULL) {
+ read->set(s);
+ }
+
+ return written;
+}
+
+
+bool Pipe::Writer::close()
+{
+ bool closed = false;
+ queue<Owned<Promise<string>>> reads;
+
+ process::internal::acquire(&data->lock);
+ {
+ if (data->writeEnd == OPEN) {
+ // Extract all the pending reads so we can complete them.
+ std::swap(data->reads, reads);
+
+ data->writeEnd = CLOSED;
+ closed = true;
+ }
+ }
+ process::internal::release(&data->lock);
+
+ // NOTE: We set the promises outside the critical section to avoid
+ // triggering callbacks that try to reacquire the lock.
+ while (!reads.empty()) {
+ reads.front()->set(string("")); // End-of-file.
+ reads.pop();
+ }
+
+ return closed;
+}
+
+
+Future<Nothing> Pipe::Writer::readerClosed()
+{
+ return data->readerClosure.future();
+}
+
+
hashmap<uint16_t, string> statuses;
namespace query {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e7b029b..10ad670 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -175,8 +175,8 @@ private:
// Demuxes and handles a response.
bool process(const Future<Response>& future, const Request& request);
- // Handles stream (i.e., pipe) based responses.
- void stream(const Future<short>& poll, const Request& request);
+ // Handles stream based responses.
+ void stream(const Request& request, const Future<string>& chunk);
Socket socket; // Wrap the socket to keep it from getting closed.
@@ -194,12 +194,14 @@ private:
delete future;
}
- // Helper for cleaning up a response (i.e., closing any open pipes
+ // Helper for cleaning up a response (i.e., closing any open Pipes
// in the event Response::type is PIPE).
static void cleanup(const Response& response)
{
if (response.type == Response::PIPE) {
- os::close(response.pipe);
+ CHECK_SOME(response.reader);
+ http::Pipe::Reader reader = response.reader.get(); // Remove const.
+ reader.close();
}
}
@@ -209,7 +211,7 @@ private:
queue<Item*> items;
- Option<int> pipe; // Current pipe, if streaming.
+ Option<http::Pipe::Reader> pipe; // Current pipe, if streaming.
};
@@ -939,7 +941,8 @@ HttpProxy::~HttpProxy()
// Need to make sure response producers know not to continue to
// create a response (streaming or otherwise).
if (pipe.isSome()) {
- os::close(pipe.get());
+ http::Pipe::Reader reader = pipe.get();
+ reader.close();
}
pipe = None();
@@ -1074,29 +1077,23 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
// should be reported and no response sent.
response.body.clear();
- // Make sure the pipe is nonblocking.
- Try<Nothing> nonblock = os::nonblock(response.pipe);
- if (nonblock.isError()) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed make pipe nonblocking: " << error;
- socket_manager->send(InternalServerError(), request, socket);
- return true; // All done, can process next response.
- }
-
// While the user is expected to properly set a 'Content-Type'
// header, we fill in (or overwrite) 'Transfer-Encoding' header.
response.headers["Transfer-Encoding"] = "chunked";
- VLOG(1) << "Starting \"chunked\" streaming";
+ VLOG(3) << "Starting \"chunked\" streaming";
socket_manager->send(
new HttpResponseEncoder(socket, response, request),
true);
- pipe = response.pipe;
+ CHECK_SOME(response.reader);
+ http::Pipe::Reader reader = response.reader.get();
- io::poll(pipe.get(), io::READ).onAny(
- defer(self(), &Self::stream, lambda::_1, request));
+ pipe = reader;
+
+ reader.read()
+ .onAny(defer(self(), &Self::stream, request, lambda::_1));
return false; // Streaming, don't process next response (yet)!
} else {
@@ -1107,66 +1104,51 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
}
-void HttpProxy::stream(const Future<short>& poll, const Request& request)
+void HttpProxy::stream(
+ const Request& request,
+ const Future<string>& chunk)
{
- // TODO(benh): Use 'splice' on Linux.
+ CHECK_SOME(pipe);
- CHECK(pipe.isSome());
+ http::Pipe::Reader reader = pipe.get();
bool finished = false; // Whether we're done streaming.
- if (poll.isReady()) {
- // Read and write.
- CHECK(poll.get() == io::READ);
- const size_t size = 4 * 1024; // 4K.
- char data[size];
- while (!finished) {
- ssize_t length = ::read(pipe.get(), data, size);
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- io::poll(pipe.get(), io::READ).onAny(
- defer(self(), &Self::stream, lambda::_1, request));
- break;
- } else {
- std::ostringstream out;
- if (length <= 0) {
- // Error or closed, treat both as closed.
- if (length < 0) {
- // Error.
- const char* error = strerror(errno);
- VLOG(1) << "Read error while streaming: " << error;
- }
- out << "0\r\n" << "\r\n";
- finished = true;
- } else {
- // Data!
- out << std::hex << length << "\r\n";
- out.write(data, length);
- out << "\r\n";
- }
+ if (chunk.isReady()) {
+ std::ostringstream out;
- // We always persist the connection when we're not finished
- // streaming.
- socket_manager->send(
- new DataEncoder(socket, out.str()),
- finished ? request.keepAlive : true);
- }
+ if (chunk.get().empty()) {
+ // Finished reading.
+ out << "0\r\n" << "\r\n";
+ finished = true;
+ } else {
+ out << std::hex << chunk.get().size() << "\r\n";
+ out << chunk.get();
+ out << "\r\n";
+
+ // Keep reading.
+ reader.read()
+ .onAny(defer(self(), &Self::stream, request, lambda::_1));
}
- } else if (poll.isFailed()) {
- VLOG(1) << "Failed to poll: " << poll.failure();
+
+ // Always persist the connection when streaming is not finished.
+ socket_manager->send(
+ new DataEncoder(socket, out.str()),
+ finished ? request.keepAlive : true);
+ } else if (chunk.isFailed()) {
+ VLOG(1) << "Failed to read from stream: " << chunk.failure();
+ // TODO(bmahler): Have to close connection if headers were sent!
socket_manager->send(InternalServerError(), request, socket);
finished = true;
} else {
- VLOG(1) << "Unexpected discarded future while polling";
+ VLOG(1) << "Failed to read from stream: discarded";
+ // TODO(bmahler): Have to close connection if headers were sent!
socket_manager->send(InternalServerError(), request, socket);
finished = true;
}
if (finished) {
- os::close(pipe.get());
+ reader.close();
pipe = None();
next();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 800752a..17fb092 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -143,24 +143,24 @@ TEST(HTTP, Endpoints)
AWAIT_EXPECT_EQ(response, socket.recv(response.size()));
// Now hit '/pipe' (by using http::get).
- int pipes[2];
- ASSERT_NE(-1, ::pipe(pipes));
-
+ http::Pipe pipe;
http::OK ok;
ok.type = http::Response::PIPE;
- ok.pipe = pipes[0];
+ ok.reader = pipe.reader();
- Future<Nothing> pipe;
+ Future<Nothing> request;
EXPECT_CALL(process, pipe(_))
- .WillOnce(DoAll(FutureSatisfy(&pipe),
+ .WillOnce(DoAll(FutureSatisfy(&request),
Return(ok)));
Future<http::Response> future = http::get(process.self(), "pipe");
- AWAIT_READY(pipe);
+ AWAIT_READY(request);
- ASSERT_SOME(os::write(pipes[1], "Hello World\n"));
- ASSERT_SOME(os::close(pipes[1]));
+ // Write the response.
+ http::Pipe::Writer writer = pipe.writer();
+ EXPECT_TRUE(writer.write("Hello World\n"));
+ EXPECT_TRUE(writer.close());
AWAIT_READY(future);
EXPECT_EQ(http::statuses[200], future.get().status);
@@ -172,6 +172,90 @@ TEST(HTTP, Endpoints)
}
+TEST(HTTP, PipeEOF)
+{
+ http::Pipe pipe;
+ http::Pipe::Reader reader = pipe.reader();
+ http::Pipe::Writer writer = pipe.writer();
+
+ // A 'read' on an empty pipe should block.
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Writing an empty string should have no effect.
+ EXPECT_TRUE(writer.write(""));
+ EXPECT_TRUE(read.isPending());
+
+ // After a 'write' the pending 'read' should complete.
+ EXPECT_TRUE(writer.write("hello"));
+ ASSERT_TRUE(read.isReady());
+ EXPECT_EQ("hello", read.get());
+
+ // After a 'write' a call to 'read' should be completed immediately.
+ ASSERT_TRUE(writer.write("world"));
+
+ read = reader.read();
+ ASSERT_TRUE(read.isReady());
+ EXPECT_EQ("world", read.get());
+
+ // Close the write end of the pipe and ensure the remaining
+ // data can be read.
+ EXPECT_TRUE(writer.write("!"));
+ EXPECT_TRUE(writer.close());
+ AWAIT_EQ("!", reader.read());
+
+ // End of file should be reached.
+ AWAIT_EQ("", reader.read());
+ AWAIT_EQ("", reader.read());
+
+ // Writes to a pipe with the write end closed are ignored.
+ EXPECT_FALSE(writer.write("!"));
+ AWAIT_EQ("", reader.read());
+
+ // The write end cannot be closed twice.
+ EXPECT_FALSE(writer.close());
+
+ // Close the read end, this should not notify the writer
+ // since the write end was already closed.
+ EXPECT_TRUE(reader.close());
+ EXPECT_TRUE(writer.readerClosed().isPending());
+}
+
+
+TEST(HTTP, PipeReaderCloses)
+{
+ http::Pipe pipe;
+ http::Pipe::Reader reader = pipe.reader();
+ http::Pipe::Writer writer = pipe.writer();
+
+ // If the read end of the pipe is closed,
+ // it should discard any unread data.
+ EXPECT_TRUE(writer.write("hello"));
+ EXPECT_TRUE(writer.write("world"));
+
+ // The writer should discover the closure.
+ Future<Nothing> closed = writer.readerClosed();
+ EXPECT_TRUE(reader.close());
+ EXPECT_TRUE(closed.isReady());
+
+ // The read end is closed, subsequent reads will fail.
+ AWAIT_FAILED(reader.read());
+
+ // The read end is closed, writes are ignored.
+ EXPECT_FALSE(writer.write("!"));
+ AWAIT_FAILED(reader.read());
+
+ // The read end cannot be closed twice.
+ EXPECT_FALSE(reader.close());
+
+ // Close the write end.
+ EXPECT_TRUE(writer.close());
+
+ // Reads should fail since the read end is closed.
+ AWAIT_FAILED(reader.read());
+}
+
+
TEST(HTTP, Encode)
{
string unencoded = "a$&+,/:;=?@ \"<>#%{}|\\^~[]`\x19\x80\xFF";