You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/11/22 03:27:22 UTC

[02/13] mesos git commit: Added support for request streaming to the connection abstraction.

Added support for request streaming to the connection abstraction.

This required modifications to the `encode()` method to return
a `Pipe::Reader` instead of the request body. The `send()` then
reads from this pipe to send the request via the socket.

Review: https://reviews.apache.org/r/53489/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a24cb498
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a24cb498
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a24cb498

Branch: refs/heads/master
Commit: a24cb4985c2333e2d15eeb8f971242f1754f81ab
Parents: d06d745
Author: Anand Mazumdar <an...@apache.org>
Authored: Mon Nov 21 18:08:18 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Mon Nov 21 19:27:08 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/http.cpp | 104 +++++++++++++++++++++++++++++++---
 1 file changed, 97 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a24cb498/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 0c2dfd3..3f16f29 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -866,7 +866,14 @@ ostream& operator<<(ostream& stream, const URL& url)
 
 namespace internal {
 
-string encode(const Request& request)
+void _encode(Pipe::Reader reader, Pipe::Writer writer); // Forward declaration.
+
+
+// Encodes the request by writing into a pipe, the caller can
+// read the encoded data from the returned read end of the pipe.
+// A pipe is used since the request body can be a pipe and must
+// be read asynchronously.
+Pipe::Reader encode(const Request& request)
 {
   // TODO(bmahler): Replace this with a RequestEncoder.
   std::ostringstream out;
@@ -915,8 +922,18 @@ string encode(const Request& request)
     headers["Connection"] = "close";
   }
 
-  // Make sure the Content-Length is set correctly.
-  headers["Content-Length"] = stringify(request.body.length());
+  if (request.type == Request::PIPE) {
+    CHECK(!headers.contains("Content-Length"));
+
+    // Make sure the Transfer-Encoding header is set correctly
+    // for PIPE requests.
+    headers["Transfer-Encoding"] = "chunked";
+  } else {
+    CHECK_EQ(Request::BODY, request.type);
+
+    // Make sure the Content-Length header is set correctly.
+    headers["Content-Length"] = stringify(request.body.length());
+  }
 
   // TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
   // Currently this does not handle 'gzip' content encoding,
@@ -931,9 +948,54 @@ string encode(const Request& request)
   }
 
   out << "\r\n";
-  out << request.body;
 
-  return out.str();
+  Pipe pipe;
+  Pipe::Reader reader = pipe.reader();
+  Pipe::Writer writer = pipe.writer();
+
+  // Write the head of the request.
+  writer.write(out.str());
+
+  switch (request.type) {
+    case Request::BODY:
+      writer.write(request.body);
+      writer.close();
+      break;
+    case Request::PIPE:
+      CHECK_SOME(request.reader);
+      CHECK(request.body.empty());
+      _encode(request.reader.get(), writer);
+      break;
+  }
+
+  return reader;
+}
+
+
+void _encode(Pipe::Reader reader, Pipe::Writer writer)
+{
+  reader.read()
+    .onAny([reader, writer](const Future<string>& chunk) mutable {
+      if (!chunk.isReady()) {
+        writer.fail(chunk.isFailed() ? chunk.failure() : "discarded");
+        return;
+      }
+
+      if (chunk->empty()) {
+        // EOF case.
+        writer.write("0\r\n\r\n");
+        writer.close();
+        return;
+      }
+
+      std::ostringstream out;
+      out << std::hex << chunk->size() << "\r\n";
+      out << chunk.get();
+      out << "\r\n";
+
+      writer.write(out.str());
+      _encode(reader, writer);
+    });
 }
 
 
@@ -976,6 +1038,21 @@ public:
       return Failure("Cannot pipeline after 'Connection: close'");
     }
 
+    if (request.type == Request::PIPE) {
+      if (request.reader.isNone()) {
+        return Failure("Request reader must be set for PIPE request");
+      }
+
+      if (!request.body.empty()) {
+        return Failure("Request body must be empty for PIPE request");
+      }
+
+      Option<string> contentLength = request.headers.get("Content-Length");
+      if (request.headers.contains("Content-Length")) {
+        return Failure("'Content-Length' cannot be set for PIPE request");
+      }
+    }
+
     if (!request.keepAlive) {
       close = true;
     }
@@ -985,8 +1062,8 @@ public:
     Socket socket_ = socket;
 
     sendChain = sendChain
-      .then([socket_, request]() mutable {
-        return socket_.send(encode(request));
+      .then([socket_, request]() {
+        return _send(socket_, encode(request));
       });
 
     // If we can't write to the socket, disconnect.
@@ -1045,6 +1122,19 @@ protected:
   }
 
 private:
+  static Future<Nothing> _send(Socket socket, Pipe::Reader reader)
+  {
+    return reader.read()
+      .then([socket, reader](const string& data) mutable -> Future<Nothing> {
+        if (data.empty()) {
+          return Nothing(); // EOF.
+        }
+
+        return socket.send(data)
+          .then(lambda::bind(_send, socket, reader));
+      });
+  }
+
   void read()
   {
     socket.recv()