You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/31 02:12:43 UTC

[1/3] mesos git commit: Added failure semantics for http::Pipe::Writer.

Repository: mesos
Updated Branches:
  refs/heads/master f98f26fa5 -> f7fccce2e


Added failure semantics for http::Pipe::Writer.

Review: https://reviews.apache.org/r/32346


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b0bba19
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b0bba19
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b0bba19

Branch: refs/heads/master
Commit: 8b0bba195058c9f209e9b4bb9716fb805161e847
Parents: f98f26f
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 11:32:04 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:38:21 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 46 +++++++++++++++------
 3rdparty/libprocess/src/http.cpp             | 50 +++++++++++++++++++----
 3rdparty/libprocess/src/tests/http_tests.cpp | 32 +++++++++++++++
 3 files changed, 106 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index faffae7..a9ef5b7 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -117,17 +117,26 @@ public:
   public:
     // Returns data written to the pipe.
     // Returns an empty read when end-of-file is reached.
-    // Returns Failure if the read-end of the pipe is closed.
+    // Returns Failure if the writer failed, or the read-end
+    // is closed.
     Future<std::string> read();
 
     // Closing the read-end of the pipe before the write-end closes
-    // will notify the writer that the reader is no longer interested.
-    // Returns false if the read-end of the pipe was already closed.
+    // or fails will notify the writer that the reader is no longer
+    // interested. Returns false if the read-end was already closed.
     bool close();
 
   private:
     friend class Pipe;
+
+    enum State
+    {
+      OPEN,
+      CLOSED,
+    };
+
     explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {}
+
     memory::shared_ptr<Data> data;
   };
 
@@ -141,9 +150,14 @@ public:
 
     // Closing the write-end of the pipe will send end-of-file
     // to the reader. Returns false if the write-end of the pipe
-    // was already closed.
+    // was already closed or failed.
     bool close();
 
+    // Closes the write-end of the pipe but sends a failure
+    // to the reader rather than end-of-file. Returns false
+    // if the write-end of the pipe was already closed or failed.
+    bool fail(const std::string& message);
+
     // Returns Nothing when the read-end of the pipe is closed
     // before the write-end is closed, which means the reader
     // was unable to continue reading!
@@ -151,7 +165,16 @@ public:
 
   private:
     friend class Pipe;
+
+    enum State
+    {
+      OPEN,
+      CLOSED,
+      FAILED,
+    };
+
     explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {}
+
     memory::shared_ptr<Data> data;
   };
 
@@ -161,23 +184,17 @@ public:
   Writer writer() const;
 
 private:
-  enum State
-  {
-    OPEN,
-    CLOSED,
-  };
-
   struct Data
   {
-    Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {}
+    Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
 
     // Rather than use a process to serialize access to the pipe's
     // internal data we use a low-level "lock" which we acquire and
     // release using atomic builtins.
     int lock;
 
-    State readEnd;
-    State writeEnd;
+    Reader::State readEnd;
+    Writer::State writeEnd;
 
     // Represents readers waiting for data from the pipe.
     std::queue<Owned<Promise<std::string>>> reads;
@@ -188,6 +205,9 @@ private:
 
     // Signals when the read-end is closed before the write-end.
     Promise<Nothing> readerClosure;
+
+    // Failure reason when the 'writeEnd' is FAILED.
+    Option<Failure> failure;
   };
 
   memory::shared_ptr<Data> data;

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 150ff33..cd52cc8 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -180,13 +180,16 @@ Future<string> Pipe::Reader::read()
 
   process::internal::acquire(&data->lock);
   {
-    if (data->readEnd == CLOSED) {
+    if (data->readEnd == Reader::CLOSED) {
       future = Failure("closed");
     } else if (!data->writes.empty()) {
       future = data->writes.front();
       data->writes.pop();
-    } else if (data->writeEnd == CLOSED) {
+    } else if (data->writeEnd == Writer::CLOSED) {
       future = ""; // End-of-file.
+    } else if (data->writeEnd == Writer::FAILED) {
+      CHECK_SOME(data->failure);
+      future = data->failure.get();
     } else {
       data->reads.push(Owned<Promise<string>>(new Promise<string>()));
       future = data->reads.back()->future();
@@ -206,7 +209,7 @@ bool Pipe::Reader::close()
 
   process::internal::acquire(&data->lock);
   {
-    if (data->readEnd == OPEN) {
+    if (data->readEnd == Reader::OPEN) {
       // Throw away outstanding data.
       while (!data->writes.empty()) {
         data->writes.pop();
@@ -216,10 +219,10 @@ bool Pipe::Reader::close()
       std::swap(data->reads, reads);
 
       closed = true;
-      data->readEnd = CLOSED;
+      data->readEnd = Reader::CLOSED;
 
       // Notify if write-end is still open!
-      notify = data->writeEnd == OPEN;
+      notify = data->writeEnd == Writer::OPEN;
     }
   }
   process::internal::release(&data->lock);
@@ -248,8 +251,8 @@ bool Pipe::Writer::write(const string& s)
 
   process::internal::acquire(&data->lock);
   {
-    // Ignore writes if either end of the pipe is closed!
-    if (data->writeEnd == OPEN && data->readEnd == OPEN) {
+    // Ignore writes if either end of the pipe is closed or failed!
+    if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
       // Don't bother surfacing empty writes to the readers.
       if (!s.empty()) {
         if (data->reads.empty()) {
@@ -281,11 +284,11 @@ bool Pipe::Writer::close()
 
   process::internal::acquire(&data->lock);
   {
-    if (data->writeEnd == OPEN) {
+    if (data->writeEnd == Writer::OPEN) {
       // Extract all the pending reads so we can complete them.
       std::swap(data->reads, reads);
 
-      data->writeEnd = CLOSED;
+      data->writeEnd = Writer::CLOSED;
       closed = true;
     }
   }
@@ -302,6 +305,35 @@ bool Pipe::Writer::close()
 }
 
 
+bool Pipe::Writer::fail(const string& message)
+{
+  bool failed = false;
+  queue<Owned<Promise<string>>> reads;
+
+  process::internal::acquire(&data->lock);
+  {
+    if (data->writeEnd == Writer::OPEN) {
+      // Extract all the pending reads so we can fail them.
+      std::swap(data->reads, reads);
+
+      data->writeEnd = Writer::FAILED;
+      data->failure = Failure(message);
+      failed = true;
+    }
+  }
+  process::internal::release(&data->lock);
+
+  // NOTE: We set the promises outside the critical section to avoid
+  // triggering callbacks that try to reacquire the lock.
+  while (!reads.empty()) {
+    reads.front()->fail(message);
+    reads.pop();
+  }
+
+  return failed;
+}
+
+
 Future<Nothing> Pipe::Writer::readerClosed()
 {
   return data->readerClosure.future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index e254506..83219da 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -230,6 +230,38 @@ TEST(HTTP, PipeEOF)
 }
 
 
+TEST(HTTP, PipeFailure)
+{
+  http::Pipe pipe;
+  http::Pipe::Reader reader = pipe.reader();
+  http::Pipe::Writer writer = pipe.writer();
+
+  // Fail the writer after writing some data.
+  EXPECT_TRUE(writer.write("hello"));
+  EXPECT_TRUE(writer.write("world"));
+
+  EXPECT_TRUE(writer.fail("disconnected!"));
+
+  // The reader should read the data, followed by the failure.
+  AWAIT_EQ("hello", reader.read());
+  AWAIT_EQ("world", reader.read());
+
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isFailed());
+  EXPECT_EQ("disconnected!", read.failure());
+
+  // The writer cannot close or fail an already failed pipe.
+  EXPECT_FALSE(writer.close());
+  EXPECT_FALSE(writer.fail("not again"));
+
+  // The writer shouldn't be notified of the reader closing,
+  // since the writer had already failed.
+  EXPECT_TRUE(reader.close());
+  EXPECT_TRUE(writer.readerClosed().isPending());
+}
+
+
+
 TEST(HTTP, PipeReaderCloses)
 {
   http::Pipe pipe;


[2/3] mesos git commit: Added a StreamingResponseDecoder.

Posted by bm...@apache.org.
Added a StreamingResponseDecoder.

Review: https://reviews.apache.org/r/32347


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6ac8eb1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6ac8eb1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6ac8eb1c

Branch: refs/heads/master
Commit: 6ac8eb1c524dc4adbc09d20dcb8e4e31d60eeb56
Parents: 8b0bba1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 11:44:51 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:38:22 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/decoder.hpp             | 232 +++++++++++++++++++
 3rdparty/libprocess/src/tests/decoder_tests.cpp |  91 ++++++++
 2 files changed, 323 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index b3a667c..56adde0 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -14,6 +14,7 @@
 
 #include <stout/foreach.hpp>
 #include <stout/gzip.hpp>
+#include <stout/option.hpp>
 #include <stout/try.hpp>
 
 
@@ -469,6 +470,237 @@ private:
 };
 
 
+// Provides a response decoder that returns 'PIPE' responses once
+// the response headers are received, but before the body data
+// is received. Callers are expected to read the body from the
+// Pipe::Reader in the response.
+//
+// TODO(bmahler): Consolidate streaming and non-streaming response
+// decoders.
+class StreamingResponseDecoder
+{
+public:
+  StreamingResponseDecoder()
+    : failure(false), header(HEADER_FIELD), response(NULL)
+  {
+    settings.on_message_begin =
+      &StreamingResponseDecoder::on_message_begin;
+
+#if !(HTTP_PARSER_VERSION_MAJOR >=2)
+    settings.on_path =
+      &StreamingResponseDecoder::on_path;
+    settings.on_fragment =
+      &StreamingResponseDecoder::on_fragment;
+    settings.on_query_string =
+      &StreamingResponseDecoder::on_query_string;
+#endif
+
+    settings.on_url =
+      &StreamingResponseDecoder::on_url;
+    settings.on_header_field =
+      &StreamingResponseDecoder::on_header_field;
+    settings.on_header_value =
+      &StreamingResponseDecoder::on_header_value;
+    settings.on_headers_complete =
+      &StreamingResponseDecoder::on_headers_complete;
+    settings.on_body =
+      &StreamingResponseDecoder::on_body;
+    settings.on_message_complete =
+      &StreamingResponseDecoder::on_message_complete;
+
+    http_parser_init(&parser, HTTP_RESPONSE);
+
+    parser.data = this;
+  }
+
+  std::deque<http::Response*> decode(const char* data, size_t length)
+  {
+    size_t parsed = http_parser_execute(&parser, &settings, data, length);
+
+    if (parsed != length) {
+      // TODO(bmahler): joyent/http-parser exposes error reasons.
+      failure = true;
+
+      // If we're still writing the body, fail the writer!
+      if (writer.isSome()) {
+        http::Pipe::Writer writer_ = writer.get(); // Remove const.
+        writer_.fail("failed to decode body");
+        writer = None();
+      }
+    }
+
+    if (!responses.empty()) {
+      std::deque<http::Response*> result = responses;
+      responses.clear();
+      return result;
+    }
+
+    return std::deque<http::Response*>();
+  }
+
+  bool failed() const
+  {
+    return failure;
+  }
+
+private:
+  static int on_message_begin(http_parser* p)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK(!decoder->failure);
+
+    decoder->header = HEADER_FIELD;
+    decoder->field.clear();
+    decoder->value.clear();
+
+    CHECK(decoder->response == NULL);
+    CHECK(decoder->writer.isNone());
+
+    decoder->response = new http::Response();
+    decoder->response->type = http::Response::PIPE;
+    decoder->writer = None();
+
+    return 0;
+  }
+
+#if !(HTTP_PARSER_VERSION_MAJOR >= 2)
+  static int on_path(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+
+  static int on_query_string(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+
+  static int on_fragment(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+#endif // !(HTTP_PARSER_VERSION_MAJOR >= 2)
+
+  static int on_url(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+
+  static int on_header_field(http_parser* p, const char* data, size_t length)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_NOTNULL(decoder->response);
+
+    if (decoder->header != HEADER_FIELD) {
+      decoder->response->headers[decoder->field] = decoder->value;
+      decoder->field.clear();
+      decoder->value.clear();
+    }
+
+    decoder->field.append(data, length);
+    decoder->header = HEADER_FIELD;
+
+    return 0;
+  }
+
+  static int on_header_value(http_parser* p, const char* data, size_t length)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_NOTNULL(decoder->response);
+
+    decoder->value.append(data, length);
+    decoder->header = HEADER_VALUE;
+    return 0;
+  }
+
+  static int on_headers_complete(http_parser* p)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_NOTNULL(decoder->response);
+
+    // Add final header.
+    decoder->response->headers[decoder->field] = decoder->value;
+    decoder->field.clear();
+    decoder->value.clear();
+
+    // Get the response status string.
+    if (http::statuses.contains(decoder->parser.status_code)) {
+      decoder->response->status = http::statuses[decoder->parser.status_code];
+    } else {
+      decoder->failure = true;
+      return 1;
+    }
+
+    // We cannot provide streaming gzip decompression!
+    Option<std::string> encoding =
+      decoder->response->headers.get("Content-Encoding");
+    if (encoding.isSome() && encoding.get() == "gzip") {
+      decoder->failure = true;
+      return 1;
+    }
+
+    CHECK(decoder->writer.isNone());
+
+    http::Pipe pipe;
+    decoder->writer = pipe.writer();
+    decoder->response->reader = pipe.reader();
+
+    // Send the response to the caller, but keep a Pipe::Writer for
+    // streaming the body content into the response.
+    decoder->responses.push_back(decoder->response);
+    decoder->response = NULL;
+
+    return 0;
+  }
+
+  static int on_body(http_parser* p, const char* data, size_t length)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_SOME(decoder->writer);
+
+    http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+    writer.write(std::string(data, length));
+
+    return 0;
+  }
+
+  static int on_message_complete(http_parser* p)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_SOME(decoder->writer);
+
+    http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+    writer.close();
+
+    decoder->writer = None();
+
+    return 0;
+  }
+
+  bool failure;
+
+  http_parser parser;
+  http_parser_settings settings;
+
+  enum {
+    HEADER_FIELD,
+    HEADER_VALUE
+  } header;
+
+  std::string field;
+  std::string value;
+
+  http::Response* response;
+  Option<http::Pipe::Writer> writer;
+
+  std::deque<http::Response*> responses;
+};
+
 }  // namespace process {
 
 #endif // __DECODER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index b996f00..efe364a 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -133,3 +133,94 @@ TEST(Decoder, Response)
 
   delete response;
 }
+
+
+TEST(Decoder, StreamingResponse)
+{
+  StreamingResponseDecoder decoder;
+
+  const string& headers =
+    "HTTP/1.1 200 OK\r\n"
+    "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+    "Content-Type: text/plain\r\n"
+    "Content-Length: 2\r\n"
+    "\r\n";
+
+  const string& body = "hi";
+
+  deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+  ASSERT_FALSE(decoder.failed());
+  ASSERT_EQ(1, responses.size());
+
+  Response* response = responses[0];
+
+  EXPECT_EQ("200 OK", response->status);
+  EXPECT_EQ(3, response->headers.size());
+
+  ASSERT_EQ(Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  decoder.decode(body.data(), body.length());
+
+  // Feeding EOF to the decoder should be ok.
+  decoder.decode("", 0);
+
+  EXPECT_TRUE(read.isReady());
+  EXPECT_EQ("hi", read.get());
+
+  // Response should be complete.
+  read = reader.read();
+  EXPECT_TRUE(read.isReady());
+  EXPECT_EQ("", read.get());
+}
+
+
+TEST(Decoder, StreamingResponseFailure)
+{
+  StreamingResponseDecoder decoder;
+
+  const string& headers =
+    "HTTP/1.1 200 OK\r\n"
+    "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+    "Content-Type: text/plain\r\n"
+    "Content-Length: 2\r\n"
+    "\r\n";
+
+  // The body is shorter than the content length!
+  const string& body = "1";
+
+  deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+  ASSERT_FALSE(decoder.failed());
+  ASSERT_EQ(1, responses.size());
+
+  Response* response = responses[0];
+
+  EXPECT_EQ("200 OK", response->status);
+  EXPECT_EQ(3, response->headers.size());
+
+  ASSERT_EQ(Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  decoder.decode(body.data(), body.length());
+
+  EXPECT_TRUE(read.isReady());
+  EXPECT_EQ("1", read.get());
+
+  // Body is not yet complete.
+  read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Feeding EOF to the decoder should trigger a failure!
+  decoder.decode("", 0);
+
+  EXPECT_TRUE(read.isFailed());
+  EXPECT_EQ("failed to decode body", read.failure());
+}


[3/3] mesos git commit: Added http::streaming::get/post for client-side streaming responses.

Posted by bm...@apache.org.
Added http::streaming::get/post for client-side streaming responses.

Review: https://reviews.apache.org/r/32351


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f7fccce2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f7fccce2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f7fccce2

Branch: refs/heads/master
Commit: f7fccce2e3208cfc6481151c2bb016727e5ebdfc
Parents: 6ac8eb1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 14:45:28 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:59:19 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp |  44 ++++
 3rdparty/libprocess/src/http.cpp             | 237 ++++++++++++++++++++--
 3rdparty/libprocess/src/tests/http_tests.cpp |  84 ++++++++
 3 files changed, 344 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index a9ef5b7..07825b2 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -583,6 +583,50 @@ Future<Response> post(
     const Option<std::string>& body = None(),
     const Option<std::string>& contentType = None());
 
+
+namespace streaming {
+
+// Asynchronously sends an HTTP GET request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> get(
+    const URL& url,
+    const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP GET request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> get(
+    const UPID& upid,
+    const Option<std::string>& path = None(),
+    const Option<std::string>& query = None(),
+    const Option<hashmap<std::string, std::string>>& headers = None());
+
+// Asynchronously sends an HTTP POST request to the specified URL
+// and returns the HTTP response of type 'PIPE' once the response
+// headers are received. The caller must read the response body
+// from the Pipe::Reader.
+Future<Response> post(
+    const URL& url,
+    const Option<hashmap<std::string, std::string>>& headers = None(),
+    const Option<std::string>& body = None(),
+    const Option<std::string>& contentType = None());
+
+// Asynchronously sends an HTTP POST request to the process with the
+// given UPID and returns the HTTP response of type 'PIPE' once the
+// response headers are received. The caller must read the response
+// body from the Pipe::Reader.
+Future<Response> post(
+    const UPID& upid,
+    const Option<std::string>& path = None(),
+    const Option<hashmap<std::string, std::string>>& headers = None(),
+    const Option<std::string>& body = None(),
+    const Option<std::string>& contentType = None());
+
+} // namespace streaming {
+
 } // namespace http {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index cd52cc8..7e6cbd3 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -548,35 +548,143 @@ ostream& operator << (
 
 namespace internal {
 
-Future<Response> decode(const string& buffer)
+// Forward declarations.
+Future<string> _convert(
+    Pipe::Reader reader,
+    const memory::shared_ptr<string>& buffer,
+    const string& read);
+Response __convert(
+    const Response& pipeResponse,
+    const string& body);
+
+
+// Returns a 'BODY' response once the body of the provided
+// 'PIPE' response can be read completely.
+Future<Response> convert(const Response& pipeResponse)
 {
-  ResponseDecoder decoder;
-  deque<Response*> responses = decoder.decode(buffer.c_str(), buffer.length());
+  memory::shared_ptr<string> buffer(new string());
 
-  if (decoder.failed() || responses.empty()) {
-    for (size_t i = 0; i < responses.size(); ++i) {
-      delete responses[i];
+  CHECK_EQ(Response::PIPE, pipeResponse.type);
+  CHECK_SOME(pipeResponse.reader);
+
+  Pipe::Reader reader = pipeResponse.reader.get();
+
+  return reader.read()
+    .then(lambda::bind(&_convert, reader, buffer, lambda::_1))
+    .then(lambda::bind(&__convert, pipeResponse, lambda::_1));
+}
+
+
+Future<string> _convert(
+    Pipe::Reader reader,
+    const memory::shared_ptr<string>& buffer,
+    const string& read)
+{
+  if (read.empty()) { // EOF.
+    return *buffer;
+  }
+
+  buffer->append(read);
+
+  return reader.read()
+    .then(lambda::bind(&_convert, reader, buffer, lambda::_1));
+}
+
+
+Response __convert(const Response& pipeResponse, const string& body)
+{
+  Response bodyResponse = pipeResponse;
+  bodyResponse.type = Response::BODY;
+  bodyResponse.body = body;
+  bodyResponse.reader = None(); // Remove the reader.
+  return bodyResponse;
+}
+
+
+// Forward declaration.
+void _decode(
+    Socket socket,
+    Owned<StreamingResponseDecoder> decoder,
+    const Future<string>& data);
+
+
+Future<Response> decode(
+    Socket socket,
+    Owned<StreamingResponseDecoder> decoder,
+    const string& data)
+{
+  deque<Response*> responses = decoder->decode(data.c_str(), data.length());
+
+  if (decoder->failed() || responses.size() > 1) {
+    foreach (Response* response, responses) {
+      delete response;
     }
-    return Failure("Failed to decode HTTP response:\n" + buffer + "\n");
-  } else if (responses.size() > 1) {
-    PLOG(ERROR) << "Received more than 1 HTTP Response";
+    return Failure(string("Failed to decode HTTP response") +
+        (responses.size() > 1 ? ": more than one response received" : ""));
   }
 
-  Response response = *responses[0];
-  for (size_t i = 0; i < responses.size(); ++i) {
-    delete responses[i];
+  if (responses.empty()) {
+    // Keep reading until the headers are complete.
+    return socket.recv(None())
+      .then(lambda::bind(&decode, socket, decoder, lambda::_1));
+  }
+
+  // Keep feeding data to the decoder until EOF or a 'recv' failure.
+  if (!data.empty()) {
+    socket.recv(None())
+      .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
   }
 
+  Response response = *responses[0];
+  delete responses[0];
   return response;
 }
 
 
+void _decode(
+    Socket socket,
+    Owned<StreamingResponseDecoder> decoder,
+    const Future<string>& data)
+{
+  deque<Response*> responses;
+
+  if (!data.isReady()) {
+    // Let the decoder process EOF if a failure
+    // or discard is encountered.
+    responses = decoder->decode("", 0);
+  } else {
+    responses = decoder->decode(data.get().c_str(), data.get().length());
+  }
+
+  // We're not expecting more responses to arrive on this socket!
+  if (!responses.empty() || decoder->failed()) {
+    VLOG(1) << "Failed to decode HTTP response: "
+            << (responses.size() > 1
+                ? ": more than one response received"
+                : "");
+
+    foreach (Response* response, responses) {
+      delete response;
+    }
+
+    return;
+  }
+
+  // Keep reading if the socket has more data.
+  if (data.isReady() && !data.get().empty()) {
+    socket.recv(None())
+      .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1));
+  }
+}
+
+
 // Forward declaration.
 Future<Response> _request(
     Socket socket,
     const Address& address,
     const URL& url,
     const string& method,
+    bool streamingResponse,
     const Option<hashmap<string, string>>& _headers,
     const Option<string>& body,
     const Option<string>& contentType);
@@ -585,6 +693,7 @@ Future<Response> _request(
 Future<Response> request(
     const URL& url,
     const string& method,
+    bool streamedResponse,
     const Option<hashmap<string, string>>& headers,
     const Option<string>& body,
     const Option<string>& contentType)
@@ -626,6 +735,7 @@ Future<Response> request(
                        address,
                        url,
                        method,
+                       streamedResponse,
                        headers,
                        body,
                        contentType));
@@ -637,6 +747,7 @@ Future<Response> _request(
     const Address& address,
     const URL& url,
     const string& method,
+    bool streamedResponse,
     const Option<hashmap<string, string>>& _headers,
     const Option<string>& body,
     const Option<string>& contentType)
@@ -685,6 +796,13 @@ Future<Response> _request(
     headers["Content-Length"] = stringify(body.get().length());
   }
 
+  // TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
+  // Currently this does not handle 'gzip' content encoding,
+  // unless the caller manually compresses the 'body'. For
+  // streaming requests we must wipe 'gzip' as an acceptable
+  // encoding as we don't currently have streaming gzip utilities
+  // to support decoding a streaming gzip response!
+
   // Emit the headers.
   foreachpair (const string& key, const string& value, headers) {
     out << key << ": " << value << "\r\n";
@@ -699,13 +817,18 @@ Future<Response> _request(
   // Need to disambiguate the Socket::recv for binding below.
   Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv;
 
-  // TODO(bmahler): For efficiency, this should properly use the
-  // ResponseDecoder when reading, rather than parsing the full string
-  // response.
-  return socket.send(out.str())
-    .then(lambda::function<Future<string>(void)>(
-              lambda::bind(recv, socket, -1)))
-    .then(lambda::bind(&internal::decode, lambda::_1));
+  Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder());
+
+  Future<Response> pipeResponse = socket.send(out.str())
+    .then(lambda::bind(recv, socket, None()))
+    .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1));
+
+  if (streamedResponse) {
+    return pipeResponse;
+  } else {
+    return pipeResponse
+      .then(lambda::bind(&internal::convert, lambda::_1));
+  }
 }
 
 } // namespace internal {
@@ -715,7 +838,7 @@ Future<Response> get(
     const URL& url,
     const Option<hashmap<string, string>>& headers)
 {
-  return internal::request(url, "GET", headers, None(), None());
+  return internal::request(url, "GET", false, headers, None(), None());
 }
 
 
@@ -757,7 +880,7 @@ Future<Response> post(
     return Failure("Attempted to do a POST with a Content-Type but no body");
   }
 
-  return internal::request(url, "POST", headers, body, contentType);
+  return internal::request(url, "POST", false, headers, body, contentType);
 }
 
 
@@ -778,5 +901,77 @@ Future<Response> post(
   return post(url, headers, body, contentType);
 }
 
+
+namespace streaming {
+
+Future<Response> get(
+    const URL& url,
+    const Option<hashmap<string, string>>& headers)
+{
+  return internal::request(url, "GET", true, headers, None(), None());
+}
+
+
+Future<Response> get(
+    const UPID& upid,
+    const Option<string>& path,
+    const Option<string>& query,
+    const Option<hashmap<string, string>>& headers)
+{
+  URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+  if (path.isSome()) {
+    // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+    url.path = strings::join("/", url.path, path.get());
+  }
+
+  if (query.isSome()) {
+    Try<hashmap<string, string>> decode = http::query::decode(
+        strings::remove(query.get(), "?", strings::PREFIX));
+
+    if (decode.isError()) {
+      return Failure("Failed to decode HTTP query string: " + decode.error());
+    }
+
+    url.query = decode.get();
+  }
+
+  return streaming::get(url, headers);
+}
+
+
+Future<Response> post(
+    const URL& url,
+    const Option<hashmap<string, string>>& headers,
+    const Option<string>& body,
+    const Option<string>& contentType)
+{
+  if (body.isNone() && contentType.isSome()) {
+    return Failure("Attempted to do a POST with a Content-Type but no body");
+  }
+
+  return internal::request(url, "POST", true, headers, body, contentType);
+}
+
+
+Future<Response> post(
+    const UPID& upid,
+    const Option<string>& path,
+    const Option<hashmap<string, string>>& headers,
+    const Option<string>& body,
+    const Option<string>& contentType)
+{
+  URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+  if (path.isSome()) {
+    // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+    url.path = strings::join("/", url.path, path.get());
+  }
+
+  return streaming::post(url, headers, body, contentType);
+}
+
+} // namespace streaming {
+
 } // namespace http {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 83219da..dfdb233 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -414,6 +414,90 @@ TEST(HTTP, Get)
 }
 
 
+TEST(HTTP, StreamingGetComplete)
+{
+  Http http;
+
+  http::Pipe pipe;
+  http::OK ok;
+  ok.type = http::Response::PIPE;
+  ok.reader = pipe.reader();
+
+  EXPECT_CALL(*http.process, pipe(_))
+    .WillOnce(Return(ok));
+
+  Future<http::Response> response =
+    http::streaming::get(http.process->self(), "pipe");
+
+  // The response should be ready since the headers were sent.
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(http::Response::PIPE, response.get().type);
+  ASSERT_SOME(response.get().reader);
+
+  http::Pipe::Reader reader = response.get().reader.get();
+
+  // There is no data to read yet.
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Stream data into the body and read it from the response.
+  http::Pipe::Writer writer = pipe.writer();
+  EXPECT_TRUE(writer.write("hello"));
+  AWAIT_EQ("hello", read);
+
+  EXPECT_TRUE(writer.write("goodbye"));
+  AWAIT_EQ("goodbye", reader.read());
+
+  // Complete the response.
+  EXPECT_TRUE(writer.close());
+  AWAIT_EQ("", reader.read()); // EOF.
+}
+
+
+TEST(HTTP, StreamingGetFailure)
+{
+  Http http;
+
+  http::Pipe pipe;
+  http::OK ok;
+  ok.type = http::Response::PIPE;
+  ok.reader = pipe.reader();
+
+  EXPECT_CALL(*http.process, pipe(_))
+    .WillOnce(Return(ok));
+
+  Future<http::Response> response =
+    http::streaming::get(http.process->self(), "pipe");
+
+  // The response should be ready since the headers were sent.
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding"));
+  ASSERT_EQ(http::Response::PIPE, response.get().type);
+  ASSERT_SOME(response.get().reader);
+
+  http::Pipe::Reader reader = response.get().reader.get();
+
+  // There is no data to read yet.
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Stream data into the body and read it from the response.
+  http::Pipe::Writer writer = pipe.writer();
+  EXPECT_TRUE(writer.write("hello"));
+  AWAIT_EQ("hello", read);
+
+  EXPECT_TRUE(writer.write("goodbye"));
+  AWAIT_EQ("goodbye", reader.read());
+
+  // Fail the response.
+  EXPECT_TRUE(writer.fail("oops"));
+  AWAIT_FAILED(reader.read());
+}
+
+
 http::Response validatePost(const http::Request& request)
 {
   EXPECT_EQ("POST", request.method);