You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/31 02:12:45 UTC
[3/3] mesos git commit: Added http::streaming::get/post for
client-side streaming responses.
Added http::streaming::get/post for client-side streaming responses.
Review: https://reviews.apache.org/r/32351
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f7fccce2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f7fccce2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f7fccce2
Branch: refs/heads/master
Commit: f7fccce2e3208cfc6481151c2bb016727e5ebdfc
Parents: 6ac8eb1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 14:45:28 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:59:19 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 44 ++++
3rdparty/libprocess/src/http.cpp | 237 ++++++++++++++++++++--
3rdparty/libprocess/src/tests/http_tests.cpp | 84 ++++++++
3 files changed, 344 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index a9ef5b7..07825b2 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -583,6 +583,50 @@ Future<Response> post(
const Option<std::string>& body = None(),
const Option<std::string>& contentType = None());
+
+namespace streaming {
+
+// Asynchronously sends an HTTP GET request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> get(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP GET request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> get(
+ const UPID& upid,
+ const Option<std::string>& path = None(),
+ const Option<std::string>& query = None(),
+ const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP POST request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> post(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None(),
+ const Option<std::string>& body = None(),
+ const Option<std::string>& contentType = None());
+
+// Asynchronously sends an HTTP POST request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> post(
+ const UPID& upid,
+ const Option<std::string>& path = None(),
+ const Option<hashmap<std::string, std::string>>& headers = None(),
+ const Option<std::string>& body = None(),
+ const Option<std::string>& contentType = None());
+
+} // namespace streaming {
+
} // namespace http {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index cd52cc8..7e6cbd3 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -548,35 +548,143 @@ ostream& operator << (
namespace internal {
-Future<Response> decode(const string& buffer)
+// Forward declarations.
+Future<string> _convert(
+ Pipe::Reader reader,
+ const memory::shared_ptr<string>& buffer,
+ const string& read);
+Response __convert(
+ const Response& pipeResponse,
+ const string& body);
+
+
+// Returns a 'BODY' response once the body of the provided
+// 'PIPE' response can be read completely.
+Future<Response> convert(const Response& pipeResponse)
{
- ResponseDecoder decoder;
- deque<Response*> responses = decoder.decode(buffer.c_str(), buffer.length());
+ memory::shared_ptr<string> buffer(new string());
- if (decoder.failed() || responses.empty()) {
- for (size_t i = 0; i < responses.size(); ++i) {
- delete responses[i];
+ CHECK_EQ(Response::PIPE, pipeResponse.type);
+ CHECK_SOME(pipeResponse.reader);
+
+ Pipe::Reader reader = pipeResponse.reader.get();
+
+ return reader.read()
+ .then(lambda::bind(&_convert, reader, buffer, lambda::_1))
+ .then(lambda::bind(&__convert, pipeResponse, lambda::_1));
+}
+
+
+Future<string> _convert(
+ Pipe::Reader reader,
+ const memory::shared_ptr<string>& buffer,
+ const string& read)
+{
+ if (read.empty()) { // EOF.
+ return *buffer;
+ }
+
+ buffer->append(read);
+
+ return reader.read()
+ .then(lambda::bind(&_convert, reader, buffer, lambda::_1));
+}
+
+
+Response __convert(const Response& pipeResponse, const string& body)
+{
+ Response bodyResponse = pipeResponse;
+ bodyResponse.type = Response::BODY;
+ bodyResponse.body = body;
+ bodyResponse.reader = None(); // Remove the reader.
+ return bodyResponse;
+}
+
+
+// Forward declaration.
+void _decode(
+ Socket socket,
+ Owned<StreamingResponseDecoder> decoder,
+ const Future<string>& data);
+
+
+Future<Response> decode(
+ Socket socket,
+ Owned<StreamingResponseDecoder> decoder,
+ const string& data)
+{
+ deque<Response*> responses = decoder->decode(data.c_str(), data.length());
+
+ if (decoder->failed() || responses.size() > 1) {
+ foreach (Response* response, responses) {
+ delete response;
}
- return Failure("Failed to decode HTTP response:\n" + buffer + "\n");
- } else if (responses.size() > 1) {
- PLOG(ERROR) << "Received more than 1 HTTP Response";
+ return Failure(string("Failed to decode HTTP response") +
+ (responses.size() > 1 ? ": more than one response received" : ""));
}
- Response response = *responses[0];
- for (size_t i = 0; i < responses.size(); ++i) {
- delete responses[i];
+ if (responses.empty()) {
+ // Keep reading until the headers are complete.
+ return socket.recv(None())
+ .then(lambda::bind(&decode, socket, decoder, lambda::_1));
+ }
+
+ // Keep feeding data to the decoder until EOF or a 'recv' failure.
+ if (!data.empty()) {
+ socket.recv(None())
+ .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
}
+ Response response = *responses[0];
+ delete responses[0];
return response;
}
+void _decode(
+ Socket socket,
+ Owned<StreamingResponseDecoder> decoder,
+ const Future<string>& data)
+{
+ deque<Response*> responses;
+
+ if (!data.isReady()) {
+ // Let the decoder process EOF if a failure
+ // or discard is encountered.
+ responses = decoder->decode("", 0);
+ } else {
+ responses = decoder->decode(data.get().c_str(), data.get().length());
+ }
+
+ // We're not expecting more responses to arrive on this socket!
+ if (!responses.empty() || decoder->failed()) {
+ VLOG(1) << "Failed to decode HTTP response: "
+ << (responses.size() > 1
+ ? ": more than one response received"
+ : "");
+
+ foreach (Response* response, responses) {
+ delete response;
+ }
+
+ return;
+ }
+
+ // Keep reading if the socket has more data.
+ if (data.isReady() && !data.get().empty()) {
+ socket.recv(None())
+ .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
+ }
+}
+
+
// Forward declaration.
Future<Response> _request(
Socket socket,
const Address& address,
const URL& url,
const string& method,
+ bool streamingResponse,
const Option<hashmap<string, string>>& _headers,
const Option<string>& body,
const Option<string>& contentType);
@@ -585,6 +693,7 @@ Future<Response> _request(
Future<Response> request(
const URL& url,
const string& method,
+ bool streamedResponse,
const Option<hashmap<string, string>>& headers,
const Option<string>& body,
const Option<string>& contentType)
@@ -626,6 +735,7 @@ Future<Response> request(
address,
url,
method,
+ streamedResponse,
headers,
body,
contentType));
@@ -637,6 +747,7 @@ Future<Response> _request(
const Address& address,
const URL& url,
const string& method,
+ bool streamedResponse,
const Option<hashmap<string, string>>& _headers,
const Option<string>& body,
const Option<string>& contentType)
@@ -685,6 +796,13 @@ Future<Response> _request(
headers["Content-Length"] = stringify(body.get().length());
}
+ // TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
+ // Currently this does not handle 'gzip' content encoding,
+ // unless the caller manually compresses the 'body'. For
+ // streaming requests we must wipe 'gzip' as an acceptable
+ // encoding as we don't currently have streaming gzip utilities
+ // to support decoding a streaming gzip response!
+
// Emit the headers.
foreachpair (const string& key, const string& value, headers) {
out << key << ": " << value << "\r\n";
@@ -699,13 +817,18 @@ Future<Response> _request(
// Need to disambiguate the Socket::recv for binding below.
Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv;
- // TODO(bmahler): For efficiency, this should properly use the
- // ResponseDecoder when reading, rather than parsing the full string
- // response.
- return socket.send(out.str())
- .then(lambda::function<Future<string>(void)>(
- lambda::bind(recv, socket, -1)))
- .then(lambda::bind(&internal::decode, lambda::_1));
+ Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder());
+
+ Future<Response> pipeResponse = socket.send(out.str())
+ .then(lambda::bind(recv, socket, None()))
+ .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1));
+
+ if (streamedResponse) {
+ return pipeResponse;
+ } else {
+ return pipeResponse
+ .then(lambda::bind(&internal::convert, lambda::_1));
+ }
}
} // namespace internal {
@@ -715,7 +838,7 @@ Future<Response> get(
const URL& url,
const Option<hashmap<string, string>>& headers)
{
- return internal::request(url, "GET", headers, None(), None());
+ return internal::request(url, "GET", false, headers, None(), None());
}
@@ -757,7 +880,7 @@ Future<Response> post(
return Failure("Attempted to do a POST with a Content-Type but no body");
}
- return internal::request(url, "POST", headers, body, contentType);
+ return internal::request(url, "POST", false, headers, body, contentType);
}
@@ -778,5 +901,77 @@ Future<Response> post(
return post(url, headers, body, contentType);
}
+
+namespace streaming {
+
+Future<Response> get(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers)
+{
+ return internal::request(url, "GET", true, headers, None(), None());
+}
+
+
+Future<Response> get(
+ const UPID& upid,
+ const Option<string>& path,
+ const Option<string>& query,
+ const Option<hashmap<string, string>>& headers)
+{
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+ if (path.isSome()) {
+ // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+ url.path = strings::join("/", url.path, path.get());
+ }
+
+ if (query.isSome()) {
+ Try<hashmap<string, string>> decode = http::query::decode(
+ strings::remove(query.get(), "?", strings::PREFIX));
+
+ if (decode.isError()) {
+ return Failure("Failed to decode HTTP query string: " + decode.error());
+ }
+
+ url.query = decode.get();
+ }
+
+ return streaming::get(url, headers);
+}
+
+
+Future<Response> post(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ if (body.isNone() && contentType.isSome()) {
+ return Failure("Attempted to do a POST with a Content-Type but no body");
+ }
+
+ return internal::request(url, "POST", true, headers, body, contentType);
+}
+
+
+Future<Response> post(
+ const UPID& upid,
+ const Option<string>& path,
+ const Option<hashmap<string, string>>& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+ if (path.isSome()) {
+ // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+ url.path = strings::join("/", url.path, path.get());
+ }
+
+ return streaming::post(url, headers, body, contentType);
+}
+
+} // namespace streaming {
+
} // namespace http {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 83219da..dfdb233 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -414,6 +414,90 @@ TEST(HTTP, Get)
}
+TEST(HTTP, StreamingGetComplete)
+{
+ Http http;
+
+ http::Pipe pipe;
+ http::OK ok;
+ ok.type = http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ EXPECT_CALL(*http.process, pipe(_))
+ .WillOnce(Return(ok));
+
+ Future<http::Response> response =
+ http::streaming::get(http.process->self(), "pipe");
+
+ // The response should be ready since the headers were sent.
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+ ASSERT_EQ(http::Response::PIPE, response.get().type);
+ ASSERT_SOME(response.get().reader);
+
+ http::Pipe::Reader reader = response.get().reader.get();
+
+ // There is no data to read yet.
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Stream data into the body and read it from the response.
+ http::Pipe::Writer writer = pipe.writer();
+ EXPECT_TRUE(writer.write("hello"));
+ AWAIT_EQ("hello", read);
+
+ EXPECT_TRUE(writer.write("goodbye"));
+ AWAIT_EQ("goodbye", reader.read());
+
+ // Complete the response.
+ EXPECT_TRUE(writer.close());
+ AWAIT_EQ("", reader.read()); // EOF.
+}
+
+
+TEST(HTTP, StreamingGetFailure)
+{
+ Http http;
+
+ http::Pipe pipe;
+ http::OK ok;
+ ok.type = http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ EXPECT_CALL(*http.process, pipe(_))
+ .WillOnce(Return(ok));
+
+ Future<http::Response> response =
+ http::streaming::get(http.process->self(), "pipe");
+
+ // The response should be ready since the headers were sent.
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+ ASSERT_EQ(http::Response::PIPE, response.get().type);
+ ASSERT_SOME(response.get().reader);
+
+ http::Pipe::Reader reader = response.get().reader.get();
+
+ // There is no data to read yet.
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Stream data into the body and read it from the response.
+ http::Pipe::Writer writer = pipe.writer();
+ EXPECT_TRUE(writer.write("hello"));
+ AWAIT_EQ("hello", read);
+
+ EXPECT_TRUE(writer.write("goodbye"));
+ AWAIT_EQ("goodbye", reader.read());
+
+ // Fail the response.
+ EXPECT_TRUE(writer.fail("oops"));
+ AWAIT_FAILED(reader.read());
+}
+
+
http::Response validatePost(const http::Request& request)
{
EXPECT_EQ("POST", request.method);