You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/12/02 18:58:38 UTC

[1/2] mesos git commit: Made `http::serv()` use the streaming request decoder.

Repository: mesos
Updated Branches:
  refs/heads/master 4cec92ff8 -> 2a73d956a


Made `http::serv()` use the streaming request decoder.

This also means that any call sites using this would need to
read the request body via the `reader` and `request.body` would
be empty.

Review: https://reviews.apache.org/r/54274/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a528186
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a528186
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a528186

Branch: refs/heads/master
Commit: 2a5281867b802dcdd3558cacef8ee8018aba4eee
Parents: 4cec92f
Author: Anand Mazumdar <an...@apache.org>
Authored: Thu Dec 1 16:02:29 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Dec 2 10:55:30 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 3 +++
 3rdparty/libprocess/src/http.cpp             | 2 +-
 3rdparty/libprocess/src/tests/http_tests.cpp | 8 +++++++-
 3 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2a528186/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index f0f849c..7f44292 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -895,6 +895,9 @@ Future<Nothing> serve(
 // NOTE: HTTP pipelining is automatically performed. If you don't want
 // pipelining you must explicitly sequence/serialize the requests to
 // wait for previous responses yourself.
+//
+// NOTE: The `Request` passed to the handler is of type `PIPE` and should
+// always be read using `Request.reader`.
 template <typename F>
 Future<Nothing> serve(const network::Socket& s, F&& f)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2a528186/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index dd55034..fc55bda 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1673,7 +1673,7 @@ Future<Nothing> receive(
   const size_t size = io::BUFFERED_READ_SIZE;
   char* data = new char[size];
 
-  DataDecoder* decoder = new DataDecoder();
+  StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
 
   return loop(
       [=]() {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2a528186/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 4a0d852..7f54257 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -1957,7 +1957,13 @@ TEST_F(HttpServeTest, Unix)
   Future<Nothing> serve = http::serve(
     socket,
     [](const http::Request& request) {
-      return http::OK(request.body);
+      CHECK_SOME(request.reader);
+      http::Pipe::Reader reader = request.reader.get(); // Remove const.
+
+      return reader.readAll()
+        .then([](const string& body) -> Future<http::Response> {
+          return http::OK(body);
+        });
     });
 
   http::Request request;


[2/2] mesos git commit: Updated the IOSwitchboard http handler to work with streaming requests.

Posted by an...@apache.org.
Updated the IOSwitchboard http handler to work with streaming requests.

Review: https://reviews.apache.org/r/54296/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a73d956
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a73d956
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a73d956

Branch: refs/heads/master
Commit: 2a73d956af1cb0615d4e66de126ab554fdabb0b5
Parents: 2a52818
Author: Kevin Klues <kl...@gmail.com>
Authored: Fri Dec 2 10:14:45 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Dec 2 10:55:49 2016 -0800

----------------------------------------------------------------------
 .../containerizer/mesos/io/switchboard.cpp      | 125 +++++++++++--------
 1 file changed, 71 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2a73d956/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 594c37e..778367a 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -47,6 +47,7 @@
 #include <mesos/slave/container_logger.hpp>
 
 #include "common/http.hpp"
+#include "common/recordio.hpp"
 
 #include "slave/flags.hpp"
 #include "slave/state.hpp"
@@ -488,13 +489,10 @@ private:
 
   // Handle `ATTACH_CONTAINER_INPUT` calls.
   Future<http::Response> attachContainerInput(
-    const agent::Call& call,
-    const ContentType& contentType);
+      const Owned<recordio::Reader<agent::Call>>& reader);
 
   // Handle `ATTACH_CONTAINER_OUTPUT` calls.
-  Future<http::Response> attachContainerOutput(
-    const agent::Call& call,
-    const ContentType& contentType);
+  Future<http::Response> attachContainerOutput(ContentType acceptType);
 
   // Asynchronously receive data as we read it from our
   // `stdoutFromFd` and `stdoutFromFd` file descriptors.
@@ -746,85 +744,104 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
     return http::MethodNotAllowed({"POST"}, request.method);
   }
 
-  agent::Call call;
-
-  Option<string> contentType =
-    request.headers.get(strings::lower("Content-Type"));
-
-  if (contentType.isNone()) {
+  Option<string> contentType_ = request.headers.get("Content-Type");
+  if (contentType_.isNone()) {
     return http::BadRequest("Expecting 'Content-Type' to be present");
   }
 
-  if (contentType.get() == APPLICATION_PROTOBUF) {
-    if (!call.ParseFromString(request.body)) {
-      return http::BadRequest("Failed to parse body into Call protobuf");
-    }
-  } else if (contentType.get() == APPLICATION_JSON) {
-    Try<JSON::Value> value = JSON::parse(request.body);
-
-    if (value.isError()) {
-      return http::BadRequest("Failed to parse body into JSON:"
-                              " " + value.error());
-    }
-
-    Try<agent::Call> parse = ::protobuf::parse<agent::Call>(value.get());
-    if (parse.isError()) {
-      return http::BadRequest("Failed to convert JSON into Call protobuf:"
-                              " " + parse.error());
-    }
-
-    call = parse.get();
+  ContentType contentType;
+  if (contentType_.get() == APPLICATION_JSON) {
+    contentType = ContentType::JSON;
+  } else if (contentType_.get() == APPLICATION_PROTOBUF) {
+    contentType = ContentType::PROTOBUF;
+  } else if (contentType_.get() == APPLICATION_STREAMING_JSON) {
+    contentType = ContentType::STREAMING_JSON;
+  } else if (contentType_.get() == APPLICATION_STREAMING_PROTOBUF) {
+    contentType = ContentType::STREAMING_PROTOBUF;
   } else {
     return http::UnsupportedMediaType(
-        "Expecting 'Content-Type' of '" + stringify(APPLICATION_JSON) +
-        "' or '" + stringify(APPLICATION_PROTOBUF) + "'");
+        string("Expecting 'Content-Type' of ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF + " or " +
+        APPLICATION_STREAMING_JSON + " or " + APPLICATION_STREAMING_PROTOBUF);
   }
 
   ContentType acceptType;
-  if (request.acceptsMediaType(APPLICATION_JSON)) {
+  if (request.acceptsMediaType(APPLICATION_STREAMING_PROTOBUF)) {
+    acceptType = ContentType::STREAMING_PROTOBUF;
+  } else if (request.acceptsMediaType(APPLICATION_STREAMING_JSON)) {
+    acceptType = ContentType::STREAMING_JSON;
+  } else if (request.acceptsMediaType(APPLICATION_JSON)) {
     acceptType = ContentType::JSON;
   } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
     acceptType = ContentType::PROTOBUF;
   } else {
     return http::NotAcceptable(
-        "Expecting 'Accept' to allow '" + stringify(APPLICATION_JSON) + "'"
-        " or '" + stringify(APPLICATION_PROTOBUF) + "'");
-  }
+        string("Expecting 'Accept' to allow ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF + " or " +
+        APPLICATION_STREAMING_JSON + " or "  + APPLICATION_STREAMING_PROTOBUF);
+  }
+
+  CHECK_EQ(http::Request::PIPE, request.type);
+  CHECK_SOME(request.reader);
+
+  if (requestStreaming(contentType)) {
+    Owned<recordio::Reader<agent::Call>> reader(
+        new recordio::Reader<agent::Call>(
+            ::recordio::Decoder<agent::Call>(lambda::bind(
+                deserialize<agent::Call>, contentType, lambda::_1)),
+            request.reader.get()));
+
+    return reader->read()
+      .then(defer(
+          self(),
+          [=](const Result<agent::Call>& call) -> Future<http::Response> {
+            if (call.isNone()) {
+              return http::BadRequest(
+                  "Received EOF while reading request body");
+            }
+
+            if (call.isError()) {
+              return Failure(call.error());
+            }
+
+            CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call->type());
+
+            return attachContainerInput(reader);
+          }));
+  } else {
+    http::Pipe::Reader reader = request.reader.get();  // Remove const.
 
-  switch (call.type()) {
-    case agent::Call::ATTACH_CONTAINER_INPUT:
-      return attachContainerInput(call, acceptType);
+    return reader.readAll()
+      .then(defer(
+          self(),
+          [=](const string& body) -> Future<http::Response> {
+            Try<agent::Call> call = deserialize<agent::Call>(contentType, body);
+            if (call.isError()) {
+              return http::BadRequest(call.error());
+            }
 
-    case agent::Call::ATTACH_CONTAINER_OUTPUT:
-      return attachContainerOutput(call, acceptType);
+            CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call->type());
 
-    default:
-      return http::NotImplemented();
+            return attachContainerOutput(acceptType);
+          }));
   }
-
-  UNREACHABLE();
 }
 
 
 Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
-    const agent::Call& call,
-    const ContentType& contentType)
+    const Owned<recordio::Reader<agent::Call>>& reader)
 {
-  CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call.type());
   return http::NotImplemented("ATTACH_CONTAINER_INPUT");
 }
 
 
 Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
-    const agent::Call& call,
-    const ContentType& contentType)
+    ContentType acceptType)
 {
-  CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call.type());
-
   http::Pipe pipe;
   http::OK ok;
 
-  ok.headers["Content-Type"] = stringify(contentType);
+  ok.headers["Content-Type"] = stringify(acceptType);
   ok.type = http::Response::PIPE;
   ok.reader = pipe.reader();
 
@@ -832,7 +849,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
   // calls to `receiveOutput()` to actually push data out over the
   // connection. If we ever detect a connection has been closed,
   // we remove it from this list.
-  HttpConnection connection(pipe.writer(), contentType);
+  HttpConnection connection(pipe.writer(), acceptType);
   auto iterator = connections.insert(connections.end(), connection);
 
   // We use the `startRedirect` promise to indicate when we should