You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/31 02:12:45 UTC

[3/3] mesos git commit: Added http::streaming::get/post for client-side streaming responses.

Added http::streaming::get/post for client-side streaming responses.

Review: https://reviews.apache.org/r/32351


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f7fccce2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f7fccce2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f7fccce2

Branch: refs/heads/master
Commit: f7fccce2e3208cfc6481151c2bb016727e5ebdfc
Parents: 6ac8eb1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 14:45:28 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:59:19 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp |  44 ++++
 3rdparty/libprocess/src/http.cpp             | 237 ++++++++++++++++++++--
 3rdparty/libprocess/src/tests/http_tests.cpp |  84 ++++++++
 3 files changed, 344 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index a9ef5b7..07825b2 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -583,6 +583,50 @@ Future<Response> post(
     const Option<std::string>& body = None(),
     const Option<std::string>& contentType = None());
 
+
+namespace streaming {
+
+// Asynchronously sends an HTTP GET request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> get(
+    const URL& url,
+    const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP GET request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> get(
+    const UPID& upid,
+    const Option<std::string>& path = None(),
+    const Option<std::string>& query = None(),
+    const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP POST request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> post(
+    const URL& url,
+    const Option<hashmap<std::string, std::string>>& headers = None(),
+    const Option<std::string>& body = None(),
+    const Option<std::string>& contentType = None());
+
+// Asynchronously sends an HTTP POST request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> post(
+    const UPID& upid,
+    const Option<std::string>& path = None(),
+    const Option<hashmap<std::string, std::string>>& headers = None(),
+    const Option<std::string>& body = None(),
+    const Option<std::string>& contentType = None());
+
+} // namespace streaming {
+
 } // namespace http {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index cd52cc8..7e6cbd3 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -548,35 +548,143 @@ ostream& operator << (
 
 namespace internal {
 
-Future<Response> decode(const string& buffer)
+// Forward declarations.
+Future<string> _convert(
+    Pipe::Reader reader,
+    const memory::shared_ptr<string>& buffer,
+    const string& read);
+Response __convert(
+    const Response& pipeResponse,
+    const string& body);
+
+
+// Returns a 'BODY' response once the body of the provided
+// 'PIPE' response can be read completely.
+Future<Response> convert(const Response& pipeResponse)
 {
-  ResponseDecoder decoder;
-  deque<Response*> responses = decoder.decode(buffer.c_str(), buffer.length());
+  memory::shared_ptr<string> buffer(new string());
 
-  if (decoder.failed() || responses.empty()) {
-    for (size_t i = 0; i < responses.size(); ++i) {
-      delete responses[i];
+  CHECK_EQ(Response::PIPE, pipeResponse.type);
+  CHECK_SOME(pipeResponse.reader);
+
+  Pipe::Reader reader = pipeResponse.reader.get();
+
+  return reader.read()
+    .then(lambda::bind(&_convert, reader, buffer, lambda::_1))
+    .then(lambda::bind(&__convert, pipeResponse, lambda::_1));
+}
+
+
+Future<string> _convert(
+    Pipe::Reader reader,
+    const memory::shared_ptr<string>& buffer,
+    const string& read)
+{
+  if (read.empty()) { // EOF.
+    return *buffer;
+  }
+
+  buffer->append(read);
+
+  return reader.read()
+    .then(lambda::bind(&_convert, reader, buffer, lambda::_1));
+}
+
+
+Response __convert(const Response& pipeResponse, const string& body)
+{
+  Response bodyResponse = pipeResponse;
+  bodyResponse.type = Response::BODY;
+  bodyResponse.body = body;
+  bodyResponse.reader = None(); // Remove the reader.
+  return bodyResponse;
+}
+
+
+// Forward declaration.
+void _decode(
+    Socket socket,
+    Owned<StreamingResponseDecoder> decoder,
+    const Future<string>& data);
+
+
+Future<Response> decode(
+    Socket socket,
+    Owned<StreamingResponseDecoder> decoder,
+    const string& data)
+{
+  deque<Response*> responses = decoder->decode(data.c_str(), data.length());
+
+  if (decoder->failed() || responses.size() > 1) {
+    foreach (Response* response, responses) {
+      delete response;
     }
-    return Failure("Failed to decode HTTP response:\n" + buffer + "\n");
-  } else if (responses.size() > 1) {
-    PLOG(ERROR) << "Received more than 1 HTTP Response";
+    return Failure(string("Failed to decode HTTP response") +
+        (responses.size() > 1 ? ": more than one response received" : ""));
   }
 
-  Response response = *responses[0];
-  for (size_t i = 0; i < responses.size(); ++i) {
-    delete responses[i];
+  if (responses.empty()) {
+    // Keep reading until the headers are complete.
+    return socket.recv(None())
+      .then(lambda::bind(&decode, socket, decoder, lambda::_1));
+  }
+
+  // Keep feeding data to the decoder until EOF or a 'recv' failure.
+  if (!data.empty()) {
+    socket.recv(None())
+      .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
   }
 
+  Response response = *responses[0];
+  delete responses[0];
   return response;
 }
 
 
+void _decode(
+    Socket socket,
+    Owned<StreamingResponseDecoder> decoder,
+    const Future<string>& data)
+{
+  deque<Response*> responses;
+
+  if (!data.isReady()) {
+    // Let the decoder process EOF if a failure
+    // or discard is encountered.
+    responses = decoder->decode("", 0);
+  } else {
+    responses = decoder->decode(data.get().c_str(), data.get().length());
+  }
+
+  // We're not expecting more responses to arrive on this socket!
+  if (!responses.empty() || decoder->failed()) {
+    VLOG(1) << "Failed to decode HTTP response: "
+            << (responses.size() > 1
+                ? ": more than one response received"
+                : "");
+
+    foreach (Response* response, responses) {
+      delete response;
+    }
+
+    return;
+  }
+
+  // Keep reading if the socket has more data.
+  if (data.isReady() && !data.get().empty()) {
+    socket.recv(None())
+      .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
+  }
+}
+
+
 // Forward declaration.
 Future<Response> _request(
     Socket socket,
     const Address& address,
     const URL& url,
     const string& method,
+    bool streamingResponse,
     const Option<hashmap<string, string>>& _headers,
     const Option<string>& body,
     const Option<string>& contentType);
@@ -585,6 +693,7 @@ Future<Response> _request(
 Future<Response> request(
     const URL& url,
     const string& method,
+    bool streamedResponse,
     const Option<hashmap<string, string>>& headers,
     const Option<string>& body,
     const Option<string>& contentType)
@@ -626,6 +735,7 @@ Future<Response> request(
                        address,
                        url,
                        method,
+                       streamedResponse,
                        headers,
                        body,
                        contentType));
@@ -637,6 +747,7 @@ Future<Response> _request(
     const Address& address,
     const URL& url,
     const string& method,
+    bool streamedResponse,
     const Option<hashmap<string, string>>& _headers,
     const Option<string>& body,
     const Option<string>& contentType)
@@ -685,6 +796,13 @@ Future<Response> _request(
     headers["Content-Length"] = stringify(body.get().length());
   }
 
+  // TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
+  // Currently this does not handle 'gzip' content encoding,
+  // unless the caller manually compresses the 'body'. For
+  // streaming requests we must wipe 'gzip' as an acceptable
+  // encoding as we don't currently have streaming gzip utilities
+  // to support decoding a streaming gzip response!
+
   // Emit the headers.
   foreachpair (const string& key, const string& value, headers) {
     out << key << ": " << value << "\r\n";
@@ -699,13 +817,18 @@ Future<Response> _request(
   // Need to disambiguate the Socket::recv for binding below.
   Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv;
 
-  // TODO(bmahler): For efficiency, this should properly use the
-  // ResponseDecoder when reading, rather than parsing the full string
-  // response.
-  return socket.send(out.str())
-    .then(lambda::function<Future<string>(void)>(
-              lambda::bind(recv, socket, -1)))
-    .then(lambda::bind(&internal::decode, lambda::_1));
+  Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder());
+
+  Future<Response> pipeResponse = socket.send(out.str())
+    .then(lambda::bind(recv, socket, None()))
+    .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1));
+
+  if (streamedResponse) {
+    return pipeResponse;
+  } else {
+    return pipeResponse
+      .then(lambda::bind(&internal::convert, lambda::_1));
+  }
 }
 
 } // namespace internal {
@@ -715,7 +838,7 @@ Future<Response> get(
     const URL& url,
     const Option<hashmap<string, string>>& headers)
 {
-  return internal::request(url, "GET", headers, None(), None());
+  return internal::request(url, "GET", false, headers, None(), None());
 }
 
 
@@ -757,7 +880,7 @@ Future<Response> post(
     return Failure("Attempted to do a POST with a Content-Type but no body");
   }
 
-  return internal::request(url, "POST", headers, body, contentType);
+  return internal::request(url, "POST", false, headers, body, contentType);
 }
 
 
@@ -778,5 +901,77 @@ Future<Response> post(
   return post(url, headers, body, contentType);
 }
 
+
+namespace streaming {
+
+Future<Response> get(
+    const URL& url,
+    const Option<hashmap<string, string>>& headers)
+{
+  return internal::request(url, "GET", true, headers, None(), None());
+}
+
+
+Future<Response> get(
+    const UPID& upid,
+    const Option<string>& path,
+    const Option<string>& query,
+    const Option<hashmap<string, string>>& headers)
+{
+  URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+  if (path.isSome()) {
+    // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+    url.path = strings::join("/", url.path, path.get());
+  }
+
+  if (query.isSome()) {
+    Try<hashmap<string, string>> decode = http::query::decode(
+        strings::remove(query.get(), "?", strings::PREFIX));
+
+    if (decode.isError()) {
+      return Failure("Failed to decode HTTP query string: " + decode.error());
+    }
+
+    url.query = decode.get();
+  }
+
+  return streaming::get(url, headers);
+}
+
+
+Future<Response> post(
+    const URL& url,
+    const Option<hashmap<string, string>>& headers,
+    const Option<string>& body,
+    const Option<string>& contentType)
+{
+  if (body.isNone() && contentType.isSome()) {
+    return Failure("Attempted to do a POST with a Content-Type but no body");
+  }
+
+  return internal::request(url, "POST", true, headers, body, contentType);
+}
+
+
+Future<Response> post(
+    const UPID& upid,
+    const Option<string>& path,
+    const Option<hashmap<string, string>>& headers,
+    const Option<string>& body,
+    const Option<string>& contentType)
+{
+  URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+  if (path.isSome()) {
+    // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+    url.path = strings::join("/", url.path, path.get());
+  }
+
+  return streaming::post(url, headers, body, contentType);
+}
+
+} // namespace streaming {
+
 } // namespace http {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 83219da..dfdb233 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -414,6 +414,90 @@ TEST(HTTP, Get)
 }
 
 
+TEST(HTTP, StreamingGetComplete)
+{
+  Http http;
+
+  http::Pipe pipe;
+  http::OK ok;
+  ok.type = http::Response::PIPE;
+  ok.reader = pipe.reader();
+
+  EXPECT_CALL(*http.process, pipe(_))
+    .WillOnce(Return(ok));
+
+  Future<http::Response> response =
+    http::streaming::get(http.process->self(), "pipe");
+
+  // The response should be ready since the headers were sent.
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(http::Response::PIPE, response.get().type);
+  ASSERT_SOME(response.get().reader);
+
+  http::Pipe::Reader reader = response.get().reader.get();
+
+  // There is no data to read yet.
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Stream data into the body and read it from the response.
+  http::Pipe::Writer writer = pipe.writer();
+  EXPECT_TRUE(writer.write("hello"));
+  AWAIT_EQ("hello", read);
+
+  EXPECT_TRUE(writer.write("goodbye"));
+  AWAIT_EQ("goodbye", reader.read());
+
+  // Complete the response.
+  EXPECT_TRUE(writer.close());
+  AWAIT_EQ("", reader.read()); // EOF.
+}
+
+
+TEST(HTTP, StreamingGetFailure)
+{
+  Http http;
+
+  http::Pipe pipe;
+  http::OK ok;
+  ok.type = http::Response::PIPE;
+  ok.reader = pipe.reader();
+
+  EXPECT_CALL(*http.process, pipe(_))
+    .WillOnce(Return(ok));
+
+  Future<http::Response> response =
+    http::streaming::get(http.process->self(), "pipe");
+
+  // The response should be ready since the headers were sent.
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(http::Response::PIPE, response.get().type);
+  ASSERT_SOME(response.get().reader);
+
+  http::Pipe::Reader reader = response.get().reader.get();
+
+  // There is no data to read yet.
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Stream data into the body and read it from the response.
+  http::Pipe::Writer writer = pipe.writer();
+  EXPECT_TRUE(writer.write("hello"));
+  AWAIT_EQ("hello", read);
+
+  EXPECT_TRUE(writer.write("goodbye"));
+  AWAIT_EQ("goodbye", reader.read());
+
+  // Fail the response.
+  EXPECT_TRUE(writer.fail("oops"));
+  AWAIT_FAILED(reader.read());
+}
+
+
 http::Response validatePost(const http::Request& request)
 {
   EXPECT_EQ("POST", request.method);