You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/23 21:09:06 UTC

[07/13] mesos git commit: Implemented operation status reconciliation.

Implemented operation status reconciliation.

Review: https://reviews.apache.org/r/66464/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2616908
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2616908
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2616908

Branch: refs/heads/master
Commit: d26169081699b6bc654113ae7ea980e55cd5f67d
Parents: c7c3848
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:41 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:49:07 2018 -0700

----------------------------------------------------------------------
 include/mesos/v1/scheduler/scheduler.proto |   6 ++
 src/master/http.cpp                        |  32 +++++--
 src/master/master.cpp                      | 108 +++++++++++++++++++++++-
 src/master/master.hpp                      |  21 ++++-
 src/messages/messages.proto                |   7 +-
 5 files changed, 159 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index 25ebcfc..b912901 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -426,7 +426,13 @@ message Call {
   message ReconcileOperations {
     message Operation {
       required OperationID operation_id = 1;
+
+      // If `agent_id` is not set and the master doesn't know the operation,
+      // then it will return `OPERATION_UNKNOWN`; if `agent_id` is set, it can
+      // return more fine-grained states depending on the state of the
+      // corresponding agent.
       optional AgentID agent_id = 2;
+
       optional ResourceProviderID resource_provider_id = 3;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 34c9023..135ae43 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -956,12 +956,14 @@ Future<Response> Master::Http::scheduler(
     return BadRequest("Failed to validate scheduler::Call: " + error->message);
   }
 
-  if (call.type() == scheduler::Call::SUBSCRIBE) {
-    // We default to JSON 'Content-Type' in the response since an
-    // empty 'Accept' header results in all media types considered
-    // acceptable.
-    ContentType acceptType = ContentType::JSON;
+  ContentType acceptType;
 
+  // Ideally this handler would be consistent with the Operator API handler
+  // and determine the accept type regardless of the type of request.
+  // However, to maintain backwards compatibility, it determines the accept
+  // type only if the response will not be empty.
+  if (call.type() == scheduler::Call::SUBSCRIBE ||
+      call.type() == scheduler::Call::RECONCILE_OPERATIONS) {
     if (request.acceptsMediaType(APPLICATION_JSON)) {
       acceptType = ContentType::JSON;
     } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
@@ -971,7 +973,9 @@ Future<Response> Master::Http::scheduler(
           string("Expecting 'Accept' to allow ") +
           "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
     }
+  }
 
+  if (call.type() == scheduler::Call::SUBSCRIBE) {
     // Make sure that a stream ID was not included in the request headers.
     if (request.headers.contains("Mesos-Stream-Id")) {
       return BadRequest(
@@ -1110,9 +1114,9 @@ Future<Response> Master::Http::scheduler(
       master->reconcile(framework, std::move(*call.mutable_reconcile()));
       return Accepted();
 
-    // TODO(greggomann): Implement operation reconciliation.
     case scheduler::Call::RECONCILE_OPERATIONS:
-      return Forbidden("Operation reconciliation is not yet implemented");
+      return reconcileOperations(
+          framework, call.reconcile_operations(), acceptType);
 
     case scheduler::Call::MESSAGE:
       master->message(framework, std::move(*call.mutable_message()));
@@ -5089,6 +5093,20 @@ Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const
   });
 }
 
+
+Future<Response> Master::Http::reconcileOperations(
+    Framework* framework,
+    const scheduler::Call::ReconcileOperations& call,
+    ContentType contentType) const
+{
+  mesos::scheduler::Response response;
+  response.set_type(mesos::scheduler::Response::RECONCILE_OPERATIONS);
+  *response.mutable_reconcile_operations() =
+    master->reconcileOperations(framework, call);
+
+  return OK(serialize(contentType, evolve(response)), stringify(contentType));
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ada7709..545a4d7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8837,10 +8837,112 @@ void Master::reconcile(
 }
 
 
-// TODO(greggomann): Implement operation update reconciliation.
-void Master::reconcileOperations(
+scheduler::Response::ReconcileOperations Master::reconcileOperations(
     Framework* framework,
-    const scheduler::Call::ReconcileOperations& reconcile) {}
+    const scheduler::Call::ReconcileOperations& reconcile)
+{
+  CHECK_NOTNULL(framework);
+
+  ++metrics->messages_reconcile_operations;
+
+  scheduler::Response::ReconcileOperations response;
+
+  if (reconcile.operations_size() == 0) {
+    // Implicit reconciliation.
+    LOG(INFO) << "Performing implicit operation state reconciliation"
+                 " for framework " << *framework;
+
+    response.mutable_operation_statuses()->Reserve(
+        framework->operations.size());
+
+    foreachvalue (Operation* operation, framework->operations) {
+      if (operation->statuses().empty()) {
+        // This can happen if the operation is pending.
+        response.add_operation_statuses()->CopyFrom(operation->latest_status());
+      } else {
+        response.add_operation_statuses()->CopyFrom(
+            *operation->statuses().rbegin());
+      }
+    }
+
+    return response;
+  }
+
+  // Explicit reconciliation.
+  LOG(INFO) << "Performing explicit operation state reconciliation for "
+            << reconcile.operations_size() << " operations of framework "
+            << *framework;
+
+  // Explicit reconciliation occurs for the following cases:
+  //   (1) Operation is known: the latest status sent to the framework.
+  //   (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING.
+  //   (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
+  //   (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
+  //   (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
+  //   (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+  //   (7) Operation is unknown, slave ID is not specified: OPERATION_UNKNOWN.
+
+  foreach (const scheduler::Call::ReconcileOperations::Operation& operation,
+           reconcile.operations()) {
+    Option<SlaveID> slaveId = None();
+    if (operation.has_slave_id()) {
+      slaveId = operation.slave_id();
+    }
+
+    Option<Operation*> frameworkOperation =
+      framework->getOperation(operation.operation_id());
+
+    OperationStatus* status = response.add_operation_statuses();
+    if (frameworkOperation.isSome()) {
+      // (1) Operation is known: resend the latest status sent to the framework.
+      if (frameworkOperation.get()->statuses().empty()) {
+        // This can happen if the operation is pending.
+        *status = frameworkOperation.get()->latest_status();
+      } else {
+        *status = *frameworkOperation.get()->statuses().rbegin();
+      }
+    } else if (slaveId.isSome() && slaves.recovered.contains(slaveId.get())) {
+      // (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_RECOVERING,
+          operation.operation_id(),
+          "Reconciliation: Agent is recovered but has not re-registered");
+    } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
+      // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNKNOWN,
+          operation.operation_id(),
+          "Reconciliation: Operation is unknown");
+    } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) {
+      // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNREACHABLE,
+          operation.operation_id(),
+          "Reconciliation: Agent is unreachable");
+    } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
+      // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_GONE_BY_OPERATOR,
+          operation.operation_id(),
+          "Reconciliation: Agent marked gone by operator");
+    } else if (slaveId.isSome()) {
+      // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNKNOWN,
+          operation.operation_id(),
+          "Reconciliation: Both operation and agent are unknown");
+    } else {
+      // (7) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNKNOWN,
+          operation.operation_id(),
+          "Reconciliation: Operation is unknown and no 'agent_id' was"
+          " provided");
+    }
+  }
+
+  return response;
+}
 
 
 void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0d9620d..a7cadd9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1076,7 +1076,7 @@ private:
       Framework* framework,
       scheduler::Call::Reconcile&& reconcile);
 
-  void reconcileOperations(
+  scheduler::Response::ReconcileOperations reconcileOperations(
       Framework* framework,
       const scheduler::Call::ReconcileOperations& reconcile);
 
@@ -1734,6 +1734,11 @@ private:
     process::Future<process::http::Response> _markAgentGone(
         const SlaveID& slaveId) const;
 
+    process::Future<process::http::Response> reconcileOperations(
+        Framework* framework,
+        const scheduler::Call::ReconcileOperations& call,
+        ContentType contentType) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored
@@ -2549,6 +2554,20 @@ struct Framework
     }
   }
 
+  Option<Operation*> getOperation(const OperationID& id) {
+    Option<UUID> uuid = operationUUIDs.get(id);
+
+    if (uuid.isNone()) {
+      return None();
+    }
+
+    Option<Operation*> operation = operations.get(uuid.get());
+
+    CHECK_SOME(operation);
+
+    return operation;
+  }
+
   void recoverResources(Operation* operation)
   {
     CHECK(operation->has_slave_id())

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 556801d..41e6a8a 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -471,10 +471,9 @@ message ReconcileTasksMessage {
 
 
 /**
- * The master uses this message to query an agent about the state of
- * one or more operations. This is useful to resolve
- * discrepancies between the master and agent's view after agent
- * reregistration.
+ * The master uses this message to query an agent about the state of one or
+ * more operations. This is useful to resolve discrepancies between the master
+ * and agent's view after agent reregistration.
  */
 message ReconcileOperationsMessage {
   message Operation {