You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/12/02 18:58:38 UTC
[1/2] mesos git commit: Made `http::serv()` use the streaming request
decoder.
Repository: mesos
Updated Branches:
refs/heads/master 4cec92ff8 -> 2a73d956a
Made `http::serv()` use the streaming request decoder.
This also means that any call sites using this would need to
read the request body via the `reader` and `request.body` would
be empty.
Review: https://reviews.apache.org/r/54274/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a528186
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a528186
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a528186
Branch: refs/heads/master
Commit: 2a5281867b802dcdd3558cacef8ee8018aba4eee
Parents: 4cec92f
Author: Anand Mazumdar <an...@apache.org>
Authored: Thu Dec 1 16:02:29 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Dec 2 10:55:30 2016 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 3 +++
3rdparty/libprocess/src/http.cpp | 2 +-
3rdparty/libprocess/src/tests/http_tests.cpp | 8 +++++++-
3 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2a528186/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index f0f849c..7f44292 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -895,6 +895,9 @@ Future<Nothing> serve(
// NOTE: HTTP pipelining is automatically performed. If you don't want
// pipelining you must explicitly sequence/serialize the requests to
// wait for previous responses yourself.
+//
+// NOTE: The `Request` passed to the handler is of type `PIPE` and should
+// always be read using `Request.reader`.
template <typename F>
Future<Nothing> serve(const network::Socket& s, F&& f)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/2a528186/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index dd55034..fc55bda 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1673,7 +1673,7 @@ Future<Nothing> receive(
const size_t size = io::BUFFERED_READ_SIZE;
char* data = new char[size];
- DataDecoder* decoder = new DataDecoder();
+ StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
return loop(
[=]() {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2a528186/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 4a0d852..7f54257 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -1957,7 +1957,13 @@ TEST_F(HttpServeTest, Unix)
Future<Nothing> serve = http::serve(
socket,
[](const http::Request& request) {
- return http::OK(request.body);
+ CHECK_SOME(request.reader);
+ http::Pipe::Reader reader = request.reader.get(); // Remove const.
+
+ return reader.readAll()
+ .then([](const string& body) -> Future<http::Response> {
+ return http::OK(body);
+ });
});
http::Request request;
[2/2] mesos git commit: Updated the IOSwitchboard http handler to
work with streaming requests.
Posted by an...@apache.org.
Updated the IOSwitchboard http handler to work with streaming requests.
Review: https://reviews.apache.org/r/54296/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a73d956
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a73d956
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a73d956
Branch: refs/heads/master
Commit: 2a73d956af1cb0615d4e66de126ab554fdabb0b5
Parents: 2a52818
Author: Kevin Klues <kl...@gmail.com>
Authored: Fri Dec 2 10:14:45 2016 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Dec 2 10:55:49 2016 -0800
----------------------------------------------------------------------
.../containerizer/mesos/io/switchboard.cpp | 125 +++++++++++--------
1 file changed, 71 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2a73d956/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 594c37e..778367a 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -47,6 +47,7 @@
#include <mesos/slave/container_logger.hpp>
#include "common/http.hpp"
+#include "common/recordio.hpp"
#include "slave/flags.hpp"
#include "slave/state.hpp"
@@ -488,13 +489,10 @@ private:
// Handle `ATTACH_CONTAINER_INPUT` calls.
Future<http::Response> attachContainerInput(
- const agent::Call& call,
- const ContentType& contentType);
+ const Owned<recordio::Reader<agent::Call>>& reader);
// Handle `ATTACH_CONTAINER_OUTPUT` calls.
- Future<http::Response> attachContainerOutput(
- const agent::Call& call,
- const ContentType& contentType);
+ Future<http::Response> attachContainerOutput(ContentType acceptType);
// Asynchronously receive data as we read it from our
// `stdoutFromFd` and `stdoutFromFd` file descriptors.
@@ -746,85 +744,104 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
return http::MethodNotAllowed({"POST"}, request.method);
}
- agent::Call call;
-
- Option<string> contentType =
- request.headers.get(strings::lower("Content-Type"));
-
- if (contentType.isNone()) {
+ Option<string> contentType_ = request.headers.get("Content-Type");
+ if (contentType_.isNone()) {
return http::BadRequest("Expecting 'Content-Type' to be present");
}
- if (contentType.get() == APPLICATION_PROTOBUF) {
- if (!call.ParseFromString(request.body)) {
- return http::BadRequest("Failed to parse body into Call protobuf");
- }
- } else if (contentType.get() == APPLICATION_JSON) {
- Try<JSON::Value> value = JSON::parse(request.body);
-
- if (value.isError()) {
- return http::BadRequest("Failed to parse body into JSON:"
- " " + value.error());
- }
-
- Try<agent::Call> parse = ::protobuf::parse<agent::Call>(value.get());
- if (parse.isError()) {
- return http::BadRequest("Failed to convert JSON into Call protobuf:"
- " " + parse.error());
- }
-
- call = parse.get();
+ ContentType contentType;
+ if (contentType_.get() == APPLICATION_JSON) {
+ contentType = ContentType::JSON;
+ } else if (contentType_.get() == APPLICATION_PROTOBUF) {
+ contentType = ContentType::PROTOBUF;
+ } else if (contentType_.get() == APPLICATION_STREAMING_JSON) {
+ contentType = ContentType::STREAMING_JSON;
+ } else if (contentType_.get() == APPLICATION_STREAMING_PROTOBUF) {
+ contentType = ContentType::STREAMING_PROTOBUF;
} else {
return http::UnsupportedMediaType(
- "Expecting 'Content-Type' of '" + stringify(APPLICATION_JSON) +
- "' or '" + stringify(APPLICATION_PROTOBUF) + "'");
+ string("Expecting 'Content-Type' of ") +
+ APPLICATION_JSON + " or " + APPLICATION_PROTOBUF + " or " +
+ APPLICATION_STREAMING_JSON + " or " + APPLICATION_STREAMING_PROTOBUF);
}
ContentType acceptType;
- if (request.acceptsMediaType(APPLICATION_JSON)) {
+ if (request.acceptsMediaType(APPLICATION_STREAMING_PROTOBUF)) {
+ acceptType = ContentType::STREAMING_PROTOBUF;
+ } else if (request.acceptsMediaType(APPLICATION_STREAMING_JSON)) {
+ acceptType = ContentType::STREAMING_JSON;
+ } else if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
acceptType = ContentType::PROTOBUF;
} else {
return http::NotAcceptable(
- "Expecting 'Accept' to allow '" + stringify(APPLICATION_JSON) + "'"
- " or '" + stringify(APPLICATION_PROTOBUF) + "'");
- }
+ string("Expecting 'Accept' to allow ") +
+ APPLICATION_JSON + " or " + APPLICATION_PROTOBUF + " or " +
+ APPLICATION_STREAMING_JSON + " or " + APPLICATION_STREAMING_PROTOBUF);
+ }
+
+ CHECK_EQ(http::Request::PIPE, request.type);
+ CHECK_SOME(request.reader);
+
+ if (requestStreaming(contentType)) {
+ Owned<recordio::Reader<agent::Call>> reader(
+ new recordio::Reader<agent::Call>(
+ ::recordio::Decoder<agent::Call>(lambda::bind(
+ deserialize<agent::Call>, contentType, lambda::_1)),
+ request.reader.get()));
+
+ return reader->read()
+ .then(defer(
+ self(),
+ [=](const Result<agent::Call>& call) -> Future<http::Response> {
+ if (call.isNone()) {
+ return http::BadRequest(
+ "Received EOF while reading request body");
+ }
+
+ if (call.isError()) {
+ return Failure(call.error());
+ }
+
+ CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call->type());
+
+ return attachContainerInput(reader);
+ }));
+ } else {
+ http::Pipe::Reader reader = request.reader.get(); // Remove const.
- switch (call.type()) {
- case agent::Call::ATTACH_CONTAINER_INPUT:
- return attachContainerInput(call, acceptType);
+ return reader.readAll()
+ .then(defer(
+ self(),
+ [=](const string& body) -> Future<http::Response> {
+ Try<agent::Call> call = deserialize<agent::Call>(contentType, body);
+ if (call.isError()) {
+ return http::BadRequest(call.error());
+ }
- case agent::Call::ATTACH_CONTAINER_OUTPUT:
- return attachContainerOutput(call, acceptType);
+ CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call->type());
- default:
- return http::NotImplemented();
+ return attachContainerOutput(acceptType);
+ }));
}
-
- UNREACHABLE();
}
Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
- const agent::Call& call,
- const ContentType& contentType)
+ const Owned<recordio::Reader<agent::Call>>& reader)
{
- CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call.type());
return http::NotImplemented("ATTACH_CONTAINER_INPUT");
}
Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
- const agent::Call& call,
- const ContentType& contentType)
+ ContentType acceptType)
{
- CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call.type());
-
http::Pipe pipe;
http::OK ok;
- ok.headers["Content-Type"] = stringify(contentType);
+ ok.headers["Content-Type"] = stringify(acceptType);
ok.type = http::Response::PIPE;
ok.reader = pipe.reader();
@@ -832,7 +849,7 @@ Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
// calls to `receiveOutput()` to actually push data out over the
// connection. If we ever detect a connection has been closed,
// we remove it from this list.
- HttpConnection connection(pipe.writer(), contentType);
+ HttpConnection connection(pipe.writer(), acceptType);
auto iterator = connections.insert(connections.end(), connection);
// We use the `startRedirect` promise to indicate when we should