You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/05/14 01:02:56 UTC

mesos git commit: Updated sorter to take resources by SlaveID.

Repository: mesos
Updated Branches:
  refs/heads/master 8e6df2a28 -> 2c6d487a8


Updated sorter to take resources by SlaveID.

Review: https://reviews.apache.org/r/31667


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c6d487a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c6d487a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c6d487a

Branch: refs/heads/master
Commit: 2c6d487a87cc192bc5a4b6355e76ee61c47ff4e9
Parents: 8e6df2a
Author: Michael Park <mc...@gmail.com>
Authored: Wed May 13 11:53:29 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Wed May 13 15:52:29 2015 -0700

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.hpp |  53 ++++----
 src/master/allocator/sorter/drf/sorter.cpp  |  54 +++++---
 src/master/allocator/sorter/drf/sorter.hpp  |  13 +-
 src/master/allocator/sorter/sorter.hpp      |  16 +--
 src/tests/sorter_tests.cpp                  | 157 ++++++++++++++++++-----
 5 files changed, 207 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 09adced..c22f044 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -296,7 +296,7 @@ void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
     const FrameworkID& frameworkId,
     const FrameworkInfo& frameworkInfo,
-    const hashmap<SlaveID, Resources>& used_)
+    const hashmap<SlaveID, Resources>& used)
 {
   CHECK(initialized);
 
@@ -311,13 +311,11 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
   // framework's role.
 
   // Update the allocation to this framework.
-  // TODO(mpark): Once the sorter API is updated to operate on
-  // 'hashmap<SlaveID, Resources>' rather than 'Resources', update
-  // the sorters for each slave instead.
-  Resources used = Resources::sum(used_);
-  roleSorter->allocated(role, used.unreserved());
-  frameworkSorters[role]->add(used);
-  frameworkSorters[role]->allocated(frameworkId.value(), used);
+  foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
+    roleSorter->allocated(role, slaveId, allocated.unreserved());
+    frameworkSorters[role]->add(slaveId, allocated);
+    frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated);
+  }
 
   frameworks[frameworkId] = Framework();
   frameworks[frameworkId].role = frameworkInfo.role();
@@ -342,11 +340,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
   // Might not be in 'frameworkSorters[role]' because it was previously
   // deactivated and never re-added.
   if (frameworkSorters[role]->contains(frameworkId.value())) {
-    Resources allocation =
+    hashmap<SlaveID, Resources> allocation =
       frameworkSorters[role]->allocation(frameworkId.value());
 
-    roleSorter->unallocated(role, allocation.unreserved());
-    frameworkSorters[role]->remove(allocation);
+    foreachpair (
+        const SlaveID& slaveId, const Resources& allocated, allocation) {
+      roleSorter->unallocated(role, slaveId, allocated.unreserved());
+      frameworkSorters[role]->remove(slaveId, allocated);
+    }
+
     frameworkSorters[role]->remove(frameworkId.value());
   }
 
@@ -417,7 +419,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
   CHECK(initialized);
   CHECK(!slaves.contains(slaveId));
 
-  roleSorter->add(total.unreserved());
+  roleSorter->add(slaveId, total.unreserved());
 
   foreachpair (const FrameworkID& frameworkId,
                const Resources& allocated,
@@ -428,9 +430,10 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
       // TODO(bmahler): Validate that the reserved resources have the
       // framework's role.
 
-      roleSorter->allocated(role, allocated.unreserved());
-      frameworkSorters[role]->add(allocated);
-      frameworkSorters[role]->allocated(frameworkId.value(), allocated);
+      roleSorter->allocated(role, slaveId, allocated.unreserved());
+      frameworkSorters[role]->add(slaveId, allocated);
+      frameworkSorters[role]->allocated(
+          frameworkId.value(), slaveId, allocated);
     }
   }
 
@@ -463,7 +466,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave(
   // all the resources. Fixing this would require more information
   // than what we currently track in the allocator.
 
-  roleSorter->remove(slaves[slaveId].total.unreserved());
+  roleSorter->remove(slaveId, slaves[slaveId].total.unreserved());
 
   slaves.erase(slaveId);
 
@@ -560,7 +563,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
   FrameworkSorter* frameworkSorter =
     frameworkSorters[frameworks[frameworkId].role];
 
-  Resources allocation = frameworkSorter->allocation(frameworkId.value());
+  Resources allocation =
+    frameworkSorter->allocation(frameworkId.value())[slaveId];
 
   // Update the allocated resources.
   Try<Resources> updatedAllocation = allocation.apply(operations);
@@ -568,11 +572,13 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
 
   frameworkSorter->update(
       frameworkId.value(),
+      slaveId,
       allocation,
       updatedAllocation.get());
 
   roleSorter->update(
       frameworks[frameworkId].role,
+      slaveId,
       allocation.unreserved(),
       updatedAllocation.get().unreserved());
 
@@ -622,9 +628,10 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     CHECK(frameworkSorters.contains(role));
 
     if (frameworkSorters[role]->contains(frameworkId.value())) {
-      frameworkSorters[role]->unallocated(frameworkId.value(), resources);
-      frameworkSorters[role]->remove(resources);
-      roleSorter->unallocated(role, resources.unreserved());
+      frameworkSorters[role]->unallocated(
+          frameworkId.value(), slaveId, resources);
+      frameworkSorters[role]->remove(slaveId, resources);
+      roleSorter->unallocated(role, slaveId, resources.unreserved());
     }
   }
 
@@ -812,9 +819,9 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
         // Reserved resources are only accounted for in the framework
         // sorter, since the reserved resources are not shared across
         // roles.
-        frameworkSorters[role]->add(resources);
-        frameworkSorters[role]->allocated(frameworkId_, resources);
-        roleSorter->allocated(role, resources.unreserved());
+        frameworkSorters[role]->add(slaveId, resources);
+        frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
+        roleSorter->allocated(role, slaveId, resources.unreserved());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 2f69f38..f53a934 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -47,7 +47,7 @@ void DRFSorter::add(const string& name, double weight)
   Client client(name, 0, 0);
   clients.insert(client);
 
-  allocations[name] = Resources();
+  allocations[name] = hashmap<SlaveID, Resources>();
   weights[name] = weight;
 }
 
@@ -90,6 +90,7 @@ void DRFSorter::deactivate(const string& name)
 
 void DRFSorter::allocated(
     const string& name,
+    const SlaveID& slaveId,
     const Resources& resources)
 {
   set<Client, DRFComparator>::iterator it = find(name);
@@ -106,7 +107,7 @@ void DRFSorter::allocated(
     clients.insert(client);
   }
 
-  allocations[name] += resources;
+  allocations[name][slaveId] += resources;
 
   // If the total resources have changed, we're going to
   // recalculate all the shares, so don't bother just
@@ -119,6 +120,7 @@ void DRFSorter::allocated(
 
 void DRFSorter::update(
     const string& name,
+    const SlaveID& slaveId,
     const Resources& oldAllocation,
     const Resources& newAllocation)
 {
@@ -129,23 +131,26 @@ void DRFSorter::update(
   // Otherwise, we need to ensure we re-calculate the shares, as
   // is being currently done, for safety.
 
-  CHECK(resources.contains(oldAllocation));
+  CHECK(resources[slaveId].contains(oldAllocation));
 
-  resources -= oldAllocation;
-  resources += newAllocation;
+  resources[slaveId] -= oldAllocation;
+  resources[slaveId] += newAllocation;
 
-  CHECK(allocations[name].contains(oldAllocation));
+  CHECK(allocations[name][slaveId].contains(oldAllocation));
 
-  allocations[name] -= oldAllocation;
-  allocations[name] += newAllocation;
+  allocations[name][slaveId] -= oldAllocation;
+  if (allocations[name][slaveId].empty()) {
+    allocations[name].erase(slaveId);
+  }
+
+  allocations[name][slaveId] += newAllocation;
 
   // Just assume the total has changed, per the TODO above.
   dirty = true;
 }
 
 
-Resources DRFSorter::allocation(
-    const string& name)
+hashmap<SlaveID, Resources> DRFSorter::allocation(const string& name)
 {
   return allocations[name];
 }
@@ -153,9 +158,13 @@ Resources DRFSorter::allocation(
 
 void DRFSorter::unallocated(
     const string& name,
+    const SlaveID& slaveId,
     const Resources& resources)
 {
-  allocations[name] -= resources;
+  allocations[name][slaveId] -= resources;
+  if (allocations[name][slaveId].empty()) {
+    allocations[name].erase(slaveId);
+  }
 
   if (!dirty) {
     update(name);
@@ -163,9 +172,9 @@ void DRFSorter::unallocated(
 }
 
 
-void DRFSorter::add(const Resources& _resources)
+void DRFSorter::add(const SlaveID& slaveId, const Resources& _resources)
 {
-  resources += _resources;
+  resources[slaveId] += _resources;
 
   // We have to recalculate all shares when the total resources
   // change, but we put it off until sort is called
@@ -175,9 +184,13 @@ void DRFSorter::add(const Resources& _resources)
 }
 
 
-void DRFSorter::remove(const Resources& _resources)
+void DRFSorter::remove(const SlaveID& slaveId, const Resources& _resources)
 {
-  resources -= _resources;
+  resources[slaveId] -= _resources;
+  if (resources[slaveId].empty()) {
+    resources.erase(slaveId);
+  }
+
   dirty = true;
 }
 
@@ -248,22 +261,27 @@ double DRFSorter::calculateShare(const string& name)
   // currently does not take into account resources that are not
   // scalars.
 
+  // NOTE: Summation is incorrect for non-scalars, but since we
+  // only care about scalar resources, this is safe.
+  Resources totalResources = Resources::sum(resources);
+  Resources clientAllocation = Resources::sum(allocations[name]);
+
   // Scalar resources may be spread across multiple 'Resource'
   // objects. E.g. persistent volumes. So we first collect the names
   // of the scalar resources, before computing the totals.
   hashset<string> scalars;
-  foreach (const Resource& resource, resources) {
+  foreach (const Resource& resource, totalResources) {
     if (resource.type() == Value::SCALAR) {
       scalars.insert(resource.name());
     }
   }
 
   foreach (const string& scalar, scalars) {
-    Option<Value::Scalar> total = resources.get<Value::Scalar>(scalar);
+    Option<Value::Scalar> total = totalResources.get<Value::Scalar>(scalar);
 
     if (total.isSome() && total.get().value() > 0) {
       Option<Value::Scalar> allocation =
-        allocations[name].get<Value::Scalar>(scalar);
+        clientAllocation.get<Value::Scalar>(scalar);
 
       if (allocation.isNone()) {
         allocation = Value::Scalar();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 4366710..fd00c8c 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -74,20 +74,23 @@ public:
   virtual void deactivate(const std::string& name);
 
   virtual void allocated(const std::string& name,
+                         const SlaveID& slaveId,
                          const Resources& resources);
 
   virtual void update(const std::string& name,
+                      const SlaveID& slaveId,
                       const Resources& oldAllocation,
                       const Resources& newAllocation);
 
   virtual void unallocated(const std::string& name,
+                           const SlaveID& slaveId,
                            const Resources& resources);
 
-  virtual Resources allocation(const std::string& name);
+  virtual hashmap<SlaveID, Resources> allocation(const std::string& name);
 
-  virtual void add(const Resources& resources);
+  virtual void add(const SlaveID& slaveId, const Resources& resources);
 
-  virtual void remove(const Resources& resources);
+  virtual void remove(const SlaveID& slaveId, const Resources& resources);
 
   virtual std::list<std::string> sort();
 
@@ -114,13 +117,13 @@ private:
   std::set<Client, DRFComparator> clients;
 
   // Maps client names to the resources they have been allocated.
-  hashmap<std::string, Resources> allocations;
+  hashmap<std::string, hashmap<SlaveID, Resources>> allocations;
 
   // Maps client names to the weights that should be applied to their shares.
   hashmap<std::string, double> weights;
 
   // Total resources.
-  Resources resources;
+  hashmap<SlaveID, Resources> resources;
 };
 
 } // namespace allocator {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index e2efb27..f06b946 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -23,6 +23,7 @@
 #include <string>
 
 #include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
 
 namespace mesos {
 namespace internal {
@@ -33,12 +34,6 @@ namespace allocator {
 // order in which users or frameworks should receive
 // resource allocations.
 //
-// TODO(bmahler): The total and allocated resources are currently
-// aggregated across slaves, which only works for scalar resources.
-// Also, persistent disks are a bit tricky because there will be
-// duplicated persistence IDs within the resources. Consider storing
-// maps keyed off of the slave ID to fix these issues.
-//
 // TODO(bmahler): Templatize this on Client, so that callers can
 // don't need to do string conversion, e.g. FrameworkID, string role,
 // etc.
@@ -62,6 +57,7 @@ public:
 
   // Specify that resources have been allocated to the given client.
   virtual void allocated(const std::string& client,
+                         const SlaveID& slaveId,
                          const Resources& resources) = 0;
 
   // Updates a portion of the allocation for the client, in order to
@@ -69,22 +65,24 @@ public:
   // This means that the new allocation must not affect the static
   // roles, or the overall quantities of resources!
   virtual void update(const std::string& client,
+                      const SlaveID& slaveId,
                       const Resources& oldAllocation,
                       const Resources& newAllocation) = 0;
 
   // Specify that resources have been unallocated from the given client.
   virtual void unallocated(const std::string& client,
+                           const SlaveID& slaveId,
                            const Resources& resources) = 0;
 
   // Returns the resources that have been allocated to this client.
-  virtual Resources allocation(const std::string& client) = 0;
+  virtual hashmap<SlaveID, Resources> allocation(const std::string& client) = 0;
 
   // Add resources to the total pool of resources this
   // Sorter should consider.
-  virtual void add(const Resources& resources) = 0;
+  virtual void add(const SlaveID& slaveId, const Resources& resources) = 0;
 
   // Remove resources from the total pool.
-  virtual void remove(const Resources& resources) = 0;
+  virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0;
 
   // Returns a list of all clients, in the order that they
   // should be allocated to, according to this Sorter's policy.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index 4244235..6b0389b 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -30,6 +30,8 @@
 
 #include "master/allocator/sorter/drf/sorter.hpp"
 
+#include "tests/mesos.hpp"
+
 using mesos::internal::master::allocator::DRFSorter;
 
 using std::list;
@@ -44,59 +46,62 @@ TEST(SorterTest, DRFSorter)
 {
   DRFSorter sorter;
 
+  SlaveID slaveId;
+  slaveId.set_value("slaveId");
+
   Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-  sorter.add(totalResources);
+  sorter.add(slaveId, totalResources);
 
   sorter.add("a");
   Resources aResources = Resources::parse("cpus:5;mem:5").get();
-  sorter.allocated("a", aResources);
+  sorter.allocated("a", slaveId, aResources);
 
   Resources bResources = Resources::parse("cpus:6;mem:6").get();
   sorter.add("b");
-  sorter.allocated("b", bResources);
+  sorter.allocated("b", slaveId, bResources);
 
   // shares: a = .05, b = .06
   EXPECT_EQ(list<string>({"a", "b"}), sorter.sort());
 
   Resources cResources = Resources::parse("cpus:1;mem:1").get();
   sorter.add("c");
-  sorter.allocated("c", cResources);
+  sorter.allocated("c", slaveId, cResources);
 
   Resources dResources = Resources::parse("cpus:3;mem:1").get();
   sorter.add("d");
-  sorter.allocated("d", dResources);
+  sorter.allocated("d", slaveId, dResources);
 
   // shares: a = .05, b = .06, c = .01, d = .03
   EXPECT_EQ(list<string>({"c", "d", "a", "b"}), sorter.sort());
 
   sorter.remove("a");
   Resources bUnallocated = Resources::parse("cpus:4;mem:4").get();
-  sorter.unallocated("b", bUnallocated);
+  sorter.unallocated("b", slaveId, bUnallocated);
 
   // shares: b = .02, c = .01, d = .03
   EXPECT_EQ(list<string>({"c", "b", "d"}), sorter.sort());
 
   Resources eResources = Resources::parse("cpus:1;mem:5").get();
   sorter.add("e");
-  sorter.allocated("e", eResources);
+  sorter.allocated("e", slaveId, eResources);
 
   Resources removedResources = Resources::parse("cpus:50;mem:0").get();
-  sorter.remove(removedResources);
+  sorter.remove(slaveId, removedResources);
   // total resources is now cpus = 50, mem = 100
 
   // shares: b = .04, c = .02, d = .06, e = .05
   EXPECT_EQ(list<string>({"c", "b", "e", "d"}), sorter.sort());
 
   Resources addedResources = Resources::parse("cpus:0;mem:100").get();
-  sorter.add(addedResources);
+  sorter.add(slaveId, addedResources);
   // total resources is now cpus = 50, mem = 200
 
   Resources fResources = Resources::parse("cpus:5;mem:1").get();
   sorter.add("f");
-  sorter.allocated("f", fResources);
+  sorter.allocated("f", slaveId, fResources);
 
   Resources cResources2 = Resources::parse("cpus:0;mem:15").get();
-  sorter.allocated("c", cResources2);
+  sorter.allocated("c", slaveId, cResources2);
 
   // shares: b = .04, c = .08, d = .06, e = .025, f = .1
   EXPECT_EQ(list<string>({"e", "b", "d", "c", "f"}), sorter.sort());
@@ -125,26 +130,29 @@ TEST(SorterTest, WDRFSorter)
 {
   DRFSorter sorter;
 
-  sorter.add(Resources::parse("cpus:100;mem:100").get());
+  SlaveID slaveId;
+  slaveId.set_value("slaveId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
 
   sorter.add("a");
 
-  sorter.allocated("a", Resources::parse("cpus:5;mem:5").get());
+  sorter.allocated("a", slaveId, Resources::parse("cpus:5;mem:5").get());
 
   sorter.add("b", 2);
-  sorter.allocated("b", Resources::parse("cpus:6;mem:6").get());
+  sorter.allocated("b", slaveId, Resources::parse("cpus:6;mem:6").get());
 
   // shares: a = .05, b = .03
   EXPECT_EQ(list<string>({"b", "a"}), sorter.sort());
 
   sorter.add("c");
-  sorter.allocated("c", Resources::parse("cpus:4;mem:4").get());
+  sorter.allocated("c", slaveId, Resources::parse("cpus:4;mem:4").get());
 
   // shares: a = .05, b = .03, c = .04
   EXPECT_EQ(list<string>({"b", "c", "a"}), sorter.sort());
 
   sorter.add("d", 10);
-  sorter.allocated("d", Resources::parse("cpus:10;mem:20").get());
+  sorter.allocated("d", slaveId, Resources::parse("cpus:10;mem:20").get());
 
   // shares: a = .05, b = .03, c = .04, d = .02
   EXPECT_EQ(list<string>({"d", "b", "c", "a"}), sorter.sort());
@@ -153,13 +161,13 @@ TEST(SorterTest, WDRFSorter)
 
   EXPECT_EQ(list<string>({"d", "c", "a"}), sorter.sort());
 
-  sorter.allocated("d", Resources::parse("cpus:10;mem:25").get());
+  sorter.allocated("d", slaveId, Resources::parse("cpus:10;mem:25").get());
 
   // shares: a = .05, c = .04, d = .045
   EXPECT_EQ(list<string>({"c", "d", "a"}), sorter.sort());
 
   sorter.add("e", .1);
-  sorter.allocated("e", Resources::parse("cpus:1;mem:1").get());
+  sorter.allocated("e", slaveId, Resources::parse("cpus:1;mem:1").get());
 
   // shares: a = .05, c = .04, d = .045, e = .1
   EXPECT_EQ(list<string>({"c", "d", "a", "e"}), sorter.sort());
@@ -177,6 +185,9 @@ TEST(SorterTest, SplitResourceShares)
 {
   DRFSorter sorter;
 
+  SlaveID slaveId;
+  slaveId.set_value("slaveId");
+
   sorter.add("a");
   sorter.add("b");
 
@@ -188,13 +199,16 @@ TEST(SorterTest, SplitResourceShares)
   disk2.mutable_disk()->mutable_persistence()->set_id("ID2");
   disk2.mutable_disk()->mutable_volume()->set_container_path("data");
 
-  sorter.add(Resources::parse("cpus:100;mem:100;disk:95").get()
-             + disk1 + disk2);
+  sorter.add(
+      slaveId,
+      Resources::parse("cpus:100;mem:100;disk:95").get() + disk1 + disk2);
 
   // Now, allocate resources to "a" and "b". Note that "b" will have
   // more disk if the shares are accounted correctly!
-  sorter.allocated("a", Resources::parse("cpus:9;mem:9;disk:9").get());
-  sorter.allocated("b", Resources::parse("cpus:9;mem:9").get() + disk1 + disk2);
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:9;mem:9;disk:9").get());
+  sorter.allocated(
+      "b", slaveId, Resources::parse("cpus:9;mem:9").get() + disk1 + disk2);
 
   EXPECT_EQ(list<string>({"a", "b"}), sorter.sort());
 }
@@ -204,31 +218,112 @@ TEST(SorterTest, Update)
 {
   DRFSorter sorter;
 
+  SlaveID slaveId;
+  slaveId.set_value("slaveId");
+
   sorter.add("a");
   sorter.add("b");
 
-  sorter.add(Resources::parse("cpus:10;mem:10;disk:10").get());
+  sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
 
-  sorter.allocated("a", Resources::parse("cpus:10;mem:10;disk:10").get());
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
 
   // Construct an offer operation.
   Resource volume = Resources::parse("disk", "5", "*").get();
   volume.mutable_disk()->mutable_persistence()->set_id("ID");
   volume.mutable_disk()->mutable_volume()->set_container_path("data");
 
-  Offer::Operation create;
-  create.set_type(Offer::Operation::CREATE);
-  create.mutable_create()->add_volumes()->CopyFrom(volume);
+  // Compute the updated allocation.
+  Resources oldAllocation = sorter.allocation("a")[slaveId];
+  Try<Resources> newAllocation = oldAllocation.apply(CREATE(volume));
+  ASSERT_SOME(newAllocation);
+
+  // Update the resources for the client.
+  sorter.update("a", slaveId, oldAllocation, newAllocation.get());
+
+  hashmap<SlaveID, Resources> allocation = sorter.allocation("a");
+  EXPECT_EQ(1u, allocation.size());
+  EXPECT_EQ(newAllocation.get(), allocation[slaveId]);
+}
+
+
+// We aggregate resources from multiple slaves into the sorter.
+// Since non-scalar resources don't aggregate well across slaves,
+// we need to keep track of the SlaveIDs of the resources. This
+// tests that no resources vanish in the process of aggregation
+// by inspecting the result of 'allocation'.
+TEST(SorterTest, MultipleSlaves)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveA;
+  slaveA.set_value("slaveA");
+
+  SlaveID slaveB;
+  slaveB.set_value("slaveB");
+
+  sorter.add("framework");
+
+  Resources slaveResources =
+    Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
+
+  sorter.add(slaveA, slaveResources);
+  sorter.add(slaveB, slaveResources);
+
+  sorter.allocated("framework", slaveA, slaveResources);
+  sorter.allocated("framework", slaveB, slaveResources);
+
+  hashmap<SlaveID, Resources> allocation = sorter.allocation("framework");
+  EXPECT_EQ(2u, allocation.size());
+  EXPECT_EQ(slaveResources, allocation[slaveA]);
+  EXPECT_EQ(slaveResources, allocation[slaveB]);
+}
+
+
+// We aggregate resources from multiple slaves into the sorter.
+// Since non-scalar resources don't aggregate well across slaves,
+// we need to keep track of the SlaveIDs of the resources.
+// This tests that no resources vanish in the process of aggregation
+// by performing a updates from unreserved to reserved resources.
+TEST(SorterTest, MultipleSlaveUpdates)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveA;
+  slaveA.set_value("slaveA");
+
+  SlaveID slaveB;
+  slaveB.set_value("slaveB");
+
+  sorter.add("framework");
+
+  Resources slaveResources =
+    Resources::parse("cpus:2;mem:512;disk:10;ports:[31000-32000]").get();
+
+  sorter.add(slaveA, slaveResources);
+  sorter.add(slaveB, slaveResources);
+
+  sorter.allocated("framework", slaveA, slaveResources);
+  sorter.allocated("framework", slaveB, slaveResources);
+
+  // Construct an offer operation.
+  Resource volume = Resources::parse("disk", "5", "*").get();
+  volume.mutable_disk()->mutable_persistence()->set_id("ID");
+  volume.mutable_disk()->mutable_volume()->set_container_path("data");
 
   // Compute the updated allocation.
-  Resources allocation = sorter.allocation("a");
-  Try<Resources> newAllocation = allocation.apply(create);
+  Try<Resources> newAllocation = slaveResources.apply(CREATE(volume));
   ASSERT_SOME(newAllocation);
 
   // Update the resources for the client.
-  sorter.update("a", allocation, newAllocation.get());
+  sorter.update("framework", slaveA, slaveResources, newAllocation.get());
+  sorter.update("framework", slaveB, slaveResources, newAllocation.get());
 
-  EXPECT_EQ(newAllocation.get(), sorter.allocation("a"));
+  hashmap<SlaveID, Resources> allocation = sorter.allocation("framework");
+  EXPECT_EQ(2u, allocation.size());
+  EXPECT_EQ(newAllocation.get(), allocation[slaveA]);
+  EXPECT_EQ(newAllocation.get(), allocation[slaveB]);
 }
 
 } // namespace tests {