You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/31 02:12:43 UTC
[1/3] mesos git commit: Added failure semantics for
http::Pipe::Writer.
Repository: mesos
Updated Branches:
refs/heads/master f98f26fa5 -> f7fccce2e
Added failure semantics for http::Pipe::Writer.
Review: https://reviews.apache.org/r/32346
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b0bba19
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b0bba19
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b0bba19
Branch: refs/heads/master
Commit: 8b0bba195058c9f209e9b4bb9716fb805161e847
Parents: f98f26f
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 11:32:04 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:38:21 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 46 +++++++++++++++------
3rdparty/libprocess/src/http.cpp | 50 +++++++++++++++++++----
3rdparty/libprocess/src/tests/http_tests.cpp | 32 +++++++++++++++
3 files changed, 106 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index faffae7..a9ef5b7 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -117,17 +117,26 @@ public:
public:
// Returns data written to the pipe.
// Returns an empty read when end-of-file is reached.
- // Returns Failure if the read-end of the pipe is closed.
+ // Returns Failure if the writer failed, or the read-end
+ // is closed.
Future<std::string> read();
// Closing the read-end of the pipe before the write-end closes
- // will notify the writer that the reader is no longer interested.
- // Returns false if the read-end of the pipe was already closed.
+ // or fails will notify the writer that the reader is no longer
+ // interested. Returns false if the read-end was already closed.
bool close();
private:
friend class Pipe;
+
+ enum State
+ {
+ OPEN,
+ CLOSED,
+ };
+
explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {}
+
memory::shared_ptr<Data> data;
};
@@ -141,9 +150,14 @@ public:
// Closing the write-end of the pipe will send end-of-file
// to the reader. Returns false if the write-end of the pipe
- // was already closed.
+ // was already closed or failed.
bool close();
+ // Closes the write-end of the pipe but sends a failure
+ // to the reader rather than end-of-file. Returns false
+ // if the write-end of the pipe was already closed or failed.
+ bool fail(const std::string& message);
+
// Returns Nothing when the read-end of the pipe is closed
// before the write-end is closed, which means the reader
// was unable to continue reading!
@@ -151,7 +165,16 @@ public:
private:
friend class Pipe;
+
+ enum State
+ {
+ OPEN,
+ CLOSED,
+ FAILED,
+ };
+
explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {}
+
memory::shared_ptr<Data> data;
};
@@ -161,23 +184,17 @@ public:
Writer writer() const;
private:
- enum State
- {
- OPEN,
- CLOSED,
- };
-
struct Data
{
- Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {}
+ Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
// Rather than use a process to serialize access to the pipe's
// internal data we use a low-level "lock" which we acquire and
// release using atomic builtins.
int lock;
- State readEnd;
- State writeEnd;
+ Reader::State readEnd;
+ Writer::State writeEnd;
// Represents readers waiting for data from the pipe.
std::queue<Owned<Promise<std::string>>> reads;
@@ -188,6 +205,9 @@ private:
// Signals when the read-end is closed before the write-end.
Promise<Nothing> readerClosure;
+
+ // Failure reason when the 'writeEnd' is FAILED.
+ Option<Failure> failure;
};
memory::shared_ptr<Data> data;
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 150ff33..cd52cc8 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -180,13 +180,16 @@ Future<string> Pipe::Reader::read()
process::internal::acquire(&data->lock);
{
- if (data->readEnd == CLOSED) {
+ if (data->readEnd == Reader::CLOSED) {
future = Failure("closed");
} else if (!data->writes.empty()) {
future = data->writes.front();
data->writes.pop();
- } else if (data->writeEnd == CLOSED) {
+ } else if (data->writeEnd == Writer::CLOSED) {
future = ""; // End-of-file.
+ } else if (data->writeEnd == Writer::FAILED) {
+ CHECK_SOME(data->failure);
+ future = data->failure.get();
} else {
data->reads.push(Owned<Promise<string>>(new Promise<string>()));
future = data->reads.back()->future();
@@ -206,7 +209,7 @@ bool Pipe::Reader::close()
process::internal::acquire(&data->lock);
{
- if (data->readEnd == OPEN) {
+ if (data->readEnd == Reader::OPEN) {
// Throw away outstanding data.
while (!data->writes.empty()) {
data->writes.pop();
@@ -216,10 +219,10 @@ bool Pipe::Reader::close()
std::swap(data->reads, reads);
closed = true;
- data->readEnd = CLOSED;
+ data->readEnd = Reader::CLOSED;
// Notify if write-end is still open!
- notify = data->writeEnd == OPEN;
+ notify = data->writeEnd == Writer::OPEN;
}
}
process::internal::release(&data->lock);
@@ -248,8 +251,8 @@ bool Pipe::Writer::write(const string& s)
process::internal::acquire(&data->lock);
{
- // Ignore writes if either end of the pipe is closed!
- if (data->writeEnd == OPEN && data->readEnd == OPEN) {
+ // Ignore writes if either end of the pipe is closed or failed!
+ if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
// Don't bother surfacing empty writes to the readers.
if (!s.empty()) {
if (data->reads.empty()) {
@@ -281,11 +284,11 @@ bool Pipe::Writer::close()
process::internal::acquire(&data->lock);
{
- if (data->writeEnd == OPEN) {
+ if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can complete them.
std::swap(data->reads, reads);
- data->writeEnd = CLOSED;
+ data->writeEnd = Writer::CLOSED;
closed = true;
}
}
@@ -302,6 +305,35 @@ bool Pipe::Writer::close()
}
+bool Pipe::Writer::fail(const string& message)
+{
+ bool failed = false;
+ queue<Owned<Promise<string>>> reads;
+
+ process::internal::acquire(&data->lock);
+ {
+ if (data->writeEnd == Writer::OPEN) {
+ // Extract all the pending reads so we can fail them.
+ std::swap(data->reads, reads);
+
+ data->writeEnd = Writer::FAILED;
+ data->failure = Failure(message);
+ failed = true;
+ }
+ }
+ process::internal::release(&data->lock);
+
+ // NOTE: We set the promises outside the critical section to avoid
+ // triggering callbacks that try to reacquire the lock.
+ while (!reads.empty()) {
+ reads.front()->fail(message);
+ reads.pop();
+ }
+
+ return failed;
+}
+
+
Future<Nothing> Pipe::Writer::readerClosed()
{
return data->readerClosure.future();
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index e254506..83219da 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -230,6 +230,38 @@ TEST(HTTP, PipeEOF)
}
+TEST(HTTP, PipeFailure)
+{
+ http::Pipe pipe;
+ http::Pipe::Reader reader = pipe.reader();
+ http::Pipe::Writer writer = pipe.writer();
+
+ // Fail the writer after writing some data.
+ EXPECT_TRUE(writer.write("hello"));
+ EXPECT_TRUE(writer.write("world"));
+
+ EXPECT_TRUE(writer.fail("disconnected!"));
+
+ // The reader should read the data, followed by the failure.
+ AWAIT_EQ("hello", reader.read());
+ AWAIT_EQ("world", reader.read());
+
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isFailed());
+ EXPECT_EQ("disconnected!", read.failure());
+
+ // The writer cannot close or fail an already failed pipe.
+ EXPECT_FALSE(writer.close());
+ EXPECT_FALSE(writer.fail("not again"));
+
+ // The writer shouldn't be notified of the reader closing,
+ // since the writer had already failed.
+ EXPECT_TRUE(reader.close());
+ EXPECT_TRUE(writer.readerClosed().isPending());
+}
+
+
+
TEST(HTTP, PipeReaderCloses)
{
http::Pipe pipe;
[2/3] mesos git commit: Added a StreamingResponseDecoder.
Posted by bm...@apache.org.
Added a StreamingResponseDecoder.
Review: https://reviews.apache.org/r/32347
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6ac8eb1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6ac8eb1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6ac8eb1c
Branch: refs/heads/master
Commit: 6ac8eb1c524dc4adbc09d20dcb8e4e31d60eeb56
Parents: 8b0bba1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 11:44:51 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:38:22 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/decoder.hpp | 232 +++++++++++++++++++
3rdparty/libprocess/src/tests/decoder_tests.cpp | 91 ++++++++
2 files changed, 323 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index b3a667c..56adde0 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -14,6 +14,7 @@
#include <stout/foreach.hpp>
#include <stout/gzip.hpp>
+#include <stout/option.hpp>
#include <stout/try.hpp>
@@ -469,6 +470,237 @@ private:
};
+// Provides a response decoder that returns 'PIPE' responses once
+// the response headers are received, but before the body data
+// is received. Callers are expected to read the body from the
+// Pipe::Reader in the response.
+//
+// TODO(bmahler): Consolidate streaming and non-streaming response
+// decoders.
+class StreamingResponseDecoder
+{
+public:
+ StreamingResponseDecoder()
+ : failure(false), header(HEADER_FIELD), response(NULL)
+ {
+ settings.on_message_begin =
+ &StreamingResponseDecoder::on_message_begin;
+
+#if !(HTTP_PARSER_VERSION_MAJOR >=2)
+ settings.on_path =
+ &StreamingResponseDecoder::on_path;
+ settings.on_fragment =
+ &StreamingResponseDecoder::on_fragment;
+ settings.on_query_string =
+ &StreamingResponseDecoder::on_query_string;
+#endif
+
+ settings.on_url =
+ &StreamingResponseDecoder::on_url;
+ settings.on_header_field =
+ &StreamingResponseDecoder::on_header_field;
+ settings.on_header_value =
+ &StreamingResponseDecoder::on_header_value;
+ settings.on_headers_complete =
+ &StreamingResponseDecoder::on_headers_complete;
+ settings.on_body =
+ &StreamingResponseDecoder::on_body;
+ settings.on_message_complete =
+ &StreamingResponseDecoder::on_message_complete;
+
+ http_parser_init(&parser, HTTP_RESPONSE);
+
+ parser.data = this;
+ }
+
+ std::deque<http::Response*> decode(const char* data, size_t length)
+ {
+ size_t parsed = http_parser_execute(&parser, &settings, data, length);
+
+ if (parsed != length) {
+ // TODO(bmahler): joyent/http-parser exposes error reasons.
+ failure = true;
+
+ // If we're still writing the body, fail the writer!
+ if (writer.isSome()) {
+ http::Pipe::Writer writer_ = writer.get(); // Remove const.
+ writer_.fail("failed to decode body");
+ writer = None();
+ }
+ }
+
+ if (!responses.empty()) {
+ std::deque<http::Response*> result = responses;
+ responses.clear();
+ return result;
+ }
+
+ return std::deque<http::Response*>();
+ }
+
+ bool failed() const
+ {
+ return failure;
+ }
+
+private:
+ static int on_message_begin(http_parser* p)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK(!decoder->failure);
+
+ decoder->header = HEADER_FIELD;
+ decoder->field.clear();
+ decoder->value.clear();
+
+ CHECK(decoder->response == NULL);
+ CHECK(decoder->writer.isNone());
+
+ decoder->response = new http::Response();
+ decoder->response->type = http::Response::PIPE;
+ decoder->writer = None();
+
+ return 0;
+ }
+
+#if !(HTTP_PARSER_VERSION_MAJOR >= 2)
+ static int on_path(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+
+ static int on_query_string(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+
+ static int on_fragment(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+#endif // !(HTTP_PARSER_VERSION_MAJOR >= 2)
+
+ static int on_url(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+
+ static int on_header_field(http_parser* p, const char* data, size_t length)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_NOTNULL(decoder->response);
+
+ if (decoder->header != HEADER_FIELD) {
+ decoder->response->headers[decoder->field] = decoder->value;
+ decoder->field.clear();
+ decoder->value.clear();
+ }
+
+ decoder->field.append(data, length);
+ decoder->header = HEADER_FIELD;
+
+ return 0;
+ }
+
+ static int on_header_value(http_parser* p, const char* data, size_t length)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_NOTNULL(decoder->response);
+
+ decoder->value.append(data, length);
+ decoder->header = HEADER_VALUE;
+ return 0;
+ }
+
+ static int on_headers_complete(http_parser* p)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_NOTNULL(decoder->response);
+
+ // Add final header.
+ decoder->response->headers[decoder->field] = decoder->value;
+ decoder->field.clear();
+ decoder->value.clear();
+
+ // Get the response status string.
+ if (http::statuses.contains(decoder->parser.status_code)) {
+ decoder->response->status = http::statuses[decoder->parser.status_code];
+ } else {
+ decoder->failure = true;
+ return 1;
+ }
+
+ // We cannot provide streaming gzip decompression!
+ Option<std::string> encoding =
+ decoder->response->headers.get("Content-Encoding");
+ if (encoding.isSome() && encoding.get() == "gzip") {
+ decoder->failure = true;
+ return 1;
+ }
+
+ CHECK(decoder->writer.isNone());
+
+ http::Pipe pipe;
+ decoder->writer = pipe.writer();
+ decoder->response->reader = pipe.reader();
+
+ // Send the response to the caller, but keep a Pipe::Writer for
+ // streaming the body content into the response.
+ decoder->responses.push_back(decoder->response);
+ decoder->response = NULL;
+
+ return 0;
+ }
+
+ static int on_body(http_parser* p, const char* data, size_t length)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_SOME(decoder->writer);
+
+ http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+ writer.write(std::string(data, length));
+
+ return 0;
+ }
+
+ static int on_message_complete(http_parser* p)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_SOME(decoder->writer);
+
+ http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+ writer.close();
+
+ decoder->writer = None();
+
+ return 0;
+ }
+
+ bool failure;
+
+ http_parser parser;
+ http_parser_settings settings;
+
+ enum {
+ HEADER_FIELD,
+ HEADER_VALUE
+ } header;
+
+ std::string field;
+ std::string value;
+
+ http::Response* response;
+ Option<http::Pipe::Writer> writer;
+
+ std::deque<http::Response*> responses;
+};
+
} // namespace process {
#endif // __DECODER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index b996f00..efe364a 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -133,3 +133,94 @@ TEST(Decoder, Response)
delete response;
}
+
+
+TEST(Decoder, StreamingResponse)
+{
+ StreamingResponseDecoder decoder;
+
+ const string& headers =
+ "HTTP/1.1 200 OK\r\n"
+ "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Length: 2\r\n"
+ "\r\n";
+
+ const string& body = "hi";
+
+ deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+ ASSERT_FALSE(decoder.failed());
+ ASSERT_EQ(1, responses.size());
+
+ Response* response = responses[0];
+
+ EXPECT_EQ("200 OK", response->status);
+ EXPECT_EQ(3, response->headers.size());
+
+ ASSERT_EQ(Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ decoder.decode(body.data(), body.length());
+
+ // Feeding EOF to the decoder should be ok.
+ decoder.decode("", 0);
+
+ EXPECT_TRUE(read.isReady());
+ EXPECT_EQ("hi", read.get());
+
+ // Response should be complete.
+ read = reader.read();
+ EXPECT_TRUE(read.isReady());
+ EXPECT_EQ("", read.get());
+}
+
+
+TEST(Decoder, StreamingResponseFailure)
+{
+ StreamingResponseDecoder decoder;
+
+ const string& headers =
+ "HTTP/1.1 200 OK\r\n"
+ "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Length: 2\r\n"
+ "\r\n";
+
+ // The body is shorter than the content length!
+ const string& body = "1";
+
+ deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+ ASSERT_FALSE(decoder.failed());
+ ASSERT_EQ(1, responses.size());
+
+ Response* response = responses[0];
+
+ EXPECT_EQ("200 OK", response->status);
+ EXPECT_EQ(3, response->headers.size());
+
+ ASSERT_EQ(Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ decoder.decode(body.data(), body.length());
+
+ EXPECT_TRUE(read.isReady());
+ EXPECT_EQ("1", read.get());
+
+ // Body is not yet complete.
+ read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Feeding EOF to the decoder should trigger a failure!
+ decoder.decode("", 0);
+
+ EXPECT_TRUE(read.isFailed());
+ EXPECT_EQ("failed to decode body", read.failure());
+}
[3/3] mesos git commit: Added http::streaming::get/post for
client-side streaming responses.
Posted by bm...@apache.org.
Added http::streaming::get/post for client-side streaming responses.
Review: https://reviews.apache.org/r/32351
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f7fccce2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f7fccce2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f7fccce2
Branch: refs/heads/master
Commit: f7fccce2e3208cfc6481151c2bb016727e5ebdfc
Parents: 6ac8eb1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 14:45:28 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:59:19 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 44 ++++
3rdparty/libprocess/src/http.cpp | 237 ++++++++++++++++++++--
3rdparty/libprocess/src/tests/http_tests.cpp | 84 ++++++++
3 files changed, 344 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index a9ef5b7..07825b2 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -583,6 +583,50 @@ Future<Response> post(
const Option<std::string>& body = None(),
const Option<std::string>& contentType = None());
+
+namespace streaming {
+
+// Asynchronously sends an HTTP GET request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> get(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP GET request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> get(
+ const UPID& upid,
+ const Option<std::string>& path = None(),
+ const Option<std::string>& query = None(),
+ const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP POST request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> post(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None(),
+ const Option<std::string>& body = None(),
+ const Option<std::string>& contentType = None());
+
+// Asynchronously sends an HTTP POST request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> post(
+ const UPID& upid,
+ const Option<std::string>& path = None(),
+ const Option<hashmap<std::string, std::string>>& headers = None(),
+ const Option<std::string>& body = None(),
+ const Option<std::string>& contentType = None());
+
+} // namespace streaming {
+
} // namespace http {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index cd52cc8..7e6cbd3 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -548,35 +548,143 @@ ostream& operator << (
namespace internal {
-Future<Response> decode(const string& buffer)
+// Forward declarations.
+Future<string> _convert(
+ Pipe::Reader reader,
+ const memory::shared_ptr<string>& buffer,
+ const string& read);
+Response __convert(
+ const Response& pipeResponse,
+ const string& body);
+
+
+// Returns a 'BODY' response once the body of the provided
+// 'PIPE' response can be read completely.
+Future<Response> convert(const Response& pipeResponse)
{
- ResponseDecoder decoder;
- deque<Response*> responses = decoder.decode(buffer.c_str(), buffer.length());
+ memory::shared_ptr<string> buffer(new string());
- if (decoder.failed() || responses.empty()) {
- for (size_t i = 0; i < responses.size(); ++i) {
- delete responses[i];
+ CHECK_EQ(Response::PIPE, pipeResponse.type);
+ CHECK_SOME(pipeResponse.reader);
+
+ Pipe::Reader reader = pipeResponse.reader.get();
+
+ return reader.read()
+ .then(lambda::bind(&_convert, reader, buffer, lambda::_1))
+ .then(lambda::bind(&__convert, pipeResponse, lambda::_1));
+}
+
+
+Future<string> _convert(
+ Pipe::Reader reader,
+ const memory::shared_ptr<string>& buffer,
+ const string& read)
+{
+ if (read.empty()) { // EOF.
+ return *buffer;
+ }
+
+ buffer->append(read);
+
+ return reader.read()
+ .then(lambda::bind(&_convert, reader, buffer, lambda::_1));
+}
+
+
+Response __convert(const Response& pipeResponse, const string& body)
+{
+ Response bodyResponse = pipeResponse;
+ bodyResponse.type = Response::BODY;
+ bodyResponse.body = body;
+ bodyResponse.reader = None(); // Remove the reader.
+ return bodyResponse;
+}
+
+
+// Forward declaration.
+void _decode(
+ Socket socket,
+ Owned<StreamingResponseDecoder> decoder,
+ const Future<string>& data);
+
+
+Future<Response> decode(
+ Socket socket,
+ Owned<StreamingResponseDecoder> decoder,
+ const string& data)
+{
+ deque<Response*> responses = decoder->decode(data.c_str(), data.length());
+
+ if (decoder->failed() || responses.size() > 1) {
+ foreach (Response* response, responses) {
+ delete response;
}
- return Failure("Failed to decode HTTP response:\n" + buffer + "\n");
- } else if (responses.size() > 1) {
- PLOG(ERROR) << "Received more than 1 HTTP Response";
+ return Failure(string("Failed to decode HTTP response") +
+ (responses.size() > 1 ? ": more than one response received" : ""));
}
- Response response = *responses[0];
- for (size_t i = 0; i < responses.size(); ++i) {
- delete responses[i];
+ if (responses.empty()) {
+ // Keep reading until the headers are complete.
+ return socket.recv(None())
+ .then(lambda::bind(&decode, socket, decoder, lambda::_1));
+ }
+
+ // Keep feeding data to the decoder until EOF or a 'recv' failure.
+ if (!data.empty()) {
+ socket.recv(None())
+ .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
}
+ Response response = *responses[0];
+ delete responses[0];
return response;
}
+void _decode(
+ Socket socket,
+ Owned<StreamingResponseDecoder> decoder,
+ const Future<string>& data)
+{
+ deque<Response*> responses;
+
+ if (!data.isReady()) {
+ // Let the decoder process EOF if a failure
+ // or discard is encountered.
+ responses = decoder->decode("", 0);
+ } else {
+ responses = decoder->decode(data.get().c_str(), data.get().length());
+ }
+
+ // We're not expecting more responses to arrive on this socket!
+ if (!responses.empty() || decoder->failed()) {
+ VLOG(1) << "Failed to decode HTTP response: "
+ << (responses.size() > 1
+ ? ": more than one response received"
+ : "");
+
+ foreach (Response* response, responses) {
+ delete response;
+ }
+
+ return;
+ }
+
+ // Keep reading if the socket has more data.
+ if (data.isReady() && !data.get().empty()) {
+ socket.recv(None())
+ .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
+ }
+}
+
+
// Forward declaration.
Future<Response> _request(
Socket socket,
const Address& address,
const URL& url,
const string& method,
+ bool streamingResponse,
const Option<hashmap<string, string>>& _headers,
const Option<string>& body,
const Option<string>& contentType);
@@ -585,6 +693,7 @@ Future<Response> _request(
Future<Response> request(
const URL& url,
const string& method,
+ bool streamedResponse,
const Option<hashmap<string, string>>& headers,
const Option<string>& body,
const Option<string>& contentType)
@@ -626,6 +735,7 @@ Future<Response> request(
address,
url,
method,
+ streamedResponse,
headers,
body,
contentType));
@@ -637,6 +747,7 @@ Future<Response> _request(
const Address& address,
const URL& url,
const string& method,
+ bool streamedResponse,
const Option<hashmap<string, string>>& _headers,
const Option<string>& body,
const Option<string>& contentType)
@@ -685,6 +796,13 @@ Future<Response> _request(
headers["Content-Length"] = stringify(body.get().length());
}
+ // TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
+ // Currently this does not handle 'gzip' content encoding,
+ // unless the caller manually compresses the 'body'. For
+ // streaming requests we must wipe 'gzip' as an acceptable
+ // encoding as we don't currently have streaming gzip utilities
+ // to support decoding a streaming gzip response!
+
// Emit the headers.
foreachpair (const string& key, const string& value, headers) {
out << key << ": " << value << "\r\n";
@@ -699,13 +817,18 @@ Future<Response> _request(
// Need to disambiguate the Socket::recv for binding below.
Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv;
- // TODO(bmahler): For efficiency, this should properly use the
- // ResponseDecoder when reading, rather than parsing the full string
- // response.
- return socket.send(out.str())
- .then(lambda::function<Future<string>(void)>(
- lambda::bind(recv, socket, -1)))
- .then(lambda::bind(&internal::decode, lambda::_1));
+ Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder());
+
+ Future<Response> pipeResponse = socket.send(out.str())
+ .then(lambda::bind(recv, socket, None()))
+ .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1));
+
+ if (streamedResponse) {
+ return pipeResponse;
+ } else {
+ return pipeResponse
+ .then(lambda::bind(&internal::convert, lambda::_1));
+ }
}
} // namespace internal {
@@ -715,7 +838,7 @@ Future<Response> get(
const URL& url,
const Option<hashmap<string, string>>& headers)
{
- return internal::request(url, "GET", headers, None(), None());
+ return internal::request(url, "GET", false, headers, None(), None());
}
@@ -757,7 +880,7 @@ Future<Response> post(
return Failure("Attempted to do a POST with a Content-Type but no body");
}
- return internal::request(url, "POST", headers, body, contentType);
+ return internal::request(url, "POST", false, headers, body, contentType);
}
@@ -778,5 +901,77 @@ Future<Response> post(
return post(url, headers, body, contentType);
}
+
+namespace streaming {
+
+Future<Response> get(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers)
+{
+ return internal::request(url, "GET", true, headers, None(), None());
+}
+
+
+Future<Response> get(
+ const UPID& upid,
+ const Option<string>& path,
+ const Option<string>& query,
+ const Option<hashmap<string, string>>& headers)
+{
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+ if (path.isSome()) {
+ // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+ url.path = strings::join("/", url.path, path.get());
+ }
+
+ if (query.isSome()) {
+ Try<hashmap<string, string>> decode = http::query::decode(
+ strings::remove(query.get(), "?", strings::PREFIX));
+
+ if (decode.isError()) {
+ return Failure("Failed to decode HTTP query string: " + decode.error());
+ }
+
+ url.query = decode.get();
+ }
+
+ return streaming::get(url, headers);
+}
+
+
+Future<Response> post(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ if (body.isNone() && contentType.isSome()) {
+ return Failure("Attempted to do a POST with a Content-Type but no body");
+ }
+
+ return internal::request(url, "POST", true, headers, body, contentType);
+}
+
+
+Future<Response> post(
+ const UPID& upid,
+ const Option<string>& path,
+ const Option<hashmap<string, string>>& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+ if (path.isSome()) {
+ // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+ url.path = strings::join("/", url.path, path.get());
+ }
+
+ return streaming::post(url, headers, body, contentType);
+}
+
+} // namespace streaming {
+
} // namespace http {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 83219da..dfdb233 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -414,6 +414,90 @@ TEST(HTTP, Get)
}
+TEST(HTTP, StreamingGetComplete)
+{
+ Http http;
+
+ http::Pipe pipe;
+ http::OK ok;
+ ok.type = http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ EXPECT_CALL(*http.process, pipe(_))
+ .WillOnce(Return(ok));
+
+ Future<http::Response> response =
+ http::streaming::get(http.process->self(), "pipe");
+
+ // The response should be ready since the headers were sent.
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+ ASSERT_EQ(http::Response::PIPE, response.get().type);
+ ASSERT_SOME(response.get().reader);
+
+ http::Pipe::Reader reader = response.get().reader.get();
+
+ // There is no data to read yet.
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Stream data into the body and read it from the response.
+ http::Pipe::Writer writer = pipe.writer();
+ EXPECT_TRUE(writer.write("hello"));
+ AWAIT_EQ("hello", read);
+
+ EXPECT_TRUE(writer.write("goodbye"));
+ AWAIT_EQ("goodbye", reader.read());
+
+ // Complete the response.
+ EXPECT_TRUE(writer.close());
+ AWAIT_EQ("", reader.read()); // EOF.
+}
+
+
+TEST(HTTP, StreamingGetFailure)
+{
+ Http http;
+
+ http::Pipe pipe;
+ http::OK ok;
+ ok.type = http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ EXPECT_CALL(*http.process, pipe(_))
+ .WillOnce(Return(ok));
+
+ Future<http::Response> response =
+ http::streaming::get(http.process->self(), "pipe");
+
+ // The response should be ready since the headers were sent.
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+ ASSERT_EQ(http::Response::PIPE, response.get().type);
+ ASSERT_SOME(response.get().reader);
+
+ http::Pipe::Reader reader = response.get().reader.get();
+
+ // There is no data to read yet.
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Stream data into the body and read it from the response.
+ http::Pipe::Writer writer = pipe.writer();
+ EXPECT_TRUE(writer.write("hello"));
+ AWAIT_EQ("hello", read);
+
+ EXPECT_TRUE(writer.write("goodbye"));
+ AWAIT_EQ("goodbye", reader.read());
+
+ // Fail the response.
+ EXPECT_TRUE(writer.fail("oops"));
+ AWAIT_FAILED(reader.read());
+}
+
+
http::Response validatePost(const http::Request& request)
{
EXPECT_EQ("POST", request.method);