You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/23 21:09:10 UTC

[11/13] mesos git commit: Added a `call()` method to the v1 scheduler library.

Added a `call()` method to the v1 scheduler library.

This patch adds a `call()` method to the scheduler library that allows
clients to send a `v1::scheduler::Call` to the master and receive a
`v1::scheduler::Response`.

It will be used to test operation state reconciliation.

Review: https://reviews.apache.org/r/66460/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c39ef695
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c39ef695
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c39ef695

Branch: refs/heads/master
Commit: c39ef69514e57ca7c90e764a4a617abf88cd144f
Parents: 949b44e
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:50:00 2018 -0700

----------------------------------------------------------------------
 include/mesos/v1/scheduler.hpp                  |  39 +++++++
 include/mesos/v1/scheduler/scheduler.proto      |  34 ++++++
 .../org_apache_mesos_v1_scheduler_V0Mesos.cpp   |   9 ++
 src/scheduler/scheduler.cpp                     | 111 +++++++++++++++++++
 4 files changed, 193 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler.hpp b/include/mesos/v1/scheduler.hpp
index d56e088..b1dfbb1 100644
--- a/include/mesos/v1/scheduler.hpp
+++ b/include/mesos/v1/scheduler.hpp
@@ -28,6 +28,10 @@
 
 #include <mesos/v1/scheduler/scheduler.hpp>
 
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+
 namespace mesos {
 
 namespace master {
@@ -48,6 +52,7 @@ public:
   // Empty virtual destructor (necessary to instantiate subclasses).
   virtual ~MesosBase() {}
   virtual void send(const Call& call) = 0;
+  virtual process::Future<APIResult> call(const Call& callMessage) = 0;
   virtual void reconnect() = 0;
 };
 
@@ -94,6 +99,40 @@ public:
   // disconnected).
   virtual void send(const Call& call) override;
 
+  // Attempts to send a call to the master, returning the response.
+  //
+  // The scheduler should only invoke this method once it has received the
+  // 'connected' callback. Otherwise, a `Failure` will be returned.
+  //
+  // Some local validation of calls is performed, and the request will not be
+  // sent to the master if the validation fails.
+  //
+  // A `Failure` will be returned on validation failures or if an error happens
+  // when sending the request to the master, e.g., a master disconnection, or a
+  // deserialization error.
+  //
+  // If it was possible to receive a response from the server, the returned
+  // object will contain the HTTP response status code.
+  //
+  // There are three cases to consider depending on the HTTP response status
+  // code:
+  //
+  //  (1) '202 ACCEPTED': Indicates the call was accepted for processing and
+  //      neither `APIResult::response` nor `APIResult::error` will be set.
+  //
+  //  (2) '200 OK': Indicates the call completed successfully.
+  //      `APIResult::response` will be set if the `scheduler::Call::Type`
+  //      has a corresponding `scheduler::Response::Type`, `APIResult::error`
+  //      will not be set.
+  //
+  //  (3) For all other HTTP status codes, the `APIResult::response` field will
+  //      not be set and the `APIResult::error` field may be set to provide more
+  //      information.
+  //
+  // Note: This method cannot be used to send `SUBSCRIBE` calls, use `send()`
+  // instead.
+  virtual process::Future<APIResult> call(const Call& callMessage) override;
+
   // Force a reconnection with the master.
   //
   // In the case of a one-way network partition, the connection between the

http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index b912901..fcfec5e 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -495,3 +495,37 @@ message Call {
   optional Request request = 11;
   optional Suppress suppress = 16;
 }
+
+/**
+ * This message is used by the C++ Scheduler HTTP API library as the return
+ * type of the `call()` method. The message includes the HTTP status code with
+ * which the master responded, and optionally a `scheduler::Response` message.
+ *
+ * There are three cases to consider depending on the HTTP response status code:
+ *
+ *  (1) '202 ACCEPTED': Indicates the call was accepted for processing and
+ *        neither `response` nor `error` will be set.
+ *
+ *  (2) '200 OK': Indicates the call completed successfully, and the `response`
+ *        field will be set if the `scheduler::Call::Type` has a corresponding
+ *        `scheduler::Response::Type`; `error` will not be set.
+ *
+ *  (3) For all other HTTP status codes, the `response` field will not be set
+ *      and the `error` field may be set to provide more information.
+ *
+ * NOTE: This message is used by the C++ Scheduler HTTP API library and is not
+ * part of the API specification.
+ */
+message APIResult {
+  // HTTP status code with which the master responded.
+  required uint32 status_code = 1;
+
+  // This field will only be set if the call completed successfully and the
+  // master responded with `200 OK` and a non-empty body.
+  optional Response response = 2;
+
+  // This field will only be set if the call did not complete successfully and
+  // the master responded with a status other than `202 Accepted` or `200 OK`,
+  // and with a non-empty body.
+  optional string error = 3;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
index 60b17b9..ea8d54f 100644
--- a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
@@ -28,6 +28,7 @@
 #include <process/clock.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
+#include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -134,6 +135,14 @@ public:
     UNREACHABLE();
   }
 
+  virtual process::Future<v1::scheduler::APIResult> call(
+      const v1::scheduler::Call& callMessage) override
+  {
+    // The driver does not support sending a `v1::scheduler::Call` that returns
+    // a `v1::scheduler::Response`.
+    UNREACHABLE();
+  }
+
   process::Owned<V0ToV1AdapterProcess> process;
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index ecef916..c0dff53 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -114,6 +114,7 @@ using mesos::internal::recordio::Reader;
 using mesos::master::detector::MasterDetector;
 
 using process::collect;
+using process::Failure;
 using process::Owned;
 using process::wait; // Necessary on some OS's to disambiguate.
 
@@ -263,6 +264,49 @@ public:
       .onAny(defer(self(), &Self::_send, call, lambda::_1));
   }
 
+  Future<APIResult> call(const Call& callMessage)
+  {
+    Option<Error> error =
+      validation::scheduler::call::validate(devolve(callMessage));
+
+    if (error.isSome()) {
+      return Failure(error->message);
+    }
+
+    if (callMessage.type() == Call::SUBSCRIBE) {
+      return Failure("This method doesn't support SUBSCRIBE calls");
+    }
+
+    if (state != SUBSCRIBED) {
+      return Failure(
+          "Cannot perform calls until subscribed. Current state: " +
+          stringify(state));
+    }
+
+    VLOG(1) << "Sending " << callMessage.type() << " call to " << master.get();
+
+    // TODO(vinod): Add support for sending MESSAGE calls directly
+    // to the slave, instead of relaying it through the master, as
+    // the scheduler driver does.
+
+    process::http::Request request;
+    request.method = "POST";
+    request.url = master.get();
+    request.body = serialize(contentType, callMessage);
+    request.keepAlive = true;
+    request.headers = {{"Accept", stringify(contentType)},
+                       {"Content-Type", stringify(contentType)}};
+
+    // TODO(tillt): Add support for multi-step authentication protocols.
+    return authenticatee->authenticate(request, credential)
+      .recover([](const Future<process::http::Request>& future) {
+        return Failure(
+            stringify("HTTP authenticatee ") +
+            (future.isFailed() ? "failed: " + future.failure() : "discarded"));
+      })
+      .then(defer(self(), &Self::_call, callMessage, lambda::_1));
+  }
+
   void reconnect()
   {
     // Ignore the reconnection request if we are currently disconnected
@@ -675,6 +719,68 @@ protected:
           response->body + ") for " + stringify(call.type()));
   }
 
+  Future<APIResult> _call(
+      const Call& callMessage,
+      process::http::Request request)
+  {
+    if (connections.isNone()) {
+      return Failure("Connection to master interrupted");
+    }
+
+    Future<process::http::Response> response;
+
+    CHECK_SOME(streamId);
+
+    // Set the stream ID associated with this connection.
+    request.headers["Mesos-Stream-Id"] = streamId->toString();
+
+    CHECK_SOME(connectionId);
+
+    return connections->nonSubscribe.send(request)
+      .then(defer(self(),
+                  &Self::__call,
+                  callMessage,
+                  lambda::_1));
+  }
+
+  Future<APIResult> __call(
+      const Call& callMessage,
+      const process::http::Response& response)
+  {
+    APIResult result;
+
+    result.set_status_code(response.code);
+
+    if (response.code == process::http::Status::ACCEPTED) {
+      // "202 Accepted" responses are asynchronously processed, so the body
+      // should be empty.
+      if (!response.body.empty()) {
+        LOG(WARNING) << "Response for " << callMessage.type()
+                     << " unexpectedly included body: '" << response.body
+                     << "'";
+      }
+    } else if (response.code == process::http::Status::OK) {
+      if (!response.body.empty()) {
+        Try<Response> deserializedResponse =
+          deserialize<Response>(contentType, response.body);
+
+        if (deserializedResponse.isError()) {
+          return Failure(
+              "Failed to deserialize the response '" + response.status + "'" +
+              " (" + response.body + "): " + deserializedResponse.error());
+        }
+
+        *result.mutable_response() = deserializedResponse.get();
+      }
+    } else {
+      result.set_error(
+          "Received unexpected '" + response.status + "'" + " (" +
+          response.body + ")");
+    }
+
+    return result;
+  }
+
   void read()
   {
     subscribed->decoder->read()
@@ -917,6 +1023,11 @@ void Mesos::send(const Call& call)
   dispatch(process, &MesosProcess::send, call);
 }
 
+Future<APIResult> Mesos::call(const Call& callMessage)
+{
+  return dispatch(process, &MesosProcess::call, callMessage);
+}
+
 
 void Mesos::reconnect()
 {