You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/11/22 03:27:22 UTC
[02/13] mesos git commit: Added support for request streaming to the
connection abstraction.
Added support for request streaming to the connection abstraction.
This required modifications to the `encode()` method to return
a `Pipe::Reader` instead of the request body. The `send()` then
reads from this pipe to send the request via the socket.
Review: https://reviews.apache.org/r/53489/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a24cb498
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a24cb498
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a24cb498
Branch: refs/heads/master
Commit: a24cb4985c2333e2d15eeb8f971242f1754f81ab
Parents: d06d745
Author: Anand Mazumdar <an...@apache.org>
Authored: Mon Nov 21 18:08:18 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Mon Nov 21 19:27:08 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/http.cpp | 104 +++++++++++++++++++++++++++++++---
1 file changed, 97 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a24cb498/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 0c2dfd3..3f16f29 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -866,7 +866,14 @@ ostream& operator<<(ostream& stream, const URL& url)
namespace internal {
-string encode(const Request& request)
+void _encode(Pipe::Reader reader, Pipe::Writer writer); // Forward declaration.
+
+
+// Encodes the request by writing into a pipe, the caller can
+// read the encoded data from the returned read end of the pipe.
+// A pipe is used since the request body can be a pipe and must
+// be read asynchronously.
+Pipe::Reader encode(const Request& request)
{
// TODO(bmahler): Replace this with a RequestEncoder.
std::ostringstream out;
@@ -915,8 +922,18 @@ string encode(const Request& request)
headers["Connection"] = "close";
}
- // Make sure the Content-Length is set correctly.
- headers["Content-Length"] = stringify(request.body.length());
+ if (request.type == Request::PIPE) {
+ CHECK(!headers.contains("Content-Length"));
+
+ // Make sure the Transfer-Encoding header is set correctly
+ // for PIPE requests.
+ headers["Transfer-Encoding"] = "chunked";
+ } else {
+ CHECK_EQ(Request::BODY, request.type);
+
+ // Make sure the Content-Length header is set correctly.
+ headers["Content-Length"] = stringify(request.body.length());
+ }
// TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
// Currently this does not handle 'gzip' content encoding,
@@ -931,9 +948,54 @@ string encode(const Request& request)
}
out << "\r\n";
- out << request.body;
- return out.str();
+ Pipe pipe;
+ Pipe::Reader reader = pipe.reader();
+ Pipe::Writer writer = pipe.writer();
+
+ // Write the head of the request.
+ writer.write(out.str());
+
+ switch (request.type) {
+ case Request::BODY:
+ writer.write(request.body);
+ writer.close();
+ break;
+ case Request::PIPE:
+ CHECK_SOME(request.reader);
+ CHECK(request.body.empty());
+ _encode(request.reader.get(), writer);
+ break;
+ }
+
+ return reader;
+}
+
+
+void _encode(Pipe::Reader reader, Pipe::Writer writer)
+{
+ reader.read()
+ .onAny([reader, writer](const Future<string>& chunk) mutable {
+ if (!chunk.isReady()) {
+ writer.fail(chunk.isFailed() ? chunk.failure() : "discarded");
+ return;
+ }
+
+ if (chunk->empty()) {
+ // EOF case.
+ writer.write("0\r\n\r\n");
+ writer.close();
+ return;
+ }
+
+ std::ostringstream out;
+ out << std::hex << chunk->size() << "\r\n";
+ out << chunk.get();
+ out << "\r\n";
+
+ writer.write(out.str());
+ _encode(reader, writer);
+ });
}
@@ -976,6 +1038,21 @@ public:
return Failure("Cannot pipeline after 'Connection: close'");
}
+ if (request.type == Request::PIPE) {
+ if (request.reader.isNone()) {
+ return Failure("Request reader must be set for PIPE request");
+ }
+
+ if (!request.body.empty()) {
+ return Failure("Request body must be empty for PIPE request");
+ }
+
+ Option<string> contentLength = request.headers.get("Content-Length");
+ if (request.headers.contains("Content-Length")) {
+ return Failure("'Content-Length' cannot be set for PIPE request");
+ }
+ }
+
if (!request.keepAlive) {
close = true;
}
@@ -985,8 +1062,8 @@ public:
Socket socket_ = socket;
sendChain = sendChain
- .then([socket_, request]() mutable {
- return socket_.send(encode(request));
+ .then([socket_, request]() {
+ return _send(socket_, encode(request));
});
// If we can't write to the socket, disconnect.
@@ -1045,6 +1122,19 @@ protected:
}
private:
+ static Future<Nothing> _send(Socket socket, Pipe::Reader reader)
+ {
+ return reader.read()
+ .then([socket, reader](const string& data) mutable -> Future<Nothing> {
+ if (data.empty()) {
+ return Nothing(); // EOF.
+ }
+
+ return socket.send(data)
+ .then(lambda::bind(_send, socket, reader));
+ });
+ }
+
void read()
{
socket.recv()