You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/11 22:02:24 UTC

[3/3] mesos git commit: Refactored agent to keep track of local resource providers.

Refactored agent to keep track of local resource providers.

Currently, we don't explicitly keep track of local resources providers.
This causes the logic for a few methods to be quite complex because we
need to reconstruct the resource provider information everytime.

Review: https://reviews.apache.org/r/64477


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9861e1a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9861e1a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9861e1a

Branch: refs/heads/master
Commit: c9861e1ae5225b4ee2cb160bbb53c3ea9fafd021
Parents: 3f862f3
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Dec 8 17:31:24 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 11 14:02:00 2017 -0800

----------------------------------------------------------------------
 src/master/validation.cpp                     |   3 +
 src/resource_provider/manager.cpp             |  18 +-
 src/resource_provider/message.hpp             |   9 +-
 src/slave/http.cpp                            |  11 +-
 src/slave/slave.cpp                           | 368 +++++++++++----------
 src/slave/slave.hpp                           |  65 +++-
 src/tests/resource_provider_manager_tests.cpp |   2 +-
 7 files changed, 286 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 38d9a3c..585d8bf 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -1929,6 +1929,9 @@ Option<Error> validateInverseOffers(
 
 namespace operation {
 
+// TODO(jieyu): Validate that resources in an operation is not empty.
+
+
 Option<Error> validate(
     const Offer::Operation::Reserve& reserve,
     const Option<Principal>& principal,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index f98611c..bfc917f 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -646,18 +646,26 @@ void ResourceProviderManagerProcess::updateState(
 
   // TODO(chhsiao): Report pending operations.
 
-  Try<UUID> resourceVersionUuid =
+  Try<UUID> resourceVersion =
     UUID::fromBytes(update.resource_version_uuid());
 
-  CHECK_SOME(resourceVersionUuid)
+  CHECK_SOME(resourceVersion)
     << "Could not deserialize version of resource provider "
-    << resourceProvider->info.id() << ": " << resourceVersionUuid.error();
+    << resourceProvider->info.id() << ": " << resourceVersion.error();
+
+  hashmap<UUID, OfferOperation> offerOperations;
+  foreach (const OfferOperation &operation, update.operations()) {
+    Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(uuid);
+
+    offerOperations.put(uuid.get(), operation);
+  }
 
   ResourceProviderMessage::UpdateState updateState{
       resourceProvider->info,
-      resourceVersionUuid.get(),
+      resourceVersion.get(),
       update.resources(),
-      {update.operations().begin(), update.operations().end()}};
+      std::move(offerOperations)};
 
   ResourceProviderMessage message;
   message.type = ResourceProviderMessage::Type::UPDATE_STATE;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index bbf6bb2..eab90cf 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -24,6 +24,7 @@
 #include <mesos/resources.hpp>
 
 #include <stout/check.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/jsonify.hpp>
 #include <stout/option.hpp>
 #include <stout/protobuf.hpp>
@@ -46,9 +47,9 @@ struct ResourceProviderMessage
   struct UpdateState
   {
     ResourceProviderInfo info;
-    UUID resourceVersionUuid;
-    Resources total;
-    std::vector<OfferOperation> operations;
+    UUID resourceVersion;
+    Resources totalResources;
+    hashmap<UUID, OfferOperation> offerOperations;
   };
 
   struct UpdateOfferOperationStatus
@@ -77,7 +78,7 @@ inline std::ostream& operator<<(
       return stream
           << "UPDATE_STATE: "
           << updateState->info.id() << " "
-          << updateState->total;
+          << updateState->totalResources;
     }
 
     case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 738786f..f71adbc 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -1883,14 +1883,13 @@ Future<Response> Http::getResourceProviders(
   agent::Response::GetResourceProviders* resourceProviders =
     response.mutable_get_resource_providers();
 
-  foreachvalue (
-      const ResourceProviderInfo& resourceProviderInfo,
-      slave->resourceProviderInfos) {
-    agent::Response::GetResourceProviders::ResourceProvider* resourceProvider =
+  foreachvalue (ResourceProvider* resourceProvider,
+                slave->resourceProviders) {
+    agent::Response::GetResourceProviders::ResourceProvider* provider =
       resourceProviders->add_resource_providers();
 
-    resourceProvider->mutable_resource_provider_info()->CopyFrom(
-        resourceProviderInfo);
+    provider->mutable_resource_provider_info()
+      ->CopyFrom(resourceProvider->info);
   }
 
   return OK(serialize(acceptType, evolve(response)), stringify(acceptType));

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 373e393..5d4cd6d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -222,7 +222,7 @@ Slave::Slave(const string& id,
     qosController(_qosController),
     secretGenerator(_secretGenerator),
     authorizer(_authorizer),
-    resourceVersions({{Option<ResourceProviderID>::none(), UUID::random()}}) {}
+    resourceVersion(UUID::random()) {}
 
 
 Slave::~Slave()
@@ -1540,8 +1540,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    message.mutable_resource_version_uuids()->CopyFrom(
-        protobuf::createResourceVersions(resourceVersions));
+    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+    uuid->set_uuid(resourceVersion.toBytes());
+
+    foreachvalue (ResourceProvider* provider, resourceProviders) {
+      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+      CHECK(provider->info.has_id());
+      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
+      uuid->set_uuid(provider->resourceVersion.toBytes());
+    }
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -1555,8 +1562,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    message.mutable_resource_version_uuids()->CopyFrom(
-        protobuf::createResourceVersions(resourceVersions));
+    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+    uuid->set_uuid(resourceVersion.toBytes());
+
+    foreachvalue (ResourceProvider* provider, resourceProviders) {
+      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+      CHECK(provider->info.has_id());
+      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
+      uuid->set_uuid(provider->resourceVersion.toBytes());
+    }
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -2258,35 +2272,37 @@ void Slave::__run(
   // TODO(bbannier): Also check executor resources.
   bool kill = false;
   if (!resourceVersionUuids.empty()) {
-    hashset<Option<ResourceProviderID>> usedResourceProviders;
+    hashset<Option<ResourceProviderID>> usedResourceProviderIds;
     foreach (const TaskInfo& _task, tasks) {
       foreach (const Resource& resource, _task.resources()) {
-        if (resource.has_provider_id()) {
-          usedResourceProviders.insert(resource.provider_id());
-        } else {
-          usedResourceProviders.insert(None());
-        }
+        usedResourceProviderIds.insert(resource.has_provider_id()
+           ? Option<ResourceProviderID>(resource.provider_id())
+           : None());
       }
     }
 
     const hashmap<Option<ResourceProviderID>, UUID> receivedResourceVersions =
-      protobuf::parseResourceVersions(
-          {resourceVersionUuids.begin(), resourceVersionUuids.end()});
+      protobuf::parseResourceVersions({
+          resourceVersionUuids.begin(),
+          resourceVersionUuids.end()});
 
-    foreach (auto&& resourceProvider, usedResourceProviders) {
-      Option<Error> error = None();
+    foreach (const Option<ResourceProviderID>& resourceProviderId,
+             usedResourceProviderIds) {
+      if (resourceProviderId.isNone()) {
+        CHECK(receivedResourceVersions.contains(None()));
 
-      if (!resourceVersions.contains(resourceProvider)) {
-        // We do not expect the agent to forget about itself.
-        CHECK_SOME(resourceProvider);
-        kill = true;
-      }
-
-      CHECK(receivedResourceVersions.contains(resourceProvider));
+        if (resourceVersion != receivedResourceVersions.at(None())) {
+          kill = true;
+        }
+      } else {
+        ResourceProvider* resourceProvider =
+          getResourceProvider(resourceProviderId.get());
 
-      if (resourceVersions.at(resourceProvider) !=
-          receivedResourceVersions.at(resourceProvider)) {
-        kill = true;
+        if (resourceProvider == nullptr ||
+            resourceProvider->resourceVersion !=
+              receivedResourceVersions.at(resourceProviderId.get())) {
+          kill = true;
+        }
       }
     }
   }
@@ -7010,76 +7026,44 @@ UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
 
 UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
 {
-  UpdateSlaveMessage message;
-
-  message.mutable_slave_id()->CopyFrom(info.id());
-
   // Agent information (total resources, offer operations, resource
   // versions) is not passed as part of some `ResourceProvider`, but
   // globally in `UpdateStateMessage`.
   //
   // TODO(bbannier): Pass agent information as a resource provider.
-
-  // Process total resources.
-  hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
-    resourceProviders;
-
-  foreach (const Resource& resource, totalResources) {
-    if (resource.has_provider_id()) {
-      resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
-          resource);
-    }
-  }
-
-  // Process offer operations.
-  UpdateSlaveMessage::OfferOperations* operations =
-    message.mutable_offer_operations();
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.set_resource_version_uuid(resourceVersion.toBytes());
+  message.mutable_offer_operations();
 
   foreachvalue (const OfferOperation* operation, offerOperations) {
     Result<ResourceProviderID> resourceProviderId =
       getResourceProviderId(operation->info());
 
-    if (resourceProviderId.isSome()) {
-      resourceProviders[resourceProviderId.get()]
-        .mutable_operations()
-        ->add_operations()
-        ->CopyFrom(*operation);
-    } else if (resourceProviderId.isNone()) {
-      operations->add_operations()->CopyFrom(*operation);
+    if (resourceProviderId.isNone()) {
+      message.mutable_offer_operations()
+        ->add_operations()->CopyFrom(*operation);
     }
   }
 
-  // Make sure 'offer_operations' is always set for resource providers.
-  foreachkey (
-      const ResourceProviderID& resourceProviderId,
-      resourceProviderInfos) {
-    resourceProviders[resourceProviderId].mutable_operations();
-  }
-
-  // Process resource versions.
-  CHECK(resourceVersions.contains(None()));
-  message.set_resource_version_uuid(resourceVersions.at(None()).toBytes());
-
-  foreachpair (
-      const ResourceProviderID& providerId,
-      UpdateSlaveMessage::ResourceProvider& provider,
-      resourceProviders) {
-    CHECK(resourceVersions.contains(providerId));
-    provider.set_resource_version_uuid(
-        resourceVersions.at(providerId).toBytes());
+  foreachvalue (ResourceProvider* resourceProvider, resourceProviders) {
+    UpdateSlaveMessage::ResourceProvider* provider =
+      message.mutable_resource_providers()->add_providers();
 
-    CHECK(resourceProviderInfos.contains(providerId));
-    provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
-  }
+    provider->mutable_info()->CopyFrom(
+        resourceProvider->info);
+    provider->mutable_total_resources()->CopyFrom(
+        resourceProvider->totalResources);
+    provider->set_resource_version_uuid(
+        resourceProvider->resourceVersion.toBytes());
 
-  // We only actually surface resource-provider related information if
-  // this agent is resource provider-capable.
-  if (capabilities.resourceProvider) {
-    list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
-      resourceProviders.values();
+    provider->mutable_operations();
 
-    message.mutable_resource_providers()->mutable_providers()->CopyFrom(
-        {resourceProviders_.begin(), resourceProviders_.end()});
+    foreachvalue (const OfferOperation* operation,
+                  resourceProvider->offerOperations) {
+      provider->mutable_operations()
+        ->add_operations()->CopyFrom(*operation);
+    }
   }
 
   return message;
@@ -7120,75 +7104,61 @@ void Slave::handleResourceProviderMessage(
     case ResourceProviderMessage::Type::UPDATE_STATE: {
       CHECK_SOME(message->updateState);
 
-      const Resources& newTotal = message->updateState->total;
+      const ResourceProviderMessage::UpdateState& updateState =
+        message->updateState.get();
 
-      CHECK(message->updateState->info.has_id());
+      CHECK(updateState.info.has_id());
+      const ResourceProviderID& resourceProviderId = updateState.info.id();
 
-      const ResourceProviderID& resourceProviderId =
-        message->updateState->info.id();
+      ResourceProvider* resourceProvider =
+        getResourceProvider(resourceProviderId);
 
-      if (resourceProviderInfos.contains(resourceProviderId)) {
-        resourceProviderInfos[resourceProviderId] = message->updateState->info;
-      } else {
-        resourceProviderInfos.put(
-            resourceProviderId,
-            message->updateState->info);
-      }
+      if (resourceProvider == nullptr) {
+        resourceProvider = new ResourceProvider(
+            updateState.info,
+            updateState.totalResources,
+            updateState.resourceVersion);
 
-      const Resources oldTotal =
-        totalResources.filter([&resourceProviderId](const Resource& resource) {
-          return resource.provider_id() == resourceProviderId;
-        });
+        addResourceProvider(resourceProvider);
 
-      bool updated = false;
-
-      if (oldTotal != newTotal) {
-        totalResources -= oldTotal;
-        totalResources += newTotal;
-
-        updated = true;
-      }
-
-      // Update offer operation state.
-      //
-      // We only update offer operations which are not contained in both the
-      // known and just received sets. All other offer operations will be
-      // updated via relayed offer operation status updates.
-      auto isForResourceProvider = [resourceProviderId](
-                                      const OfferOperation& operation) {
-        Result<ResourceProviderID> id = getResourceProviderId(operation.info());
-        return id.isSome() && resourceProviderId == id.get();
-      };
-
-      hashmap<UUID, OfferOperation*> knownOfferOperations;
-      foreachpair(auto&& uuid, auto&& operation, offerOperations) {
-        if (isForResourceProvider(*operation)) {
-          knownOfferOperations.put(uuid, operation);
+        foreachvalue (const OfferOperation& operation,
+                      updateState.offerOperations) {
+          addOfferOperation(new OfferOperation(operation));
         }
-      }
 
-      hashmap<UUID, OfferOperation> receivedOfferOperations;
-      foreach (
-          const OfferOperation& operation,
-          message->updateState->operations) {
-        CHECK(isForResourceProvider(operation))
-          << "Received operation on unexpected resource provider "
-          << "from resource provider " << resourceProviderId;
-
-        Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
-        CHECK_SOME(operationUuid);
+        // Update the 'total' in the Slave.
+        totalResources += updateState.totalResources;
+      } else {
+        // Always update the resource provider info.
+        resourceProvider->info = updateState.info;
 
-        receivedOfferOperations.put(operationUuid.get(), operation);
-      }
+        if (resourceProvider->totalResources != updateState.totalResources) {
+          // Update the 'total' in the Slave.
+          CHECK(totalResources.contains(resourceProvider->totalResources));
+          totalResources -= resourceProvider->totalResources;
+          totalResources += updateState.totalResources;
 
-      const hashset<UUID> knownUuids = knownOfferOperations.keys();
-      const hashset<UUID> receivedUuids = receivedOfferOperations.keys();
+          // Update the 'total' in the resource provider.
+          resourceProvider->totalResources = updateState.totalResources;
+        }
 
-      if (knownUuids != receivedUuids) {
-        // Handle offer operations known to the agent but not reported by the
-        // resource provider. These could be operations where the agent has
-        // started tracking an offer operation, but the resource provider failed
-        // over before it could bookkeep the operation.
+        // Update offer operation state.
+        //
+        // We only update offer operations which are not contained in
+        // both the known and just received sets. All other offer
+        // operations will be updated via relayed offer operation
+        // status updates.
+        const hashset<UUID> knownUuids =
+          resourceProvider->offerOperations.keys();
+
+        const hashset<UUID> receivedUuids =
+          updateState.offerOperations.keys();
+
+        // Handle offer operations known to the agent but not reported
+        // by the resource provider. These could be operations where
+        // the agent has started tracking an offer operation, but the
+        // resource provider failed over before it could bookkeep the
+        // operation.
         //
         // NOTE: We do not mutate offer operations statuses here; this
         // would be the responsibility of a offer operation status
@@ -7203,13 +7173,13 @@ void Slave::handleResourceProviderMessage(
                 disappearedOperations, disappearedOperations.begin()));
 
         foreach (const UUID& uuid, disappearedOperations) {
-          // TODO(bbannier): Instead of simply dropping an operation with
-          // `removeOfferOperation` here we should instead send a `Reconcile`
-          // message with a failed state to the resource provider so its status
-          // update manager can reliably deliver the operation status to the
-          // framework.
-          CHECK(offerOperations.contains(uuid));
-          removeOfferOperation(offerOperations.at(uuid));
+          // TODO(bbannier): Instead of simply dropping an operation
+          // with `removeOfferOperation` here we should instead send a
+          // `Reconcile` message with a failed state to the resource
+          // provider so its status update manager can reliably
+          // deliver the operation status to the framework.
+          CHECK(resourceProvider->offerOperations.contains(uuid));
+          removeOfferOperation(resourceProvider->offerOperations.at(uuid));
         }
 
         // Handle offer operations known to the resource provider but
@@ -7228,27 +7198,13 @@ void Slave::handleResourceProviderMessage(
           //
           // NOTE: We do not need to update total resources here as its
           // state was sync explicitly with the received total above.
-          CHECK(receivedOfferOperations.contains(uuid));
+          CHECK(updateState.offerOperations.contains(uuid));
           addOfferOperation(
-              new OfferOperation(receivedOfferOperations.at(uuid)));
-        }
-
-        updated = true;
-      }
-
-      // Update resource version of this resource provider.
-      const UUID& resourceVersionUuid =
-        message->updateState->resourceVersionUuid;
-
-      if (!resourceVersions.contains(resourceProviderId) ||
-          resourceVersions.at(resourceProviderId) != resourceVersionUuid) {
-        if (resourceVersions.contains(resourceProviderId)) {
-          resourceVersions.at(resourceProviderId) = resourceVersionUuid;
-        } else {
-          resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+              new OfferOperation(updateState.offerOperations.at(uuid)));
         }
 
-        updated = true;
+        // Update resource version of this resource provider.
+        resourceProvider->resourceVersion = updateState.resourceVersion;
       }
 
       // Send the updated resources to the master if the agent is running. Note
@@ -7263,14 +7219,12 @@ void Slave::handleResourceProviderMessage(
           break;
         }
         case RUNNING: {
-          if (updated) {
-            LOG(INFO) << "Forwarding new total resources " << totalResources;
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-            // Inform the master about the update from the resource provider.
-            send(master.get(), generateResourceProviderUpdate());
+          // Inform the master about the update from the resource provider.
+          send(master.get(), generateResourceProviderUpdate());
 
-            break;
-          }
+          break;
         }
       }
       break;
@@ -7338,6 +7292,22 @@ void Slave::addOfferOperation(OfferOperation* operation)
   CHECK_SOME(uuid);
 
   offerOperations.put(uuid.get(), operation);
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError())
+    << "Failed to get resource provider ID: "
+    << resourceProviderId.error();
+
+  if (resourceProviderId.isSome()) {
+    ResourceProvider* resourceProvider =
+      getResourceProvider(resourceProviderId.get());
+
+    CHECK_NOTNULL(resourceProvider);
+
+    resourceProvider->addOfferOperation(operation);
+  }
 }
 
 
@@ -7441,6 +7411,22 @@ void Slave::removeOfferOperation(OfferOperation* operation)
   Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
   CHECK_SOME(uuid);
 
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError())
+    << "Failed to get resource provider ID: "
+    << resourceProviderId.error();
+
+  if (resourceProviderId.isSome()) {
+    ResourceProvider* resourceProvider =
+      getResourceProvider(resourceProviderId.get());
+
+    CHECK_NOTNULL(resourceProvider);
+
+    resourceProvider->removeOfferOperation(operation);
+  }
+
   CHECK(offerOperations.contains(uuid.get()))
     << "Unknown offer operation (uuid: " << uuid->toString() << ")";
 
@@ -7458,6 +7444,26 @@ OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
 }
 
 
+void Slave::addResourceProvider(ResourceProvider* resourceProvider)
+{
+  CHECK(resourceProvider->info.has_id());
+  CHECK(!resourceProviders.contains(resourceProvider->info.id()));
+
+  resourceProviders.put(
+      resourceProvider->info.id(),
+      resourceProvider);
+}
+
+
+ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) const
+{
+  if (resourceProviders.contains(id)) {
+    return resourceProviders.at(id);
+  }
+  return nullptr;
+}
+
+
 void Slave::apply(const vector<ResourceConversion>& conversions)
 {
   Try<Resources> resources = totalResources.apply(conversions);
@@ -9076,6 +9082,30 @@ Resources Executor::allocatedResources() const
 }
 
 
+void ResourceProvider::addOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(!offerOperations.contains(uuid.get()))
+    << "Offer operation (uuid: " << uuid->toString() << ") already exists";
+
+  offerOperations.put(uuid.get(), operation);
+}
+
+
+void ResourceProvider::removeOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(offerOperations.contains(uuid.get()))
+    << "Unknown offer operation (uuid: " << uuid->toString() << ")";
+
+  offerOperations.erase(uuid.get());
+}
+
+
 map<string, string> executorEnvironment(
     const Flags& flags,
     const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b3a1e70..7c40fc7 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -108,6 +108,7 @@ class Executor;
 class Framework;
 
 struct HttpConnection;
+struct ResourceProvider;
 
 
 class Slave : public ProtobufProcess<Slave>
@@ -577,6 +578,9 @@ private:
 
   OfferOperation* getOfferOperation(const UUID& uuid) const;
 
+  void addResourceProvider(ResourceProvider* resourceProvider);
+  ResourceProvider* getResourceProvider(const ResourceProviderID& id) const;
+
   void apply(const std::vector<ResourceConversion>& conversions);
 
   // Publish all resources that are needed to run the current set of
@@ -730,12 +734,28 @@ private:
 
   ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
-  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-
-  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviderInfos;
 
-  // Pending operations or terminal operations that have
-  // unacknowledged status updates.
+  // Local resource providers known by the agent.
+  hashmap<ResourceProviderID, ResourceProvider*> resourceProviders;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  UUID resourceVersion;
+
+  // Keeps track of the following:
+  // (1) Pending operations for resources from the agent.
+  // (2) Pending operations or terminal operations that have
+  //     unacknowledged status updates for resource provider
+  //     provided resources.
   hashmap<UUID, OfferOperation*> offerOperations;
 };
 
@@ -1034,6 +1054,41 @@ private:
 };
 
 
+struct ResourceProvider
+{
+  ResourceProvider(
+      const ResourceProviderInfo& _info,
+      const Resources& _totalResources,
+      const UUID& _resourceVersion)
+    : info(_info),
+      totalResources(_totalResources),
+      resourceVersion(_resourceVersion) {}
+
+  void addOfferOperation(OfferOperation* operation);
+  void removeOfferOperation(OfferOperation* operation);
+
+  ResourceProviderInfo info;
+  Resources totalResources;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  UUID resourceVersion;
+
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates.
+  hashmap<UUID, OfferOperation*> offerOperations;
+};
+
+
 /**
  * Returns a map of environment variables necessary in order to launch
  * an executor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index a6eb4c9..e37a53a 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -343,7 +343,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
     EXPECT_EQ(
         devolve(resourceProviderId.get()),
         message->updateState->info.id());
-    EXPECT_EQ(devolve(resources), message->updateState->total);
+    EXPECT_EQ(devolve(resources), message->updateState->totalResources);
   }
 }