You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/23 21:09:06 UTC
[07/13] mesos git commit: Implemented operation status reconciliation.
Implemented operation status reconciliation.
Review: https://reviews.apache.org/r/66464/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2616908
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2616908
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2616908
Branch: refs/heads/master
Commit: d26169081699b6bc654113ae7ea980e55cd5f67d
Parents: c7c3848
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:41 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:49:07 2018 -0700
----------------------------------------------------------------------
include/mesos/v1/scheduler/scheduler.proto | 6 ++
src/master/http.cpp | 32 +++++--
src/master/master.cpp | 108 +++++++++++++++++++++++-
src/master/master.hpp | 21 ++++-
src/messages/messages.proto | 7 +-
5 files changed, 159 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index 25ebcfc..b912901 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -426,7 +426,13 @@ message Call {
message ReconcileOperations {
message Operation {
required OperationID operation_id = 1;
+
+ // If `agent_id` is not set and the master doesn't know the operation,
+ // then it will return `OPERATION_UNKNOWN`; if `agent_id` is set, it can
+ // return more fine-grained states depending on the state of the
+ // corresponding agent.
optional AgentID agent_id = 2;
+
optional ResourceProviderID resource_provider_id = 3;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 34c9023..135ae43 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -956,12 +956,14 @@ Future<Response> Master::Http::scheduler(
return BadRequest("Failed to validate scheduler::Call: " + error->message);
}
- if (call.type() == scheduler::Call::SUBSCRIBE) {
- // We default to JSON 'Content-Type' in the response since an
- // empty 'Accept' header results in all media types considered
- // acceptable.
- ContentType acceptType = ContentType::JSON;
+ ContentType acceptType;
+ // Ideally this handler would be consistent with the Operator API handler
+ // and determine the accept type regardless of the type of request.
+ // However, to maintain backwards compatibility, it determines the accept
+ // type only if the response will not be empty.
+ if (call.type() == scheduler::Call::SUBSCRIBE ||
+ call.type() == scheduler::Call::RECONCILE_OPERATIONS) {
if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
@@ -971,7 +973,9 @@ Future<Response> Master::Http::scheduler(
string("Expecting 'Accept' to allow ") +
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
+ }
+ if (call.type() == scheduler::Call::SUBSCRIBE) {
// Make sure that a stream ID was not included in the request headers.
if (request.headers.contains("Mesos-Stream-Id")) {
return BadRequest(
@@ -1110,9 +1114,9 @@ Future<Response> Master::Http::scheduler(
master->reconcile(framework, std::move(*call.mutable_reconcile()));
return Accepted();
- // TODO(greggomann): Implement operation reconciliation.
case scheduler::Call::RECONCILE_OPERATIONS:
- return Forbidden("Operation reconciliation is not yet implemented");
+ return reconcileOperations(
+ framework, call.reconcile_operations(), acceptType);
case scheduler::Call::MESSAGE:
master->message(framework, std::move(*call.mutable_message()));
@@ -5089,6 +5093,20 @@ Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const
});
}
+
+Future<Response> Master::Http::reconcileOperations(
+ Framework* framework,
+ const scheduler::Call::ReconcileOperations& call,
+ ContentType contentType) const
+{
+ mesos::scheduler::Response response;
+ response.set_type(mesos::scheduler::Response::RECONCILE_OPERATIONS);
+ *response.mutable_reconcile_operations() =
+ master->reconcileOperations(framework, call);
+
+ return OK(serialize(contentType, evolve(response)), stringify(contentType));
+}
+
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ada7709..545a4d7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8837,10 +8837,112 @@ void Master::reconcile(
}
-// TODO(greggomann): Implement operation update reconciliation.
-void Master::reconcileOperations(
+scheduler::Response::ReconcileOperations Master::reconcileOperations(
Framework* framework,
- const scheduler::Call::ReconcileOperations& reconcile) {}
+ const scheduler::Call::ReconcileOperations& reconcile)
+{
+ CHECK_NOTNULL(framework);
+
+ ++metrics->messages_reconcile_operations;
+
+ scheduler::Response::ReconcileOperations response;
+
+ if (reconcile.operations_size() == 0) {
+ // Implicit reconciliation.
+ LOG(INFO) << "Performing implicit operation state reconciliation"
+ " for framework " << *framework;
+
+ response.mutable_operation_statuses()->Reserve(
+ framework->operations.size());
+
+ foreachvalue (Operation* operation, framework->operations) {
+ if (operation->statuses().empty()) {
+ // This can happen if the operation is pending.
+ response.add_operation_statuses()->CopyFrom(operation->latest_status());
+ } else {
+ response.add_operation_statuses()->CopyFrom(
+ *operation->statuses().rbegin());
+ }
+ }
+
+ return response;
+ }
+
+ // Explicit reconciliation.
+ LOG(INFO) << "Performing explicit operation state reconciliation for "
+ << reconcile.operations_size() << " operations of framework "
+ << *framework;
+
+ // Explicit reconciliation occurs for the following cases:
+ // (1) Operation is known: the latest status sent to the framework.
+ // (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING.
+ // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
+ // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
+ // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
+ // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+ // (7) Operation is unknown, slave ID is not specified: OPERATION_UNKNOWN.
+
+ foreach (const scheduler::Call::ReconcileOperations::Operation& operation,
+ reconcile.operations()) {
+ Option<SlaveID> slaveId = None();
+ if (operation.has_slave_id()) {
+ slaveId = operation.slave_id();
+ }
+
+ Option<Operation*> frameworkOperation =
+ framework->getOperation(operation.operation_id());
+
+ OperationStatus* status = response.add_operation_statuses();
+ if (frameworkOperation.isSome()) {
+ // (1) Operation is known: resend the latest status sent to the framework.
+ if (frameworkOperation.get()->statuses().empty()) {
+ // This can happen if the operation is pending.
+ *status = frameworkOperation.get()->latest_status();
+ } else {
+ *status = *frameworkOperation.get()->statuses().rbegin();
+ }
+ } else if (slaveId.isSome() && slaves.recovered.contains(slaveId.get())) {
+ // (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING.
+ *status = protobuf::createOperationStatus(
+ OperationState::OPERATION_RECOVERING,
+ operation.operation_id(),
+ "Reconciliation: Agent is recovered but has not re-registered");
+ } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
+ // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
+ *status = protobuf::createOperationStatus(
+ OperationState::OPERATION_UNKNOWN,
+ operation.operation_id(),
+ "Reconciliation: Operation is unknown");
+ } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) {
+ // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
+ *status = protobuf::createOperationStatus(
+ OperationState::OPERATION_UNREACHABLE,
+ operation.operation_id(),
+ "Reconciliation: Agent is unreachable");
+ } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
+ // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
+ *status = protobuf::createOperationStatus(
+ OperationState::OPERATION_GONE_BY_OPERATOR,
+ operation.operation_id(),
+ "Reconciliation: Agent marked gone by operator");
+ } else if (slaveId.isSome()) {
+ // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+ *status = protobuf::createOperationStatus(
+ OperationState::OPERATION_UNKNOWN,
+ operation.operation_id(),
+ "Reconciliation: Both operation and agent are unknown");
+ } else {
+ // (7) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+ *status = protobuf::createOperationStatus(
+ OperationState::OPERATION_UNKNOWN,
+ operation.operation_id(),
+ "Reconciliation: Operation is unknown and no 'agent_id' was"
+ " provided");
+ }
+ }
+
+ return response;
+}
void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0d9620d..a7cadd9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1076,7 +1076,7 @@ private:
Framework* framework,
scheduler::Call::Reconcile&& reconcile);
- void reconcileOperations(
+ scheduler::Response::ReconcileOperations reconcileOperations(
Framework* framework,
const scheduler::Call::ReconcileOperations& reconcile);
@@ -1734,6 +1734,11 @@ private:
process::Future<process::http::Response> _markAgentGone(
const SlaveID& slaveId) const;
+ process::Future<process::http::Response> reconcileOperations(
+ Framework* framework,
+ const scheduler::Call::ReconcileOperations& call,
+ ContentType contentType) const;
+
Master* master;
// NOTE: The quota specific pieces of the Operator API are factored
@@ -2549,6 +2554,20 @@ struct Framework
}
}
+ Option<Operation*> getOperation(const OperationID& id) {
+ Option<UUID> uuid = operationUUIDs.get(id);
+
+ if (uuid.isNone()) {
+ return None();
+ }
+
+ Option<Operation*> operation = operations.get(uuid.get());
+
+ CHECK_SOME(operation);
+
+ return operation;
+ }
+
void recoverResources(Operation* operation)
{
CHECK(operation->has_slave_id())
http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 556801d..41e6a8a 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -471,10 +471,9 @@ message ReconcileTasksMessage {
/**
- * The master uses this message to query an agent about the state of
- * one or more operations. This is useful to resolve
- * discrepancies between the master and agent's view after agent
- * reregistration.
+ * The master uses this message to query an agent about the state of one or
+ * more operations. This is useful to resolve discrepancies between the master
+ * and agent's view after agent reregistration.
*/
message ReconcileOperationsMessage {
message Operation {