You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/02/04 03:02:03 UTC

[1/9] mesos git commit: Update the allocator to handle frameworks with multiple roles.

Repository: mesos
Updated Branches:
  refs/heads/master f686f04f5 -> c20744a99


Update the allocator to handle frameworks with multiple roles.

The allocator now sets `Resource.AllocationInfo` when performing
an allocation, and the interface is updated to expose allocations
for each role within a framework.

The allocator also assumes that `Resource.AllocationInfo` is set
for inbound resources that are allocated (e.g. when adding an agent,
when adding a framework, when recovering resources). Note however,
that the necessary changes to the master and agent to enforce this
will be done via separate patches.

Review: https://reviews.apache.org/r/55870


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1b86051
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1b86051
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1b86051

Branch: refs/heads/master
Commit: a1b86051d3d9120b8ef00a3e3ac18786a630dd76
Parents: f686f04
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Jan 23 16:12:55 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:06 2017 -0800

----------------------------------------------------------------------
 include/mesos/allocator/allocator.hpp       |  20 +-
 src/master/allocator/mesos/allocator.hpp    |   9 +-
 src/master/allocator/mesos/hierarchical.cpp | 361 +++++++++++++++--------
 src/master/allocator/mesos/hierarchical.hpp |  33 ++-
 src/master/master.cpp                       | 220 +++++++-------
 src/master/master.hpp                       |   2 +-
 src/tests/allocator.hpp                     |   2 +-
 7 files changed, 396 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/include/mesos/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp
index 5585949..71a4053 100644
--- a/include/mesos/allocator/allocator.hpp
+++ b/include/mesos/allocator/allocator.hpp
@@ -88,7 +88,8 @@ public:
       const Duration& allocationInterval,
       const lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>& offerCallback,
+               const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&
+                   offerCallback,
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, UnavailableResources>&)>&
@@ -259,7 +260,8 @@ public:
    * (dynamic reservation and persistent volumes). The allocator may react
    * differently for certain offer operations. The allocator should use this
    * call to update bookkeeping information related to the framework. The
-   * `offeredResources` are the resources that the operations are applied to.
+   * `offeredResources` are the resources that the operations are applied to
+   * and must be allocated to a single role.
    */
   virtual void updateAllocation(
       const FrameworkID& frameworkId,
@@ -328,7 +330,13 @@ public:
    *
    * Used to update the set of available resources for a specific agent. This
    * method is invoked to inform the allocator about allocated resources that
-   * have been refused or are no longer in use.
+   * have been refused or are no longer in use. Allocated resources will have
+   * an `allocation_info.role` assigned and callers are expected to only call
+   * this with resources allocated to a single role.
+   *
+   * TODO(bmahler): We could allow resources allocated to multiple roles
+   * within a single call here, but filtering them in the same way does
+   * not seem desirable.
    */
   virtual void recoverResources(
       const FrameworkID& frameworkId,
@@ -340,6 +348,9 @@ public:
    * Suppresses offers.
    *
    * Informs the allocator to stop sending offers to the framework.
+   *
+   * TODO(bmahler): Take an optional role to allow frameworks with
+   * multiple roles to do fine-grained suppression.
    */
   virtual void suppressOffers(
       const FrameworkID& frameworkId) = 0;
@@ -347,6 +358,9 @@ public:
   /**
    * Revives offers for a framework. This is invoked by a framework when
    * it wishes to receive filtered resources or offers immediately.
+   *
+   * TODO(bmahler): Take an optional role to allow frameworks with
+   * multiple roles to do fine-grained revival.
    */
   virtual void reviveOffers(
       const FrameworkID& frameworkId) = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 8e0f37a..e3c8618 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -50,7 +50,8 @@ public:
       const Duration& allocationInterval,
       const lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>& offerCallback,
+               const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&
+                   offerCallback,
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, UnavailableResources>&)>&
@@ -182,7 +183,8 @@ public:
       const Duration& allocationInterval,
       const lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>& offerCallback,
+               const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&
+                   offerCallback,
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, UnavailableResources>&)>&
@@ -322,7 +324,8 @@ inline void MesosAllocator<AllocatorProcess>::initialize(
     const Duration& allocationInterval,
     const lambda::function<
         void(const FrameworkID&,
-             const hashmap<SlaveID, Resources>&)>& offerCallback,
+             const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&
+                 offerCallback,
     const lambda::function<
         void(const FrameworkID&,
               const hashmap<SlaveID, UnavailableResources>&)>&

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index e2cd82b..fc93ade 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -19,6 +19,7 @@
 #include <algorithm>
 #include <set>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <mesos/resources.hpp>
@@ -125,7 +126,7 @@ private:
 
 HierarchicalAllocatorProcess::Framework::Framework(
     const FrameworkInfo& frameworkInfo)
-  : role(frameworkInfo.role()),
+  : roles(protobuf::framework::getRoles(frameworkInfo)),
     suppressed(false),
     capabilities(frameworkInfo.capabilities()) {}
 
@@ -134,7 +135,8 @@ void HierarchicalAllocatorProcess::initialize(
     const Duration& _allocationInterval,
     const lambda::function<
         void(const FrameworkID&,
-             const hashmap<SlaveID, Resources>&)>& _offerCallback,
+             const hashmap<string, hashmap<SlaveID, Resources>>&)>&
+      _offerCallback,
     const lambda::function<
         void(const FrameworkID&,
              const hashmap<SlaveID, UnavailableResources>&)>&
@@ -239,37 +241,44 @@ void HierarchicalAllocatorProcess::addFramework(
 
   const Framework& framework = frameworks.at(frameworkId);
 
-  const string& role = framework.role;
+  foreach (const string& role, framework.roles) {
+    // If this is the first framework to register as this role,
+    // initialize state as necessary.
+    if (!activeRoles.contains(role)) {
+      activeRoles[role] = 1;
+      roleSorter->add(role, roleWeight(role));
+      frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())});
+      frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
+      metrics.addRole(role);
+    } else {
+      activeRoles[role]++;
+    }
 
-  // If this is the first framework to register as this role,
-  // initialize state as necessary.
-  if (!activeRoles.contains(role)) {
-    activeRoles[role] = 1;
-    roleSorter->add(role, roleWeight(role));
-    frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())});
-    frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
-    metrics.addRole(role);
-  } else {
-    activeRoles[role]++;
+    CHECK(!frameworkSorters.at(role)->contains(frameworkId.value()));
+    frameworkSorters.at(role)->add(frameworkId.value());
   }
 
-  CHECK(!frameworkSorters.at(role)->contains(frameworkId.value()));
-  frameworkSorters.at(role)->add(frameworkId.value());
-
   // TODO(bmahler): Validate that the reserved resources have the
   // framework's role.
 
   // Update the allocation for this framework.
-  foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
-    if (slaves.contains(slaveId)) {
-      roleSorter->allocated(role, slaveId, allocated);
-      frameworkSorters.at(role)->add(slaveId, allocated);
+  foreachpair (const SlaveID& slaveId, const Resources& resources, used) {
+    if (!slaves.contains(slaveId)) {
+      continue;
+    }
+
+    hashmap<string, Resources> allocations = resources.allocations();
+
+    foreachpair (const string& role, const Resources& allocation, allocations) {
+      roleSorter->allocated(role, slaveId, allocation);
+      frameworkSorters.at(role)->add(slaveId, allocation);
       frameworkSorters.at(role)->allocated(
-          frameworkId.value(), slaveId, allocated);
+          frameworkId.value(), slaveId, allocation);
 
       if (quotas.contains(role)) {
-        // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-        quotaRoleSorter->allocated(role, slaveId, allocated.nonRevocable());
+        // See comment at `quotaRoleSorter` declaration
+        // regarding non-revocable.
+        quotaRoleSorter->allocated(role, slaveId, allocation.nonRevocable());
       }
     }
   }
@@ -290,23 +299,29 @@ void HierarchicalAllocatorProcess::removeFramework(
   CHECK(initialized);
   CHECK(frameworks.contains(frameworkId));
 
-  const string& role = frameworks.at(frameworkId).role;
-  CHECK(activeRoles.contains(role));
+  const Framework& framework = frameworks.at(frameworkId);
+
+  foreach (const string& role, framework.roles) {
+    // Might not be in 'frameworkSorters[role]' because it
+    // was previously deactivated and never re-added.
+    if (!frameworkSorters.contains(role) ||
+        !frameworkSorters.at(role)->contains(frameworkId.value())) {
+      continue;
+    }
 
-  // Might not be in 'frameworkSorters[role]' because it was previously
-  // deactivated and never re-added.
-  if (frameworkSorters.at(role)->contains(frameworkId.value())) {
     hashmap<SlaveID, Resources> allocation =
       frameworkSorters.at(role)->allocation(frameworkId.value());
 
     // Update the allocation for this framework.
-    foreachpair (
-        const SlaveID& slaveId, const Resources& allocated, allocation) {
+    foreachpair (const SlaveID& slaveId,
+                 const Resources& allocated,
+                 allocation) {
       roleSorter->unallocated(role, slaveId, allocated);
       frameworkSorters.at(role)->remove(slaveId, allocated);
 
       if (quotas.contains(role)) {
-        // See comment at `quotaRoleSorter` declaration regarding non-revocable.
+        // See comment at `quotaRoleSorter` declaration
+        // regarding non-revocable.
         quotaRoleSorter->unallocated(role, slaveId, allocated.nonRevocable());
       }
     }
@@ -314,24 +329,28 @@ void HierarchicalAllocatorProcess::removeFramework(
     frameworkSorters.at(role)->remove(frameworkId.value());
   }
 
-  // If this is the last framework that was registered for this role,
-  // cleanup associated state. This is not necessary for correctness
-  // (roles with no registered frameworks will not be offered any
-  // resources), but since many different role names might be used
-  // over time, we want to avoid leaking resources for no-longer-used
-  // role names. Note that we don't remove the role from
-  // `quotaRoleSorter` if it exists there, since roles with a quota
-  // set still influence allocation even if they don't have any
-  // registered frameworks.
-  activeRoles[role]--;
-  if (activeRoles[role] == 0) {
-    activeRoles.erase(role);
-    roleSorter->remove(role);
+  foreach (const string& role, framework.roles) {
+    CHECK(activeRoles.contains(role));
+
+    // If this is the last framework that was registered for this role,
+    // cleanup associated state. This is not necessary for correctness
+    // (roles with no registered frameworks will not be offered any
+    // resources), but since many different role names might be used
+    // over time, we want to avoid leaking resources for no-longer-used
+    // role names. Note that we don't remove the role from
+    // `quotaRoleSorter` if it exists there, since roles with a quota
+    // set still influence allocation even if they don't have any
+    // registered frameworks.
+    activeRoles[role]--;
+    if (activeRoles[role] == 0) {
+      activeRoles.erase(role);
+      roleSorter->remove(role);
 
-    CHECK(frameworkSorters.contains(role));
-    frameworkSorters.erase(role);
+      CHECK(frameworkSorters.contains(role));
+      frameworkSorters.erase(role);
 
-    metrics.removeRole(role);
+      metrics.removeRole(role);
+    }
   }
 
   // Do not delete the filters contained in this
@@ -350,10 +369,12 @@ void HierarchicalAllocatorProcess::activateFramework(
   CHECK(initialized);
   CHECK(frameworks.contains(frameworkId));
 
-  const string& role = frameworks.at(frameworkId).role;
+  const Framework& framework = frameworks.at(frameworkId);
 
-  CHECK(frameworkSorters.contains(role));
-  frameworkSorters.at(role)->activate(frameworkId.value());
+  foreach (const string& role, framework.roles) {
+    CHECK(frameworkSorters.contains(role));
+    frameworkSorters.at(role)->activate(frameworkId.value());
+  }
 
   LOG(INFO) << "Activated framework " << frameworkId;
 
@@ -368,16 +389,17 @@ void HierarchicalAllocatorProcess::deactivateFramework(
   CHECK(frameworks.contains(frameworkId));
 
   Framework& framework = frameworks.at(frameworkId);
-  const string& role = framework.role;
 
-  CHECK(frameworkSorters.contains(role));
-  frameworkSorters.at(role)->deactivate(frameworkId.value());
+  foreach (const string& role, framework.roles) {
+    CHECK(frameworkSorters.contains(role));
+    frameworkSorters.at(role)->deactivate(frameworkId.value());
 
-  // Note that the Sorter *does not* remove the resources allocated
-  // to this framework. For now, this is important because if the
-  // framework fails over and is activated, we still want a record
-  // of the resources that it is using. We might be able to collapse
-  // the added/removed and activated/deactivated in the future.
+    // Note that the Sorter *does not* remove the resources allocated
+    // to this framework. For now, this is important because if the
+    // framework fails over and is activated, we still want a record
+    // of the resources that it is using. We might be able to collapse
+    // the added/removed and activated/deactivated in the future.
+  }
 
   // Do not delete the filters contained in this
   // framework's `offerFilters` hashset yet, see comments in
@@ -402,11 +424,12 @@ void HierarchicalAllocatorProcess::updateFramework(
   CHECK(frameworks.contains(frameworkId));
 
   Framework& framework = frameworks.at(frameworkId);
+  set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
 
-  // TODO(jmlvanre): Once we allow frameworks to re-register with a new 'role',
-  // we need to update our internal 'frameworks' structure. See MESOS-703 for
-  // progress on allowing these fields to be updated.
-  CHECK_EQ(framework.role, frameworkInfo.role());
+  // TODO(bmahler): Allow frameworks to update their roles, see MESOS-6627.
+  CHECK(framework.roles == newRoles)
+    << "Expected: " << stringify(framework.roles)
+    << " vs Actual: " << stringify(newRoles);
 
   framework.capabilities = Capabilities(frameworkInfo.capabilities());
 }
@@ -430,11 +453,15 @@ void HierarchicalAllocatorProcess::addSlave(
 
   // Update the allocation for each framework.
   foreachpair (const FrameworkID& frameworkId,
-               const Resources& allocated,
+               const Resources& used_,
                used) {
-    if (frameworks.contains(frameworkId)) {
-      const string& role = frameworks.at(frameworkId).role;
+    if (!frameworks.contains(frameworkId) ) {
+      continue;
+    }
 
+    foreachpair (const string& role,
+                 const Resources& allocated,
+                 used_.allocations()) {
       // TODO(bmahler): Validate that the reserved resources have the
       // framework's role.
       CHECK(roleSorter->contains(role));
@@ -624,24 +651,47 @@ void HierarchicalAllocatorProcess::updateAllocation(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     const Resources& offeredResources,
-    const vector<Offer::Operation>& operations)
+    const vector<Offer::Operation>& operations_)
 {
   CHECK(initialized);
   CHECK(slaves.contains(slaveId));
   CHECK(frameworks.contains(frameworkId));
 
   Slave& slave = slaves.at(slaveId);
-  const Framework& framework = frameworks.at(frameworkId);
 
-  CHECK(frameworkSorters.contains(framework.role));
+  // We require that an allocation is tied to a single role.
+  //
+  // TODO(bmahler): The use of `Resources::allocations()` induces
+  // unnecessary copying of `Resources` objects (which is expensive
+  // at the time this was written).
+  hashmap<string, Resources> allocations = offeredResources.allocations();
+
+  CHECK_EQ(1u, allocations.size());
+
+  string role = allocations.begin()->first;
 
-  const Owned<Sorter>& frameworkSorter = frameworkSorters.at(framework.role);
+  CHECK(frameworkSorters.contains(role));
+
+  const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
 
   // We keep a copy of the offered resources here and it is updated
   // by the operations.
   Resources _offeredResources = offeredResources;
 
-  foreach (const Offer::Operation& operation, operations) {
+  // Take a non-const copy to perform the adjustment below.
+  vector<Offer::Operation> operations = operations_;
+
+  foreach (Offer::Operation& operation, operations) {
+    // The operations should have been normalized by the master via
+    // `protobuf::adjustOfferOperation()`.
+    //
+    // TODO(bmahler): Check that the operations have the allocation
+    // info set. The master should enforce this. E.g.
+    //
+    //  foreach (const Offer::Operation& operation, operations) {
+    //    CHECK_NONE(validateOperationOnAllocatedResources(operation));
+    //  }
+
     Try<Resources> updatedOfferedResources = _offeredResources.apply(operation);
     CHECK_SOME(updatedOfferedResources);
     _offeredResources = updatedOfferedResources.get();
@@ -704,11 +754,12 @@ void HierarchicalAllocatorProcess::updateAllocation(
 
         frameworkSorter->add(slaveId, additional);
         frameworkSorter->allocated(frameworkId.value(), slaveId, additional);
-        roleSorter->allocated(framework.role, slaveId, additional);
 
-        if (quotas.contains(framework.role)) {
+        roleSorter->allocated(role, slaveId, additional);
+
+        if (quotas.contains(role)) {
           quotaRoleSorter->allocated(
-              framework.role, slaveId, additional.nonRevocable());
+              role, slaveId, additional.nonRevocable());
         }
       }
 
@@ -751,19 +802,19 @@ void HierarchicalAllocatorProcess::updateAllocation(
         frameworkAllocation,
         updatedFrameworkAllocation.get());
 
-    // Update the allocation in the role sorter.
+    // Update the allocated resources in the role sorter.
+    // We only update the allocated resources if this role
+    // has quota set.
     roleSorter->update(
-        framework.role,
+        role,
         slaveId,
         frameworkAllocation,
         updatedFrameworkAllocation.get());
 
-    // Update the allocated resources in the quota sorter. We only update
-    // the allocated resources if this role has quota set.
-    if (quotas.contains(framework.role)) {
+    if (quotas.contains(role)) {
       // See comment at `quotaRoleSorter` declaration regarding non-revocable.
       quotaRoleSorter->update(
-          framework.role,
+          role,
           slaveId,
           frameworkAllocation.nonRevocable(),
           updatedFrameworkAllocation.get().nonRevocable());
@@ -782,6 +833,11 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable(
     const SlaveID& slaveId,
     const vector<Offer::Operation>& operations)
 {
+  // Note that the operations may contain allocated resources,
+  // however such operations can be applied to unallocated
+  // resources unambiguously, so we don't have a strict CHECK
+  // for the operations to contain only unallocated resources.
+
   CHECK(initialized);
   CHECK(slaves.contains(slaveId));
 
@@ -975,6 +1031,19 @@ void HierarchicalAllocatorProcess::recoverResources(
     return;
   }
 
+  // For now, we require that resources are recovered within a single
+  // allocation role (since filtering in the same manner across roles
+  // seems undesirable).
+  //
+  // TODO(bmahler): The use of `Resources::allocations()` induces
+  // unnecessary copying of `Resources` objects (which is expensive
+  // at the time this was written).
+  hashmap<string, Resources> allocations = resources.allocations();
+
+  CHECK_EQ(1u, allocations.size());
+
+  string role = allocations.begin()->first;
+
   // Updated resources allocated to framework (if framework still
   // exists, which it might not in the event that we dispatched
   // Master::offer before we received
@@ -982,21 +1051,20 @@ void HierarchicalAllocatorProcess::recoverResources(
   // MesosAllocatorProcess::deactivateFramework, in which case we will
   // have already recovered all of its resources).
   if (frameworks.contains(frameworkId)) {
-    const Framework& framework = frameworks.at(frameworkId);
-
-    CHECK(frameworkSorters.contains(framework.role));
+    CHECK(frameworkSorters.contains(role));
 
-    const Owned<Sorter>& frameworkSorter = frameworkSorters.at(framework.role);
+    const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
 
     if (frameworkSorter->contains(frameworkId.value())) {
       frameworkSorter->unallocated(frameworkId.value(), slaveId, resources);
       frameworkSorter->remove(slaveId, resources);
-      roleSorter->unallocated(framework.role, slaveId, resources);
+      roleSorter->unallocated(role, slaveId, resources);
 
-      if (quotas.contains(framework.role)) {
-        // See comment at `quotaRoleSorter` declaration regarding non-revocable.
+      if (quotas.contains(role)) {
+        // See comment at `quotaRoleSorter` declaration
+        // regarding non-revocable
         quotaRoleSorter->unallocated(
-            framework.role, slaveId, resources.nonRevocable());
+            role, slaveId, resources.nonRevocable());
       }
     }
   }
@@ -1052,9 +1120,14 @@ void HierarchicalAllocatorProcess::recoverResources(
             << " filtered agent " << slaveId
             << " for " << timeout.get();
 
-    // Create a new filter.
-    OfferFilter* offerFilter = new RefusedOfferFilter(resources);
-    frameworks.at(frameworkId).offerFilters[slaveId].insert(offerFilter);
+    // Create a new filter. Note that we unallocate the resources
+    // since filters are applied per-role already.
+    Resources unallocated = resources;
+    unallocated.unallocate();
+
+    OfferFilter* offerFilter = new RefusedOfferFilter(unallocated);
+    frameworks.at(frameworkId)
+      .offerFilters[role][slaveId].insert(offerFilter);
 
     // Expire the filter after both an `allocationInterval` and the
     // `timeout` have elapsed. This ensures that the filter does not
@@ -1072,14 +1145,16 @@ void HierarchicalAllocatorProcess::recoverResources(
     // We need to disambiguate the function call to pick the correct
     // `expire()` overload.
     void (Self::*expireOffer)(
-              const FrameworkID&,
-              const SlaveID&,
-              OfferFilter*) = &Self::expire;
+        const FrameworkID&,
+        const string&,
+        const SlaveID&,
+        OfferFilter*) = &Self::expire;
 
     delay(timeout.get(),
           self(),
           expireOffer,
           frameworkId,
+          role,
           slaveId,
           offerFilter);
   }
@@ -1093,17 +1168,18 @@ void HierarchicalAllocatorProcess::suppressOffers(
   CHECK(frameworks.contains(frameworkId));
 
   Framework& framework = frameworks.at(frameworkId);
-
   framework.suppressed = true;
 
-  CHECK(frameworkSorters.contains(framework.role));
-
   // Deactivating the framework in the sorter is fine as long as
   // SUPPRESS is not parameterized. When parameterization is added,
   // we have to differentiate between the cases here.
-  frameworkSorters.at(framework.role)->deactivate(frameworkId.value());
+  foreach (const string& role, framework.roles) {
+    CHECK(frameworkSorters.contains(role));
+    frameworkSorters.at(role)->deactivate(frameworkId.value());
+  }
 
-  LOG(INFO) << "Suppressed offers for framework " << frameworkId;
+  LOG(INFO) << "Suppressed offers for roles " << stringify(framework.roles)
+            << " of framework " << frameworkId;
 }
 
 
@@ -1114,19 +1190,19 @@ void HierarchicalAllocatorProcess::reviveOffers(
   CHECK(frameworks.contains(frameworkId));
 
   Framework& framework = frameworks.at(frameworkId);
-
   framework.offerFilters.clear();
   framework.inverseOfferFilters.clear();
 
   if (framework.suppressed) {
     framework.suppressed = false;
 
-    CHECK(frameworkSorters.contains(framework.role));
-
     // Activating the framework in the sorter on REVIVE is fine as long as
     // SUPPRESS is not parameterized. When parameterization is added,
     // we may need to differentiate between the cases here.
-    frameworkSorters.at(framework.role)->activate(frameworkId.value());
+    foreach (const string& role, framework.roles) {
+      CHECK(frameworkSorters.contains(role));
+      frameworkSorters.at(role)->activate(frameworkId.value());
+    }
   }
 
   // We delete each actual `OfferFilter` when
@@ -1136,7 +1212,8 @@ void HierarchicalAllocatorProcess::reviveOffers(
   // would expire that filter too soon. Note that this only works
   // right now because ALL Filter types "expire".
 
-  LOG(INFO) << "Removed offer filters for framework " << frameworkId;
+  LOG(INFO) << "Revived offers for roles " << stringify(framework.roles)
+            << " of framework " << frameworkId;
 
   allocate();
 }
@@ -1342,7 +1419,7 @@ void HierarchicalAllocatorProcess::__allocate()
   //       framework having the corresponding role.
   //   (2) For unreserved resources on the slave, allocate these
   //       to a framework of any role.
-  hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
+  hashmap<FrameworkID, hashmap<string, hashmap<SlaveID, Resources>>> offerable;
 
   // NOTE: This function can operate on a small subset of
   // `allocationCandidates`, we have to make sure that we don't
@@ -1377,9 +1454,9 @@ void HierarchicalAllocatorProcess::__allocate()
   auto getQuotaRoleAllocatedResources = [this](const string& role) {
     CHECK(quotas.contains(role));
 
-    // NOTE: `allocationScalarQuantities` omits dynamic reservation
-    // and persistent volume info, but we additionally strip `role`
-    // here via `flatten()`.
+    // NOTE: `allocationScalarQuantities` omits dynamic reservation,
+    // persistent volume info, and allocation info. We additionally
+    // strip the `Resource.role` here via `flatten()`.
     return quotaRoleSorter->allocationScalarQuantities(role).flatten();
   };
 
@@ -1493,18 +1570,20 @@ void HierarchicalAllocatorProcess::__allocate()
 
         // If the framework filters these resources, ignore. The unallocated
         // part of the quota will not be allocated to other roles.
-        if (isFiltered(frameworkId, slaveId, resources)) {
+        if (isFiltered(frameworkId, role, slaveId, resources)) {
           continue;
         }
 
         VLOG(2) << "Allocating " << resources << " on agent " << slaveId
-                << " to framework " << frameworkId
+                << " to role " << role << " of framework " << frameworkId
                 << " as part of its role quota";
 
+        resources.allocate(role);
+
         // NOTE: We perform "coarse-grained" allocation for quota'ed
         // resources, which may lead to overcommitment of resources beyond
         // quota. This is fine since quota currently represents a guarantee.
-        offerable[frameworkId][slaveId] += resources;
+        offerable[frameworkId][role][slaveId] += resources;
         offeredSharedResources[slaveId] += resources.shared();
 
         slave.allocated += resources;
@@ -1671,7 +1750,7 @@ void HierarchicalAllocatorProcess::__allocate()
         }
 
         // If the framework filters these resources, ignore.
-        if (isFiltered(frameworkId, slaveId, resources)) {
+        if (isFiltered(frameworkId, role, slaveId, resources)) {
           continue;
         }
 
@@ -1691,14 +1770,16 @@ void HierarchicalAllocatorProcess::__allocate()
         }
 
         VLOG(2) << "Allocating " << resources << " on agent " << slaveId
-                << " to framework " << frameworkId;
+                << " to role " << role << " of framework " << frameworkId;
+
+        resources.allocate(role);
 
         // NOTE: We perform "coarse-grained" allocation, meaning that we always
         // allocate the entire remaining slave resources to a single framework.
         //
         // NOTE: We may have already allocated some resources on the current
         // agent as part of quota.
-        offerable[frameworkId][slaveId] += resources;
+        offerable[frameworkId][role][slaveId] += resources;
         offeredSharedResources[slaveId] += resources.shared();
         allocatedStage2 += scalarQuantity;
 
@@ -1722,7 +1803,7 @@ void HierarchicalAllocatorProcess::__allocate()
   } else {
     // Now offer the resources to each framework.
     foreachkey (const FrameworkID& frameworkId, offerable) {
-      offerCallback(frameworkId, offerable[frameworkId]);
+      offerCallback(frameworkId, offerable.at(frameworkId));
     }
   }
 }
@@ -1820,6 +1901,7 @@ void HierarchicalAllocatorProcess::deallocate()
 
 void HierarchicalAllocatorProcess::_expire(
     const FrameworkID& frameworkId,
+    const string& role,
     const SlaveID& slaveId,
     OfferFilter* offerFilter)
 {
@@ -1835,12 +1917,17 @@ void HierarchicalAllocatorProcess::_expire(
   if (frameworkIterator != frameworks.end()) {
     Framework& framework = frameworkIterator->second;
 
-    auto filters = framework.offerFilters.find(slaveId);
-    if (filters != framework.offerFilters.end()) {
-      filters->second.erase(offerFilter);
+    auto roleFilters = framework.offerFilters.find(role);
+    if (roleFilters != framework.offerFilters.end()) {
+      auto agentFilters = roleFilters->second.find(slaveId);
 
-      if (filters->second.empty()) {
-        framework.offerFilters.erase(slaveId);
+      if (agentFilters != roleFilters->second.end()) {
+        // Erase the filter (may be a no-op per the comment above).
+        agentFilters->second.erase(offerFilter);
+
+        if (agentFilters->second.empty()) {
+          roleFilters->second.erase(slaveId);
+        }
       }
     }
   }
@@ -1851,6 +1938,7 @@ void HierarchicalAllocatorProcess::_expire(
 
 void HierarchicalAllocatorProcess::expire(
     const FrameworkID& frameworkId,
+    const string& role,
     const SlaveID& slaveId,
     OfferFilter* offerFilter)
 {
@@ -1858,6 +1946,7 @@ void HierarchicalAllocatorProcess::expire(
       self(),
       &Self::_expire,
       frameworkId,
+      role,
       slaveId,
       offerFilter);
 }
@@ -1918,6 +2007,7 @@ bool HierarchicalAllocatorProcess::isWhitelisted(
 
 bool HierarchicalAllocatorProcess::isFiltered(
     const FrameworkID& frameworkId,
+    const string& role,
     const SlaveID& slaveId,
     const Resources& resources) const
 {
@@ -1926,15 +2016,26 @@ bool HierarchicalAllocatorProcess::isFiltered(
 
   const Framework& framework = frameworks.at(frameworkId);
 
-  if (framework.offerFilters.contains(slaveId)) {
-    foreach (OfferFilter* offerFilter, framework.offerFilters.at(slaveId)) {
-      if (offerFilter->filter(resources)) {
-        VLOG(1) << "Filtered offer with " << resources
-                << " on agent " << slaveId
-                << " for framework " << frameworkId;
+  // Since this is a performance-sensitive piece of code,
+  // we use find to avoid the doing any redundant lookups.
+  auto roleFilters = framework.offerFilters.find(role);
+  if (roleFilters == framework.offerFilters.end()) {
+    return false;
+  }
 
-        return true;
-      }
+  auto agentFilters = roleFilters->second.find(slaveId);
+  if (agentFilters == roleFilters->second.end()) {
+    return false;
+  }
+
+  foreach (OfferFilter* offerFilter, agentFilters->second) {
+    if (offerFilter->filter(resources)) {
+      VLOG(1) << "Filtered offer with " << resources
+              << " on agent " << slaveId
+              << " for role " << role
+              << " of framework " << frameworkId;
+
+      return true;
     }
   }
 
@@ -2025,12 +2126,12 @@ double HierarchicalAllocatorProcess::_offer_filters_active(
   double result = 0;
 
   foreachvalue (const Framework& framework, frameworks) {
-    if (framework.role != role) {
+    if (!framework.offerFilters.contains(role)) {
       continue;
     }
 
-    foreachkey (const SlaveID& slaveId, framework.offerFilters) {
-      result += framework.offerFilters.at(slaveId).size();
+    foreachkey (const SlaveID& slaveId, framework.offerFilters.at(role)) {
+      result += framework.offerFilters.at(role).at(slaveId).size();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 331b85b..896abcd 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -95,7 +95,8 @@ public:
       const Duration& allocationInterval,
       const lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>& offerCallback,
+               const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&
+        offerCallback,
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, UnavailableResources>&)>&
@@ -186,9 +187,11 @@ public:
       const Resources& resources,
       const Option<Filters>& filters);
 
+  // TODO(bmahler): Update to take optional Suppress.role.
   void suppressOffers(
       const FrameworkID& frameworkId);
 
+  // TODO(bmahler): Update to take optional Revive.role.
   void reviveOffers(
       const FrameworkID& frameworkId);
 
@@ -233,14 +236,16 @@ protected:
   // Helper for `_allocate()` that deallocates resources for inverse offers.
   void deallocate();
 
-  // Remove an offer filter for the specified framework.
+  // Remove an offer filter for the specified role of the framework.
   void expire(
       const FrameworkID& frameworkId,
+      const std::string& role,
       const SlaveID& slaveId,
       OfferFilter* offerFilter);
 
   void _expire(
       const FrameworkID& frameworkId,
+      const std::string& role,
       const SlaveID& slaveId,
       OfferFilter* offerFilter);
 
@@ -256,10 +261,11 @@ protected:
   // Checks whether the slave is whitelisted.
   bool isWhitelisted(const SlaveID& slaveId) const;
 
-  // Returns true if there is a resource offer filter for this framework
-  // on this slave.
+  // Returns true if there is a resource offer filter for the
+  // specified role of this framework on this slave.
   bool isFiltered(
       const FrameworkID& frameworkId,
+      const std::string& role,
       const SlaveID& slaveId,
       const Resources& resources) const;
 
@@ -281,11 +287,13 @@ protected:
 
   lambda::function<
       void(const FrameworkID&,
-           const hashmap<SlaveID, Resources>&)> offerCallback;
+           const hashmap<std::string, hashmap<SlaveID, Resources>>&)>
+    offerCallback;
 
   lambda::function<
       void(const FrameworkID&,
-           const hashmap<SlaveID, UnavailableResources>&)> inverseOfferCallback;
+           const hashmap<SlaveID, UnavailableResources>&)>
+    inverseOfferCallback;
 
   friend Metrics;
   Metrics metrics;
@@ -294,7 +302,7 @@ protected:
   {
     explicit Framework(const FrameworkInfo& frameworkInfo);
 
-    std::string role;
+    std::set<std::string> roles;
 
     // Whether the framework suppresses offers.
     bool suppressed;
@@ -302,7 +310,9 @@ protected:
     protobuf::framework::Capabilities capabilities;
 
     // Active offer and inverse offer filters for the framework.
-    hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
+    // Offer filters are tied to the role the filtered resources
+    // were allocated to.
+    hashmap<std::string, hashmap<SlaveID, hashset<OfferFilter*>>> offerFilters;
     hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
   };
 
@@ -353,7 +363,12 @@ protected:
     // In this case, allocated > total.
     Resources available() const
     {
-      return total - allocated;
+      // In order to subtract from the total,
+      // we strip the allocation information.
+      Resources allocated_ = allocated;
+      allocated_.unallocate();
+
+      return total - allocated_;
     }
 
     bool activated;  // Whether to offer resources.

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 284566c..2edcab7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6704,8 +6704,9 @@ void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,
 }
 
 
-void Master::offer(const FrameworkID& frameworkId,
-                   const hashmap<SlaveID, Resources>& resources)
+void Master::offer(
+    const FrameworkID& frameworkId,
+    const hashmap<string, hashmap<SlaveID, Resources>>& resources)
 {
   if (!frameworks.registered.contains(frameworkId) ||
       !frameworks.registered[frameworkId]->active()) {
@@ -6713,131 +6714,142 @@ void Master::offer(const FrameworkID& frameworkId,
                  << frameworkId << " because the framework"
                  << " has terminated or is inactive";
 
-    foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
-      allocator->recoverResources(frameworkId, slaveId, offered, None());
+    foreachkey (const string& role, resources) {
+      foreachpair (const SlaveID& slaveId,
+                   const Resources& offered,
+                   resources.at(role)) {
+        allocator->recoverResources(frameworkId, slaveId, offered, None());
+      }
     }
     return;
   }
 
-  // Create an offer for each slave and add it to the message.
-  ResourceOffersMessage message;
+  Framework* framework = CHECK_NOTNULL(frameworks.registered.at(frameworkId));
 
-  Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]);
-  foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
-    if (!slaves.registered.contains(slaveId)) {
-      LOG(WARNING)
-        << "Master returning resources offered to framework " << *framework
-        << " because agent " << slaveId << " is not valid";
-
-      allocator->recoverResources(frameworkId, slaveId, offered, None());
-      continue;
-    }
+  // Each offer we create is tied to a single agent
+  // and a single allocation role.
+  ResourceOffersMessage message;
 
-    Slave* slave = slaves.registered.get(slaveId);
-    CHECK_NOTNULL(slave);
+  foreachkey (const string& role, resources) {
+    foreachpair (const SlaveID& slaveId,
+                 const Resources& offered,
+                 resources.at(role)) {
+      if (!slaves.registered.contains(slaveId)) {
+        LOG(WARNING)
+          << "Master returning resources offered to framework " << *framework
+          << " because agent " << slaveId << " is not valid";
 
-    // This could happen if the allocator dispatched 'Master::offer' before
-    // the slave was deactivated in the allocator.
-    if (!slave->active) {
-      LOG(WARNING)
-        << "Master returning resources offered because agent " << *slave
-        << " is " << (slave->connected ? "deactivated" : "disconnected");
+        allocator->recoverResources(frameworkId, slaveId, offered, None());
+        continue;
+      }
 
-      allocator->recoverResources(frameworkId, slaveId, offered, None());
-      continue;
-    }
+      Slave* slave = slaves.registered.get(slaveId);
+      CHECK_NOTNULL(slave);
 
-#ifdef WITH_NETWORK_ISOLATOR
-    // TODO(dhamon): This flag is required as the static allocation of
-    // ephemeral ports leads to a maximum number of containers that can
-    // be created on each slave. Once MESOS-1654 is fixed and ephemeral
-    // ports are a first class resource, this can be removed.
-    if (flags.max_executors_per_agent.isSome()) {
-      // Check that we haven't hit the executor limit.
-      size_t numExecutors = 0;
-      foreachkey (const FrameworkID& frameworkId, slave->executors) {
-        numExecutors += slave->executors[frameworkId].keys().size();
-      }
+      // This could happen if the allocator dispatched 'Master::offer' before
+      // the slave was deactivated in the allocator.
+      if (!slave->active) {
+        LOG(WARNING)
+          << "Master returning resources offered because agent " << *slave
+          << " is " << (slave->connected ? "deactivated" : "disconnected");
 
-      if (numExecutors >= flags.max_executors_per_agent.get()) {
-        LOG(WARNING) << "Master returning resources offered because agent "
-                     << *slave << " has reached the maximum number of "
-                     << "executors";
-        // Pass a default filter to avoid getting this same offer immediately
-        // from the allocator.
-        allocator->recoverResources(frameworkId, slaveId, offered, Filters());
+        allocator->recoverResources(frameworkId, slaveId, offered, None());
         continue;
       }
-    }
-#endif // WITH_NETWORK_ISOLATOR
 
-    // TODO(vinod): Split regular and revocable resources into
-    // separate offers, so that rescinding offers with revocable
-    // resources does not affect offers with regular resources.
-
-    // TODO(bmahler): Set "https" if only "https" is supported.
-    mesos::URL url;
-    url.set_scheme("http");
-    url.mutable_address()->set_hostname(slave->info.hostname());
-    url.mutable_address()->set_ip(stringify(slave->pid.address.ip));
-    url.mutable_address()->set_port(slave->pid.address.port);
-    url.set_path("/" + slave->pid.id);
-
-    Offer* offer = new Offer();
-    offer->mutable_id()->MergeFrom(newOfferId());
-    offer->mutable_framework_id()->MergeFrom(framework->id());
-    offer->mutable_slave_id()->MergeFrom(slave->id);
-    offer->set_hostname(slave->info.hostname());
-    offer->mutable_url()->MergeFrom(url);
-    offer->mutable_resources()->MergeFrom(offered);
-    offer->mutable_attributes()->MergeFrom(slave->info.attributes());
+  #ifdef WITH_NETWORK_ISOLATOR
+      // TODO(dhamon): This flag is required as the static allocation of
+      // ephemeral ports leads to a maximum number of containers that can
+      // be created on each slave. Once MESOS-1654 is fixed and ephemeral
+      // ports are a first class resource, this can be removed.
+      if (flags.max_executors_per_agent.isSome()) {
+        // Check that we haven't hit the executor limit.
+        size_t numExecutors = 0;
+        foreachkey (const FrameworkID& frameworkId, slave->executors) {
+          numExecutors += slave->executors[frameworkId].keys().size();
+        }
 
-    // Add all framework's executors running on this slave.
-    if (slave->executors.contains(framework->id())) {
-      const hashmap<ExecutorID, ExecutorInfo>& executors =
-        slave->executors[framework->id()];
-      foreachkey (const ExecutorID& executorId, executors) {
-        offer->add_executor_ids()->MergeFrom(executorId);
+        if (numExecutors >= flags.max_executors_per_agent.get()) {
+          LOG(WARNING) << "Master returning resources offered because agent "
+                       << *slave << " has reached the maximum number of "
+                       << "executors";
+          // Pass a default filter to avoid getting this same offer immediately
+          // from the allocator.
+          allocator->recoverResources(frameworkId, slaveId, offered, Filters());
+          continue;
+        }
+      }
+  #endif // WITH_NETWORK_ISOLATOR
+
+      // TODO(vinod): Split regular and revocable resources into
+      // separate offers, so that rescinding offers with revocable
+      // resources does not affect offers with regular resources.
+
+      // TODO(bmahler): Set "https" if only "https" is supported.
+      mesos::URL url;
+      url.set_scheme("http");
+      url.mutable_address()->set_hostname(slave->info.hostname());
+      url.mutable_address()->set_ip(stringify(slave->pid.address.ip));
+      url.mutable_address()->set_port(slave->pid.address.port);
+      url.set_path("/" + slave->pid.id);
+
+      Offer* offer = new Offer();
+      offer->mutable_id()->MergeFrom(newOfferId());
+      offer->mutable_framework_id()->MergeFrom(framework->id());
+      offer->mutable_slave_id()->MergeFrom(slave->id);
+      offer->set_hostname(slave->info.hostname());
+      offer->mutable_url()->MergeFrom(url);
+      offer->mutable_resources()->MergeFrom(offered);
+      offer->mutable_attributes()->MergeFrom(slave->info.attributes());
+      offer->mutable_allocation_info()->set_role(role);
+
+      // Add all framework's executors running on this slave.
+      if (slave->executors.contains(framework->id())) {
+        const hashmap<ExecutorID, ExecutorInfo>& executors =
+          slave->executors[framework->id()];
+        foreachkey (const ExecutorID& executorId, executors) {
+          offer->add_executor_ids()->MergeFrom(executorId);
+        }
       }
-    }
 
-    // If the slave in this offer is planned to be unavailable due to
-    // maintenance in the future, then set the Unavailability.
-    CHECK(machines.contains(slave->machineId));
-    if (machines[slave->machineId].info.has_unavailability()) {
-      offer->mutable_unavailability()->CopyFrom(
-          machines[slave->machineId].info.unavailability());
-    }
+      // If the slave in this offer is planned to be unavailable due to
+      // maintenance in the future, then set the Unavailability.
+      CHECK(machines.contains(slave->machineId));
+      if (machines[slave->machineId].info.has_unavailability()) {
+        offer->mutable_unavailability()->CopyFrom(
+            machines[slave->machineId].info.unavailability());
+      }
 
-    offers[offer->id()] = offer;
+      offers[offer->id()] = offer;
 
-    framework->addOffer(offer);
-    slave->addOffer(offer);
+      framework->addOffer(offer);
+      slave->addOffer(offer);
 
-    if (flags.offer_timeout.isSome()) {
-      // Rescind the offer after the timeout elapses.
-      offerTimers[offer->id()] =
-        delay(flags.offer_timeout.get(),
-              self(),
-              &Self::offerTimeout,
-              offer->id());
-    }
+      if (flags.offer_timeout.isSome()) {
+        // Rescind the offer after the timeout elapses.
+        offerTimers[offer->id()] =
+          delay(flags.offer_timeout.get(),
+                self(),
+                &Self::offerTimeout,
+                offer->id());
+      }
 
-    // TODO(jieyu): For now, we strip 'ephemeral_ports' resource from
-    // offers so that frameworks do not see this resource. This is a
-    // short term workaround. Revisit this once we resolve MESOS-1654.
-    Offer offer_ = *offer;
-    offer_.clear_resources();
+      // TODO(jieyu): For now, we strip 'ephemeral_ports' resource from
+      // offers so that frameworks do not see this resource. This is a
+      // short term workaround. Revisit this once we resolve MESOS-1654.
+      Offer offer_ = *offer;
+      offer_.clear_resources();
 
-    foreach (const Resource& resource, offered) {
-      if (resource.name() != "ephemeral_ports") {
-        offer_.add_resources()->CopyFrom(resource);
+      foreach (const Resource& resource, offered) {
+        if (resource.name() != "ephemeral_ports") {
+          offer_.add_resources()->CopyFrom(resource);
+        }
       }
-    }
 
-    // Add the offer *AND* the corresponding slave's PID.
-    message.add_offers()->MergeFrom(offer_);
-    message.add_pids(slave->pid);
+      // Add the offer *AND* the corresponding slave's PID.
+      message.add_offers()->MergeFrom(offer_);
+      message.add_pids(slave->pid);
+    }
   }
 
   if (message.offers().size() == 0) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 812c740..511773e 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -447,7 +447,7 @@ public:
 
   void offer(
       const FrameworkID& frameworkId,
-      const hashmap<SlaveID, Resources>& resources);
+      const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
 
   void inverseOffer(
       const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1b86051/src/tests/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp
index 1f9261d..32c2912 100644
--- a/src/tests/allocator.hpp
+++ b/src/tests/allocator.hpp
@@ -361,7 +361,7 @@ public:
       const Duration&,
       const lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>&,
+               const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&,
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, UnavailableResources>&)>&,


[7/9] mesos git commit: Fixed MULTI_ROLE related bugs when updating framework info.

Posted by bm...@apache.org.
Fixed MULTI_ROLE related bugs when updating framework info.

The first issue is that we need to update the capabilities member
to reflect the new capabilities.

The second issue is that when we allow an upgrade or downgrade
to or from MULTI_ROLE, we need to update the `role` and `roles`
fields of `FrameworkInfo`.

Review: https://reviews.apache.org/r/56004


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7e976534
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7e976534
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7e976534

Branch: refs/heads/master
Commit: 7e976534062e221356bce96ef88718a413b734f8
Parents: 9f4dab1
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Jan 26 14:53:50 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:12 2017 -0800

----------------------------------------------------------------------
 src/master/master.hpp | 33 +++++++++++++++++----------------
 1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7e976534/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 616687f..a7e3c73 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2409,34 +2409,35 @@ struct Framework
     // do however allow frameworks to opt in and out of `MULTI_ROLE`
     // capability, given that the `role` and `roles` field contain the
     // same number of roles.
-    if (protobuf::frameworkHasCapability(
-            source, FrameworkInfo::Capability::MULTI_ROLE) ||
-        protobuf::frameworkHasCapability(
-            info, FrameworkInfo::Capability::MULTI_ROLE)) {
+    if (capabilities.multiRole || protobuf::frameworkHasCapability(
+            source, FrameworkInfo::Capability::MULTI_ROLE)) {
       // Two `roles` sets are equivalent if they contain the same
       // elements. A `role` `*` is not equivalent to an empty `roles`
       // set, but to the set `{*}`. Since we might be dealing with a
       // framework upgrading to `MULTI_ROLE` capability or dropping
       // it, we need to examine either `role` or `roles` in order to
       // determine the roles a framework is subscribed to.
-      const std::set<std::string> newRoles =
-        protobuf::frameworkHasCapability(
-            source, FrameworkInfo::Capability::MULTI_ROLE)
-          ? std::set<std::string>(
-                {source.roles().begin(), source.roles().end()})
-          : std::set<std::string>({source.role()});
-
       const std::set<std::string> oldRoles =
-        protobuf::frameworkHasCapability(
-            info, FrameworkInfo::Capability::MULTI_ROLE)
-          ? std::set<std::string>({info.roles().begin(), info.roles().end()})
-          : std::set<std::string>({info.role()});
+        protobuf::framework::getRoles(info);
+      const std::set<std::string> newRoles =
+        protobuf::framework::getRoles(source);
 
       if (oldRoles != newRoles) {
         return Error(
             "Frameworks cannot change their roles: expected '" +
             stringify(oldRoles) + "', but got '" + stringify(newRoles) + "'");
       }
+
+      info.clear_role();
+      info.clear_roles();
+
+      if (source.has_role()) {
+        info.set_role(source.role());
+      }
+
+      if (source.roles_size() > 0) {
+        info.mutable_roles()->CopyFrom(source.roles());
+      }
     } else {
       if (source.role() != info.role()) {
         LOG(WARNING) << "Cannot update FrameworkInfo.role to '" << source.role()
@@ -2444,7 +2445,6 @@ struct Framework
       }
     }
 
-
     if (source.user() != info.user()) {
       LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << source.user()
                    << "' for framework " << id() << ". Check MESOS-703";
@@ -2487,6 +2487,7 @@ struct Framework
     } else {
       info.clear_capabilities();
     }
+    capabilities = protobuf::framework::Capabilities(info.capabilities());
 
     if (source.has_labels()) {
       info.mutable_labels()->CopyFrom(source.labels());


[6/9] mesos git commit: Updated the agent to be MULTI_ROLE capable.

Posted by bm...@apache.org.
Updated the agent to be MULTI_ROLE capable.

With the addition of MULTI_ROLE support, the agent needs to ensure
that allocated resources reported to the master have the
`Resource.AllocationInfo` set. The approach taken here is to set
it only in memory upon recovering tasks and executors. Note that
once we allow a framework to modify its roles, we need to update
the agent to re-persist the resources with injected allocation
info (rather than just setting it in memory).

Review: https://reviews.apache.org/r/55971


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7497541f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7497541f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7497541f

Branch: refs/heads/master
Commit: 7497541f1ae0520f225aa6633df99ff9457c8cf6
Parents: 45786b6
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 25 17:12:34 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:11 2017 -0800

----------------------------------------------------------------------
 src/slave/resource_estimators/fixed.cpp |  8 ++-
 src/slave/slave.cpp                     | 87 +++++++++++++++++++++++++++-
 2 files changed, 92 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7497541f/src/slave/resource_estimators/fixed.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/fixed.cpp b/src/slave/resource_estimators/fixed.cpp
index 2c1268c..b43a3ac 100644
--- a/src/slave/resource_estimators/fixed.cpp
+++ b/src/slave/resource_estimators/fixed.cpp
@@ -57,7 +57,13 @@ public:
       allocatedRevocable += Resources(executor.allocated()).revocable();
     }
 
-    return totalRevocable - allocatedRevocable;
+    auto unallocated = [](const Resources& resources) {
+      Resources result = resources;
+      result.unallocate();
+      return result;
+    };
+
+    return totalRevocable - unallocated(allocatedRevocable);
   }
 
 protected:

http://git-wip-us.apache.org/repos/asf/mesos/blob/7497541f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 629e741..92564ff 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -101,6 +101,8 @@
 #include <slave/posix_signalhandler.hpp>
 #endif // __WINDOWS__
 
+using google::protobuf::RepeatedPtrField;
+
 using mesos::executor::Call;
 
 using mesos::master::detector::MasterDetector;
@@ -1386,6 +1388,9 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     // Registering for the first time.
     RegisterSlaveMessage message;
     message.set_version(MESOS_VERSION);
+    message.add_agent_capabilities()->set_type(
+        SlaveInfo::Capability::MULTI_ROLE);
+
     message.mutable_slave()->CopyFrom(info);
 
     // Include checkpointed resources.
@@ -1396,6 +1401,8 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
     message.set_version(MESOS_VERSION);
+    message.add_agent_capabilities()->set_type(
+        SlaveInfo::Capability::MULTI_ROLE);
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
@@ -1864,8 +1871,16 @@ void Slave::_run(
   // out of order.
   bool kill = false;
   foreach (const TaskInfo& _task, tasks) {
+    auto unallocated = [](const Resources& resources) {
+      Resources result = resources;
+      result.unallocate();
+      return result;
+    };
+
+    // We must unallocate the resources to check whether they are
+    // contained in the unallocated total checkpointed resources.
     Resources checkpointedTaskResources =
-      Resources(_task.resources()).filter(needCheckpointing);
+      unallocated(_task.resources()).filter(needCheckpointing);
 
     foreach (const Resource& resource, checkpointedTaskResources) {
       if (!checkpointedResources.contains(resource)) {
@@ -5209,6 +5224,68 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
   Option<ResourcesState> resourcesState = state->resources;
   Option<SlaveState> slaveState = state->slave;
 
+  // With the addition of frameworks with multiple roles, we
+  // need to inject the allocated role into each allocated
+  // `Resource` object that we've persisted. Note that we
+  // also must do this for MULTI_ROLE frameworks since they
+  // may have tasks that were present before the framework
+  // upgraded into MULTI_ROLE.
+  //
+  // TODO(bmahler): When can the `info` fields be None?
+  auto injectAllocationInfo = [](
+      RepeatedPtrField<Resource>* resources,
+      const FrameworkInfo& frameworkInfo) {
+    set<string> roles = protobuf::framework::getRoles(frameworkInfo);
+
+    for (int i = 0; i < resources->size(); ++i) {
+      Resource* resource = resources->Mutable(i);
+
+      if (!resource->has_allocation_info()) {
+        if (roles.size() != 1) {
+          LOG(FATAL) << "Missing 'Resource.AllocationInfo' for resources"
+                     << " allocated to MULTI_ROLE framework"
+                     << " '" << frameworkInfo.name() << "'";
+        }
+
+        resource->mutable_allocation_info()->set_role(*roles.begin());
+      }
+    }
+  };
+
+  // TODO(bmahler): We currently don't allow frameworks to
+  // change their roles so we do not need to re-persist the
+  // resources with `AllocationInfo` injected for existing
+  // tasks and executors.
+  if (slaveState.isSome()) {
+    foreachvalue (FrameworkState& frameworkState, slaveState->frameworks) {
+      if (!frameworkState.info.isSome()) {
+        continue;
+      }
+
+      foreachvalue (ExecutorState& executorState, frameworkState.executors) {
+        if (!executorState.info.isSome()) {
+          continue;
+        }
+
+        injectAllocationInfo(
+            executorState.info->mutable_resources(),
+            frameworkState.info.get());
+
+        foreachvalue (RunState& runState, executorState.runs) {
+          foreachvalue (TaskState& taskState, runState.tasks) {
+            if (!taskState.info.isSome()) {
+              continue;
+            }
+
+            injectAllocationInfo(
+                taskState.info->mutable_resources(),
+                frameworkState.info.get());
+          }
+        }
+      }
+    }
+  }
+
   // Recover checkpointed resources.
   // NOTE: 'resourcesState' is None if the slave rootDir does not
   // exist or the resources checkpoint file cannot be found.
@@ -5616,6 +5693,12 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     // rather than rejecting and crashing here.
     CHECK_EQ(oversubscribable.get(), oversubscribable->revocable());
 
+    auto unallocated = [](const Resources& resources) {
+      Resources result = resources;
+      result.unallocate();
+      return result;
+    };
+
     // Calculate the latest allocation of oversubscribed resources.
     // Note that this allocation value might be different from the
     // master's view because new task/executor might be in flight from
@@ -5625,7 +5708,7 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     Resources oversubscribed;
     foreachvalue (Framework* framework, frameworks) {
       foreachvalue (Executor* executor, framework->executors) {
-        oversubscribed += executor->resources.revocable();
+        oversubscribed += unallocated(executor->resources.revocable());
       }
     }
 


[9/9] mesos git commit: Update the tests to handle MULTI_ROLE support.

Posted by bm...@apache.org.
Update the tests to handle MULTI_ROLE support.

A number of tests and example frameworks assume that allocated
resources did not look different from unallocated resources.
This updates the tests to reflect the presence of
`Resource.AllocationInfo`.

Review: https://reviews.apache.org/r/55973


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c20744a9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c20744a9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c20744a9

Branch: refs/heads/master
Commit: c20744a9976b5e83698e9c6062218abb4d2e6b25
Parents: 7e97653
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 25 17:14:36 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 19:00:39 2017 -0800

----------------------------------------------------------------------
 src/examples/disk_full_framework.cpp            |  22 ++-
 src/examples/dynamic_reservation_framework.cpp  |  27 +--
 src/examples/no_executor_framework.cpp          |  30 +--
 src/examples/persistent_volume_framework.cpp    |   8 +-
 src/examples/test_framework.cpp                 |   7 +-
 src/examples/test_http_framework.cpp            |   7 +-
 src/tests/api_tests.cpp                         |  23 ++-
 .../containerizer/cgroups_isolator_tests.cpp    |   4 +-
 src/tests/hook_tests.cpp                        |  22 ++-
 src/tests/master_allocator_tests.cpp            |  44 +++--
 src/tests/master_tests.cpp                      |  14 +-
 src/tests/mesos.hpp                             |   7 +-
 src/tests/oversubscription_tests.cpp            |  35 ++--
 src/tests/partition_tests.cpp                   |   1 +
 src/tests/persistent_volume_endpoints_tests.cpp |  79 ++++++--
 src/tests/persistent_volume_tests.cpp           |  86 ++++++---
 src/tests/reservation_endpoints_tests.cpp       |  76 +++++---
 src/tests/reservation_tests.cpp                 | 186 ++++++++++++-------
 src/tests/resource_offers_tests.cpp             |   3 +-
 src/tests/role_tests.cpp                        |  25 ++-
 src/tests/slave_recovery_tests.cpp              |  11 +-
 21 files changed, 476 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/examples/disk_full_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/disk_full_framework.cpp b/src/examples/disk_full_framework.cpp
index e13d4c8..a73e6cf 100644
--- a/src/examples/disk_full_framework.cpp
+++ b/src/examples/disk_full_framework.cpp
@@ -111,8 +111,11 @@ class DiskFullSchedulerProcess
   : public process::Process<DiskFullSchedulerProcess>
 {
 public:
-  DiskFullSchedulerProcess (const Flags& _flags)
+  DiskFullSchedulerProcess (
+      const Flags& _flags,
+      const FrameworkInfo& _frameworkInfo)
     : flags(_flags),
+      frameworkInfo(_frameworkInfo),
       tasksLaunched(0),
       taskActive(false),
       isRegistered(false),
@@ -135,10 +138,11 @@ public:
       SchedulerDriver* driver,
       const std::vector<Offer>& offers)
   {
-    static const Resources TASK_RESOURCES = Resources::parse(
+    Resources taskResources = Resources::parse(
         "cpus:" + stringify(CPUS_PER_TASK) +
         ";mem:" + stringify(MEMORY_PER_TASK) +
         ";disk:" + stringify(DISK_PER_TASK.megabytes())).get();
+    taskResources.allocate(frameworkInfo.role());
 
     foreach (const Offer& offer, offers) {
       LOG(INFO) << "Received offer " << offer.id() << " from agent "
@@ -149,7 +153,7 @@ public:
 
       // If we've already launched the task, or if the offer is not
       // big enough, reject the offer.
-      if (taskActive || !resources.flatten().contains(TASK_RESOURCES)) {
+      if (taskActive || !resources.flatten().contains(taskResources)) {
         Filters filters;
         filters.set_refuse_seconds(600);
 
@@ -173,7 +177,7 @@ public:
       task.set_name("Disk full framework task");
       task.mutable_task_id()->set_value(stringify(taskId));
       task.mutable_slave_id()->MergeFrom(offer.slave_id());
-      task.mutable_resources()->CopyFrom(TASK_RESOURCES);
+      task.mutable_resources()->CopyFrom(taskResources);
       task.mutable_command()->set_shell(true);
       task.mutable_command()->set_value(command);
 
@@ -252,6 +256,8 @@ public:
 
 private:
   const Flags flags;
+  const FrameworkInfo frameworkInfo;
+
   int tasksLaunched;
   bool taskActive;
 
@@ -313,8 +319,8 @@ private:
 class DiskFullScheduler : public Scheduler
 {
 public:
-  DiskFullScheduler(const Flags& _flags)
-    : process(_flags)
+  DiskFullScheduler(const Flags& _flags, const FrameworkInfo& _frameworkInfo)
+    : process(_flags, _frameworkInfo)
   {
     process::spawn(process);
   }
@@ -426,13 +432,13 @@ int main(int argc, char** argv)
     EXIT(EXIT_FAILURE) << flags.usage(load.error());
   }
 
-  DiskFullScheduler scheduler(flags);
-
   FrameworkInfo framework;
   framework.set_user(""); // Have Mesos fill the current user.
   framework.set_name("Disk Full Framework (C++)");
   framework.set_checkpoint(true);
 
+  DiskFullScheduler scheduler(flags, framework);
+
   MesosSchedulerDriver* driver;
 
   // TODO(hartem): Refactor these into a common set of flags.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/examples/dynamic_reservation_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/dynamic_reservation_framework.cpp b/src/examples/dynamic_reservation_framework.cpp
index 4d3db96..c1650dc 100644
--- a/src/examples/dynamic_reservation_framework.cpp
+++ b/src/examples/dynamic_reservation_framework.cpp
@@ -62,7 +62,14 @@ public:
   {
     reservationInfo.set_principal(principal);
 
-    Try<Resources> flattened = TASK_RESOURCES.flatten(role, reservationInfo);
+    taskResources = Resources::parse(
+        "cpus:" + stringify(CPUS_PER_TASK) +
+        ";mem:" + stringify(MEM_PER_TASK)).get();
+
+    taskResources.allocate(role);
+
+    // The task will run on reserved resources.
+    Try<Resources> flattened = taskResources.flatten(role, reservationInfo);
     CHECK_SOME(flattened);
     taskResources = flattened.get();
   }
@@ -111,13 +118,16 @@ public:
           // the task'll be dispatched when reserved resources are re-offered
           // to this framework.
           Resources resources = offer.resources();
-          Resources unreserved = resources.unreserved();
-          if (!unreserved.contains(TASK_RESOURCES)) {
+          Offer::Operation reserve = RESERVE(taskResources);
+
+          Try<Resources> apply = resources.apply(reserve);
+          if (apply.isError()) {
             LOG(INFO) << "Failed to reserve resources for task in offer "
-                      << stringify(offer.id());
+                      << stringify(offer.id()) << ": " << apply.error();
             break;
           }
-          driver->acceptOffers({offer.id()}, {RESERVE(taskResources)}, filters);
+
+          driver->acceptOffers({offer.id()}, {reserve}, filters);
           states[offer.slave_id()] = State::RESERVING;
           break;
         }
@@ -293,8 +303,6 @@ private:
   Resource::ReservationInfo reservationInfo;
   Resources taskResources;
 
-  static const Resources TASK_RESOURCES;
-
   Offer::Operation RESERVE(Resources resources)
   {
     Offer::Operation operation;
@@ -313,11 +321,6 @@ private:
 };
 
 
-const Resources DynamicReservationScheduler::TASK_RESOURCES = Resources::parse(
-    "cpus:" + stringify(CPUS_PER_TASK) +
-    ";mem:" + stringify(MEM_PER_TASK)).get();
-
-
 class Flags : public virtual flags::FlagsBase
 {
 public:

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/examples/no_executor_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/no_executor_framework.cpp b/src/examples/no_executor_framework.cpp
index e82ae9a..77b7408 100644
--- a/src/examples/no_executor_framework.cpp
+++ b/src/examples/no_executor_framework.cpp
@@ -325,6 +325,20 @@ int main(int argc, char** argv)
                      " to enable authentication");
   }
 
+  FrameworkInfo framework;
+  framework.set_user(""); // Have Mesos fill in the current user.
+  framework.set_name("No Executor Framework");
+  framework.set_checkpoint(flags.checkpoint);
+
+  if (flags.task_revocable_resources.isSome()) {
+    framework.add_capabilities()->set_type(
+        FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+  }
+
+  if (flags.principal.isSome()) {
+    framework.set_principal(flags.principal.get());
+  }
+
   Try<Resources> resources =
     Resources::parse(flags.task_resources);
 
@@ -352,21 +366,9 @@ int main(int argc, char** argv)
     }
   }
 
-  logging::initialize(argv[0], flags, true); // Catch signals.
+  taskResources.allocate(framework.role());
 
-  FrameworkInfo framework;
-  framework.set_user(""); // Have Mesos fill in the current user.
-  framework.set_name("No Executor Framework");
-  framework.set_checkpoint(flags.checkpoint);
-
-  if (flags.task_revocable_resources.isSome()) {
-    framework.add_capabilities()->set_type(
-        FrameworkInfo::Capability::REVOCABLE_RESOURCES);
-  }
-
-  if (flags.principal.isSome()) {
-    framework.set_principal(flags.principal.get());
-  }
+  logging::initialize(argv[0], flags, true); // Catch signals.
 
   NoExecutorScheduler scheduler(
       framework,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/examples/persistent_volume_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/persistent_volume_framework.cpp b/src/examples/persistent_volume_framework.cpp
index 222018e..702dba3 100644
--- a/src/examples/persistent_volume_framework.cpp
+++ b/src/examples/persistent_volume_framework.cpp
@@ -57,7 +57,12 @@ using std::vector;
 // reserved resources.
 static Resources SHARD_INITIAL_RESOURCES(const string& role)
 {
-  return Resources::parse("cpus:0.1;mem:32;disk:16", role).get();
+  Resources allocation =
+    Resources::parse("cpus:0.1;mem:32;disk:16", role).get();
+
+  allocation.allocate(role);
+
+  return allocation;
 }
 
 
@@ -79,6 +84,7 @@ static Resource SHARD_PERSISTENT_VOLUME(
 
   Resource resource = Resources::parse("disk", "8", role).get();
   resource.mutable_disk()->CopyFrom(info);
+  resource.mutable_allocation_info()->set_role(role);
 
   if (isShared) {
     resource.mutable_shared();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/examples/test_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp
index 068b85d..05ddc89 100644
--- a/src/examples/test_framework.cpp
+++ b/src/examples/test_framework.cpp
@@ -85,16 +85,17 @@ public:
       cout << "Received offer " << offer.id() << " with " << offer.resources()
            << endl;
 
-      static const Resources TASK_RESOURCES = Resources::parse(
+      Resources taskResources = Resources::parse(
           "cpus:" + stringify(CPUS_PER_TASK) +
           ";mem:" + stringify(MEM_PER_TASK)).get();
+      taskResources.allocate(role);
 
       Resources remaining = offer.resources();
 
       // Launch tasks.
       vector<TaskInfo> tasks;
       while (tasksLaunched < totalTasks &&
-             remaining.flatten().contains(TASK_RESOURCES)) {
+             remaining.flatten().contains(taskResources)) {
         int taskId = tasksLaunched++;
 
         cout << "Launching task " << taskId << " using offer "
@@ -106,7 +107,7 @@ public:
         task.mutable_slave_id()->MergeFrom(offer.slave_id());
         task.mutable_executor()->MergeFrom(executor);
 
-        Try<Resources> flattened = TASK_RESOURCES.flatten(role);
+        Try<Resources> flattened = taskResources.flatten(role);
         CHECK_SOME(flattened);
         Option<Resources> resources = remaining.find(flattened.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/examples/test_http_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_framework.cpp b/src/examples/test_http_framework.cpp
index 258cb51..471835c 100644
--- a/src/examples/test_http_framework.cpp
+++ b/src/examples/test_http_framework.cpp
@@ -222,16 +222,17 @@ private:
            << Resources(offer.resources())
            << endl;
 
-      static const Resources TASK_RESOURCES = Resources::parse(
+      Resources taskResources = Resources::parse(
           "cpus:" + stringify(CPUS_PER_TASK) +
           ";mem:" + stringify(MEM_PER_TASK)).get();
+      taskResources.allocate(framework.role());
 
       Resources remaining = offer.resources();
 
       // Launch tasks.
       vector<TaskInfo> tasks;
       while (tasksLaunched < totalTasks &&
-             remaining.flatten().contains(TASK_RESOURCES)) {
+             remaining.flatten().contains(taskResources)) {
         int taskId = tasksLaunched++;
 
         cout << "Launching task " << taskId << " using offer "
@@ -244,7 +245,7 @@ private:
         task.mutable_agent_id()->MergeFrom(offer.agent_id());
         task.mutable_executor()->MergeFrom(executor);
 
-        Try<Resources> flattened = TASK_RESOURCES.flatten(framework.role());
+        Try<Resources> flattened = taskResources.flatten(framework.role());
         CHECK_SOME(flattened);
         Option<Resources> resources = remaining.find(flattened.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 4f43194..277fbe3 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -57,6 +57,7 @@
 #include "tests/allocator.hpp"
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
 
 #include "tests/containerizer/mock_containerizer.hpp"
 
@@ -894,8 +895,11 @@ TEST_P(MasterAPITest, GetRoles)
   ASSERT_EQ(2, v1Response->get_roles().roles().size());
   EXPECT_EQ("role1", v1Response->get_roles().roles(1).name());
   EXPECT_EQ(2.5, v1Response->get_roles().roles(1).weight());
-  ASSERT_EQ(v1::Resources::parse(slaveFlags.resources.get()).get(),
-            v1Response->get_roles().roles(1).resources());
+  ASSERT_EQ(
+      allocatedResources(
+          devolve(v1::Resources::parse(slaveFlags.resources.get()).get()),
+          "role1"),
+      devolve(v1Response->get_roles().roles(1).resources()));
 
   driver.stop();
   driver.join();
@@ -973,7 +977,8 @@ TEST_P(MasterAPITest, ReserveResources)
   ASSERT_EQ(1u, offers->size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -1006,7 +1011,8 @@ TEST_P(MasterAPITest, ReserveResources)
   ASSERT_EQ(1u, offers->size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1083,7 +1089,8 @@ TEST_P(MasterAPITest, UnreserveResources)
   ASSERT_EQ(1u, offers->size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -1116,7 +1123,8 @@ TEST_P(MasterAPITest, UnreserveResources)
   offer = offers.get()[0];
 
   // Verifies if the resources are unreserved.
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -2022,7 +2030,8 @@ TEST_P(MasterAPITest, CreateAndDestroyVolumes)
   EXPECT_NE(0u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   Resources taskResources = Resources::parse(
       "disk:256",

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/containerizer/cgroups_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/cgroups_isolator_tests.cpp b/src/tests/containerizer/cgroups_isolator_tests.cpp
index ba268b6..b86716d 100644
--- a/src/tests/containerizer/cgroups_isolator_tests.cpp
+++ b/src/tests/containerizer/cgroups_isolator_tests.cpp
@@ -28,6 +28,7 @@
 
 #include "tests/mesos.hpp"
 #include "tests/mock_slave.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/script.hpp"
 
 #include "tests/containerizer/docker_archive.hpp"
@@ -304,7 +305,8 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_RevocableCpu)
   // Now the framework will get revocable resources.
   AWAIT_READY(offers2);
   EXPECT_NE(0u, offers2.get().size());
-  EXPECT_EQ(cpus, Resources(offers2.get()[0].resources()));
+  EXPECT_EQ(allocatedResources(cpus, frameworkInfo.role()),
+            Resources(offers2.get()[0].resources()));
 
   TaskInfo task = createTask(
       offers2.get()[0].slave_id(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/hook_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hook_tests.cpp b/src/tests/hook_tests.cpp
index f4ef629..237df81 100644
--- a/src/tests/hook_tests.cpp
+++ b/src/tests/hook_tests.cpp
@@ -51,6 +51,7 @@
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 #include "tests/mock_docker.hpp"
+#include "tests/resources_utils.hpp"
 
 using namespace mesos::modules;
 
@@ -1106,12 +1107,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
 
   Resources resources = offers.get()[0].resources();
 
-  // The test hook sets "cpus" to 4.
-  EXPECT_EQ(4, resources.cpus().get());
-
-  // The test hook adds a resource named "foo" of type set with values "bar"
-  // and "baz".
-  EXPECT_EQ(Resources::parse("foo:{bar,baz}").get(), resources.get("foo"));
+  // The test hook sets "cpus" to 4 and adds a resource named
+  // "foo" of type set with values "bar" and "baz".
+  //
+  // TODO(bmahler): Avoid the need for non-local reasoning here
+  // about the test hook. E.g. Expose the resources from the test
+  // hook and use them here.
+  const size_t TEST_HOOK_CPUS = 4;
+  const Resources TEST_HOOK_ADDITIONAL_RESOURCES =
+    Resources::parse("foo:{bar,baz}").get();
+
+  EXPECT_EQ(TEST_HOOK_CPUS, resources.cpus().get());
+
+  const string allocationRole = DEFAULT_FRAMEWORK_INFO.role();
+  EXPECT_TRUE(resources.contains(
+      allocatedResources(TEST_HOOK_ADDITIONAL_RESOURCES, allocationRole)));
 
   // The test hook does not modify "mem", the default value must still be
   // present.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index d22862d..25c67d3 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -50,6 +50,7 @@
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 #include "tests/module.hpp"
+#include "tests/resources_utils.hpp"
 
 using google::protobuf::RepeatedPtrField;
 
@@ -817,7 +818,9 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost)
   AWAIT_READY(resourceOffers);
 
   EXPECT_EQ(Resources(resourceOffers.get()[0].resources()),
-            Resources::parse(flags2.resources.get()).get());
+            allocatedResources(
+                Resources::parse(flags2.resources.get()).get(),
+                DEFAULT_FRAMEWORK_INFO.role()));
 
   // Shut everything down.
   EXPECT_CALL(allocator, recoverResources(_, _, _, _))
@@ -1604,8 +1607,9 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   vector<Owned<cluster::Slave>> slaves;
 
   // Register three agents with the same resources.
-  string agentResources = "cpus:2;gpus:0;mem:1024;"
-                          "disk:4096;ports:[31000-32000]";
+  const Resources agentResources = Resources::parse(
+      "cpus:2;gpus:0;mem:1024;disk:4096;ports:[31000-32000]").get();
+
   for (int i = 0; i < 3; i++) {
     Future<Nothing> addSlave;
     EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
@@ -1613,7 +1617,7 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
                       FutureSatisfy(&addSlave)));
 
     slave::Flags flags = this->CreateSlaveFlags();
-    flags.resources = agentResources;
+    flags.resources = stringify(agentResources);
 
     Try<Owned<cluster::Slave>> slave = this->StartSlave(detector.get(), flags);
     ASSERT_SOME(slave);
@@ -1656,7 +1660,7 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   ASSERT_EQ(3u, framework1offers1.get().size());
   for (int i = 0; i < 3; i++) {
     EXPECT_EQ(Resources(framework1offers1.get()[i].resources()),
-              Resources::parse(agentResources).get());
+              allocatedResources(agentResources, "role1"));
   }
 
   // Framework2 registers with 'role2' which also uses the default weight.
@@ -1718,15 +1722,29 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   // 'updateWeights' will rescind all outstanding offers and the rescinded
   // offer resources will only be available to the updated weights once
   // another allocation is invoked.
+  //
+  // TODO(bmahler): This lambda is copied in several places
+  // in the code, consider how best to pull this out.
+  auto unallocated = [](const Resources& resources) {
+    Resources result = resources;
+    result.unallocate();
+    return result;
+  };
+
   AWAIT_READY(recoverResources1);
-  EXPECT_EQ(recoverResources1.get(),
-            Resources::parse(agentResources).get());
+  EXPECT_EQ(agentResources, unallocated(recoverResources1.get()));
+
   AWAIT_READY(recoverResources2);
-  EXPECT_EQ(recoverResources2.get(),
-            Resources::parse(agentResources).get());
+  EXPECT_EQ(agentResources, unallocated(recoverResources2.get()));
+
   AWAIT_READY(recoverResources3);
-  EXPECT_EQ(recoverResources3.get(),
-            Resources::parse(agentResources).get());
+  EXPECT_EQ(agentResources, unallocated(recoverResources3.get()));
+
+  Resources totalRecovered =
+    recoverResources1.get() + recoverResources2.get() + recoverResources3.get();
+  totalRecovered.unallocate();
+
+  EXPECT_EQ(agentResources + agentResources + agentResources, totalRecovered);
 
   // Trigger a batch allocation to make sure all resources are
   // offered out again.
@@ -1743,7 +1761,7 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   AWAIT_READY(framework1offers2);
   ASSERT_EQ(1u, framework1offers2.get().size());
   EXPECT_EQ(Resources(framework1offers2.get()[0].resources()),
-            Resources::parse(agentResources).get());
+            allocatedResources(agentResources, "role1"));
 
   ASSERT_EQ(2u, framework2offers.size());
   for (int i = 0; i < 2; i++) {
@@ -1752,7 +1770,7 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
     // All offers for framework2 are enqueued by now.
     AWAIT_READY(offer);
     EXPECT_EQ(Resources(offer->resources()),
-              Resources::parse(agentResources).get());
+              allocatedResources(agentResources, "role2"));
   }
 
   driver1.stop();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index da7094d..3b4123b 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -70,6 +70,7 @@
 #include "tests/containerizer.hpp"
 #include "tests/limiter.hpp"
 #include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 using mesos::internal::master::Master;
@@ -842,8 +843,9 @@ TEST_F(MasterTest, RecoverResources)
   ExecutorInfo executorInfo;
   executorInfo.MergeFrom(DEFAULT_EXECUTOR_INFO);
 
-  Resources executorResources =
-    Resources::parse("cpus:0.3;mem:200;ports:[5-8, 23-25]").get();
+  Resources executorResources = allocatedResources(
+      Resources::parse("cpus:0.3;mem:200;ports:[5-8, 23-25]").get(),
+      DEFAULT_FRAMEWORK_INFO.role());
   executorInfo.mutable_resources()->MergeFrom(executorResources);
 
   TaskID taskId;
@@ -924,8 +926,10 @@ TEST_F(MasterTest, RecoverResources)
 
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
+
   Resources slaveResources = Resources::parse(flags.resources.get()).get();
-  EXPECT_EQ(slaveResources, offers.get()[0].resources());
+  EXPECT_EQ(allocatedResources(slaveResources, DEFAULT_FRAMEWORK_INFO.role()),
+            offers.get()[0].resources());
 
   driver.stop();
   driver.join();
@@ -3371,7 +3375,9 @@ TEST_F(MasterTest, IgnoreEphemeralPortsResource)
 
   EXPECT_EQ(
       Resources(offers.get()[0].resources()),
-      Resources::parse(resourcesWithoutEphemeralPorts).get());
+      allocatedResources(
+          Resources::parse(resourcesWithoutEphemeralPorts).get(),
+          DEFAULT_FRAMEWORK_INFO.role()));
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ff83a9c..b450a04 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1558,14 +1558,15 @@ ACTION_P5(LaunchTasks, executor, tasks, cpus, mem, role)
   for (size_t i = 0; i < offers.size(); i++) {
     const Offer& offer = offers[i];
 
-    const Resources TASK_RESOURCES = Resources::parse(
+    Resources taskResources = Resources::parse(
         "cpus:" + stringify(cpus) + ";mem:" + stringify(mem)).get();
+    taskResources.allocate(role);
 
     int nextTaskId = 0;
     std::vector<TaskInfo> tasks;
     Resources remaining = offer.resources();
 
-    while (remaining.flatten().contains(TASK_RESOURCES) &&
+    while (remaining.flatten().contains(taskResources) &&
            launched < numTasks) {
       TaskInfo task;
       task.set_name("TestTask");
@@ -1574,7 +1575,7 @@ ACTION_P5(LaunchTasks, executor, tasks, cpus, mem, role)
       task.mutable_executor()->MergeFrom(executor);
 
       Option<Resources> resources =
-        remaining.find(TASK_RESOURCES.flatten(role).get());
+        remaining.find(taskResources.flatten(role).get());
 
       CHECK_SOME(resources);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 167beaf..e57fcc6 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -50,6 +50,7 @@
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 #include "tests/mock_slave.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 using namespace process;
@@ -398,9 +399,13 @@ TEST_F(OversubscriptionTest, RevocableOffer)
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   // Inject an estimation of oversubscribable cpu resources.
+  estimations.put(createRevocableResources("cpus", "2"));
+
   Resources taskResources = createRevocableResources("cpus", "1");
+  taskResources.allocate(framework.role());
+
   Resources executorResources = createRevocableResources("cpus", "1");
-  estimations.put(taskResources + executorResources);
+  executorResources.allocate(framework.role());
 
   // Now the framework will get revocable resources.
   AWAIT_READY(offers2);
@@ -506,7 +511,8 @@ TEST_F(OversubscriptionTest, RescindRevocableOfferWithIncreasedRevocable)
   EXPECT_EQ(1u, offers.size());
   Future<Offer> offer = offers.get();
   AWAIT_READY(offer);
-  EXPECT_EQ(resources1, Resources(offer->resources()));
+  EXPECT_EQ(allocatedResources(resources1, framework.role()),
+            Resources(offer->resources()));
 
   Future<OfferID> offerId;
   EXPECT_CALL(sched, offerRescinded(&driver, _))
@@ -543,7 +549,7 @@ TEST_F(OversubscriptionTest, RescindRevocableOfferWithIncreasedRevocable)
   }
 
   // The offered resources should match the resource estimate.
-  EXPECT_EQ(resources2, resources3);
+  EXPECT_EQ(allocatedResources(resources2, framework.role()), resources3);
 
   driver.stop();
   driver.join();
@@ -619,7 +625,8 @@ TEST_F(OversubscriptionTest, RescindRevocableOfferWithDecreasedRevocable)
   // Now the framework will get revocable resources.
   AWAIT_READY(offers2);
   EXPECT_NE(0u, offers2->size());
-  EXPECT_EQ(resources1, Resources(offers2.get()[0].resources()));
+  EXPECT_EQ(allocatedResources(resources1, framework.role()),
+            Resources(offers2.get()[0].resources()));
 
   Future<OfferID> offerId;
   EXPECT_CALL(sched, offerRescinded(&driver, _))
@@ -649,7 +656,8 @@ TEST_F(OversubscriptionTest, RescindRevocableOfferWithDecreasedRevocable)
   // The new offer should include the latest oversubscribed resources.
   AWAIT_READY(offers3);
   EXPECT_NE(0u, offers3->size());
-  EXPECT_EQ(resources2, Resources(offers3.get()[0].resources()));
+  EXPECT_EQ(allocatedResources(resources2, framework.role()),
+            Resources(offers3.get()[0].resources()));
 
   driver.stop();
   driver.join();
@@ -1274,13 +1282,12 @@ TEST_F(OversubscriptionTest, UpdateAllocatorOnSchedulerFailover)
   EXPECT_CALL(sched2, resourceOffers(&driver2, _))
     .WillOnce(FutureArg<1>(&offers2));
 
-  Resources taskResources = createRevocableResources("cpus", "1");
-  Resources executorResources = createRevocableResources("cpus", "1");
-  estimations.put(taskResources + executorResources);
+  Resources revocable = createRevocableResources("cpus", "2");
+  estimations.put(revocable);
 
   AWAIT_READY(offers2);
   EXPECT_NE(0u, offers2.get().size());
-  EXPECT_EQ(taskResources + executorResources,
+  EXPECT_EQ(allocatedResources(revocable, framework2.role()),
             Resources(offers2.get()[0].resources()));
 
   EXPECT_EQ(DRIVER_STOPPED, driver2.stop());
@@ -1347,16 +1354,14 @@ TEST_F(OversubscriptionTest, RemoveCapabilitiesOnSchedulerFailover)
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   // Inject an estimation of oversubscribable cpu resources.
-  Resources taskResources = createRevocableResources("cpus", "1");
-  Resources executorResources = createRevocableResources("cpus", "1");
-  estimations.put(taskResources + executorResources);
+  Resources revocable = createRevocableResources("cpus", "2");
+  estimations.put(revocable);
 
   // Now the framework will get revocable resources.
   AWAIT_READY(offers2);
   EXPECT_NE(0u, offers2.get().size());
-  EXPECT_EQ(
-      taskResources + executorResources,
-      Resources(offers2.get()[0].resources()));
+  EXPECT_EQ(allocatedResources(revocable, framework1.role()),
+            Resources(offers2.get()[0].resources()));
 
   // Reregister the framework with removal of revocable resources capability.
   FrameworkInfo framework2 = DEFAULT_FRAMEWORK_INFO;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index f03c5bd..105157d 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -869,6 +869,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   Offer offer = offers.get()[0];
 
   Resources taskResources = Resources::parse("cpus:1;mem:512").get();
+  taskResources.allocate(DEFAULT_FRAMEWORK_INFO.role());
 
   EXPECT_TRUE(Resources(offer.resources()).contains(taskResources));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 695029f..ec8df33 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -38,6 +38,7 @@
 
 #include "tests/allocator.hpp"
 #include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 using std::string;
@@ -169,7 +170,8 @@ TEST_F(PersistentVolumeEndpointsTest, StaticReservation)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   Future<OfferID> rescindedOfferId;
 
@@ -196,7 +198,8 @@ TEST_F(PersistentVolumeEndpointsTest, StaticReservation)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_FALSE(Resources(offer.resources()).contains(volume));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -262,7 +265,8 @@ TEST_F(PersistentVolumeEndpointsTest, DynamicReservation)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   Future<OfferID> rescindedOfferId;
 
@@ -298,7 +302,8 @@ TEST_F(PersistentVolumeEndpointsTest, DynamicReservation)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .WillOnce(FutureArg<1>(&rescindedOfferId));
@@ -379,7 +384,8 @@ TEST_F(PersistentVolumeEndpointsTest, DynamicReservationRoleMismatch)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   ASSERT_NE(frameworkInfo.role(), "role2");
   Resources volume = createPersistentVolume(
@@ -896,7 +902,8 @@ TEST_F(PersistentVolumeEndpointsTest, GoodCreateAndDestroyACL)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   Future<OfferID> rescindedOfferId;
 
@@ -926,7 +933,8 @@ TEST_F(PersistentVolumeEndpointsTest, GoodCreateAndDestroyACL)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_FALSE(Resources(offer.resources()).contains(volume));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1124,7 +1132,8 @@ TEST_F(PersistentVolumeEndpointsTest, BadCreateAndDestroyACL)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   // The failed destruction attempt.
   Future<Response> destroyResponse = process::http::post(
@@ -1331,7 +1340,8 @@ TEST_F(PersistentVolumeEndpointsTest, GoodCreateAndDestroyACLBadCredential)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   // The failed destruction attempt.
   Future<Response> destroyResponse = process::http::post(
@@ -1603,7 +1613,8 @@ TEST_F(PersistentVolumeEndpointsTest, OfferCreateThenEndpointRemove)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // Create a 1MB persistent volume.
   Resources volume = createPersistentVolume(
@@ -1627,7 +1638,8 @@ TEST_F(PersistentVolumeEndpointsTest, OfferCreateThenEndpointRemove)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   Future<OfferID> rescindedOfferId;
 
@@ -1655,7 +1667,8 @@ TEST_F(PersistentVolumeEndpointsTest, OfferCreateThenEndpointRemove)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .WillOnce(FutureArg<1>(&rescindedOfferId));
@@ -1681,7 +1694,8 @@ TEST_F(PersistentVolumeEndpointsTest, OfferCreateThenEndpointRemove)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1765,7 +1779,8 @@ TEST_F(PersistentVolumeEndpointsTest, EndpointCreateThenOfferRemove)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   // We use the filter explicitly here so that the resources will not
   // be filtered for 5 seconds (the default).
@@ -1786,7 +1801,8 @@ TEST_F(PersistentVolumeEndpointsTest, EndpointCreateThenOfferRemove)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -1800,7 +1816,8 @@ TEST_F(PersistentVolumeEndpointsTest, EndpointCreateThenOfferRemove)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -2018,7 +2035,8 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   Resources taskUnreserved = Resources::parse("cpus:1;mem:256").get();
   Resources taskResources = taskUnreserved.flatten(
@@ -2140,6 +2158,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
       R"~(
       [
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "cpus",
           "reservation": {
             "principal": "test-principal"
@@ -2151,6 +2172,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "mem",
           "reservation": {
             "principal": "test-principal"
@@ -2169,6 +2193,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
       R"~(
       [
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "disk": {
             "persistence": {
               "id": "id1",
@@ -2190,6 +2217,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "mem",
           "reservation": {
             "principal": "test-principal"
@@ -2201,6 +2231,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "disk",
           "reservation": {
             "principal": "test-principal"
@@ -2212,6 +2245,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "cpus",
           "role": "*",
           "scalar": {
@@ -2220,6 +2256,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "mem",
           "role": "*",
           "scalar": {
@@ -2228,6 +2267,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "disk",
           "role": "*",
           "scalar": {
@@ -2236,6 +2278,9 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
           "type": "SCALAR"
         },
         {
+          "allocation_info": {
+            "role": "role1"
+          },
           "name": "ports",
           "ranges": {
             "range": [

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 842e113..7ac8286 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -54,6 +54,7 @@
 #include "tests/environment.hpp"
 #include "tests/mesos.hpp"
 #include "tests/mock_slave.hpp"
+#include "tests/resources_utils.hpp"
 
 using namespace process;
 
@@ -373,8 +374,10 @@ TEST_P(PersistentVolumeTest, CreateAndDestroyPersistentVolumes)
   offer = offers.get()[0];
 
   // Expect that the offer contains the persistent volumes we created.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume1));
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume2));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume1, frameworkInfo.role())));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume2, frameworkInfo.role())));
 
   // Destroy `volume1`.
   driver.acceptOffers(
@@ -639,7 +642,8 @@ TEST_P(PersistentVolumeTest, MasterFailover)
 
   Offer offer2 = offers2.get()[0];
 
-  EXPECT_TRUE(Resources(offer2.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer2.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -864,7 +868,8 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
 
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   Future<CheckpointResourcesMessage> checkpointMessage =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
@@ -1119,7 +1124,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
 
   Offer offer2 = offers2.get()[0];
 
-  EXPECT_TRUE(Resources(offer2.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer2.resources()).contains(
+      allocatedResources(volume, frameworkInfo2.role())));
 
   // 4. framework1 kills the task which results in an offer to framework1
   //    with the shared volume. At this point, both frameworks will have
@@ -1141,7 +1147,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
 
   offer1 = offers1.get()[0];
 
-  EXPECT_TRUE(Resources(offer1.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer1.resources()).contains(
+      allocatedResources(volume, frameworkInfo1.role())));
 
   // 5. DESTROY the shared volume via framework2 which would result in
   //    framework1 being rescinded the offer.
@@ -1282,8 +1289,11 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
 
   // Verify the offer from the failed over master.
   EXPECT_TRUE(Resources(offer2.resources()).contains(
-      Resources::parse("cpus:1;mem:1024").get()));
-  EXPECT_TRUE(Resources(offer2.resources()).contains(volume));
+      allocatedResources(
+          Resources::parse("cpus:1;mem:1024").get(),
+          frameworkInfo.role())));
+  EXPECT_TRUE(Resources(offer2.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1434,11 +1444,19 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
 
   offer = offers.get()[0];
 
+  // TODO(bmahler): This lambda is copied in several places
+  // in the code, consider how best to pull this out.
+  auto unallocated = [](const Resources& resources) {
+    Resources result = resources;
+    result.unallocate();
+    return result;
+  };
+
   // Check that the persistent volumes are offered back. The shared volume
   // is offered since it can be used in multiple tasks; the non-shared
   // volume is offered since there are no tasks using it.
-  EXPECT_TRUE(Resources(offer.resources()).contains(sharedVolume));
-  EXPECT_TRUE(Resources(offer.resources()).contains(nonSharedVolume));
+  EXPECT_TRUE(unallocated(offer.resources()).contains(sharedVolume));
+  EXPECT_TRUE(unallocated(offer.resources()).contains(nonSharedVolume));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -1459,8 +1477,8 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
 
   // Check that the shared persistent volume is in the offer, but the
   // non-shared volume is not in the offer.
-  EXPECT_TRUE(Resources(offer.resources()).contains(sharedVolume));
-  EXPECT_FALSE(Resources(offer.resources()).contains(nonSharedVolume));
+  EXPECT_TRUE(unallocated(offer.resources()).contains(sharedVolume));
+  EXPECT_FALSE(unallocated(offer.resources()).contains(nonSharedVolume));
 
   // We kill the long-lived task and wait for TASK_KILLED, so we can
   // DESTROY the persistent volume once the task terminates.
@@ -1586,7 +1604,15 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleIterations)
 
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume1));
+  // TODO(bmahler): This lambda is copied in several places
+  // in the code, consider how best to pull this out.
+  auto unallocated = [](const Resources& resources) {
+    Resources result = resources;
+    result.unallocate();
+    return result;
+  };
+
+  EXPECT_TRUE(unallocated(offer.resources()).contains(volume1));
 
   // 3. The framework CREATEs the 2nd shared volume, and LAUNCHes a task
   //    using this shared volume.
@@ -1622,8 +1648,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleIterations)
 
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume1));
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume2));
+  EXPECT_TRUE(unallocated(offer.resources()).contains(volume1));
+  EXPECT_TRUE(unallocated(offer.resources()).contains(volume2));
 
   driver.stop();
   driver.join();
@@ -1863,7 +1889,8 @@ TEST_P(PersistentVolumeTest, GoodACLCreateThenDestroy)
   offer = offers.get()[0];
 
   // Check that the persistent volume was created successfully.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
   EXPECT_TRUE(os::exists(slave::paths::getPersistentVolumePath(
       slaveFlags.work_dir,
       volume)));
@@ -1894,7 +1921,8 @@ TEST_P(PersistentVolumeTest, GoodACLCreateThenDestroy)
   offer = offers.get()[0];
 
   // Check that the persistent volume is not in the offer.
-  EXPECT_FALSE(Resources(offer.resources()).contains(volume));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -2011,7 +2039,8 @@ TEST_P(PersistentVolumeTest, GoodACLNoPrincipal)
   offer = offers.get()[0];
 
   // Check that the persistent volume was successfully created.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
   EXPECT_TRUE(os::exists(slave::paths::getPersistentVolumePath(
       slaveFlags.work_dir,
       volume)));
@@ -2040,7 +2069,8 @@ TEST_P(PersistentVolumeTest, GoodACLNoPrincipal)
   offer = offers.get()[0];
 
   // Check that the persistent volume was not created
-  EXPECT_FALSE(Resources(offer.resources()).contains(volume));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
   EXPECT_FALSE(
       Resources(checkpointResources2.get().resources()).contains(volume));
 
@@ -2159,7 +2189,8 @@ TEST_P(PersistentVolumeTest, BadACLNoPrincipal)
     offer = offers.get()[0];
 
     // Check that the persistent volume is not contained in this offer.
-    EXPECT_FALSE(Resources(offer.resources()).contains(volume));
+    EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo1.role())));
   }
 
   // Decline the offer and suppress so the second
@@ -2214,7 +2245,8 @@ TEST_P(PersistentVolumeTest, BadACLNoPrincipal)
   offer = offers.get()[0];
 
   // Check that the persistent volume is contained in this offer.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo2.role())));
 
   // Decline and suppress offers to `driver2` so that
   // `driver1` can receive an offer.
@@ -2259,7 +2291,8 @@ TEST_P(PersistentVolumeTest, BadACLNoPrincipal)
   // TODO(greggomann): In addition to checking that the volume is contained in
   // the offer, we should also confirm that the Destroy operation failed for the
   // correct reason. See MESOS-5470.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo1.role())));
 
   driver1.stop();
   driver1.join();
@@ -2378,7 +2411,8 @@ TEST_P(PersistentVolumeTest, BadACLDropCreateAndDestroy)
     offer = offers.get()[0];
 
     // Check that the persistent volume is not contained in this offer.
-    EXPECT_FALSE(Resources(offer.resources()).contains(volume));
+    EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo1.role())));
   }
 
   // Decline the offer and suppress so the second
@@ -2433,7 +2467,8 @@ TEST_P(PersistentVolumeTest, BadACLDropCreateAndDestroy)
   offer = offers.get()[0];
 
   // Check that the persistent volume is contained in this offer.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo2.role())));
 
   // Decline and suppress offers to `driver2` so that
   // `driver1` can receive an offer.
@@ -2478,7 +2513,8 @@ TEST_P(PersistentVolumeTest, BadACLDropCreateAndDestroy)
   // TODO(greggomann): In addition to checking that the volume is contained in
   // the offer, we should also confirm that the Destroy operation failed for the
   // correct reason. See MESOS-5470.
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo1.role())));
 
   driver1.stop();
   driver1.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index 7bc59c2..7432d75 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -36,6 +36,7 @@
 
 #include "tests/allocator.hpp"
 #include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 using std::string;
@@ -154,7 +155,8 @@ TEST_F(ReservationEndpointsTest, AvailableResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   Future<Nothing> recoverResources;
   EXPECT_CALL(allocator, recoverResources(_, _, _, _))
@@ -186,7 +188,8 @@ TEST_F(ReservationEndpointsTest, AvailableResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // Ignore subsequent `recoverResources` calls triggered from recovering the
   // resources that this framework is currently holding onto.
@@ -243,7 +246,8 @@ TEST_F(ReservationEndpointsTest, ReserveOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -264,7 +268,8 @@ TEST_F(ReservationEndpointsTest, ReserveOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -324,7 +329,8 @@ TEST_F(ReservationEndpointsTest, UnreserveOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -345,7 +351,8 @@ TEST_F(ReservationEndpointsTest, UnreserveOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -412,7 +419,8 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(available + offered));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(available + offered, frameworkInfo.role())));
 
   // Launch a task on the 'available' resources portion of the offer, which
   // recovers 'offered' resources portion.
@@ -446,7 +454,8 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(offered));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(offered, frameworkInfo.role())));
 
   // Kill the task running on 'available' resources to make it available.
   EXPECT_CALL(sched, statusUpdate(_, _));
@@ -461,7 +470,8 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   driver.killTask(taskInfo.task_id());
 
   AWAIT_READY(availableResources);
-  EXPECT_TRUE(availableResources.get().contains(available));
+  EXPECT_TRUE(availableResources.get().contains(
+      allocatedResources(available, frameworkInfo.role())));
 
   // At this point, we have 'available' resources in the allocator, and
   // 'offered' resources offered to the framework.
@@ -490,7 +500,8 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // Ignore subsequent `recoverResources` calls triggered from recovering the
   // resources that this framework is currently holding onto.
@@ -575,7 +586,8 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(available + offered));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(available + offered, frameworkInfo.role())));
 
   // Launch a task on the 'available' resources portion of the offer, which
   // recovers 'offered' resources portion.
@@ -609,7 +621,8 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(offered));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(offered, frameworkInfo.role())));
 
   // Kill the task running on 'available' resources to make it available.
   EXPECT_CALL(sched, statusUpdate(_, _));
@@ -624,7 +637,8 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   driver.killTask(taskInfo.task_id());
 
   AWAIT_READY(availableResources);
-  EXPECT_TRUE(availableResources.get().contains(available));
+  EXPECT_TRUE(availableResources.get().contains(
+      allocatedResources(available, frameworkInfo.role())));
 
   // At this point, we have 'available' resources in the allocator, and
   // 'offered' resources offered to the framework.
@@ -653,7 +667,8 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // Ignore subsequent `recoverResources` calls triggered from recovering the
   // resources that this framework is currently holding onto.
@@ -742,8 +757,10 @@ TEST_F(ReservationEndpointsTest, LabeledResources)
   Offer offer = offers.get()[0];
 
   Resources offeredResources = Resources(offer.resources());
-  EXPECT_TRUE(offeredResources.contains(labeledResources1));
-  EXPECT_TRUE(offeredResources.contains(labeledResources2));
+  EXPECT_TRUE(offeredResources.contains(
+      allocatedResources(labeledResources1, frameworkInfo.role())));
+  EXPECT_TRUE(offeredResources.contains(
+      allocatedResources(labeledResources2, frameworkInfo.role())));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -766,10 +783,14 @@ TEST_F(ReservationEndpointsTest, LabeledResources)
   offer = offers.get()[0];
 
   offeredResources = Resources(offer.resources());
-  EXPECT_FALSE(offeredResources.contains(totalSlaveResources));
-  EXPECT_TRUE(offeredResources.contains(unreserved));
-  EXPECT_FALSE(offeredResources.contains(labeledResources1));
-  EXPECT_TRUE(offeredResources.contains(labeledResources2));
+  EXPECT_FALSE(offeredResources.contains(
+      allocatedResources(totalSlaveResources, frameworkInfo.role())));
+  EXPECT_TRUE(offeredResources.contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
+  EXPECT_FALSE(offeredResources.contains(
+      allocatedResources(labeledResources1, frameworkInfo.role())));
+  EXPECT_TRUE(offeredResources.contains(
+      allocatedResources(labeledResources2, frameworkInfo.role())));
 
   // Now that the first labeled reservation has been unreserved,
   // attempting to unreserve it again should fail.
@@ -803,9 +824,12 @@ TEST_F(ReservationEndpointsTest, LabeledResources)
 
   offeredResources = Resources(offer.resources());
 
-  EXPECT_TRUE(offeredResources.contains(totalSlaveResources));
-  EXPECT_FALSE(offeredResources.contains(labeledResources1));
-  EXPECT_FALSE(offeredResources.contains(labeledResources2));
+  EXPECT_TRUE(offeredResources.contains(
+      allocatedResources(totalSlaveResources, frameworkInfo.role())));
+  EXPECT_FALSE(offeredResources.contains(
+      allocatedResources(labeledResources1, frameworkInfo.role())));
+  EXPECT_FALSE(offeredResources.contains(
+      allocatedResources(labeledResources2, frameworkInfo.role())));
 
   // Ignore subsequent `recoverResources` calls triggered from recovering the
   // resources that this framework is currently holding onto.
@@ -1598,8 +1622,10 @@ TEST_F(ReservationEndpointsTest, DifferentPrincipalsSameRole)
   Offer offer = offers.get()[0];
   Resources resources = Resources(offer.resources());
 
-  EXPECT_TRUE(resources.contains(dynamicallyReserved1));
-  EXPECT_TRUE(resources.contains(dynamicallyReserved2));
+  EXPECT_TRUE(resources.contains(
+      allocatedResources(dynamicallyReserved1, frameworkInfo.role())));
+  EXPECT_TRUE(resources.contains(
+      allocatedResources(dynamicallyReserved2, frameworkInfo.role())));
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index b7061de..309ce8b 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -48,6 +48,7 @@
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 #include "tests/mock_slave.hpp"
+#include "tests/resources_utils.hpp"
 
 using mesos::internal::master::allocator::HierarchicalDRFAllocator;
 
@@ -135,7 +136,8 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -150,7 +152,8 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -165,7 +168,8 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -226,7 +230,8 @@ TEST_F(ReservationTest, ReserveTwiceWithDoubleValue)
   Offer offer = offers.get()[0];
 
   // In the first offer, expect an offer with unreserved resources.
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -241,7 +246,8 @@ TEST_F(ReservationTest, ReserveTwiceWithDoubleValue)
   offer = offers.get()[0];
 
   // In the second offer, expect an offer with reserved resources.
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -265,7 +271,8 @@ TEST_F(ReservationTest, ReserveTwiceWithDoubleValue)
         frameworkInfo.role(),
         createReservationInfo(frameworkInfo.principal())).get();
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(finalReservation));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(finalReservation, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -330,7 +337,8 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // Create a task.
   TaskInfo taskInfo =
@@ -360,7 +368,8 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -380,7 +389,8 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
@@ -451,7 +461,8 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo1.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
@@ -471,7 +482,8 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo1.role())));
 
   // The filter to decline the offer "forever".
   Filters filtersForever;
@@ -496,7 +508,8 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo1.role())));
 
   driver1.stop();
   driver1.join();
@@ -563,7 +576,8 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -591,7 +605,8 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -655,7 +670,8 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(staticallyReserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -680,7 +696,8 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(staticallyReserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -743,7 +760,8 @@ TEST_F(ReservationTest, SendingCheckpointResourcesMessage)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved2));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved1 + unreserved2, frameworkInfo.role())));
 
   Future<CheckpointResourcesMessage> message3 =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
@@ -839,7 +857,8 @@ TEST_F(ReservationTest, ResourcesCheckpointing)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   Future<CheckpointResourcesMessage> checkpointResources =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid);
@@ -985,7 +1004,8 @@ TEST_F(ReservationTest, MasterFailover)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(reserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(reserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1054,7 +1074,8 @@ TEST_F(ReservationTest, CompatibleCheckpointedResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   Future<CheckpointResourcesMessage> checkpointResources =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
@@ -1173,8 +1194,8 @@ TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(
-      Resources(offer.resources()).contains(unreserved + unreservedDisk));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved + unreservedDisk, frameworkInfo.role())));
 
   Future<CheckpointResourcesMessage> message2 =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
@@ -1211,7 +1232,8 @@ TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(reserved + volume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(reserved + volume, frameworkInfo.role())));
 
   Future<OfferID> rescindedOfferId;
 
@@ -1310,7 +1332,8 @@ TEST_F(ReservationTest, IncompatibleCheckpointedResources)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   Future<CheckpointResourcesMessage> checkpointResources =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
@@ -1433,7 +1456,8 @@ TEST_F(ReservationTest, GoodACLReserveThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1448,7 +1472,8 @@ TEST_F(ReservationTest, GoodACLReserveThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1463,7 +1488,8 @@ TEST_F(ReservationTest, GoodACLReserveThenUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1533,7 +1559,8 @@ TEST_F(ReservationTest, BadACLDropReserve)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1549,7 +1576,8 @@ TEST_F(ReservationTest, BadACLDropReserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1631,7 +1659,8 @@ TEST_F(ReservationTest, BadACLDropUnreserve)
   Offer offer = offers.get()[0];
 
   // The slave's total resources are twice those defined by `unreserved1`.
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved1));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved1 + unreserved1, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1648,10 +1677,10 @@ TEST_F(ReservationTest, BadACLDropUnreserve)
 
   // The reserved resources and an equal portion of
   // unreserved resources should be present.
-  EXPECT_TRUE(
-      Resources(offer.resources()).contains(
-          dynamicallyReserved1 +
-          unreserved1));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(
+          dynamicallyReserved1 + unreserved1,
+          frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1670,11 +1699,10 @@ TEST_F(ReservationTest, BadACLDropUnreserve)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(
-      Resources(offer.resources()).contains(
-          dynamicallyReserved1 +
-          dynamicallyReserved2 +
-          unreserved2));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(
+          dynamicallyReserved1 + dynamicallyReserved2 + unreserved2,
+          frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -1771,7 +1799,8 @@ TEST_F(ReservationTest, ACLMultipleOperations)
   Offer offer = offers.get()[0];
 
   // The slave's total resources are twice those defined by `unreserved1`.
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved1));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved1 + unreserved1, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1791,10 +1820,10 @@ TEST_F(ReservationTest, ACLMultipleOperations)
 
   // The reserved resources and an equal portion of
   // unreserved resources should be present.
-  EXPECT_TRUE(
-      Resources(offer.resources()).contains(
-          dynamicallyReserved1 +
-          unreserved1));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(
+          dynamicallyReserved1 + unreserved1,
+          frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1836,11 +1865,10 @@ TEST_F(ReservationTest, ACLMultipleOperations)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(
-      Resources(offer.resources()).contains(
-          dynamicallyReserved1 +
-          dynamicallyReserved2 +
-          unreserved2));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(
+          dynamicallyReserved1 + dynamicallyReserved2 + unreserved2,
+          frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1874,11 +1902,10 @@ TEST_F(ReservationTest, ACLMultipleOperations)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(
-      Resources(offer.resources()).contains(
-          dynamicallyReserved1 +
-          dynamicallyReserved2 +
-          unreserved2));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(
+          dynamicallyReserved1 + dynamicallyReserved2 + unreserved2,
+          frameworkInfo.role())));
 
   // Check that the task launched as expected.
   EXPECT_EQ(TASK_FINISHED, failedTaskStatus.get().state());
@@ -1952,7 +1979,8 @@ TEST_F(ReservationTest, WithoutAuthenticationWithoutPrincipal)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the offer with reserved resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1970,7 +1998,8 @@ TEST_F(ReservationTest, WithoutAuthenticationWithoutPrincipal)
   offer = offers.get()[0];
 
   // Make sure that the reservation succeeded.
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // An expectation for an offer with unreserved resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1989,7 +2018,8 @@ TEST_F(ReservationTest, WithoutAuthenticationWithoutPrincipal)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -2057,7 +2087,8 @@ TEST_F(ReservationTest, WithoutAuthenticationWithPrincipal)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the offer with reserved resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -2075,7 +2106,8 @@ TEST_F(ReservationTest, WithoutAuthenticationWithPrincipal)
   offer = offers.get()[0];
 
   // Make sure that the reservation succeeded.
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // An expectation for an offer with unreserved resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -2094,7 +2126,8 @@ TEST_F(ReservationTest, WithoutAuthenticationWithPrincipal)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -2153,7 +2186,8 @@ TEST_F(ReservationTest, DropReserveWithInvalidRole)
   Offer offer = offers->front();
 
   Resources unreserved = Resources::parse("cpus:1;mem:512").get();
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -2186,7 +2220,8 @@ TEST_F(ReservationTest, DropReserveWithInvalidRole)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers->front();
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -2250,7 +2285,8 @@ TEST_F(ReservationTest, PreventUnreservingAlienResources)
 
   const Resources unreserved = Resources::parse("cpus:1;mem:512").get();
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo1.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
@@ -2277,8 +2313,10 @@ TEST_F(ReservationTest, PreventUnreservingAlienResources)
   ASSERT_EQ(1u, offers->size());
   offer = offers->front();
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
-  EXPECT_TRUE(Resources(offer.resources()).contains(halfMemory));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo1.role())));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(halfMemory, frameworkInfo1.role())));
 
   // The filter to decline the offer "forever".
   Filters filtersForever;
@@ -2302,8 +2340,10 @@ TEST_F(ReservationTest, PreventUnreservingAlienResources)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers->front();
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(halfMemory));
-  EXPECT_FALSE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(halfMemory, frameworkInfo2.role())));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo2.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched2, resourceOffers(&driver2, _))
@@ -2320,8 +2360,10 @@ TEST_F(ReservationTest, PreventUnreservingAlienResources)
   // Expect another offer without the resources reserved by `framework1`.
   AWAIT_READY(offers);
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(halfMemory));
-  EXPECT_FALSE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(halfMemory, frameworkInfo2.role())));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo2.role())));
 
   // Decline the offer "forever" in order to force `framework1` to
   // receive the remaining resources.
@@ -2344,8 +2386,10 @@ TEST_F(ReservationTest, PreventUnreservingAlienResources)
   offer = offers->front();
 
   // Make sure that the reservation is still in place.
-  EXPECT_TRUE(Resources(offer.resources()).contains(halfMemory));
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(halfMemory, frameworkInfo1.role())));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo1.role())));
 
   driver1.stop();
   driver1.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 72ceb86..9d6fa92 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -262,7 +262,8 @@ TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_TRUE(strings::startsWith(
-        status.get().message(), "Task uses invalid resources"));
+        status.get().message(), "Task uses invalid resources"))
+    << status->message();
 
   MockScheduler sched2;
   MesosSchedulerDriver driver2(

http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/role_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp
index 0d0f1a9..6ef7af7 100644
--- a/src/tests/role_tests.cpp
+++ b/src/tests/role_tests.cpp
@@ -26,6 +26,7 @@
 
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
 
 using mesos::internal::master::Master;
 using mesos::internal::slave::Slave;
@@ -138,8 +139,10 @@ TEST_F(RoleTest, ImplicitRoleRegister)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
-  EXPECT_FALSE(Resources(offer.resources()).contains(dynamicallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
 
   // The expectation for the next offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -154,8 +157,10 @@ TEST_F(RoleTest, ImplicitRoleRegister)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
-  EXPECT_FALSE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   Resources volume = createPersistentVolume(
       Megabytes(64),
@@ -178,9 +183,12 @@ TEST_F(RoleTest, ImplicitRoleRegister)
   ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(volume));
-  EXPECT_FALSE(Resources(offer.resources()).contains(dynamicallyReserved));
-  EXPECT_FALSE(Resources(offer.resources()).contains(unreserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(volume, frameworkInfo.role())));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(dynamicallyReserved, frameworkInfo.role())));
+  EXPECT_FALSE(Resources(offer.resources()).contains(
+      allocatedResources(unreserved, frameworkInfo.role())));
 
   driver.stop();
   driver.join();
@@ -240,7 +248,8 @@ TEST_F(RoleTest, ImplicitRoleStaticReservation)
   ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
-  EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
+  EXPECT_TRUE(Resources(offer.resources()).contains(
+      allocatedResources(staticallyReserved, frameworkInfo.role())));
 
   // Create a task to launch with the resources of `staticallyReserved`.
   TaskInfo taskInfo =


[2/9] mesos git commit: Updated the master to handle non-MULTI_ROLE schedulers.

Posted by bm...@apache.org.
Updated the master to handle non-MULTI_ROLE schedulers.

When communicating with non-MULTI_ROLE schedulers, the master
must inject the allocation info from the offer in order so that
the operations can be treated as consistently having allocation
info set. This is necessary when applying operations to allocated
resources that contain allocations to multiple roles (e.g. the
sum of allocated resources on an agent).

Review: https://reviews.apache.org/r/55969


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0a1f825
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0a1f825
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0a1f825

Branch: refs/heads/master
Commit: a0a1f825d2cbcd94820ca72222945f5668f99f1e
Parents: 4d47129
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 25 16:20:55 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:11 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a0a1f825/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2edcab7..ef075e4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3621,11 +3621,15 @@ void Master::accept(
     // TODO(jieyu): Add metrics for non launch operations.
   }
 
+  // TODO(bmahler): MULTI_ROLE: Validate that the accepted offers
+  // do not mix allocation roles, see MESOS-6637.
+
   // TODO(bmahler): We currently only support using multiple offers
   // for a single slave.
   Resources offeredResources;
   Option<SlaveID> slaveId = None();
   Option<Error> error = None();
+  Option<Resource::AllocationInfo> allocationInfo = None();
 
   if (accept.offer_ids().size() == 0) {
     error = Error("No offers specified");
@@ -3648,6 +3652,7 @@ void Master::accept(
               None());
         } else {
           slaveId = offer->slave_id();
+          allocationInfo = offer->allocation_info();
           offeredResources += offer->resources();
         }
 
@@ -3723,6 +3728,16 @@ void Master::accept(
     return;
   }
 
+  CHECK_SOME(allocationInfo);
+
+  // With the addition of the MULTI_ROLE capability, the resources
+  // within an offer now contain an `AllocationInfo`. We therefore
+  // inject the offer's allocation info into the operation's
+  // resources if the scheduler has not done so already.
+  foreach (Offer::Operation& operation, *accept.mutable_operations()) {
+    protobuf::adjustOfferOperation(&operation, allocationInfo.get());
+  }
+
   CHECK_SOME(slaveId);
   Slave* slave = slaves.registered.get(slaveId.get());
   CHECK_NOTNULL(slave);


[5/9] mesos git commit: Updated the master's HTTP operations to handle MULTI_ROLE changes.

Posted by bm...@apache.org.
Updated the master's HTTP operations to handle MULTI_ROLE changes.

With the addition of MULTI_ROLE framework support, allocated
resources in the master have `Resource.AllocationInfo` set. This
means that the master's http endpoints that apply operations or
perform checks between unallocated and allocated resources must
be updated to continue to function correctly.

Review: https://reviews.apache.org/r/55970


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/45786b69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/45786b69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/45786b69

Branch: refs/heads/master
Commit: 45786b69cd08ae8527204193de2747ba3a5b575e
Parents: a0a1f82
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 25 16:26:04 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:11 2017 -0800

----------------------------------------------------------------------
 src/master/http.cpp          | 13 ++++++++-----
 src/master/quota_handler.cpp |  8 +++++++-
 src/master/validation.cpp    | 30 ++++++++++++++++++++++++------
 3 files changed, 39 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/45786b69/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index a455f03..d881ad6 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -4649,7 +4649,7 @@ Future<Response> Master::Http::_operation(
   }
 
   // The resources recovered by rescinding outstanding offers.
-  Resources recovered;
+  Resources totalRecovered;
 
   // We pessimistically assume that what seems like "available"
   // resources in the allocator will be gone. This can happen due to
@@ -4660,12 +4660,15 @@ Future<Response> Master::Http::_operation(
   foreach (Offer* offer, utils::copy(slave->offers)) {
     // If rescinding the offer would not contribute to satisfying
     // the required resources, skip it.
-    if (required == required - offer->resources()) {
+    Resources recovered = offer->resources();
+    recovered.unallocate();
+
+    if (required == required - recovered) {
       continue;
     }
 
-    recovered += offer->resources();
-    required -= offer->resources();
+    totalRecovered += recovered;
+    required -= recovered;
 
     // We explicitly pass 'Filters()' which has a default 'refuse_sec'
     // of 5 seconds rather than 'None()' here, so that we can
@@ -4679,7 +4682,7 @@ Future<Response> Master::Http::_operation(
     master->removeOffer(offer, true); // Rescind!
 
     // If we've rescinded enough offers to cover 'operation', we're done.
-    Try<Resources> updatedRecovered = recovered.apply(operation);
+    Try<Resources> updatedRecovered = totalRecovered.apply(operation);
     if (updatedRecovered.isSome()) {
       break;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/45786b69/src/master/quota_handler.cpp
----------------------------------------------------------------------
diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp
index f4a27ea..3ad28e4 100644
--- a/src/master/quota_handler.cpp
+++ b/src/master/quota_handler.cpp
@@ -193,7 +193,13 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const
       master->allocator->recoverResources(
           offer->framework_id(), offer->slave_id(), offer->resources(), None());
 
-      rescinded += offer->resources();
+      auto unallocated = [](const Resources& resources) {
+        Resources result = resources;
+        result.unallocate();
+        return result;
+      };
+
+      rescinded += unallocated(offer->resources());
       master->removeOffer(offer, true);
       agentVisited = true;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/45786b69/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index f45b644..2beee16 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -1714,17 +1714,35 @@ Option<Error> validate(
     const hashmap<FrameworkID, Resources>& usedResources,
     const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks)
 {
-  Option<Error> error = resource::validate(destroy.volumes());
+  // The operation can either contain allocated resources
+  // (in the case of a framework accepting offers), or
+  // unallocated resources (in the case of the operator
+  // endpoints). To ensure we can check for the presence
+  // of the volume in the resources in use by tasks and
+  // executors, we unallocate both the volume and the
+  // used resources before performing the contains check.
+  //
+  // TODO(bmahler): This lambda is copied in several places
+  // in the code, consider how best to pull this out.
+  auto unallocated = [](const Resources& resources) {
+    Resources result = resources;
+    result.unallocate();
+    return result;
+  };
+
+  Resources volumes = unallocated(destroy.volumes());
+
+  Option<Error> error = resource::validate(volumes);
   if (error.isSome()) {
     return Error("Invalid resources: " + error.get().message);
   }
 
-  error = resource::validatePersistentVolume(destroy.volumes());
+  error = resource::validatePersistentVolume(volumes);
   if (error.isSome()) {
     return Error("Not a persistent volume: " + error.get().message);
   }
 
-  if (!checkpointedResources.contains(destroy.volumes())) {
+  if (!checkpointedResources.contains(volumes)) {
     return Error("Persistent volumes not found");
   }
 
@@ -1733,8 +1751,8 @@ Option<Error> validate(
   // it is not possible for a non-shared resource to appear in an offer
   // if it is already in use.
   foreachvalue (const Resources& resources, usedResources) {
-    foreach (const Resource& volume, destroy.volumes()) {
-      if (resources.contains(volume)) {
+    foreach (const Resource& volume, volumes) {
+      if (unallocated(resources).contains(volume)) {
         return Error("Persistent volumes in use");
       }
     }
@@ -1754,7 +1772,7 @@ Option<Error> validate(
       }
 
       foreach (const Resource& volume, destroy.volumes()) {
-        if (resources.contains(volume)) {
+        if (unallocated(resources).contains(volume)) {
           return Error("Persistent volume in pending tasks");
         }
       }


[3/9] mesos git commit: Update the allocator unit tests to reflect MULTI_ROLE support.

Posted by bm...@apache.org.
Update the allocator unit tests to reflect MULTI_ROLE support.

Review: https://reviews.apache.org/r/55967


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d471291
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d471291
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d471291

Branch: refs/heads/master
Commit: 4d471291f4e7feca704a70d33542cc1d8ba9223b
Parents: a1b8605
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 25 16:38:05 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:11 2017 -0800

----------------------------------------------------------------------
 src/tests/hierarchical_allocator_tests.cpp | 434 ++++++++++++++----------
 1 file changed, 249 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4d471291/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 1e0b945..65fed37 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -84,12 +84,20 @@ struct Allocation
 
   Allocation(
       const FrameworkID& frameworkId_,
-      const hashmap<SlaveID, Resources>& resources_)
+      const hashmap<string, hashmap<SlaveID, Resources>>& resources_)
     : frameworkId(frameworkId_),
-      resources(resources_) {}
+      resources(resources_)
+  {
+    // Ensure the resources have the allocation info set.
+    foreachkey (const string& role, resources) {
+      foreachvalue (Resources& r, resources.at(role)) {
+        r.allocate(role);
+      }
+    }
+  }
 
   FrameworkID frameworkId;
-  hashmap<SlaveID, Resources> resources;
+  hashmap<string, hashmap<SlaveID, Resources>> resources;
 };
 
 
@@ -132,7 +140,8 @@ protected:
       const master::Flags& _flags = master::Flags(),
       Option<lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>> offerCallback = None(),
+               const hashmap<string, hashmap<SlaveID, Resources>>&)>>
+                 offerCallback = None(),
       Option<lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, UnavailableResources>&)>>
@@ -143,7 +152,7 @@ protected:
     if (offerCallback.isNone()) {
       offerCallback =
         [this](const FrameworkID& frameworkId,
-               const hashmap<SlaveID, Resources>& resources) {
+               const hashmap<string, hashmap<SlaveID, Resources>>& resources) {
           Allocation allocation;
           allocation.frameworkId = frameworkId;
           allocation.resources = resources;
@@ -287,7 +296,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
 
   Allocation expected = Allocation(
       framework1.id(),
-      {{slave1.id(), slave1.resources()}});
+      {{"role1", {{slave1.id(), slave1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -309,7 +318,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // has the lowest user share, and framework2 is its only framework.
   expected = Allocation(
       framework2.id(),
-      {{slave2.id(), slave2.resources()}});
+      {{"role2", {{slave2.id(), slave2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -330,7 +339,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // has the lowest share.
   expected = Allocation(
       framework2.id(),
-      {{slave3.id(), slave3.resources()}});
+      {{"role2", {{slave3.id(), slave3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -356,7 +365,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // role1's frameworks.
   expected = Allocation(
       framework3.id(),
-      {{slave4.id(), slave4.resources()}});
+      {{"role1", {{slave4.id(), slave4.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -383,7 +392,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // lower share than role1, so framework2 receives slave5's resources.
   expected = Allocation(
       framework2.id(),
-      {{slave5.id(), slave5.resources()}});
+      {{"role2", {{slave5.id(), slave5.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -410,7 +419,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
 
   Allocation expected = Allocation(
       framework1.id(),
-      {{slave1.id(), slave1.resources()}});
+      {{"role1", {{slave1.id(), slave1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -423,7 +432,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
 
   expected = Allocation(
       framework2.id(),
-      {{slave2.id(), slave2.resources()}});
+      {{"role2", {{slave2.id(), slave2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -434,7 +443,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
 
   expected = Allocation(
       framework2.id(),
-      {{slave3.id(), slave3.resources()}});
+      {{"role2", {{slave3.id(), slave3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -451,7 +460,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
 
   expected = Allocation(
       framework3.id(),
-      {{slave4.id(), slave4.resources()}});
+      {{"role1", {{slave4.id(), slave4.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -490,7 +499,7 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
 
   Allocation expected = Allocation(
       framework1.id(),
-      {{agent1.id(), agent1.resources()}});
+      {{"role1", {{agent1.id(), agent1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -512,7 +521,7 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
   // has the lowest user share, and framework2 is its only framework.
   expected = Allocation(
       framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{"role2", {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -533,7 +542,7 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
   // has the lowest share.
   expected = Allocation(
       framework2.id(),
-      {{agent3.id(), agent3.resources()}});
+      {{"role2", {{agent3.id(), agent3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -559,7 +568,7 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
   // role1's frameworks.
   expected = Allocation(
       framework3.id(),
-      {{agent4.id(), agent4.resources()}});
+      {{"role1", {{agent4.id(), agent4.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -586,7 +595,7 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
   // lower share than role1, so framework2 receives agent5's resources.
   expected = Allocation(
       framework2.id(),
-      {{agent5.id(), agent5.resources()}});
+      {{"role2", {{agent5.id(), agent5.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -617,12 +626,11 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
   // because it is the only framework in the cluster.
   Allocation expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{ROLE, {{agent.id(), agent.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
 
-
   // Now `framework` declines the offer and sets a filter
   // with the duration greater than the allocation interval.
   Duration filterTimeout = flags.allocation_interval * 2;
@@ -632,7 +640,7 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
   allocator->recoverResources(
       framework.id(),
       agent.id(),
-      allocation->resources.at(agent.id()),
+      allocation->resources.at(ROLE).at(agent.id()),
       offerFilter);
 
   // Ensure the offer filter timeout is set before advancing the clock.
@@ -660,7 +668,7 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
   // The next batch allocation should offer resources to `framework1`.
   expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{ROLE, {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocation);
 
@@ -707,7 +715,7 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
       agent1,
       None(),
       agent1.resources(),
-      {{framework1.id(), agent1.resources()}});
+      {{framework1.id(), allocatedResources(agent1.resources(), ROLE)}});
 
   // Process all triggered allocation events.
   //
@@ -730,12 +738,11 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
   // because its share (0) is smaller than `framework1`.
   Allocation expected = Allocation(
       framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{ROLE, {{agent2.id(), agent2.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
 
-
   // Total cluster resources (2 agents): cpus=2, mem=1024.
   // ROLE1 share = 1 (cpus=2, mem=1024)
   //   framework1 share = 0.5 (cpus=1, mem=512)
@@ -752,7 +759,7 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
   allocator->recoverResources(
       framework2.id(),
       agent2.id(),
-      allocation->resources.at(agent2.id()),
+      allocation->resources.at(ROLE).at(agent2.id()),
       offerFilter);
 
   // Total cluster resources (2 agents): cpus=2, mem=1024.
@@ -775,8 +782,8 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
   // Since the filter is applied, resources are offered to `framework1`
   // even though its share is greater than `framework2`.
   expected = Allocation(
-      framework1.id(),
-      {{agent2.id(), agent2.resources()}});
+       framework1.id(),
+       {{ROLE, {{agent2.id(), agent2.resources()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -793,7 +800,7 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
   allocator->recoverResources(
       framework1.id(),
       agent2.id(),
-      allocation->resources.at(agent2.id()),
+      allocation->resources.at(ROLE).at(agent2.id()),
       None());
 
   // Total cluster resources (2 agents): cpus=2, mem=1024.
@@ -806,8 +813,8 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
 
   // Since the filter is removed, resources are offered to `framework2`.
   expected = Allocation(
-      framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+       framework2.id(),
+       {{ROLE, {{agent2.id(), agent2.resources()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -841,7 +848,7 @@ TEST_F(HierarchicalAllocatorTest, MaintenanceInverseOffers)
   // Check that the resources go to the framework.
   Allocation expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{"*", {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -898,10 +905,10 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
 
   Allocation expected = Allocation(
       framework1.id(),
-      {
+      {{"role1", {
           {slave1.id(), slave1.resources()},
-          {slave2.id(), slave2.resources()}
-      });
+          {slave2.id(), slave2.resources()}}
+      }});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -909,12 +916,12 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
   allocator->recoverResources(
       framework1.id(),
       slave1.id(),
-      allocation->resources.at(slave1.id()),
+      allocation->resources.at("role1").at(slave1.id()),
       None());
   allocator->recoverResources(
       framework1.id(),
       slave2.id(),
-      allocation->resources.at(slave2.id()),
+      allocation->resources.at("role1").at(slave2.id()),
       None());
 
   // Now add the second framework, we expect there to be 2 subsequent
@@ -937,12 +944,19 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
   ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
 
   allocation = frameworkAllocations.at(framework1.id());
-  EXPECT_EQ(slave1.resources(), Resources::sum(allocation->resources));
 
-  ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
+  ASSERT_EQ(1u, allocation->resources.size());
+  ASSERT_TRUE(allocation->resources.contains("role1"));
+  EXPECT_EQ(allocatedResources(slave1.resources(), "role1"),
+            Resources::sum(allocation->resources.at("role1")));
 
+  ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
   allocation = frameworkAllocations.at(framework2.id());
-  EXPECT_EQ(slave2.resources(), Resources::sum(allocation->resources));
+
+  ASSERT_EQ(1u, allocation->resources.size());
+  ASSERT_TRUE(allocation->resources.contains("role2"));
+  EXPECT_EQ(allocatedResources(slave2.resources(), "role2"),
+            Resources::sum(allocation->resources.at("role2")));
 }
 
 
@@ -975,7 +989,7 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness)
 
     Allocation expected = Allocation(
         allocation->frameworkId,
-        {{slave.id(), slave.resources()}});
+        {{"*", {{slave.id(), slave.resources()}}}});
 
     EXPECT_EQ(expected, allocation.get());
 
@@ -984,7 +998,7 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness)
     allocator->recoverResources(
         allocation->frameworkId,
         slave.id(),
-        allocation->resources.at(slave.id()),
+        allocation->resources.at("*").at(slave.id()),
         None());
 
     Clock::advance(flags.allocation_interval);
@@ -1024,10 +1038,10 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
 
   Allocation expected = Allocation(
       framework1.id(),
-      {
-          {slave1.id(), slave1.resources()},
-          {slave2.id(), Resources(slave2.resources()).unreserved()}
-      });
+      {{"role1", {
+        {slave1.id(), slave1.resources()},
+        {slave2.id(), Resources(slave2.resources()).unreserved()}
+      }}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1037,7 +1051,9 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
 
   expected = Allocation(
       framework2.id(),
-      {{slave2.id(), Resources(slave2.resources()).reserved("role2")}});
+      {{"role2", {
+        {slave2.id(), Resources(slave2.resources()).reserved("role2")}
+      }}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -1061,14 +1077,16 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
 
   // Recover the reserved resources, expect them to be re-offered.
-  Resources reserved = allocation->resources.at(slave.id()).reserved("role1");
-  Resources unreserved = allocation->resources.at(slave.id()).unreserved();
+  Resources reserved = allocation->resources.at("role1").at(slave.id())
+    .reserved("role1");
+  Resources unreserved = allocation->resources.at("role1").at(slave.id())
+    .unreserved();
 
   allocator->recoverResources(
       allocation->frameworkId,
@@ -1080,7 +1098,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
 
   expected = Allocation(
       framework.id(),
-      {{slave.id(), reserved}});
+      {{"role1", {{slave.id(), reserved}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1096,7 +1114,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
 
   expected = Allocation(
       framework.id(),
-      {{slave.id(), unreserved}});
+      {{"role1", {{slave.id(), unreserved}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1131,7 +1149,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{slave2.id(), slave2.resources()}});
+      {{"role1", {{slave2.id(), slave2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1144,7 +1162,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
 
   expected = Allocation(
       framework.id(),
-      {{slave3.id(), slave3.resources()}});
+      {{"role1", {{slave3.id(), slave3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1160,7 +1178,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
 
   expected = Allocation(
       framework.id(),
-      {{slave4.id(), slave4.resources()}});
+      {{"role1", {{slave4.id(), slave4.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -1183,7 +1201,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1192,6 +1210,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
   Resource volume = Resources::parse("disk", "5", "*").get();
   volume.mutable_disk()->mutable_persistence()->set_id("ID");
   volume.mutable_disk()->mutable_volume()->set_container_path("data");
+  volume.mutable_allocation_info()->set_role("role1");
 
   Offer::Operation create;
   create.set_type(Offer::Operation::CREATE);
@@ -1199,7 +1218,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
 
   // Ensure the offer operation can be applied.
   Try<Resources> updated =
-    allocation->resources.at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(create);
 
   ASSERT_SOME(updated);
 
@@ -1207,7 +1226,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
   allocator->updateAllocation(
       framework.id(),
       slave.id(),
-      allocation->resources.at(slave.id()),
+      allocation->resources.at("role1").at(slave.id()),
       {create});
 
   // Now recover the resources, and expect the next allocation to
@@ -1220,12 +1239,11 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
 
   Clock::advance(flags.allocation_interval);
 
-
   // The allocation should be the slave's resources with the offer
   // operation applied.
   expected = Allocation(
       framework.id(),
-      {{slave.id(), updated.get()}});
+      {{"role1", {{slave.id(), updated.get()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1252,7 +1270,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1261,11 +1279,13 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
   // Create a shared volume.
   Resource volume = createDiskResource(
       "5", "role1", "id1", None(), None(), true);
+  volume.mutable_allocation_info()->set_role("role1");
+
   Offer::Operation create = CREATE(volume);
 
   // Ensure the offer operation can be applied.
   Try<Resources> update =
-    allocation->resources.at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(create);
 
   ASSERT_SOME(update);
 
@@ -1273,7 +1293,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
   allocator->updateAllocation(
       framework.id(),
       slave.id(),
-      allocation->resources.at(slave.id()),
+      allocation->resources.at("role1").at(slave.id()),
       {create});
 
   // Now recover the resources, and expect the next allocation to
@@ -1290,7 +1310,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
   // operation applied.
   expected = Allocation(
       framework.id(),
-      {{slave.id(), update.get()}});
+      {{"role1", {{slave.id(), update.get()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1303,13 +1323,13 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
   allocator->updateAllocation(
       framework.id(),
       slave.id(),
-      allocation->resources.at(slave.id()),
+      allocation->resources.at("role1").at(slave.id()),
       {destroy});
 
   // The resources to recover should be equal to the agent's original
   // resources now that the shared volume is created and then destroyed.
   update = update->apply(destroy);
-  ASSERT_SOME_EQ(slave.resources(), update);
+  ASSERT_SOME_EQ(allocatedResources(slave.resources(), "role1"), update);
 
   allocator->recoverResources(
       framework.id(),
@@ -1321,7 +1341,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
 
   expected = Allocation(
       framework.id(),
-      {{slave.id(), update.get()}});
+      {{"role1", {{slave.id(), update.get()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1346,7 +1366,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
   // Initially, all the resources are allocated to `framework1`.
   Allocation expected = Allocation(
       framework1.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1354,11 +1374,13 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
   // Create a shared volume.
   Resource volume = createDiskResource(
       "5", "role1", "id1", None(), None(), true);
+  volume.mutable_allocation_info()->set_role("role1");
+
   Offer::Operation create = CREATE(volume);
 
   // Ensure the offer operation can be applied.
   Try<Resources> update =
-    allocation->resources.at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(create);
 
   ASSERT_SOME(update);
 
@@ -1366,7 +1388,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
   allocator->updateAllocation(
       framework1.id(),
       slave.id(),
-      allocation->resources.at(slave.id()),
+      allocation->resources.at("role1").at(slave.id()),
       {create});
 
   // Now recover the resources, and expect the next allocation to
@@ -1383,7 +1405,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
 
   expected = Allocation(
       framework1.id(),
-      {{slave.id(), update.get() - volume}});
+      {{"role1", {{slave.id(), update.get() - volume}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1392,7 +1414,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
   allocator->recoverResources(
       framework1.id(),
       slave.id(),
-      allocation->resources.at(slave.id()),
+      allocation->resources.at("role1").at(slave.id()),
       None());
 
   // Create `framework2` with opting in for SHARED_RESOURCES.
@@ -1407,7 +1429,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
 
   expected = Allocation(
       framework2.id(),
-      {{slave.id(), update.get()}});
+      {{"role1", {{slave.id(), update.get()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1445,7 +1467,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
   // operation applied.
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), update.get()}});
+      {{"role1", {{slave.id(), update.get()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -1466,7 +1488,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1503,7 +1525,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   // Initially, all the resources are allocated.
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1514,7 +1536,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   // The next allocation should be for 10 oversubscribed resources.
   expected = Allocation(
       framework.id(),
-      {{slave.id(), oversubscribed}});
+      {{"role1", {{slave.id(), oversubscribed}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1525,7 +1547,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   // The next allocation should be for 2 oversubscribed cpus.
   expected = Allocation(
       framework.id(),
-      {{slave.id(), oversubscribed2 - oversubscribed}});
+      {{"role1", {{slave.id(), oversubscribed2 - oversubscribed}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1561,7 +1583,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
   // Initially, all the resources are allocated.
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1599,7 +1621,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
   // Initially, all the resources are allocated.
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1610,13 +1632,14 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
   // The next allocation should be for 10 oversubscribed cpus.
   expected = Allocation(
       framework.id(),
-      {{slave.id(), oversubscribed}});
+      {{"role1", {{slave.id(), oversubscribed}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
   // Recover 6 oversubscribed cpus and 2 regular cpus.
   Resources recovered = createRevocableResources("cpus", "6");
   recovered += Resources::parse("cpus:2").get();
+  recovered.allocate("role1");
 
   allocator->recoverResources(framework.id(), slave.id(), recovered, None());
 
@@ -1626,7 +1649,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
   // cpus.
   expected = Allocation(
       framework.id(),
-      {{slave.id(), recovered}});
+      {{"role1", {{slave.id(), recovered}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -1670,7 +1693,7 @@ TEST_F(HierarchicalAllocatorTest, Whitelist)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{slave.id(), slave.resources()}});
+      {{"*", {{slave.id(), slave.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocation);
 }
@@ -1704,14 +1727,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, NoDoubleAccounting)
   FrameworkInfo framework2 = createFrameworkInfo(ROLE2);
 
   hashmap<FrameworkID, Resources> agent1Allocation =
-    {{framework1.id(), agent1.resources()}};
+    {{framework1.id(), allocatedResources(agent1.resources(), ROLE1)}};
   hashmap<FrameworkID, Resources> agent2Allocation =
-    {{framework2.id(), agent2.resources()}};
+    {{framework2.id(), allocatedResources(agent2.resources(), ROLE2)}};
 
   hashmap<SlaveID, Resources> framework1Allocation =
-    {{agent1.id(), agent1.resources()}};
+    {{agent1.id(), allocatedResources(agent1.resources(), ROLE1)}};
   hashmap<SlaveID, Resources> framework2Allocation =
-    {{agent2.id(), agent2.resources()}};
+    {{agent2.id(), allocatedResources(agent2.resources(), ROLE2)}};
 
   // Call `addFramework()` and `addSlave()` in different order for
   // `framework1` and `framework2`
@@ -1806,7 +1829,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
   // the only framework in the only role with unsatisfied quota.
   Allocation expected = Allocation(
       framework1.id(),
-      {{agent1.id(), agent1.resources()}});
+      {{QUOTA_ROLE, {{agent1.id(), agent1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -1824,7 +1847,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
   // quota. `framework2` has to wait.
   expected = Allocation(
       framework1.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
@@ -1846,7 +1869,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
   allocator->recoverResources(
       framework1.id(),
       agent2.id(),
-      allocation->resources.at(agent2.id()),
+      allocation->resources.at(QUOTA_ROLE).at(agent2.id()),
       offerFilter);
 
   // Total cluster resources: cpus=2, mem=1024.
@@ -1873,7 +1896,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
   // Previously declined resources should be offered to the quota'ed role.
   expected = Allocation(
       framework1.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocation);
 
@@ -1914,7 +1937,7 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
       agent1,
       None(),
       agent1.resources(),
-      {{framework1.id(), agent1.resources()}});
+      {{framework1.id(), allocatedResources(agent1.resources(), QUOTA_ROLE)}});
 
   SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0");
   allocator->addSlave(
@@ -1922,7 +1945,7 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
       agent2,
       None(),
       agent2.resources(),
-      {{framework1.id(), agent2.resources()}});
+      {{framework1.id(), allocatedResources(agent2.resources(), QUOTA_ROLE)}});
 
   // Total cluster resources (2 identical agents): cpus=2, mem=1024.
   // QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024]
@@ -1945,7 +1968,7 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
   allocator->recoverResources(
       framework1.id(),
       agent1.id(),
-      agent1.resources(),
+      allocatedResources(agent1.resources(), QUOTA_ROLE),
       None());
 
   // Trigger the next batch allocation.
@@ -1953,7 +1976,7 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
 
   Allocation expected = Allocation(
       framework2.id(),
-      {{agent1.id(), agent1.resources()}});
+      {{NO_QUOTA_ROLE, {{agent1.id(), agent1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2020,7 +2043,7 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
   // it is the only framework in the only role with unsatisfied quota.
   Allocation expected = Allocation(
       framework1a.id(),
-      {{agent1.id(), agent1.resources()}});
+      {{QUOTA_ROLE, {{agent1.id(), agent1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2042,7 +2065,7 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
   // to a role with unsatisfied quota.
   expected = Allocation(
       framework1b.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2061,7 +2084,7 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
   // has unsatisfied quota.
   expected = Allocation(
       framework1a.id(),
-      {{agent3.id(), agent3.resources()}});
+      {{QUOTA_ROLE, {{agent3.id(), agent3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2079,7 +2102,7 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
   allocator->recoverResources(
       framework1a.id(),
       agent3.id(),
-      agent3.resources(),
+      allocatedResources(agent3.resources(), QUOTA_ROLE),
       filter5s);
 
   // Trigger the next batch allocation.
@@ -2087,11 +2110,10 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
 
   expected = Allocation(
       framework1b.id(),
-      {{agent3.id(), agent3.resources()}});
+      {{QUOTA_ROLE, {{agent3.id(), agent3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
-
   // Total cluster resources: cpus=4, mem=2048.
   // QUOTA_ROLE share = 1 (cpus=4, mem=2048) [quota: cpus=4, mem=2048]
   //   framework1a share = 0.25 (cpus=1, mem=512)
@@ -2142,7 +2164,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAllocationGranularity)
   // and the allocator performs coarse-grained allocation.
   Allocation expected = Allocation(
       framework1.id(),
-      {{agent.id(), agent.resources()}});
+      {{QUOTA_ROLE, {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2206,7 +2228,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DRFWithQuota)
       agent1,
       None(),
       agent1.resources(),
-      {{framework1.id(), Resources(quota.info.guarantee())}});
+      {{framework1.id(),
+        allocatedResources(quota.info.guarantee(), QUOTA_ROLE)}});
 
   // Total cluster resources (1 agent): cpus=1, mem=512.
   // QUOTA_ROLE share = 0.25 (cpus=0.25, mem=128) [quota: cpus=0.25, mem=128]
@@ -2222,7 +2245,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DRFWithQuota)
   // share is 0.
   Allocation expected = Allocation(
       framework2.id(),
-      {{agent1.id(), Resources(agent1.resources()) - quota.info.guarantee()}});
+      {{NO_QUOTA_ROLE, {{agent1.id(),
+          Resources(agent1.resources()) - quota.info.guarantee()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2264,7 +2288,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DRFWithQuota)
 
   expected = Allocation(
       framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{NO_QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -2304,7 +2328,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
       agent1,
       None(),
       agent1.resources(),
-      {{framework1.id(), agent1.resources()}});
+      {{framework1.id(), allocatedResources(agent1.resources(), QUOTA_ROLE)}});
 
   // Total cluster resources (1 agent): cpus=1, mem=512.
   // QUOTA_ROLE share = 1 (cpus=1, mem=512)
@@ -2320,7 +2344,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
 
   Allocation expected = Allocation(
       framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{NO_QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2338,7 +2362,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
   allocator->recoverResources(
       framework2.id(),
       agent2.id(),
-      agent2.resources(),
+      allocatedResources(agent2.resources(), NO_QUOTA_ROLE),
       filter0s);
 
   // Total cluster resources (2 identical agents): cpus=2, mem=1024.
@@ -2352,7 +2376,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
 
   expected = Allocation(
       framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{NO_QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2360,7 +2384,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
   allocator->recoverResources(
       framework2.id(),
       agent2.id(),
-      agent2.resources(),
+      allocatedResources(agent2.resources(), NO_QUOTA_ROLE),
       filter0s);
 
   // We set quota for the "starving" `QUOTA_ROLE` role.
@@ -2372,7 +2396,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
 
   expected = Allocation(
       framework1.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2444,7 +2468,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAbsentFramework)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{NO_QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2495,7 +2519,7 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaAbsentFrameworks)
   // get all `agent`'s resources.
   Allocation expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{QUOTA_ROLE2, {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -2545,7 +2569,7 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks)
       agent1,
       None(),
       agent1.resources(),
-      {{framework1.id(), agent1.resources()}});
+      {{framework1.id(), allocatedResources(agent1.resources(), QUOTA_ROLE1)}});
 
   SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:1024;disk:0");
   allocator->addSlave(
@@ -2553,7 +2577,7 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks)
       agent2,
       None(),
       agent2.resources(),
-      {{framework2.id(), agent2.resources()}});
+      {{framework2.id(), allocatedResources(agent2.resources(), QUOTA_ROLE2)}});
 
   // TODO(bmahler): Add assertions to test this is accurate!
   //
@@ -2574,7 +2598,7 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks)
   // quota, while other roles' quotas are satisfied.
   Allocation expected = Allocation(
       framework2.id(),
-      {{agent3.id(), agent3.resources()}});
+      {{QUOTA_ROLE2, {{agent3.id(), agent3.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2623,17 +2647,17 @@ TEST_F(HierarchicalAllocatorTest, ReservationWithinQuota)
       agent1,
       None(),
       agent1.resources(),
-      {{
-          framework1.id(),
-          // The `mem` portion is used to test that reserved resources are
-          // accounted for, and the `cpus` portion is allocated to show that
-          // the result of DRF would be different if `mem` was not accounted.
-          Resources::parse("cpus:2;mem(" + QUOTA_ROLE + "):256").get()
-      }});
+      {{framework1.id(),
+        // The `mem` portion is used to test that reserved resources are
+        // accounted for, and the `cpus` portion is allocated to show that
+        // the result of DRF would be different if `mem` was not accounted.
+        allocatedResources(
+            Resources::parse("cpus:2;mem(" + QUOTA_ROLE + "):256").get(),
+            QUOTA_ROLE)}});
 
   Allocation expected = Allocation(
       framework2.id(),
-      {{agent1.id(), Resources::parse("cpus:6").get()}});
+      {{NON_QUOTA_ROLE, {{agent1.id(), Resources::parse("cpus:6").get()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2645,7 +2669,7 @@ TEST_F(HierarchicalAllocatorTest, ReservationWithinQuota)
 
   expected = Allocation(
       framework2.id(),
-      {{agent2.id(), agent2.resources()}});
+      {{NON_QUOTA_ROLE, {{agent2.id(), agent2.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -2697,7 +2721,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
   // resources at `agent2` are reserved for a different role.
   Allocation expected = Allocation(
       framework1.id(),
-      {{agent1.id(), agent1.resources()}});
+      {{QUOTA_ROLE, {{agent1.id(), agent1.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2709,7 +2733,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
   allocator->recoverResources(
       framework1.id(),
       agent1.id(),
-      agent1.resources(),
+      allocatedResources(agent1.resources(), QUOTA_ROLE),
       longFilter);
 
   // Trigger a batch allocation for good measure, but don't expect any
@@ -2728,7 +2752,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
   // because those resources are reserved for its role.
   expected = Allocation(
       framework2.id(),
-      {{agent2.id(), dynamicallyReserved}});
+      {{NO_QUOTA_ROLE, {{agent2.id(), dynamicallyReserved}}}});
 
   AWAIT_EXPECT_EQ(expected, allocation);
 
@@ -2737,7 +2761,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
   allocator->recoverResources(
       framework2.id(),
       agent2.id(),
-      dynamicallyReserved,
+      allocatedResources(dynamicallyReserved, NO_QUOTA_ROLE),
       longFilter);
 
   // No more resource offers should be made until the filters expire:
@@ -2778,14 +2802,14 @@ TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{"role1", {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
   allocator->recoverResources(
       framework.id(),
       agent.id(),
-      agent.resources(),
+      allocatedResources(agent.resources(), "role1"),
       None());
 
   // Suppress offers and disconnect framework.
@@ -2809,7 +2833,7 @@ TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework)
   // after getting activated.
   expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{"role1", {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocation);
 }
@@ -2836,7 +2860,7 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{"role1", {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -2854,7 +2878,7 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
   allocator->recoverResources(
       framework.id(),
       agent.id(),
-      agent.resources(),
+      allocatedResources(agent.resources(), "role1"),
       None());
 
   allocator->suppressOffers(framework.id());
@@ -2873,7 +2897,7 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
   // reviving offers.
   expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{"role1", {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocation);
 }
@@ -3128,7 +3152,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
 
   Allocation expectedAllocation = Allocation(
       framework1.id(),
-      {{agent.id(), agent.resources()}});
+      {{"roleA", {{agent.id(), agent.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expectedAllocation, allocation);
@@ -3136,7 +3160,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   allocator->recoverResources(
       allocation->frameworkId,
       agent.id(),
-      allocation->resources.at(agent.id()),
+      allocation->resources.at("roleA").at(agent.id()),
       offerFilter);
 
   JSON::Object expected;
@@ -3153,7 +3177,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
 
   expectedAllocation = Allocation(
       framework2.id(),
-      {{agent.id(), agent.resources()}});
+      {{"roleB", {{agent.id(), agent.resources()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expectedAllocation, allocation);
@@ -3161,7 +3185,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   allocator->recoverResources(
       allocation->frameworkId,
       agent.id(),
-      allocation->resources.at(agent.id()),
+      allocation->resources.at("roleB").at(agent.id()),
       offerFilter);
 
   expected.values = {
@@ -3178,7 +3202,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
 
   expectedAllocation = Allocation(
       framework3.id(),
-      {{agent.id(), agent.resources()}});
+      {{"roleA", {{agent.id(), agent.resources()}}}});
 
   allocation = allocations.get();
   AWAIT_EXPECT_EQ(expectedAllocation, allocation);
@@ -3186,7 +3210,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   allocator->recoverResources(
       allocation->frameworkId,
       agent.id(),
-      allocation->resources.at(agent.id()),
+      allocation->resources.at("roleA").at(agent.id()),
       offerFilter);
 
   expected.values = {
@@ -3235,7 +3259,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DominantShareMetrics)
   allocator->recoverResources(
       allocation->frameworkId,
       agent1.id(),
-      allocation->resources.at(agent1.id()),
+      allocation->resources.at("roleA").at(agent1.id()),
       None());
   Clock::settle();
 
@@ -3384,21 +3408,25 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
     for (int i = 0; i < allocationsCount; i++) {
       Future<Allocation> allocation = allocations.get();
       AWAIT_READY(allocation);
+      ASSERT_EQ(1u, allocation->resources.size());
 
       (*frameworkAllocations)[allocation->frameworkId] = allocation.get();
-      (*totalAllocatedResources) += Resources::sum(allocation->resources);
+      *totalAllocatedResources +=
+        Resources::sum(allocation->resources.begin()->second);
 
       if (recoverResources) {
         // Recover the allocated resources so they can be offered
         // again next time.
-        foreachpair (const SlaveID& slaveId,
-                     const Resources& resources,
-                     allocation->resources) {
+        foreachkey (const string& role, allocation->resources) {
+          foreachpair (const SlaveID& slaveId,
+                       const Resources& resources,
+                       allocation->resources.at(role)) {
           allocator->recoverResources(
               allocation->frameworkId,
               slaveId,
               resources,
               None());
+          }
         }
       }
     }
@@ -3438,8 +3466,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
   awaitAllocationsAndRecoverResources(
       &totalAllocatedResources, &frameworkAllocations, 1, true);
 
-  // Tests whether `framework1` and `framework2` each get half of the resources
-  // when their roles' weights are 1:1.
+  // Total cluster resources (6 agents): cpus=12, mem=6144.
   {
     // Advance the clock and trigger a batch allocation.
     Clock::advance(flags.allocation_interval);
@@ -3460,12 +3487,13 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
     ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
     ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
 
-
     Allocation allocation1 = frameworkAllocations.at(framework1.id());
     Allocation allocation2 = frameworkAllocations.at(framework2.id());
 
-    EXPECT_EQ(TRIPLE_RESOURCES, Resources::sum(allocation1.resources));
-    EXPECT_EQ(TRIPLE_RESOURCES, Resources::sum(allocation2.resources));
+    EXPECT_EQ(allocatedResources(TRIPLE_RESOURCES, "role1"),
+              Resources::sum(allocation1.resources.at("role1")));
+    EXPECT_EQ(allocatedResources(TRIPLE_RESOURCES, "role2"),
+              Resources::sum(allocation2.resources.at("role2")));
 
     // Check to ensure that these two allocations sum to the total resources;
     // this check can ensure there are only two allocations in this case.
@@ -3502,8 +3530,10 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
     Allocation allocation1 = frameworkAllocations.at(framework1.id());
     Allocation allocation2 = frameworkAllocations.at(framework2.id());
 
-    EXPECT_EQ(DOUBLE_RESOURCES, Resources::sum(allocation1.resources));
-    EXPECT_EQ(FOURFOLD_RESOURCES, Resources::sum(allocation2.resources));
+    EXPECT_EQ(allocatedResources(DOUBLE_RESOURCES, "role1"),
+              Resources::sum(allocation1.resources.at("role1")));
+    EXPECT_EQ(allocatedResources(FOURFOLD_RESOURCES, "role2"),
+              Resources::sum(allocation2.resources.at("role2")));
 
     // Check to ensure that these two allocations sum to the total resources;
     // this check can ensure there are only two allocations in this case.
@@ -3550,19 +3580,21 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
     ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
     ASSERT_TRUE(frameworkAllocations.contains(framework3.id()));
 
-
     Allocation allocation1 = frameworkAllocations.at(framework1.id());
     Allocation allocation2 = frameworkAllocations.at(framework2.id());
     Allocation allocation3 = frameworkAllocations.at(framework3.id());
 
-    EXPECT_EQ(SINGLE_RESOURCES, Resources::sum(allocation1.resources));
-    EXPECT_EQ(DOUBLE_RESOURCES, Resources::sum(allocation2.resources));
-    EXPECT_EQ(TRIPLE_RESOURCES, Resources::sum(allocation3.resources));
+    EXPECT_EQ(allocatedResources(SINGLE_RESOURCES, "role1"),
+              Resources::sum(allocation1.resources.at("role1")));
+    EXPECT_EQ(allocatedResources(DOUBLE_RESOURCES, "role2"),
+              Resources::sum(allocation2.resources.at("role2")));
+    EXPECT_EQ(allocatedResources(TRIPLE_RESOURCES, "role3"),
+              Resources::sum(allocation3.resources.at("role3")));
 
     // Check to ensure that these two allocations sum to the total resources;
     // this check can ensure there are only three allocations in this case.
     EXPECT_EQ(TOTAL_RESOURCES,
-        totalAllocatedResources.createStrippedScalarQuantity());
+              totalAllocatedResources.createStrippedScalarQuantity());
   }
 }
 
@@ -3590,7 +3622,7 @@ TEST_F(HierarchicalAllocatorTest, ReviveOffers)
 
   Allocation expected = Allocation(
       framework.id(),
-      {{agent.id(), agent.resources()}});
+      {{"role1", {{agent.id(), agent.resources()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 
@@ -3599,7 +3631,7 @@ TEST_F(HierarchicalAllocatorTest, ReviveOffers)
   allocator->recoverResources(
       framework.id(),
       agent.id(),
-      agent.resources(),
+      allocatedResources(agent.resources(), "role1"),
       filter1000s);
 
   // Advance the clock to trigger a batch allocation.
@@ -3664,16 +3696,21 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
   // Initially, all the resources are allocated to `framework1`.
   Allocation expected = Allocation(
       framework1.id(),
-      {{slave.id(), slave.resources()}});
+      {{"role1", {{slave.id(), slave.resources()}}}});
 
   Future<Allocation> allocation = allocations.get();
   AWAIT_EXPECT_EQ(expected, allocation);
 
+  Resource::AllocationInfo allocationInfo;
+  allocationInfo.set_role("role1");
+
   // Create a shared volume.
   Resource volume = createDiskResource(
       "5", "role1", "id1", None(), None(), true);
   Offer::Operation create = CREATE(volume);
 
+  protobuf::adjustOfferOperation(&create, allocationInfo);
+
   // Launch a task using the shared volume.
   TaskInfo task = createTask(
       slave.id(),
@@ -3681,9 +3718,11 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
       "echo abc > path1/file");
   Offer::Operation launch = LAUNCH({task});
 
+  protobuf::adjustOfferOperation(&launch, allocationInfo);
+
   // Ensure the CREATE operation can be applied.
   Try<Resources> updated =
-    allocation->resources.at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(create);
 
   ASSERT_SOME(updated);
 
@@ -3692,7 +3731,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
   allocator->updateAllocation(
       framework1.id(),
       slave.id(),
-      allocation->resources.at(slave.id()),
+      allocation->resources.at("role1").at(slave.id()),
       {create, launch});
 
   // Now recover the resources, and expect the next allocation to contain
@@ -3701,7 +3740,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
   allocator->recoverResources(
       framework1.id(),
       slave.id(),
-      updated.get() - task.resources(),
+      updated.get() - allocatedResources(task.resources(), "role1"),
       None());
 
   // The offer to 'framework2` should contain the shared volume.
@@ -3709,7 +3748,10 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
 
   expected = Allocation(
       framework2.id(),
-      {{slave.id(), updated.get() - task.resources() + volume}});
+      {{"role1", {{slave.id(),
+          updated.get() -
+          launch.launch().task_infos(0).resources() +
+          create.create().volumes()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());
 }
@@ -3766,7 +3808,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
 
   auto offerCallback = [&offerCallbacks](
       const FrameworkID& frameworkId,
-      const hashmap<SlaveID, Resources>& resources) {
+      const hashmap<string, hashmap<SlaveID, Resources>>& resources) {
     offerCallbacks++;
   };
 
@@ -3789,9 +3831,11 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
 
   // Each agent has a portion of its resources allocated to a single
   // framework. We round-robin through the frameworks when allocating.
-  const Resources allocation = Resources::parse(
-      "cpus:1;mem:128;disk:1024;"
-      "ports:[31126-31510,31512-31623,31810-31852,31854-31964]").get();
+  const Resources allocation = allocatedResources(
+      Resources::parse(
+          "cpus:1;mem:128;disk:1024;"
+          "ports:[31126-31510,31512-31623,31810-31852,31854-31964]").get(),
+      "*");
 
   watch.start();
 
@@ -3853,7 +3897,8 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers)
   // Pause the clock because we want to manually drive the allocations.
   Clock::pause();
 
-  struct OfferedResources {
+  struct OfferedResources
+  {
     FrameworkID   frameworkId;
     SlaveID       slaveId;
     Resources     resources;
@@ -3863,11 +3908,14 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers)
 
   auto offerCallback = [&offers](
       const FrameworkID& frameworkId,
-      const hashmap<SlaveID, Resources>& resources_)
+      const hashmap<string, hashmap<SlaveID, Resources>>& resources_)
   {
-    foreach (auto resources, resources_) {
-      offers.push_back(
-          OfferedResources{frameworkId, resources.first, resources.second});
+    foreachkey (const string& role, resources_) {
+      foreachpair (const SlaveID& slaveId,
+                   const Resources& resources,
+                   resources_.at(role)) {
+        offers.push_back(OfferedResources{frameworkId, slaveId, resources});
+      }
     }
   };
 
@@ -3911,6 +3959,8 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers)
 
   allocation += createPorts(ranges.get());
 
+  allocation.allocate("*");
+
   watch.start();
 
   for (size_t i = 0; i < slaveCount; i++) {
@@ -3937,7 +3987,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers)
   // Loop enough times for all the frameworks to get offered all the resources.
   for (size_t i = 0; i < frameworkCount * 2; i++) {
     // Permanently decline any offered resources.
-    foreach (auto offer, offers) {
+    foreach (const OfferedResources& offer, offers) {
       Filters filters;
 
       filters.set_refuse_seconds(INT_MAX);
@@ -3996,7 +4046,8 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels)
   // Pause the clock because we want to manually drive the allocations.
   Clock::pause();
 
-  struct OfferedResources {
+  struct OfferedResources
+  {
     FrameworkID   frameworkId;
     SlaveID       slaveId;
     Resources     resources;
@@ -4006,11 +4057,14 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels)
 
   auto offerCallback = [&offers](
       const FrameworkID& frameworkId,
-      const hashmap<SlaveID, Resources>& resources_)
+      const hashmap<string, hashmap<SlaveID, Resources>>& resources_)
   {
-    foreach (auto resources, resources_) {
-      offers.push_back(
-          OfferedResources{frameworkId, resources.first, resources.second});
+    foreachkey (const string& role, resources_) {
+      foreachpair (const SlaveID& slaveId,
+                   const Resources& resources,
+                   resources_.at(role)) {
+        offers.push_back(OfferedResources{frameworkId, slaveId, resources});
+      }
     }
   };
 
@@ -4059,6 +4113,8 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels)
 
   allocation += createPorts(ranges.get());
 
+  allocation.allocate("role1");
+
   watch.start();
 
   for (size_t i = 0; i < slaveCount; i++) {
@@ -4087,9 +4143,12 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels)
     Resources reserved1 =
       createReservedResource("cpus", "8", "role1",
                              createReservationInfo("principal1", labels1));
+    reserved1.allocate("role1");
+
     Resources reserved2 =
       createReservedResource("cpus", "8", "role1",
                              createReservationInfo("principal1", labels2));
+    reserved2.allocate("role1");
 
     Resources _allocation = allocation + reserved1 + reserved2;
 
@@ -4114,7 +4173,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels)
   // Loop enough times for all the frameworks to get offered all the resources.
   for (size_t i = 0; i < frameworkCount * 2; i++) {
     // Permanently decline any offered resources.
-    foreach (auto offer, offers) {
+    foreach (const OfferedResources& offer, offers) {
       Filters filters;
 
       filters.set_refuse_seconds(INT_MAX);
@@ -4167,10 +4226,14 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers)
 
   auto offerCallback = [&offers](
       const FrameworkID& frameworkId,
-      const hashmap<SlaveID, Resources>& resources)
+      const hashmap<string, hashmap<SlaveID, Resources>>& resources_)
   {
-    foreachpair (const SlaveID& slaveId, const Resources& r, resources) {
-      offers.push_back(OfferedResources{frameworkId, slaveId, r});
+    foreachkey (const string& role, resources_) {
+      foreachpair (const SlaveID& slaveId,
+                   const Resources& resources,
+                   resources_.at(role)) {
+        offers.push_back(OfferedResources{frameworkId, slaveId, resources});
+      }
     }
   };
 
@@ -4214,6 +4277,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers)
   ASSERT_EQ(16, ranges->range_size());
 
   allocation += createPorts(ranges.get());
+  allocation.allocate("*");
 
   watch.start();
 


[4/9] mesos git commit: Updated master to handle non-MULTI_ROLE agents.

Posted by bm...@apache.org.
Updated master to handle non-MULTI_ROLE agents.

With the addition of MULTI_ROLE framweork support, allocated
resources have a `Resource.AllocationInfo` set. While the new
MULTI_ROLE capable agents will be sending allocated resources,
the old agents may not be since they may not preserve the
unknown fields.

To cope with this, the master ensures the `Resource.AllocationInfo`
is set for non-MULTI_ROLE capable agents, by injecting it. Note
that we can only do this so long as it is unambiguous! This will
be prevented by not allowing frameworks with multiple to use
agents without the MULTI_ROLE support, see: MESOS-6940.

Review: https://reviews.apache.org/r/55972


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f4dab1e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f4dab1e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f4dab1e

Branch: refs/heads/master
Commit: 9f4dab1e358493609d0a54deb2fe85fc7cd3ce8b
Parents: 7497541
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Jan 25 16:28:52 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Feb 3 18:47:11 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 82 ++++++++++++++++++++++++++++++++++++++++++----
 src/master/master.hpp | 12 +++++++
 2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f4dab1e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ef075e4..a3038b6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5575,8 +5575,8 @@ void Master::_reregisterSlave(
     const SlaveInfo& slaveInfo,
     const UPID& pid,
     const vector<Resource>& checkpointedResources,
-    const vector<ExecutorInfo>& executorInfos,
-    const vector<Task>& tasks,
+    const vector<ExecutorInfo>& executorInfos_,
+    const vector<Task>& tasks_,
     const vector<FrameworkInfo>& frameworks,
     const vector<Archive::Framework>& completedFrameworks,
     const string& version,
@@ -5602,6 +5602,64 @@ void Master::_reregisterSlave(
   // we've recovered it from the registry.
   slaves.recovered.erase(slaveInfo.id());
 
+  // For agents without the MULTI_ROLE capability,
+  // we need to inject the allocation role inside
+  // the task and executor resources;
+  auto injectAllocationInfo = [](
+      RepeatedPtrField<Resource>* resources,
+      const FrameworkInfo& frameworkInfo)
+  {
+    set<string> roles = protobuf::framework::getRoles(frameworkInfo);
+
+    foreach (Resource& resource, *resources) {
+      if (!resource.has_allocation_info()) {
+        if (roles.size() != 1) {
+          LOG(FATAL) << "Missing 'Resource.AllocationInfo' for resources"
+                     << " allocated to MULTI_ROLE framework"
+                     << " '" << frameworkInfo.name() << "'";
+        }
+
+        resource.mutable_allocation_info()->set_role(*roles.begin());
+      }
+    }
+  };
+
+  protobuf::slave::Capabilities slaveCapabilities(agentCapabilities);
+  vector<Task> adjustedTasks;
+  vector<ExecutorInfo> adjustedExecutorInfos;
+
+  if (!slaveCapabilities.multiRole) {
+    hashmap<FrameworkID, FrameworkInfo> frameworks_;
+    foreach (const FrameworkInfo& framework, frameworks) {
+      frameworks_[framework.id()] = framework;
+    }
+
+    adjustedTasks = tasks_;
+    adjustedExecutorInfos = executorInfos_;
+
+    foreach (Task& task, adjustedTasks) {
+      CHECK(frameworks_.contains(task.framework_id()));
+
+      injectAllocationInfo(
+          task.mutable_resources(),
+          frameworks_.at(task.framework_id()));
+    }
+
+    foreach (ExecutorInfo& executor, adjustedExecutorInfos) {
+      CHECK(frameworks_.contains(executor.framework_id()));
+
+      injectAllocationInfo(
+          executor.mutable_resources(),
+          frameworks_.at(executor.framework_id()));
+    }
+  }
+
+  const vector<Task>& tasks =
+    slaveCapabilities.multiRole ? tasks_ : adjustedTasks;
+
+  const vector<ExecutorInfo>& executorInfos =
+    slaveCapabilities.multiRole ? executorInfos_ : adjustedExecutorInfos;
+
   MachineID machineId;
   machineId.set_hostname(slaveInfo.hostname());
   machineId.set_ip(stringify(pid.address.ip));
@@ -5663,7 +5721,7 @@ void Master::_reregisterSlave(
   // (b) If the master has failed over, all tasks are re-added to the
   // master. The master shouldn't have any record of the tasks running
   // on the agent, so no further cleanup is required.
-  vector<Task> tasks_;
+  vector<Task> recoveredTasks;
   foreach (const Task& task, tasks) {
     const FrameworkID& frameworkId = task.framework_id();
     Framework* framework = getFramework(frameworkId);
@@ -5676,7 +5734,7 @@ void Master::_reregisterSlave(
 
     // Always re-add partition-aware tasks.
     if (partitionAwareFrameworks.contains(frameworkId)) {
-      tasks_.push_back(task);
+      recoveredTasks.push_back(task);
 
       if (framework != nullptr) {
         framework->unreachableTasks.erase(task.task_id());
@@ -5684,7 +5742,7 @@ void Master::_reregisterSlave(
     } else if (!slaveWasRemoved) {
       // Only re-add non-partition-aware tasks if the master has
       // failed over since the agent was marked unreachable.
-      tasks_.push_back(task);
+      recoveredTasks.push_back(task);
     }
   }
 
@@ -5698,7 +5756,7 @@ void Master::_reregisterSlave(
       Clock::now(),
       checkpointedResources,
       executorInfos,
-      tasks_);
+      recoveredTasks);
 
   slave->reregisteredTime = Clock::now();
 
@@ -8916,6 +8974,12 @@ void Slave::addTask(Task* task)
   CHECK(!tasks[frameworkId].contains(taskId))
     << "Duplicate task " << taskId << " of framework " << frameworkId;
 
+  // Verify that Resource.AllocationInfo is set,
+  // this should be guaranteed by the master.
+  foreach (const Resource& resource, task->resources()) {
+    CHECK(resource.has_allocation_info());
+  }
+
   tasks[frameworkId][taskId] = task;
 
   if (!Master::isRemovable(task->state())) {
@@ -9023,6 +9087,12 @@ void Slave::addExecutor(const FrameworkID& frameworkId,
     << "Duplicate executor '" << executorInfo.executor_id()
     << "' of framework " << frameworkId;
 
+  // Verify that Resource.AllocationInfo is set,
+  // this should be guaranteed by the master.
+  foreach (const Resource& resource, executorInfo.resources()) {
+    CHECK(resource.has_allocation_info());
+  }
+
   executors[frameworkId][executorInfo.executor_id()] = executorInfo;
   usedResources[frameworkId] += executorInfo.resources();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f4dab1e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 511773e..616687f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2214,6 +2214,12 @@ struct Framework
       << "Duplicate task " << task->task_id()
       << " of framework " << task->framework_id();
 
+    // Verify that Resource.AllocationInfo is set,
+    // this should be guaranteed by the master.
+    foreach (const Resource& resource, task->resources()) {
+      CHECK(resource.has_allocation_info());
+    }
+
     tasks[task->task_id()] = task;
 
     if (!Master::isRemovable(task->state())) {
@@ -2355,6 +2361,12 @@ struct Framework
       << "Duplicate executor '" << executorInfo.executor_id()
       << "' on agent " << slaveId;
 
+    // Verify that Resource.AllocationInfo is set,
+    // this should be guaranteed by the master.
+    foreach (const Resource& resource, executorInfo.resources()) {
+      CHECK(resource.has_allocation_info());
+    }
+
     executors[slaveId][executorInfo.executor_id()] = executorInfo;
     totalUsedResources += executorInfo.resources();
     usedResources[slaveId] += executorInfo.resources();


[8/9] mesos git commit: Update the tests to handle MULTI_ROLE support.

Posted by bm...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/c20744a9/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 9cf1d46..946a7bc 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -68,6 +68,7 @@
 #include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 using namespace mesos::internal::slave;
@@ -1849,13 +1850,15 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
   Offer offer = offers.get()[0];
 
   Offer offer1 = offer;
-  offer1.mutable_resources()->CopyFrom(
-      Resources::parse("cpus:1;mem:512").get());
+  Resources resources1 = allocatedResources(
+      Resources::parse("cpus:1;mem:512").get(), frameworkInfo.role());
+  offer1.mutable_resources()->CopyFrom(resources1);
   tasks.push_back(createTask(offer1, "sleep 1000")); // Long-running task.
 
   Offer offer2 = offer;
-  offer2.mutable_resources()->CopyFrom(
-      Resources::parse("cpus:1;mem:512").get());
+  Resources resources2 = allocatedResources(
+      Resources::parse("cpus:1;mem:512").get(), frameworkInfo.role());
+  offer2.mutable_resources()->CopyFrom(resources2);
   tasks.push_back(createTask(offer2, "sleep 1000")); // Long-running task,
 
   ASSERT_TRUE(Resources(offer.resources()).contains(