You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/23 21:09:10 UTC
[11/13] mesos git commit: Added a `call()` method to the v1 scheduler
library.
Added a `call()` method to the v1 scheduler library.
This patch adds a `call()` method to the scheduler library that allows
clients to send a `v1::scheduler::Call` to the master and receive a
`v1::scheduler::Response`.
It will be used to test operation state reconciliation.
Review: https://reviews.apache.org/r/66460/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c39ef695
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c39ef695
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c39ef695
Branch: refs/heads/master
Commit: c39ef69514e57ca7c90e764a4a617abf88cd144f
Parents: 949b44e
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:50:00 2018 -0700
----------------------------------------------------------------------
include/mesos/v1/scheduler.hpp | 39 +++++++
include/mesos/v1/scheduler/scheduler.proto | 34 ++++++
.../org_apache_mesos_v1_scheduler_V0Mesos.cpp | 9 ++
src/scheduler/scheduler.cpp | 111 +++++++++++++++++++
4 files changed, 193 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler.hpp b/include/mesos/v1/scheduler.hpp
index d56e088..b1dfbb1 100644
--- a/include/mesos/v1/scheduler.hpp
+++ b/include/mesos/v1/scheduler.hpp
@@ -28,6 +28,10 @@
#include <mesos/v1/scheduler/scheduler.hpp>
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+
namespace mesos {
namespace master {
@@ -48,6 +52,7 @@ public:
// Empty virtual destructor (necessary to instantiate subclasses).
virtual ~MesosBase() {}
virtual void send(const Call& call) = 0;
+ virtual process::Future<APIResult> call(const Call& callMessage) = 0;
virtual void reconnect() = 0;
};
@@ -94,6 +99,40 @@ public:
// disconnected).
virtual void send(const Call& call) override;
+ // Attempts to send a call to the master, returning the response.
+ //
+ // The scheduler should only invoke this method once it has received the
+ // 'connected' callback. Otherwise, a `Failure` will be returned.
+ //
+ // Some local validation of calls is performed, and the request will not be
+ // sent to the master if the validation fails.
+ //
+ // A `Failure` will be returned on validation failures or if an error happens
+ // when sending the request to the master, e.g., a master disconnection, or a
+ // deserialization error.
+ //
+ // If it was possible to receive a response from the server, the returned
+ // object will contain the HTTP response status code.
+ //
+ // There are three cases to consider depending on the HTTP response status
+ // code:
+ //
+ // (1) '202 ACCEPTED': Indicates the call was accepted for processing and
+ // neither `APIResult::response` nor `APIResult::error` will be set.
+ //
+ // (2) '200 OK': Indicates the call completed successfully.
+ // `APIResult::response` will be set if the `scheduler::Call::Type`
+ // has a corresponding `scheduler::Response::Type`, `APIResult::error`
+ // will not be set.
+ //
+ // (3) For all other HTTP status codes, the `APIResult::response` field will
+ // not be set and the `APIResult::error` field may be set to provide more
+ // information.
+ //
+ // Note: This method cannot be used to send `SUBSCRIBE` calls, use `send()`
+ // instead.
+ virtual process::Future<APIResult> call(const Call& callMessage) override;
+
// Force a reconnection with the master.
//
// In the case of a one-way network partition, the connection between the
http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index b912901..fcfec5e 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -495,3 +495,37 @@ message Call {
optional Request request = 11;
optional Suppress suppress = 16;
}
+
+/**
+ * This message is used by the C++ Scheduler HTTP API library as the return
+ * type of the `call()` method. The message includes the HTTP status code with
+ * which the master responded, and optionally a `scheduler::Response` message.
+ *
+ * There are three cases to consider depending on the HTTP response status code:
+ *
+ * (1) '202 ACCEPTED': Indicates the call was accepted for processing and
+ * neither `response` nor `error` will be set.
+ *
+ * (2) '200 OK': Indicates the call completed successfully, and the `response`
+ * field will be set if the `scheduler::Call::Type` has a corresponding
+ * `scheduler::Response::Type`; `error` will not be set.
+ *
+ * (3) For all other HTTP status codes, the `response` field will not be set
+ * and the `error` field may be set to provide more information.
+ *
+ * NOTE: This message is used by the C++ Scheduler HTTP API library and is not
+ * part of the API specification.
+ */
+message APIResult {
+ // HTTP status code with which the master responded.
+ required uint32 status_code = 1;
+
+ // This field will only be set if the call completed successfully and the
+ // master responded with `200 OK` and a non-empty body.
+ optional Response response = 2;
+
+ // This field will only be set if the call did not complete successfully and
+ // the master responded with a status other than `202 Accepted` or `200 OK`,
+ // and with a non-empty body.
+ optional string error = 3;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
index 60b17b9..ea8d54f 100644
--- a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
@@ -28,6 +28,7 @@
#include <process/clock.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
+#include <process/future.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
@@ -134,6 +135,14 @@ public:
UNREACHABLE();
}
+ virtual process::Future<v1::scheduler::APIResult> call(
+ const v1::scheduler::Call& callMessage) override
+ {
+ // The driver does not support sending a `v1::scheduler::Call` that returns
+ // a `v1::scheduler::Response`.
+ UNREACHABLE();
+ }
+
process::Owned<V0ToV1AdapterProcess> process;
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index ecef916..c0dff53 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -114,6 +114,7 @@ using mesos::internal::recordio::Reader;
using mesos::master::detector::MasterDetector;
using process::collect;
+using process::Failure;
using process::Owned;
using process::wait; // Necessary on some OS's to disambiguate.
@@ -263,6 +264,49 @@ public:
.onAny(defer(self(), &Self::_send, call, lambda::_1));
}
+ Future<APIResult> call(const Call& callMessage)
+ {
+ Option<Error> error =
+ validation::scheduler::call::validate(devolve(callMessage));
+
+ if (error.isSome()) {
+ return Failure(error->message);
+ }
+
+ if (callMessage.type() == Call::SUBSCRIBE) {
+ return Failure("This method doesn't support SUBSCRIBE calls");
+ }
+
+ if (state != SUBSCRIBED) {
+ return Failure(
+ "Cannot perform calls until subscribed. Current state: " +
+ stringify(state));
+ }
+
+ VLOG(1) << "Sending " << callMessage.type() << " call to " << master.get();
+
+ // TODO(vinod): Add support for sending MESSAGE calls directly
+ // to the slave, instead of relaying it through the master, as
+ // the scheduler driver does.
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = master.get();
+ request.body = serialize(contentType, callMessage);
+ request.keepAlive = true;
+ request.headers = {{"Accept", stringify(contentType)},
+ {"Content-Type", stringify(contentType)}};
+
+ // TODO(tillt): Add support for multi-step authentication protocols.
+ return authenticatee->authenticate(request, credential)
+ .recover([](const Future<process::http::Request>& future) {
+ return Failure(
+ stringify("HTTP authenticatee ") +
+ (future.isFailed() ? "failed: " + future.failure() : "discarded"));
+ })
+ .then(defer(self(), &Self::_call, callMessage, lambda::_1));
+ }
+
void reconnect()
{
// Ignore the reconnection request if we are currently disconnected
@@ -675,6 +719,68 @@ protected:
response->body + ") for " + stringify(call.type()));
}
+ Future<APIResult> _call(
+ const Call& callMessage,
+ process::http::Request request)
+ {
+ if (connections.isNone()) {
+ return Failure("Connection to master interrupted");
+ }
+
+ Future<process::http::Response> response;
+
+ CHECK_SOME(streamId);
+
+ // Set the stream ID associated with this connection.
+ request.headers["Mesos-Stream-Id"] = streamId->toString();
+
+ CHECK_SOME(connectionId);
+
+ return connections->nonSubscribe.send(request)
+ .then(defer(self(),
+ &Self::__call,
+ callMessage,
+ lambda::_1));
+ }
+
+ Future<APIResult> __call(
+ const Call& callMessage,
+ const process::http::Response& response)
+ {
+ APIResult result;
+
+ result.set_status_code(response.code);
+
+ if (response.code == process::http::Status::ACCEPTED) {
+ // "202 Accepted" responses are asynchronously processed, so the body
+ // should be empty.
+ if (!response.body.empty()) {
+ LOG(WARNING) << "Response for " << callMessage.type()
+ << " unexpectedly included body: '" << response.body
+ << "'";
+ }
+ } else if (response.code == process::http::Status::OK) {
+ if (!response.body.empty()) {
+ Try<Response> deserializedResponse =
+ deserialize<Response>(contentType, response.body);
+
+ if (deserializedResponse.isError()) {
+ return Failure(
+ "Failed to deserialize the response '" + response.status + "'" +
+ " (" + response.body + "): " + deserializedResponse.error());
+ }
+
+ *result.mutable_response() = deserializedResponse.get();
+ }
+ } else {
+ result.set_error(
+ "Received unexpected '" + response.status + "'" + " (" +
+ response.body + ")");
+ }
+
+ return result;
+ }
+
void read()
{
subscribed->decoder->read()
@@ -917,6 +1023,11 @@ void Mesos::send(const Call& call)
dispatch(process, &MesosProcess::send, call);
}
+Future<APIResult> Mesos::call(const Call& callMessage)
+{
+ return dispatch(process, &MesosProcess::call, callMessage);
+}
+
void Mesos::reconnect()
{