You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/03/14 12:44:55 UTC

[3/3] mesos git commit: Explicitly tracked resource providers in master.

Explicitly tracked resource providers in master.

This patch adds explicit tracking of resource providers to the master
process. While we already had explicitly send resource provider
information in e.g., `UpdateSlaveMessage`, we only stored that
information aggregated over the full agent in the master up to now.

Review: https://reviews.apache.org/r/65591/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6fe66ce0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6fe66ce0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6fe66ce0

Branch: refs/heads/master
Commit: 6fe66ce0625afa8aa4ab9815a0bb881ccf066e31
Parents: 729cb5b
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:14 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:14 2018 +0100

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp |   9 +-
 src/master/http.cpp           |   8 +
 src/master/master.cpp         | 685 ++++++++++++++++++-------------------
 src/master/master.hpp         |  40 ++-
 4 files changed, 385 insertions(+), 357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 9c5fb97..d2ada35 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -1272,13 +1272,12 @@ mesos::master::Response::GetAgents::Agent createAgentResponse(
       slave.capabilities.toRepeatedPtrField());
 
   foreachvalue (
-      const ResourceProviderInfo& resourceProviderInfo,
+      const mesos::internal::master::Slave::ResourceProvider& resourceProvider,
       slave.resourceProviders) {
-    mesos::master::Response::GetAgents::Agent::ResourceProvider*
-      resourceProvider = agent.add_resource_providers();
+    mesos::master::Response::GetAgents::Agent::ResourceProvider* provider =
+      agent.add_resource_providers();
 
-    resourceProvider->mutable_resource_provider_info()->CopyFrom(
-        resourceProviderInfo);
+    provider->mutable_resource_provider_info()->CopyFrom(resourceProvider.info);
   }
 
   return agent;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index cf03d8b..05fd7ce 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3907,6 +3907,14 @@ Future<Response> Master::Http::getOperations(
     foreachvalue (Operation* operation, slave->operations) {
       operations->add_operations()->CopyFrom(*operation);
     }
+
+    foreachvalue (
+        const Slave::ResourceProvider resourceProvider,
+        slave->resourceProviders) {
+      foreachvalue (Operation* operation, resourceProvider.operations) {
+        operations->add_operations()->CopyFrom(*operation);
+      }
+    }
   }
 
   return OK(serialize(contentType, evolve(response)), stringify(contentType));

http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 223ebf2..8df7ad5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4942,8 +4942,21 @@ void Master::_accept(
             RunTaskMessage message;
             message.mutable_framework()->MergeFrom(framework->info);
 
+            hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+            if (slave->resourceVersion.isSome()) {
+              resourceVersions.put(None(), slave->resourceVersion.get());
+            }
+
+            foreachpair (
+                const ResourceProviderID& resourceProviderId,
+                const Slave::ResourceProvider& resourceProvider,
+                slave->resourceProviders) {
+              resourceVersions.put(
+                  resourceProviderId, resourceProvider.resourceVersion);
+            }
+
             message.mutable_resource_version_uuids()->CopyFrom(
-                protobuf::createResourceVersions(slave->resourceVersions));
+                protobuf::createResourceVersions(resourceVersions));
 
             // TODO(anand): We set 'pid' to UPID() for http frameworks
             // as 'pid' was made optional in 0.24.0. In 0.25.0, we
@@ -5129,8 +5142,21 @@ void Master::_accept(
         message.mutable_executor()->CopyFrom(executor);
         message.mutable_task_group()->CopyFrom(taskGroup);
 
+        hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+        if (slave->resourceVersion.isSome()) {
+          resourceVersions.put(None(), slave->resourceVersion.get());
+        }
+
+        foreachpair (
+            const ResourceProviderID& resourceProviderId,
+            const Slave::ResourceProvider& resourceProvider,
+            slave->resourceProviders) {
+          resourceVersions.put(
+              resourceProviderId, resourceProvider.resourceVersion);
+        }
+
         message.mutable_resource_version_uuids()->CopyFrom(
-            protobuf::createResourceVersions(slave->resourceVersions));
+            protobuf::createResourceVersions(resourceVersions));
 
         set<TaskID> taskIds;
         Resources totalResources;
@@ -7292,178 +7318,93 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
 
   // TODO(bbannier): We only need to update if any changes from
   // resource providers are reported.
-  bool updated =
-    slave->totalResources != newSlaveResources ||
-    message.has_resource_providers();
-
-  if (message.has_resource_version_uuid()) {
-    hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-
-    resourceVersions.insert({None(), message.resource_version_uuid()});
-
-    foreach (
-        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
-        message.resource_providers().providers()) {
-      if (!resourceProvider.has_info()) {
-        continue;
-      }
-
-      CHECK(resourceProvider.info().has_id());
-
-      const ResourceProviderID& resourceProviderId =
-        resourceProvider.info().id();
-
-      CHECK(!resourceVersions.contains(resourceProviderId));
-      resourceVersions.insert(
-          {resourceProviderId, resourceProvider.resource_version_uuid()});
-    }
+  bool updated = slave->totalResources != newSlaveResources;
 
-    updated = updated || slave->resourceVersions != resourceVersions;
-    slave->resourceVersions = resourceVersions;
+  // Check if the agent's resource version changed.
+  if (!updated && message.has_resource_version_uuid() &&
+      (slave->resourceVersion.isNone() ||
+       (slave->resourceVersion.isSome() &&
+        message.resource_version_uuid() != slave->resourceVersion.get()))) {
+    updated = true;
   }
 
   // Check if the known operations for this agent changed.
-  const hashset<UUID> knownOperations = slave->operations.keys();
-  hashset<UUID> receivedOperations;
-
-  foreach (const Operation& operation, message.operations().operations()) {
-    receivedOperations.insert(operation.uuid());
-  }
+  if (!updated) {
+    foreach (const Operation& operation, message.operations().operations()) {
+      if (!slave->operations.contains(operation.uuid())) {
+        updated = true;
+        break;
+      }
 
-  if (message.has_resource_providers()) {
-    foreach (
-        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
-        message.resource_providers().providers()) {
-      foreach (const Operation& operation,
-               resourceProvider.operations().operations()) {
-        receivedOperations.insert(operation.uuid());
+      if (*slave->operations.at(operation.uuid()) != operation) {
+        updated = true;
+        break;
       }
     }
   }
 
-  updated = updated || knownOperations != receivedOperations;
-
-  if (!updated) {
-    LOG(INFO) << "Ignoring update on agent " << *slave
-              << " as it reports no changes";
-    return;
-  }
-
-  struct ResourceProvider
-  {
-    Option<Resources> oldTotal;
-    Option<Resources> newTotal;
-    Option<hashmap<UUID, Operation>> oldOperations;
-    Option<hashmap<UUID, Operation>> newOperations;
-    Option<ResourceProviderInfo> info;
-  };
-
-  // We store information on the different `ResourceProvider`s on this agent in
-  // a map, indexed by an optional provider id. Since the provider ID field for
-  // resources is only set for resources from true resource providers and is not
-  // set for agent default resources, the value for the key `None` points to
-  // information about the agent itself, not its resource providers.
-  hashmap<Option<ResourceProviderID>, ResourceProvider> resourceProviders;
-
-  // Group the resources and operation updates by resource provider.
-  {
-    // Process known resources.
-    auto groupResourcesByProviderId = [](const Resources& resources) {
-      hashmap<Option<ResourceProviderID>, Resources> result;
+  // Check if resource provider information changed.
+  if (!updated && message.has_resource_providers()) {
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& receivedProvider,
+        message.resource_providers().providers()) {
+      CHECK(receivedProvider.has_info());
+      CHECK(receivedProvider.info().has_id());
 
-      foreach (const Resource& resource, resources) {
-        Option<ResourceProviderID> providerId =
-          Resources::hasResourceProvider(resource)
-            ? resource.provider_id()
-            : Option<ResourceProviderID>::none();
+      const ResourceProviderID& resourceProviderId =
+        receivedProvider.info().id();
 
-        result[std::move(providerId)] += resource;
+      if (!slave->resourceProviders.contains(resourceProviderId)) {
+        updated = true;
+        break;
       }
 
-      return result;
-    };
+      const Slave::ResourceProvider& storedProvider =
+        slave->resourceProviders.at(resourceProviderId);
 
-    foreachpair (
-        const Option<ResourceProviderID>& providerId,
-        const Resources& resources,
-        groupResourcesByProviderId(slave->totalResources)) {
-      resourceProviders[providerId].oldTotal = resources;
-    }
-
-    // Process known operations.
-    foreachpair (const UUID& uuid,
-                 Operation* operation,
-                 slave->operations) {
-      Result<ResourceProviderID> providerId_ =
-        getResourceProviderId(operation->info());
-
-      CHECK(!providerId_.isError())
-        << "Failed to extract resource provider id from known operation: "
-        << providerId_.error();
-
-      Option<ResourceProviderID> providerId =
-        providerId_.isSome()
-          ? providerId_.get()
-          : Option<ResourceProviderID>::none();
-
-      // Set up an init empty list of existing operations. We might
-      // create a record for this resource provider if needed.
-      if (resourceProviders[providerId].oldOperations.isNone()) {
-        resourceProviders.at(providerId).oldOperations =
-          hashmap<UUID, Operation>();
+      if (storedProvider.info != receivedProvider.info() ||
+          storedProvider.totalResources != receivedProvider.total_resources() ||
+          storedProvider.resourceVersion !=
+            receivedProvider.resource_version_uuid()) {
+        updated = true;
+        break;
       }
 
-      resourceProviders.at(providerId)
-        .oldOperations->emplace(uuid, *operation);
-    }
-
-    // Explicitly add an entry for received agent resources.
-    resourceProviders[None()].newTotal =
-      newSlaveResources.filter(agentResources);
-
-    // Process received agent operations.
-    resourceProviders[None()].newOperations =
-      hashmap<UUID, Operation>();
-
-    foreach (const Operation& operation, message.operations().operations()) {
-      resourceProviders.at(None())
-        .newOperations->emplace(operation.uuid(), operation);
-    }
-
-    // Process explicitly received resource provider information.
-    if (message.has_resource_providers()) {
       foreach (
-          const UpdateSlaveMessage::ResourceProvider& resourceProvider,
-          message.resource_providers().providers()) {
-        CHECK(resourceProvider.has_info());
-        CHECK(resourceProvider.info().has_id());
-
-        ResourceProvider& provider =
-          resourceProviders[resourceProvider.info().id()];
-
-        provider.info = resourceProvider.info();
-
-        provider.newTotal = resourceProvider.total_resources();
-        if (provider.newOperations.isNone()) {
-          provider.newOperations = hashmap<UUID, Operation>();
+          const Operation& operation,
+          receivedProvider.operations().operations()) {
+        if (!storedProvider.operations.contains(operation.uuid())) {
+          updated = true;
+          break;
         }
 
-        foreach (const Operation& operation,
-                 resourceProvider.operations().operations()) {
-          provider.newOperations->emplace(operation.uuid(), operation);
+        if (*storedProvider.operations.at(operation.uuid()) != operation) {
+          updated = true;
+          break;
         }
       }
     }
   }
 
+  if (!updated) {
+    LOG(INFO) << "Ignoring update on agent " << *slave
+              << " as it reports no changes";
+    return;
+  }
+
   // Check invariants of the received update.
   {
-    foreachpair (
-        const Option<ResourceProviderID>& providerId,
-        const ResourceProvider& provider,
-        resourceProviders) {
-      if (providerId.isSome() &&
-          slave->resourceProviders.contains(providerId.get())) {
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      CHECK(resourceProvider.has_info());
+      CHECK(resourceProvider.info().has_id());
+      const ResourceProviderID& providerId = resourceProvider.info().id();
+
+      const Option<Slave::ResourceProvider>& oldProvider =
+        slave->resourceProviders.get(providerId);
+
+      if (oldProvider.isSome()) {
         // For known resource providers the master should always know at least
         // as many non-terminal operations as the agent. While an
         // operation might get lost on the way to the agent or resource
@@ -7477,75 +7418,58 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
         // to the agent, but the agent fails over before it can process the
         // acknowledgement, or the agent initiates an unrelated
         // `UpdateSlaveMessage`.
-        auto extractPendingOperations =
-          [](const hashmap<UUID, Operation>& source,
-             hashset<UUID>* target) {
-            foreachpair (const UUID& uuid,
-                         const Operation& operation,
-                         source) {
-              if (!protobuf::isTerminalState(
-                      operation.latest_status().state())) {
-                target->insert(uuid);
-              }
-            }
-          };
+        foreach (
+            const Operation& operation,
+            resourceProvider.operations().operations()) {
+          if (!protobuf::isTerminalState(operation.latest_status().state())) {
+            CHECK(oldProvider->operations.contains(operation.uuid()))
+              << "Agent tried to reconcile unknown non-terminal operation "
+              << operation.uuid();
+          }
+        }
+      }
+    }
+  }
 
-        hashset<UUID> oldPendingOperations;
-        hashset<UUID> newPendingOperations;
+  // Update master and allocator state.
 
-        if (provider.oldOperations.isSome()) {
-          extractPendingOperations(
-              provider.oldOperations.get(), &oldPendingOperations);
-        }
+  if (hasOversubscribed) {
+    slave->totalResources -= slave->totalResources.revocable();
+    slave->totalResources += message.oversubscribed_resources();
 
-        if (provider.newOperations.isSome()) {
-          extractPendingOperations(
-              provider.newOperations.get(), &newPendingOperations);
-        }
+    // TODO(bbannier): Track oversubscribed resources for resource
+    // providers as well.
+  }
 
-        foreach (const UUID& uuid, newPendingOperations) {
-          CHECK(oldPendingOperations.contains(uuid))
-            << "Agent tried to reconcile unknown non-terminal operation "
-            << uuid;
-        }
-      }
+  ReconcileOperationsMessage reconcile;
 
-      if (providerId.isNone()) {
-        // We do not permit changes to agent (i.e., non-resource
-        // provider) non-revocable resources.
-        CHECK_SOME(provider.oldTotal);
-        CHECK_SOME(provider.newTotal);
-
-        Resources oldNonRevocable =
-          provider.oldTotal->nonRevocable().createStrippedScalarQuantity();
-        Resources newNonRevocable =
-          provider.newTotal->nonRevocable().createStrippedScalarQuantity();
-        CHECK_EQ(
-            provider.oldTotal->nonRevocable(),
-            provider.newTotal->nonRevocable());
-
-        // For agents only speculative operations can be reconciled.
-        //
-        // TODO(bbannier): Reconcile agent operations in
-        // `ReregisterSlaveMessage` in which case we expect agents to
-        // send the already known operations again here (possibly with
-        // changed status).
-        if (provider.newOperations.isSome()) {
-          foreachvalue (const Operation& operation,
-                        provider.newOperations.get()) {
-            CHECK(protobuf::isSpeculativeOperation(operation.info()));
-          }
-        }
-      }
+  // Reconcile operations on agent-default resources.
+  hashset<UUID> newOperations;
+  foreach (const Operation& operation, message.operations().operations()) {
+    newOperations.insert(operation.uuid());
+  }
+
+  foreachkey (const UUID& uuid, slave->operations) {
+    if (!message.has_operations() || !newOperations.contains(uuid)) {
+      LOG(WARNING) << "Performing explicit reconciliation with agent for"
+                   << " known operation " << uuid
+                   << " since it was not present in original"
+                   << " reconciliation message from agent";
+
+      ReconcileOperationsMessage::Operation* reconcileOperation =
+        reconcile.add_operations();
+
+      reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
     }
   }
 
-  ReconcileOperationsMessage reconcile;
+  foreach (
+      const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+      message.resource_providers().providers()) {
+    CHECK(resourceProvider.has_info());
+    CHECK(resourceProvider.info().has_id());
+    const ResourceProviderID& providerId = resourceProvider.info().id();
 
-  // Update master and allocator state.
-  foreachpair (const Option<ResourceProviderID>& providerId,
-               const ResourceProvider& provider,
-               resourceProviders) {
     // Below we only add operations to our state from resource
     // providers which are unknown, or possibly remove them for known
     // resource providers.  This works since the master should always
@@ -7565,142 +7489,140 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
     // TODO(bbannier): We might want to consider to also learn about
     // new (terminal) operations when observing messages from status
     // update managers to frameworks.
-
-    if (providerId.isSome() &&
-        !slave->resourceProviders.contains(providerId.get())) {
+    if (!slave->resourceProviders.contains(providerId)) {
       // If this is a not previously seen resource provider we had a master
       // failover. Add the resources and operations to our state.
-      CHECK_SOME(provider.newTotal);
       CHECK(
-          provider.newTotal->empty() ||
-          !slave->totalResources.contains(provider.newTotal.get()));
-
-      CHECK_SOME(provider.info);
-      slave->resourceProviders.insert({providerId.get(), provider.info.get()});
-
-      slave->totalResources += provider.newTotal.get();
+          resourceProvider.total_resources().empty() ||
+          !slave->totalResources.contains(resourceProvider.total_resources()));
+
+      // We add the resource provider to the master first so
+      // that it can be found when e.g., adding operations.
+      slave->resourceProviders.put(
+          providerId,
+          {resourceProvider.info(),
+           resourceProvider.total_resources(),
+           resourceProvider.resource_version_uuid(),
+           {}});
 
       hashmap<FrameworkID, Resources> usedByOperations;
 
-      if (provider.newOperations.isSome()) {
-        foreachpair (const UUID& uuid,
-                     const Operation& operation,
-                     provider.newOperations.get()) {
-          // Update to bookkeeping of operations.
-          CHECK(!slave->operations.contains(uuid))
-            << "New operation " << uuid << " is already known";
-
-          Framework* framework = nullptr;
-          if (operation.has_framework_id()) {
-            framework = getFramework(operation.framework_id());
-          }
+      foreach (
+          const Operation& operation,
+          resourceProvider.operations().operations()) {
+        // Update to bookkeeping of operations.
+        Framework* framework = nullptr;
+        if (operation.has_framework_id()) {
+          framework = getFramework(operation.framework_id());
+        }
 
-          addOperation(framework, slave, new Operation(operation));
+        addOperation(framework, slave, new Operation(operation));
 
-          if (!protobuf::isTerminalState(operation.latest_status().state())) {
-            // If we do not yet know the `FrameworkInfo` of the framework the
-            // operation originated from, we cannot properly track the operation
-            // at this point.
-            //
-            // TODO(bbannier): Consider introducing ways of making
-            // sure an agent always knows the `FrameworkInfo` of
-            // operations triggered on its resources, e.g., by adding
-            // an explicit `FrameworkInfo` to operations like is
-            // already done for `RunTaskMessage`, see MESOS-8582.
-            if (framework == nullptr) {
-              LOG(WARNING)
-                << "Cannot properly account for operation " << operation.uuid()
-                << " learnt in reconciliation of agent " << slaveId
-                << " since framework " << operation.framework_id()
-                << " is unknown; this can lead to assertion failures after the"
-                   " operation terminates, see MESOS-8536";
-              continue;
-            }
+        if (!protobuf::isTerminalState(operation.latest_status().state())) {
+          // If we do not yet know the `FrameworkInfo` of the framework the
+          // operation originated from, we cannot properly track the operation
+          // at this point.
+          //
+          // TODO(bbannier): Consider introducing ways of making sure an agent
+          // always knows the `FrameworkInfo` of operations triggered on its
+          // resources, e.g., by adding an explicit `FrameworkInfo` to
+          // operations like is already done for `RunTaskMessage`, see
+          // MESOS-8582.
+          if (framework == nullptr) {
+            LOG(WARNING)
+              << "Cannot properly account for operation " << operation.uuid()
+              << " learnt in reconciliation of agent " << slaveId
+              << " since framework " << operation.framework_id()
+              << " is unknown; this can lead to assertion failures after the"
+              " operation terminates, see MESOS-8536";
+            continue;
+          }
 
-            Try<Resources> consumedResources =
-              protobuf::getConsumedResources(operation.info());
+          Try<Resources> consumedResources =
+            protobuf::getConsumedResources(operation.info());
 
-            CHECK_SOME(consumedResources)
-              << "Could not determine resources consumed by operation "
-              << operation.uuid();
+          CHECK_SOME(consumedResources)
+            << "Could not determine resources consumed by operation "
+            << operation.uuid();
 
-            usedByOperations[operation.framework_id()] +=
-              consumedResources.get();
-          }
+          usedByOperations[operation.framework_id()] +=
+            consumedResources.get();
         }
       }
 
+      slave->totalResources += resourceProvider.total_resources();
+
       allocator->addResourceProvider(
-          slaveId,
-          provider.newTotal.get(),
-          usedByOperations);
+          slaveId, resourceProvider.total_resources(), usedByOperations);
     } else {
-      // If this is a known resource provider or agent its total
-      // capacity cannot have changed, and it would not know about any
-      // non-terminal operations not already known to the master.
-      // However, it might not have received an operation for a couple
-      // different reasons:
+      // If this is a known resource provider its total capacity cannot have
+      // changed, and it would not know about any non-terminal operations not
+      // already known to the master.  However, it might not have received an
+      // operation for a couple different reasons:
+      //
       //   - The resource provider or agent could have failed over
       //     before the operation's `ApplyOperationMessage` could be
       //     received.
       //   - The operation's `ApplyOperationMessage` could have raced
       //     with this `UpdateSlaveMessage`.
       //
-      // In both of these cases, we need to reconcile such operations
-      // explicitly with the agent. For operations which the agent or
-      // resource provider does not recognize, an OPERATION_DROPPED
-      // status update will be generated and the master will remove
-      // the operation from its state upon receipt of that update.
-      if (provider.oldOperations.isSome()) {
-        foreachkey (const UUID& uuid, provider.oldOperations.get()) {
-          if (provider.newOperations.isNone() ||
-              !provider.newOperations->contains(uuid)) {
-            LOG(WARNING) << "Performing explicit reconciliation with agent for"
-                         << " known operation " << uuid
-                         << " since it was not present in original"
-                         << " reconciliation message from agent";
-
-            ReconcileOperationsMessage::Operation* reconcileOperation =
-              reconcile.add_operations();
-            reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
-
-            if (providerId.isSome()) {
-              reconcileOperation->mutable_resource_provider_id()
-                ->CopyFrom(providerId.get());
-            }
-          }
+      // In both of these cases, we need to reconcile such operations explicitly
+      // with the agent. For operations which the agent or resource provider
+      // does not recognize, an OPERATION_DROPPED status update will be
+      // generated and the master will remove the operation from its state upon
+      // receipt of that update.
+      CHECK(slave->resourceProviders.contains(providerId));
+
+      Slave::ResourceProvider& oldProvider =
+        slave->resourceProviders.at(providerId);
+
+      hashmap<UUID, const Operation*> newOperations;
+      foreach (
+          const Operation& operation,
+          resourceProvider.operations().operations()) {
+        newOperations.put(operation.uuid(), &operation);
+      }
+
+      foreachpair (
+          const UUID& uuid, Operation* oldOperation, oldProvider.operations) {
+        if (!newOperations.contains(uuid)) {
+          LOG(WARNING) << "Performing explicit reconciliation with agent for"
+                       << " known operation " << uuid
+                       << " since it was not present in original"
+                       << " reconciliation message from agent";
 
+          ReconcileOperationsMessage::Operation* reconcileOperation =
+            reconcile.add_operations();
+
+          reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
+          reconcileOperation->mutable_resource_provider_id()->CopyFrom(
+              providerId);
+        } else {
           // If a known operation became terminal between any previous offer
           // operation status update and this `UpdateSlaveMessage`, the total
           // resources we were sent already had the operation applied. We need
           // to update the state of the operation to terminal here so that any
           // update sent by the agent later does not cause us to apply the
           // operation again.
-          if (provider.newOperations.isSome() &&
-              provider.newOperations->contains(uuid)) {
-            Option<Operation> oldOperation = provider.oldOperations->get(uuid);
-            Option<Operation> newOperation = provider.newOperations->get(uuid);
-
-            CHECK_SOME(oldOperation);
-            CHECK_SOME(newOperation);
-
-            if (!protobuf::isTerminalState(
-                    oldOperation->latest_status().state()) &&
-                protobuf::isTerminalState(
-                    newOperation->latest_status().state())) {
-              Operation* operation = CHECK_NOTNULL(slave->getOperation(uuid));
-
-              UpdateOperationStatusMessage update =
-                protobuf::createUpdateOperationStatusMessage(
-                    uuid,
-                    newOperation->latest_status(),
-                    newOperation->latest_status(),
-                    operation->framework_id(),
-                    operation->slave_id());
-
-              updateOperation(
-                  operation, update, false); // Do not update resources.
-            }
+
+          const Operation* newOperation = newOperations.at(uuid);
+
+          if (!protobuf::isTerminalState(
+                  oldOperation->latest_status().state()) &&
+              protobuf::isTerminalState(
+                  newOperation->latest_status().state())) {
+            Operation* operation = CHECK_NOTNULL(slave->getOperation(uuid));
+
+            UpdateOperationStatusMessage update =
+              protobuf::createUpdateOperationStatusMessage(
+                  uuid,
+                  newOperation->latest_status(),
+                  newOperation->latest_status(),
+                  operation->framework_id(),
+                  operation->slave_id());
+
+            updateOperation(
+                operation, update, false); // Do not update resources.
           }
         }
       }
@@ -7709,14 +7631,18 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
       // speculated operations which are only visible in the total,
       // but never in the used resources. We explicitly allow for
       // resource providers to change from or to zero capacity.
-      if (provider.oldTotal.isSome()) {
-        CHECK(slave->totalResources.contains(provider.oldTotal.get()));
-        slave->totalResources -= provider.oldTotal.get();
-      }
+      const Resources oldResources =
+        slave->totalResources.filter([&providerId](const Resource& resource) {
+          return resource.provider_id() == providerId;
+        });
 
-      if (provider.newTotal.isSome()) {
-        slave->totalResources += provider.newTotal.get();
-      }
+      slave->totalResources -= oldResources;
+      slave->totalResources += resourceProvider.total_resources();
+
+      oldProvider.totalResources = resourceProvider.total_resources();
+
+      // Reconcile resource versions.
+      oldProvider.resourceVersion = resourceProvider.resource_version_uuid();
     }
   }
 
@@ -7749,9 +7675,13 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
     //
     // TODO(bbannier): Only rescind offers possibly containing
     // affected resources.
-    if (message.has_resource_providers()) {
-      LOG(INFO) << "Removing offer " << offer->id() << " with resources "
-                << offered << " on agent " << *slave;
+    const Resources offeredResourceProviderResources = offered.filter(
+        [](const Resource& resource) { return resource.has_provider_id(); });
+    if (message.has_resource_providers() &&
+        !offeredResourceProviderResources.empty()) {
+      LOG(INFO)
+        << "Removing offer " << offer->id()
+        << " with resources " << offered << " on agent " << *slave;
 
       rescind = true;
     }
@@ -10604,16 +10534,17 @@ void Master::_apply(
     // This must have been validated by the caller.
     CHECK(!resourceProviderId.isError());
 
-    Option<UUID> resourceVersion = resourceProviderId.isSome()
-      ? slave->resourceVersions.get(resourceProviderId.get())
-      : slave->resourceVersions.get(None());
+    CHECK(
+        resourceProviderId.isNone() ||
+        slave->resourceProviders.contains(resourceProviderId.get()))
+      << "Resource provider " + stringify(resourceProviderId.get()) +
+           " is unknown";
+
+    CHECK_SOME(slave->resourceVersion);
 
-    CHECK_SOME(resourceVersion)
-      << "Resource version of "
-      << (resourceProviderId.isSome()
-           ? "resource provider " + stringify(resourceProviderId.get())
-           : "agent " + stringify(*slave))
-      << " is unknown";
+    const UUID resourceVersion = resourceProviderId.isNone()
+      ? slave->resourceVersion.get()
+      : slave->resourceProviders.get(resourceProviderId.get())->resourceVersion;
 
     Operation* operation = new Operation(
         protobuf::createOperation(
@@ -10651,7 +10582,7 @@ void Master::_apply(
     }
 
     message.mutable_resource_version_uuid()->mutable_uuid()->CopyFrom(
-        resourceVersion.get());
+        resourceVersion);
 
     LOG(INFO) << "Sending operation '" << operation->info().id()
               << "' (uuid: " << operation->uuid() << ") "
@@ -11427,7 +11358,7 @@ Slave::Slave(
     vector<SlaveInfo::Capability> _capabilites,
     const Time& _registeredTime,
     vector<Resource> _checkpointedResources,
-    const Option<UUID>& resourceVersion,
+    const Option<UUID>& _resourceVersion,
     vector<ExecutorInfo> executorInfos,
     vector<Task> tasks)
   : master(_master),
@@ -11441,6 +11372,7 @@ Slave::Slave(
     connected(true),
     active(true),
     checkpointedResources(std::move(_checkpointedResources)),
+    resourceVersion(_resourceVersion),
     observer(nullptr)
 {
   CHECK(info.has_id());
@@ -11453,10 +11385,6 @@ Slave::Slave(
   CHECK_SOME(resources);
   totalResources = resources.get();
 
-  if (resourceVersion.isSome()) {
-    resourceVersions.put(None(), resourceVersion.get());
-  }
-
   foreach (ExecutorInfo& executorInfo, executorInfos) {
     CHECK(executorInfo.has_framework_id());
     addExecutor(executorInfo.framework_id(), std::move(executorInfo));
@@ -11579,7 +11507,21 @@ void Slave::removeTask(Task* task)
 
 void Slave::addOperation(Operation* operation)
 {
-  operations.put(operation->uuid(), operation);
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError()) << resourceProviderId.error();
+
+  if (resourceProviderId.isNone()) {
+    operations.put(operation->uuid(), operation);
+  } else {
+    CHECK(resourceProviders.contains(resourceProviderId.get()));
+
+    ResourceProvider& resourceProvider =
+      resourceProviders.at(resourceProviderId.get());
+
+    resourceProvider.operations.put(operation->uuid(), operation);
+  }
 
   if (!protobuf::isSpeculativeOperation(operation->info()) &&
       !protobuf::isTerminalState(operation->latest_status().state())) {
@@ -11629,16 +11571,38 @@ void Slave::removeOperation(Operation* operation)
 {
   const UUID& uuid = operation->uuid();
 
-  CHECK(operations.contains(uuid))
-    << "Unknown operation (uuid: " << uuid << ")"
-    << " to agent " << *this;
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError()) << resourceProviderId.error();
 
+  // Recover the resource used by this operation.
   if (!protobuf::isSpeculativeOperation(operation->info()) &&
       !protobuf::isTerminalState(operation->latest_status().state())) {
     recoverResources(operation);
   }
 
-  operations.erase(uuid);
+  // Remove the operation.
+  if (resourceProviderId.isNone()) {
+    CHECK(operations.contains(uuid))
+      << "Unknown operation (uuid: " << uuid << ")"
+      << " to agent " << *this;
+
+    operations.erase(operation->uuid());
+  } else {
+    CHECK(resourceProviders.contains(resourceProviderId.get()))
+      << "resource provider " << resourceProviderId.get() << " is unknown";
+
+    ResourceProvider& resourceProvider =
+      resourceProviders.at(resourceProviderId.get());
+
+    CHECK(resourceProvider.operations.contains(uuid))
+      << "Unknown operation (uuid: " << uuid << ")"
+      << " to resource provider " << resourceProviderId.get()
+      << " on agent " << *this;
+
+    resourceProvider.operations.erase(operation->uuid());
+  }
 }
 
 
@@ -11647,6 +11611,13 @@ Operation* Slave::getOperation(const UUID& uuid) const
   if (operations.contains(uuid)) {
     return operations.at(uuid);
   }
+
+  foreachvalue (const ResourceProvider& resourceProvider, resourceProviders) {
+    if (resourceProvider.operations.contains(uuid)) {
+      return resourceProvider.operations.at(uuid);
+    }
+  }
+
   return nullptr;
 }
 
@@ -11740,6 +11711,24 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
   totalResources = resources.get();
 
   checkpointedResources = totalResources.filter(needCheckpointing);
+
+  // Also apply the conversion to the explicitly maintained resource
+  // provider resources.
+  foreach (const ResourceConversion& conversion, conversions) {
+    Result<ResourceProviderID> providerId = getResourceProviderId(conversion);
+
+    if (providerId.isNone()) {
+      continue;
+    }
+
+    CHECK_SOME(providerId);
+    CHECK(resourceProviders.contains(providerId.get()));
+    ResourceProvider& provider = resourceProviders.at(providerId.get());
+
+    CHECK(provider.totalResources.contains(conversion.consumed));
+    provider.totalResources -= conversion.consumed;
+    provider.totalResources += conversion.converted;
+  }
 }
 
 
@@ -11748,7 +11737,7 @@ Try<Nothing> Slave::update(
     const string& _version,
     const vector<SlaveInfo::Capability>& _capabilities,
     const Resources& _checkpointedResources,
-    const Option<UUID>& resourceVersion)
+    const Option<UUID>& _resourceVersion)
 {
   Try<Resources> resources = applyCheckpointedResources(
       _info.resources(),
@@ -11770,9 +11759,7 @@ Try<Nothing> Slave::update(
   // reregistering in this case.
   totalResources = resources.get();
 
-  if (resourceVersion.isSome()) {
-    resourceVersions.put(None(), resourceVersion.get());
-  }
+  resourceVersion = _resourceVersion;
 
   return Nothing();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8bf2c76..896995f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -124,7 +124,7 @@ Slave(Master* const _master,
         std::vector<SlaveInfo::Capability> _capabilites,
         const process::Time& _registeredTime,
         std::vector<Resource> _checkpointedResources,
-        const Option<UUID>& resourceVersion,
+        const Option<UUID>& _resourceVersion,
         std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
         std::vector<Task> tasks = std::vector<Task>());
 
@@ -276,10 +276,44 @@ Slave(Master* const _master,
   // providers as well.
   Resources totalResources;
 
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  Option<UUID> resourceVersion;
+
   SlaveObserver* observer;
 
-  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
+  struct ResourceProvider {
+    ResourceProviderInfo info;
+    Resources totalResources;
+
+    // Used to establish the relationship between the operation and the
+    // resources that the operation is operating on. Each resource
+    // provider will keep a resource version UUID, and change it when it
+    // believes that the resources from this resource provider are out
+    // of sync from the master's view.  The master will keep track of
+    // the last known resource version UUID for each resource provider,
+    // and attach the resource version UUID in each operation it sends
+    // out. The resource provider should reject operations that have a
+    // different resource version UUID than that it maintains, because
+    // this means the operation is operating on resources that might
+    // have already been invalidated.
+    UUID resourceVersion;
+
+    // Pending operations or terminal operations that have
+    // unacknowledged status updates.
+    hashmap<UUID, Operation*> operations;
+  };
+
+  hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
 
 private:
   Slave(const Slave&);              // No copying.