You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/11/29 06:15:32 UTC

[3/6] mesos git commit: Added streaming support to `/api/v1` handler on the agent.

Added streaming support to `/api/v1` handler on the agent.

Note that this change only updates the handler to correctly
handle non-streaming calls given it receives `PIPE` requests.
Handling streaming calls will come later.

Review: https://reviews.apache.org/r/53994


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad786c58
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad786c58
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad786c58

Branch: refs/heads/master
Commit: ad786c5831bd0adb199c701f6aeef482623c5ffb
Parents: 95030c5
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Nov 15 20:09:17 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Nov 28 22:14:01 2016 -0800

----------------------------------------------------------------------
 src/slave/http.cpp  | 85 +++++++++++++++++++++++++++++++-----------------
 src/slave/slave.cpp |  7 ++--
 src/slave/slave.hpp |  7 ++++
 3 files changed, 68 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ad786c58/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index aa9f492..cca88b8 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -310,47 +310,42 @@ Future<Response> Slave::Http::api(
     return MethodNotAllowed({"POST"}, request.method);
   }
 
-  v1::agent::Call v1Call;
-
-  Option<string> contentType = request.headers.get("Content-Type");
-  if (contentType.isNone()) {
+  Option<string> contentType_ = request.headers.get("Content-Type");
+  if (contentType_.isNone()) {
     return BadRequest("Expecting 'Content-Type' to be present");
   }
 
-  if (contentType.get() == APPLICATION_PROTOBUF) {
-    if (!v1Call.ParseFromString(request.body)) {
-      return BadRequest("Failed to parse body into Call protobuf");
-    }
-  } else if (contentType.get() == APPLICATION_JSON) {
-    Try<JSON::Value> value = JSON::parse(request.body);
-    if (value.isError()) {
-      return BadRequest("Failed to parse body into JSON: " + value.error());
-    }
-
-    Try<v1::agent::Call> parse =
-      ::protobuf::parse<v1::agent::Call>(value.get());
-
-    if (parse.isError()) {
-      return BadRequest("Failed to convert JSON into Call protobuf: " +
-                        parse.error());
-    }
-
-    v1Call = parse.get();
+  ContentType contentType;
+  if (contentType_.get() == APPLICATION_JSON) {
+    contentType = ContentType::JSON;
+  } else if (contentType_.get() == APPLICATION_PROTOBUF) {
+    contentType = ContentType::PROTOBUF;
   } else {
     return UnsupportedMediaType(
         string("Expecting 'Content-Type' of ") +
         APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
   }
 
-  agent::Call call = devolve(v1Call);
+  // This lambda deserializes a string into a valid `Call`
+  // based on the content type.
+  auto deserializer = [](const string& body, ContentType contentType)
+      -> Try<agent::Call> {
+    Try<v1::agent::Call> v1Call =
+      deserialize<v1::agent::Call>(contentType, body);
+
+    if (v1Call.isError()) {
+      return Error(v1Call.error());
+    }
 
-  Option<Error> error = validation::agent::call::validate(call);
+    agent::Call call = devolve(v1Call.get());
 
-  if (error.isSome()) {
-    return BadRequest("Failed to validate agent::Call: " + error.get().message);
-  }
+    Option<Error> error = validation::agent::call::validate(call);
+    if (error.isSome()) {
+      return Error("Failed to validate agent::Call: " + error.get().message);
+    }
 
-  LOG(INFO) << "Processing call " << call.type();
+    return call;
+  };
 
   ContentType acceptType;
   if (request.acceptsMediaType(APPLICATION_JSON)) {
@@ -363,6 +358,38 @@ Future<Response> Slave::Http::api(
         "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
   }
 
+  if (contentType == ContentType::JSON ||
+      contentType == ContentType::PROTOBUF) {
+    CHECK_EQ(http::Request::PIPE, request.type);
+    CHECK_SOME(request.reader);
+
+    Pipe::Reader reader = request.reader.get();  // Remove const.
+
+    return reader.readAll()
+      .then(defer(
+          slave->self(),
+          [=](const string& body) -> Future<Response> {
+            Try<agent::Call> call = deserializer(body, contentType);
+            if (call.isError()) {
+              return BadRequest(call.error());
+            }
+            return _api(call.get(), contentType, acceptType, principal);
+          }));
+  } else {
+    // TODO(vinod): Add support for 'streaming' content type.
+    UNREACHABLE();
+  }
+}
+
+
+Future<Response> Slave::Http::_api(
+    const agent::Call& call,
+    ContentType contentType,
+    ContentType acceptType,
+    const Option<string>& principal) const
+{
+  LOG(INFO) << "Processing call " << call.type();
+
   switch (call.type()) {
     case agent::Call::UNKNOWN:
       return NotImplemented();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad786c58/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 521f08d..1273b0f 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -648,7 +648,9 @@ void Slave::initialize()
       &Slave::ping,
       &PingSlaveMessage::connected);
 
-
+  // Setup the '/api/v1' handler for streaming requests.
+  RouteOptions options;
+  options.requestStreaming = true;
   route("/api/v1",
         // TODO(benh): Is this authentication realm sufficient or do
         // we need some kind of hybrid if we expect both executors
@@ -659,7 +661,8 @@ void Slave::initialize()
                const Option<string>& principal) {
           Http::log(request);
           return http.api(request, principal);
-        });
+        },
+        options);
 
   route("/api/v1/executor",
         Http::EXECUTOR_HELP(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad786c58/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0cc1054..0ede185 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -523,6 +523,13 @@ private:
   private:
     JSON::Object _flags() const;
 
+    // Continuation for `/api` endpoint that handles non-streaming requests.
+    process::Future<process::http::Response> _api(
+        const agent::Call& call,
+        ContentType contentTye,
+        ContentType acceptType,
+        const Option<std::string>& principal) const;
+
     // Make continuation for `statistics` `static` as it might
     // execute when the invoking `Http` is already destructed.
     process::http::Response _statistics(