You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/11/29 06:15:32 UTC
[3/6] mesos git commit: Added streaming support to `/api/v1` handler
on the agent.
Added streaming support to `/api/v1` handler on the agent.
Note that this change only updates the handler to correctly
handle non-streaming calls given it receives `PIPE` requests.
Handling streaming calls will come later.
Review: https://reviews.apache.org/r/53994
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad786c58
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad786c58
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad786c58
Branch: refs/heads/master
Commit: ad786c5831bd0adb199c701f6aeef482623c5ffb
Parents: 95030c5
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Nov 15 20:09:17 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Nov 28 22:14:01 2016 -0800
----------------------------------------------------------------------
src/slave/http.cpp | 85 +++++++++++++++++++++++++++++++-----------------
src/slave/slave.cpp | 7 ++--
src/slave/slave.hpp | 7 ++++
3 files changed, 68 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad786c58/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index aa9f492..cca88b8 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -310,47 +310,42 @@ Future<Response> Slave::Http::api(
return MethodNotAllowed({"POST"}, request.method);
}
- v1::agent::Call v1Call;
-
- Option<string> contentType = request.headers.get("Content-Type");
- if (contentType.isNone()) {
+ Option<string> contentType_ = request.headers.get("Content-Type");
+ if (contentType_.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
- if (contentType.get() == APPLICATION_PROTOBUF) {
- if (!v1Call.ParseFromString(request.body)) {
- return BadRequest("Failed to parse body into Call protobuf");
- }
- } else if (contentType.get() == APPLICATION_JSON) {
- Try<JSON::Value> value = JSON::parse(request.body);
- if (value.isError()) {
- return BadRequest("Failed to parse body into JSON: " + value.error());
- }
-
- Try<v1::agent::Call> parse =
- ::protobuf::parse<v1::agent::Call>(value.get());
-
- if (parse.isError()) {
- return BadRequest("Failed to convert JSON into Call protobuf: " +
- parse.error());
- }
-
- v1Call = parse.get();
+ ContentType contentType;
+ if (contentType_.get() == APPLICATION_JSON) {
+ contentType = ContentType::JSON;
+ } else if (contentType_.get() == APPLICATION_PROTOBUF) {
+ contentType = ContentType::PROTOBUF;
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
- agent::Call call = devolve(v1Call);
+ // This lambda deserializes a string into a valid `Call`
+ // based on the content type.
+ auto deserializer = [](const string& body, ContentType contentType)
+ -> Try<agent::Call> {
+ Try<v1::agent::Call> v1Call =
+ deserialize<v1::agent::Call>(contentType, body);
+
+ if (v1Call.isError()) {
+ return Error(v1Call.error());
+ }
- Option<Error> error = validation::agent::call::validate(call);
+ agent::Call call = devolve(v1Call.get());
- if (error.isSome()) {
- return BadRequest("Failed to validate agent::Call: " + error.get().message);
- }
+ Option<Error> error = validation::agent::call::validate(call);
+ if (error.isSome()) {
+ return Error("Failed to validate agent::Call: " + error.get().message);
+ }
- LOG(INFO) << "Processing call " << call.type();
+ return call;
+ };
ContentType acceptType;
if (request.acceptsMediaType(APPLICATION_JSON)) {
@@ -363,6 +358,38 @@ Future<Response> Slave::Http::api(
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
+ if (contentType == ContentType::JSON ||
+ contentType == ContentType::PROTOBUF) {
+ CHECK_EQ(http::Request::PIPE, request.type);
+ CHECK_SOME(request.reader);
+
+ Pipe::Reader reader = request.reader.get(); // Remove const.
+
+ return reader.readAll()
+ .then(defer(
+ slave->self(),
+ [=](const string& body) -> Future<Response> {
+ Try<agent::Call> call = deserializer(body, contentType);
+ if (call.isError()) {
+ return BadRequest(call.error());
+ }
+ return _api(call.get(), contentType, acceptType, principal);
+ }));
+ } else {
+ // TODO(vinod): Add support for 'streaming' content type.
+ UNREACHABLE();
+ }
+}
+
+
+Future<Response> Slave::Http::_api(
+ const agent::Call& call,
+ ContentType contentType,
+ ContentType acceptType,
+ const Option<string>& principal) const
+{
+ LOG(INFO) << "Processing call " << call.type();
+
switch (call.type()) {
case agent::Call::UNKNOWN:
return NotImplemented();
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad786c58/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 521f08d..1273b0f 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -648,7 +648,9 @@ void Slave::initialize()
&Slave::ping,
&PingSlaveMessage::connected);
-
+ // Setup the '/api/v1' handler for streaming requests.
+ RouteOptions options;
+ options.requestStreaming = true;
route("/api/v1",
// TODO(benh): Is this authentication realm sufficient or do
// we need some kind of hybrid if we expect both executors
@@ -659,7 +661,8 @@ void Slave::initialize()
const Option<string>& principal) {
Http::log(request);
return http.api(request, principal);
- });
+ },
+ options);
route("/api/v1/executor",
Http::EXECUTOR_HELP(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad786c58/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0cc1054..0ede185 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -523,6 +523,13 @@ private:
private:
JSON::Object _flags() const;
+ // Continuation for `/api` endpoint that handles non-streaming requests.
+ process::Future<process::http::Response> _api(
+ const agent::Call& call,
+ ContentType contentTye,
+ ContentType acceptType,
+ const Option<std::string>& principal) const;
+
// Make continuation for `statistics` `static` as it might
// execute when the invoking `Http` is already destructed.
process::http::Response _statistics(