You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/11/30 17:35:57 UTC

[4/8] mesos git commit: Reconciled pending resource provider operations in agent.

Reconciled pending resource provider operations in agent.

When resource providers update their state they send a list of
pending or unacknowledged operations to the agent. This patch add
tracking for such operations to the agent. The agent can then forward
these operations to the master, e.g., for calculating the unused
resources behind an agent.

We track an operation until we either receive a updated list of
pending or unacknowledged operations from a resource provider, or
until we see an acknowledgement from a framework. This keeps the list
of operations bounded and ensures that we maintain the latest
information in the agent.

Review: https://reviews.apache.org/r/63731/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42c14e1e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42c14e1e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42c14e1e

Branch: refs/heads/master
Commit: 42c14e1efc81ff617502cd360ad460d729802b2c
Parents: 0deabd3
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:38 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/slave/slave.cpp | 159 ++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 138 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42c14e1e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a415894..a9aa987 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1283,6 +1283,13 @@ void Slave::registered(
     message.mutable_resource_categories()->set_total(true);
     message.mutable_total_resources()->CopyFrom(totalResources);
 
+    UpdateSlaveMessage::OfferOperations* operations =
+      message.mutable_offer_operations();
+
+    foreachvalue (const OfferOperation* operation, offerOperations) {
+      operations->add_operations()->CopyFrom(*operation);
+    }
+
     sendUpdateSlaveMessage = true;
   }
 
@@ -1389,6 +1396,13 @@ void Slave::reregistered(
     message.mutable_resource_categories()->set_total(true);
     message.mutable_total_resources()->CopyFrom(totalResources);
 
+    UpdateSlaveMessage::OfferOperations* operations =
+      message.mutable_offer_operations();
+
+    foreachvalue (const OfferOperation* operation, offerOperations) {
+      operations->add_operations()->CopyFrom(*operation);
+    }
+
     sendUpdateSlaveMessage = true;
   }
 
@@ -6798,21 +6812,115 @@ void Slave::handleResourceProviderMessage(
           return resource.provider_id() == resourceProviderId;
         });
 
-      // Ignore the update if it contained no new information.
-      if (newTotal == oldTotal) {
-        break;
+      bool updated = false;
+
+      if (oldTotal != newTotal) {
+        totalResources -= oldTotal;
+        totalResources += newTotal;
+
+        updated = true;
+      }
+
+      // Update offer operation state.
+      //
+      // We only update offer operations which are not contained in both the
+      // known and just received sets. All other offer operations will be
+      // updated via relayed offer operation status updates.
+      auto isForResourceProvider = [resourceProviderId](
+                                      const OfferOperation& operation) {
+        Result<ResourceProviderID> id = getResourceProviderId(operation.info());
+        return id.isSome() && resourceProviderId == id.get();
+      };
+
+      hashmap<UUID, OfferOperation*> knownOfferOperations;
+      foreachpair(auto&& uuid, auto&& operation, offerOperations) {
+        if (isForResourceProvider(*operation)) {
+          knownOfferOperations.put(uuid, operation);
+        }
+      }
+
+      hashmap<UUID, OfferOperation> receivedOfferOperations;
+      foreach (
+          const OfferOperation& operation,
+          message->updateState->operations) {
+        CHECK(isForResourceProvider(operation))
+          << "Received operation on unexpected resource provider "
+          << "from resource provider " << resourceProviderId;
+
+        Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+        CHECK_SOME(operationUuid);
+
+        receivedOfferOperations.put(operationUuid.get(), operation);
       }
 
-      totalResources -= oldTotal;
-      totalResources += newTotal;
+      const hashset<UUID> knownUuids = knownOfferOperations.keys();
+      const hashset<UUID> receivedUuids = receivedOfferOperations.keys();
+
+      if (knownUuids != receivedUuids) {
+        // Handle offer operations known to the agent but not reported by the
+        // resource provider. These could be operations where the agent has
+        // started tracking an offer operation, but the resource provider failed
+        // over before it could bookkeep the operation.
+        //
+        // NOTE: We do not mutate offer operations statuses here; this
+        // would be the responsibility of a offer operation status
+        // update handler.
+        hashset<UUID> disappearedOperations;
+        std::set_difference(
+            knownUuids.begin(),
+            knownUuids.end(),
+            receivedUuids.begin(),
+            receivedUuids.end(),
+            std::inserter(
+                disappearedOperations, disappearedOperations.begin()));
+
+        foreach (const UUID& uuid, disappearedOperations) {
+          // TODO(bbannier): Instead of simply dropping an operation with
+          // `removeOfferOperation` here we should instead send a `Reconcile`
+          // message with a failed state to the resource provider so its status
+          // update manager can reliably deliver the operation status to the
+          // framework.
+          CHECK(offerOperations.contains(uuid));
+          removeOfferOperation(offerOperations.at(uuid));
+        }
+
+        // Handle offer operations known to the resource provider but
+        // not the agent. This can happen if the agent failed over and
+        // the resource provider reregistered.
+        hashset<UUID> reappearedOperations;
+        std::set_difference(
+            receivedUuids.begin(),
+            receivedUuids.end(),
+            knownUuids.begin(),
+            knownUuids.end(),
+            std::inserter(reappearedOperations, reappearedOperations.begin()));
+
+        foreach (const UUID& uuid, reappearedOperations) {
+          // Start tracking this offer operation.
+          //
+          // NOTE: We do not need to update total resources here as its
+          // state was sync explicitly with the received total above.
+          CHECK(receivedOfferOperations.contains(uuid));
+          addOfferOperation(
+              new OfferOperation(receivedOfferOperations.at(uuid)));
+        }
+
+        updated = true;
+      }
 
+      // Update resource version of this resource provider.
       const UUID& resourceVersionUuid =
         message->updateState->resourceVersionUuid;
 
-      if (resourceVersions.contains(resourceProviderId)) {
-        resourceVersions.at(resourceProviderId) = resourceVersionUuid;
-      } else {
-        resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+      if (!resourceVersions.contains(resourceProviderId) ||
+          resourceVersions.at(resourceProviderId) != resourceVersionUuid) {
+        if (resourceVersions.contains(resourceProviderId)) {
+          resourceVersions.at(resourceProviderId) = resourceVersionUuid;
+        } else {
+          resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+        }
+
+        updated = true;
       }
 
       // Send the updated resources to the master if the agent is running. Note
@@ -6827,23 +6935,32 @@ void Slave::handleResourceProviderMessage(
           break;
         }
         case RUNNING: {
-          LOG(INFO) << "Forwarding new total resources " << totalResources;
+          if (updated) {
+            LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-          // Inform the master that the total capacity of this agent has
-          // changed.
-          UpdateSlaveMessage updateSlaveMessage;
-          updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
-          updateSlaveMessage.mutable_resource_categories()->set_total(true);
+            // Inform the master that the total capacity of this agent has
+            // changed.
+            UpdateSlaveMessage updateSlaveMessage;
+            updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
+            updateSlaveMessage.mutable_resource_categories()->set_total(true);
 
-          updateSlaveMessage.mutable_total_resources()->CopyFrom(
-              totalResources);
+            updateSlaveMessage.mutable_total_resources()->CopyFrom(
+                totalResources);
 
-          updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
-              protobuf::createResourceVersions(resourceVersions));
+            updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
+                protobuf::createResourceVersions(resourceVersions));
 
-          send(master.get(), updateSlaveMessage);
+            UpdateSlaveMessage::OfferOperations* operations =
+              updateSlaveMessage.mutable_offer_operations();
 
-          break;
+            foreachvalue (const OfferOperation* operation, offerOperations) {
+              operations->add_operations()->CopyFrom(*operation);
+            }
+
+            send(master.get(), updateSlaveMessage);
+
+            break;
+          }
         }
       }
       break;