You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/10/31 00:36:57 UTC

[mesos] 01/02: Fixed allocator performance issue in updateAllocation().

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1d850597a3ecd671ab93ee8dba1be7f428247d0b
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Oct 30 19:53:36 2019 -0400

    Fixed allocator performance issue in updateAllocation().
    
    This patch addresses poor performance of
    `HierarchicalAllocatorProcess::updateAllocation()` for agents with
    a huge number of non-addable resources in a many-framework case
    (see MESOS-10015).
    
    Sorter methods for totals tracking that modify `Resources` of an agent
    in the Sorter are replaced with methods that add/remove resource
    quantities of an agent as a whole (which was actually the only use case
    of the old methods). Thus, subtracting/adding `Resources` of a whole
    agent no longer occurs when updating resources of an agent in a Sorter.
    
    Further, this patch completely removes agent resource tracking logic
    from the random sorter (which by itself makes no use of them) by
    implementing cluster totals tracking in the allocator.
    
    Results of `*BENCHMARK_WithReservationParam.UpdateAllocation*`
    (for the DRF sorter):
    
    1.7.x branch:
    Agent resources size: 200 (50 frameworks)
    Made 20 reserve and unreserve operations in 2.014081646secs
    Agent resources size: 400 (100 frameworks)
    Made 20 reserve and unreserve operations in 13.623513239secs
    Agent resources size: 800 (200 frameworks)
    Made 20 reserve and unreserve operations in 2.14100063438333mins
    Agent resources size: 1600 (400 frameworks)
    (killed after several minutes)
    
    1.7.x branch + this patch:
    Agent resources size: 200 (50 frameworks)
    Made 20 reserve and unreserve operations in 236.706615ms
    Agent resources size: 400 (100 frameworks)
    Made 20 reserve and unreserve operations in 483.544585ms
    Agent resources size: 800 (200 frameworks)
    Made 20 reserve and unreserve operations in 1.095224322secs
    ...
    Agent resources size: 6400 (1600 frameworks)
    Made 20 reserve and unreserve operations in 50.369691741secs
    
    This is a backport of https://reviews.apache.org/r/71646
    
    Review: https://reviews.apache.org/r/71698/
---
 src/master/allocator/mesos/hierarchical.cpp   |  69 +++++--
 src/master/allocator/mesos/hierarchical.hpp   |   3 +
 src/master/allocator/sorter/drf/sorter.cpp    |  75 ++-----
 src/master/allocator/sorter/drf/sorter.hpp    |  41 +---
 src/master/allocator/sorter/random/sorter.cpp |  58 ------
 src/master/allocator/sorter/random/sorter.hpp |  42 +---
 src/master/allocator/sorter/sorter.hpp        |  22 +-
 src/tests/sorter_tests.cpp                    | 278 +++++++++++---------------
 8 files changed, 220 insertions(+), 368 deletions(-)

diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 3e8a8ce..116d3c4 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -537,14 +537,21 @@ void HierarchicalAllocatorProcess::addSlave(
 
   trackReservations(total.reservations());
 
-  roleSorter->add(slaveId, total);
+  const Resources strippedScalars = total.createStrippedScalarQuantity();
+  const ResourceQuantities agentScalarQuantities =
+    ResourceQuantities::fromScalarResources(strippedScalars);
+
+  totalStrippedScalars += strippedScalars;
+  roleSorter->addSlave(slaveId, agentScalarQuantities);
 
   foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) {
-    sorter->add(slaveId, total);
+    sorter->addSlave(slaveId, agentScalarQuantities);
   }
 
   // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-  quotaRoleSorter->add(slaveId, total.nonRevocable());
+  quotaRoleSorter->addSlave(
+      slaveId,
+      ResourceQuantities::fromScalarResources(total.nonRevocable().scalars()));
 
   foreachpair (const FrameworkID& frameworkId,
                const Resources& allocation,
@@ -608,18 +615,22 @@ void HierarchicalAllocatorProcess::removeSlave(
   // all the resources. Fixing this would require more information
   // than what we currently track in the allocator.
 
-  roleSorter->remove(slaveId, slaves.at(slaveId).getTotal());
+  roleSorter->removeSlave(slaveId);
 
   foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) {
-    sorter->remove(slaveId, slaves.at(slaveId).getTotal());
+    sorter->removeSlave(slaveId);
   }
 
-  // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-  quotaRoleSorter->remove(
-      slaveId, slaves.at(slaveId).getTotal().nonRevocable());
+  quotaRoleSorter->removeSlave(slaveId);
 
   untrackReservations(slaves.at(slaveId).getTotal().reservations());
 
+  const Resources strippedScalars =
+      slaves.at(slaveId).getTotal().createStrippedScalarQuantity();
+
+  CHECK(totalStrippedScalars.contains(strippedScalars));
+  totalStrippedScalars -= strippedScalars;
+
   slaves.erase(slaveId);
   allocationCandidates.erase(slaveId);
 
@@ -1727,7 +1738,11 @@ void HierarchicalAllocatorProcess::__allocate()
   //                        allocated resources -
   //                        unallocated reservations -
   //                        unallocated revocable resources
-  Resources availableHeadroom = roleSorter->totalScalarQuantities();
+  Resources availableHeadroom = totalStrippedScalars;
+
+  // NOTE: The role sorter does not return aggregated allocation
+  // information whereas `reservationScalarQuantities` does, so
+  // we need to loop over only top level roles for the latter.
 
   // Subtract allocated resources from the total.
   foreachkey (const string& role, roles) {
@@ -2473,8 +2488,7 @@ double HierarchicalAllocatorProcess::_resources_total(
     const string& resource)
 {
   Option<Value::Scalar> total =
-    roleSorter->totalScalarQuantities()
-      .get<Value::Scalar>(resource);
+    totalStrippedScalars.get<Value::Scalar>(resource);
 
   return total.isSome() ? total->value() : 0;
 }
@@ -2545,7 +2559,9 @@ void HierarchicalAllocatorProcess::trackFrameworkUnderRole(
     frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
 
     foreachvalue (const Slave& slave, slaves) {
-      frameworkSorters.at(role)->add(slave.info.id(), slave.getTotal());
+      frameworkSorters.at(role)->addSlave(
+        slave.info.id(),
+        ResourceQuantities::fromScalarResources(slave.getTotal().scalars()));
     }
 
     metrics.addRole(role);
@@ -2662,18 +2678,33 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal(
     trackReservations(newReservations);
   }
 
-  // Update the totals in the sorters.
-  roleSorter->remove(slaveId, oldTotal);
-  roleSorter->add(slaveId, total);
+  // Update the total in the allocator and totals in the sorters.
+  const Resources oldStrippedScalars = oldTotal.createStrippedScalarQuantity();
+  const Resources strippedScalars = total.createStrippedScalarQuantity();
+
+  const ResourceQuantities oldAgentScalarQuantities =
+    ResourceQuantities::fromScalarResources(oldStrippedScalars);
+
+  const ResourceQuantities agentScalarQuantities =
+    ResourceQuantities::fromScalarResources(strippedScalars);
+
+  CHECK(totalStrippedScalars.contains(oldStrippedScalars));
+  totalStrippedScalars -= oldStrippedScalars;
+  totalStrippedScalars += strippedScalars;
+
+  roleSorter->removeSlave(slaveId);
+  roleSorter->addSlave(slaveId, agentScalarQuantities);
 
   foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) {
-    sorter->remove(slaveId, oldTotal);
-    sorter->add(slaveId, total);
+    sorter->removeSlave(slaveId);
+    sorter->addSlave(slaveId, agentScalarQuantities);
   }
 
   // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-  quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable());
-  quotaRoleSorter->add(slaveId, total.nonRevocable());
+  quotaRoleSorter->removeSlave(slaveId);
+  quotaRoleSorter->addSlave(
+      slaveId,
+      ResourceQuantities::fromScalarResources(total.nonRevocable().scalars()));
 
   return true;
 }
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 1fce68f..97d4a71 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -523,6 +523,9 @@ protected:
 
   hashmap<SlaveID, Slave> slaves;
 
+  // Total stripped scalar resources on all agents.
+  Resources totalStrippedScalars;
+
   // A set of agents that are kept as allocation candidates. Events
   // may add or remove candidates to the set. When an allocation is
   // processed, the set of candidates is cleared.
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 43c9767..fb1baec 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -493,69 +493,36 @@ Resources DRFSorter::allocation(
 }
 
 
-const Resources& DRFSorter::totalScalarQuantities() const
+void DRFSorter::addSlave(
+    const SlaveID& slaveId,
+    const ResourceQuantities& scalarQuantities)
 {
-  return total_.scalarQuantities;
-}
+  bool inserted = total_.agentResourceQuantities.emplace(
+      slaveId, scalarQuantities).second;
 
+  CHECK(inserted) << "Attempted to add already added agent " << slaveId;
 
-void DRFSorter::add(const SlaveID& slaveId, const Resources& resources)
-{
-  if (!resources.empty()) {
-    // Add shared resources to the total quantities when the same
-    // resources don't already exist in the total.
-    const Resources newShared = resources.shared()
-      .filter([this, slaveId](const Resource& resource) {
-        return !total_.resources[slaveId].contains(resource);
-      });
-
-    total_.resources[slaveId] += resources;
-
-    const Resources scalarQuantities =
-      (resources.nonShared() + newShared).createStrippedScalarQuantity();
-
-    total_.scalarQuantities += scalarQuantities;
-    total_.totals += ResourceQuantities::fromScalarResources(scalarQuantities);
-
-    // We have to recalculate all shares when the total resources
-    // change, but we put it off until `sort` is called so that if
-    // something else changes before the next allocation we don't
-    // recalculate everything twice.
-    dirty = true;
-  }
+  total_.totals += scalarQuantities;
+
+  // We have to recalculate all shares when the total resources
+  // change, but we put it off until `sort` is called so that if
+  // something else changes before the next allocation we don't
+  // recalculate everything twice.
+  dirty = true;
 }
 
 
-void DRFSorter::remove(const SlaveID& slaveId, const Resources& resources)
+void DRFSorter::removeSlave(const SlaveID& slaveId)
 {
-  if (!resources.empty()) {
-    CHECK(total_.resources.contains(slaveId));
-    CHECK(total_.resources[slaveId].contains(resources))
-      << total_.resources[slaveId] << " does not contain " << resources;
-
-    total_.resources[slaveId] -= resources;
+  const auto agent = total_.agentResourceQuantities.find(slaveId);
+  CHECK(agent != total_.agentResourceQuantities.end())
+    << "Attempted to remove unknown agent " << slaveId;
 
-    // Remove shared resources from the total quantities when there
-    // are no instances of same resources left in the total.
-    const Resources absentShared = resources.shared()
-      .filter([this, slaveId](const Resource& resource) {
-        return !total_.resources[slaveId].contains(resource);
-      });
+  // CHECK(total_.totals.contains(agent->second));
+  total_.totals -= agent->second;
 
-    const Resources scalarQuantities =
-      (resources.nonShared() + absentShared).createStrippedScalarQuantity();
-
-    CHECK(total_.scalarQuantities.contains(scalarQuantities));
-    total_.scalarQuantities -= scalarQuantities;
-
-    total_.totals -= ResourceQuantities::fromScalarResources(scalarQuantities);
-
-    if (total_.resources[slaveId].empty()) {
-      total_.resources.erase(slaveId);
-    }
-
-    dirty = true;
-  }
+  total_.agentResourceQuantities.erase(agent);
+  dirty = true;
 }
 
 
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 75f90f3..bfbb9a3 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -96,11 +96,11 @@ public:
       const std::string& clientPath,
       const SlaveID& slaveId) const override;
 
-  const Resources& totalScalarQuantities() const override;
-
-  void add(const SlaveID& slaveId, const Resources& resources) override;
+  void addSlave(
+      const SlaveID& slaveId,
+      const ResourceQuantities& scalarQuantities) override;
 
-  void remove(const SlaveID& slaveId, const Resources& resources) override;
+  void removeSlave(const SlaveID& slaveId) override;
 
   std::vector<std::string> sort() override;
 
@@ -150,34 +150,15 @@ private:
   // Total resources.
   struct Total
   {
-    // We need to keep track of the resources (and not just scalar
-    // quantities) to account for multiple copies of the same shared
-    // resources. We need to ensure that we do not update the scalar
-    // quantities for shared resources when the change is only in the
-    // number of copies in the sorter.
-    hashmap<SlaveID, Resources> resources;
-
-    // NOTE: Scalars can be safely aggregated across slaves. We keep
-    // that to speed up the calculation of shares. See MESOS-2891 for
-    // the reasons why we want to do that.
-    //
-    // NOTE: We omit information about dynamic reservations and
-    // persistent volumes here to enable resources to be aggregated
-    // across slaves more effectively. See MESOS-4833 for more
-    // information.
-    //
-    // Sharedness info is also stripped out when resource identities
-    // are omitted because sharedness inherently refers to the
-    // identities of resources and not quantities.
-    Resources scalarQuantities;
+    ResourceQuantities totals;
 
-    // To improve the performance of calculating shares, we store
-    // a redundant but more efficient version of `scalarQuantities`.
-    // See MESOS-4694.
+    // We keep track of per-agent resource quantities to handle agent removal.
     //
-    // TODO(bmahler): Can we remove `scalarQuantities` in favor of
-    // using this type whenever scalar quantities are needed?
-    ResourceQuantities totals;
+    // Note that the only way to change the stored resource quantities
+    // is to remove the agent from the sorter and add it with new resources.
+    // Thus, when a resource shared count on an agent changes, multiple copies
+    // of the same shared resource are still accounted for exactly once.
+    hashmap<SlaveID, const ResourceQuantities> agentResourceQuantities;
   } total_;
 
   // Metrics are optionally exposed by the sorter.
diff --git a/src/master/allocator/sorter/random/sorter.cpp b/src/master/allocator/sorter/random/sorter.cpp
index 6fcfc41..95070cc 100644
--- a/src/master/allocator/sorter/random/sorter.cpp
+++ b/src/master/allocator/sorter/random/sorter.cpp
@@ -418,64 +418,6 @@ Resources RandomSorter::allocation(
 }
 
 
-const Resources& RandomSorter::totalScalarQuantities() const
-{
-  return total_.scalarQuantities;
-}
-
-
-void RandomSorter::add(const SlaveID& slaveId, const Resources& resources)
-{
-  if (!resources.empty()) {
-    // Add shared resources to the total quantities when the same
-    // resources don't already exist in the total.
-    const Resources newShared = resources.shared()
-      .filter([this, slaveId](const Resource& resource) {
-        return !total_.resources[slaveId].contains(resource);
-      });
-
-    total_.resources[slaveId] += resources;
-
-    const Resources scalarQuantities =
-      (resources.nonShared() + newShared).createStrippedScalarQuantity();
-
-    total_.scalarQuantities += scalarQuantities;
-    total_.totals += ResourceQuantities::fromScalarResources(scalarQuantities);
-  }
-}
-
-
-void RandomSorter::remove(const SlaveID& slaveId, const Resources& resources)
-{
-  if (!resources.empty()) {
-    CHECK(total_.resources.contains(slaveId));
-    CHECK(total_.resources[slaveId].contains(resources))
-      << total_.resources[slaveId] << " does not contain " << resources;
-
-    total_.resources[slaveId] -= resources;
-
-    // Remove shared resources from the total quantities when there
-    // are no instances of same resources left in the total.
-    const Resources absentShared = resources.shared()
-      .filter([this, slaveId](const Resource& resource) {
-        return !total_.resources[slaveId].contains(resource);
-      });
-
-    const Resources scalarQuantities =
-      (resources.nonShared() + absentShared).createStrippedScalarQuantity();
-
-    CHECK(total_.scalarQuantities.contains(scalarQuantities));
-    total_.scalarQuantities -= scalarQuantities;
-
-    total_.totals -= ResourceQuantities::fromScalarResources(scalarQuantities);
-
-    if (total_.resources[slaveId].empty()) {
-      total_.resources.erase(slaveId);
-    }
-  }
-}
-
-
 vector<string> RandomSorter::sort()
 {
   std::function<void (Node*)> shuffleTree = [this, &shuffleTree](Node* node) {
diff --git a/src/master/allocator/sorter/random/sorter.hpp b/src/master/allocator/sorter/random/sorter.hpp
index 2031cb2..2aa4ee8 100644
--- a/src/master/allocator/sorter/random/sorter.hpp
+++ b/src/master/allocator/sorter/random/sorter.hpp
@@ -95,11 +95,12 @@ public:
       const std::string& clientPath,
       const SlaveID& slaveId) const override;
 
-  const Resources& totalScalarQuantities() const override;
-
-  void add(const SlaveID& slaveId, const Resources& resources) override;
+  // NOTE: addSlave/removeSlave is a no-op for this sorter.
+  void addSlave(
+      const SlaveID& slaveId,
+      const ResourceQuantities& scalarQuantities) override {};
 
-  void remove(const SlaveID& slaveId, const Resources& resources) override;
+  void removeSlave(const SlaveID& slaveId) override {};
 
   // This will perform a weighted random shuffle on each call.
   //
@@ -144,39 +145,6 @@ private:
   // rooted at that path. This hashmap might include weights for paths
   // that are not currently in the sorter tree.
   hashmap<std::string, double> weights;
-
-  // Total resources.
-  struct Total
-  {
-    // We need to keep track of the resources (and not just scalar
-    // quantities) to account for multiple copies of the same shared
-    // resources. We need to ensure that we do not update the scalar
-    // quantities for shared resources when the change is only in the
-    // number of copies in the sorter.
-    hashmap<SlaveID, Resources> resources;
-
-    // NOTE: Scalars can be safely aggregated across slaves. We keep
-    // that to speed up the calculation of shares. See MESOS-2891 for
-    // the reasons why we want to do that.
-    //
-    // NOTE: We omit information about dynamic reservations and
-    // persistent volumes here to enable resources to be aggregated
-    // across slaves more effectively. See MESOS-4833 for more
-    // information.
-    //
-    // Sharedness info is also stripped out when resource identities
-    // are omitted because sharedness inherently refers to the
-    // identities of resources and not quantities.
-    Resources scalarQuantities;
-
-    // To improve the performance of calculating shares, we store
-    // a redundant but more efficient version of `scalarQuantities`.
-    // See MESOS-4694.
-    //
-    // TODO(bmahler): Can we remove `scalarQuantities` in favor of
-    // using this type whenever scalar quantities are needed?
-    ResourceQuantities totals;
-  } total_;
 };
 
 
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index 25ad48d..20f877a 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -124,17 +124,19 @@ public:
       const std::string& client,
       const SlaveID& slaveId) const = 0;
 
-  // Returns the total scalar resource quantities in this sorter. This
-  // omits metadata about dynamic reservations and persistent volumes; see
-  // `Resources::createStrippedScalarQuantity`.
-  virtual const Resources& totalScalarQuantities() const = 0;
-
-  // Add resources to the total pool of resources this
-  // Sorter should consider.
-  virtual void add(const SlaveID& slaveId, const Resources& resources) = 0;
+  // Add/remove total scalar resource quantities of an agent to/from the
+  // total pool of resources this Sorter should consider.
+  //
+  // NOTE: Updating resources of an agent in the Sorter is done by first calling
+  // `removeSlave()` and then `addSlave()` with new resource quantities.
+  //
+  // NOTE: Attempt to add the same agent twice or remove an agent not added
+  // to the Sorter may crash the program.
+  virtual void addSlave(
+      const SlaveID& slaveId,
+      const ResourceQuantities& scalarQuantities) = 0;
 
-  // Remove resources from the total pool.
-  virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0;
+  virtual void removeSlave(const SlaveID& slaveId) = 0;
 
   // Returns all of the clients in the order that they should
   // be allocated to, according to this Sorter's policy.
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index 1e2791f..40b4e38 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -109,8 +109,8 @@ TEST(DRFSorterTest, DRF)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   EXPECT_EQ(vector<string>({}), sorter.sort());
 
@@ -153,16 +153,16 @@ TEST(DRFSorterTest, DRF)
   sorter.activate("e");
   sorter.allocated("e", slaveId, eResources);
 
-  Resources removedResources = Resources::parse("cpus:50;mem:0").get();
-  sorter.remove(slaveId, removedResources);
-  // total resources is now cpus = 50, mem = 100
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:50;mem:100").get());
 
   // shares: b = .04, c = .02, d = .06, e = .05
   EXPECT_EQ(vector<string>({"c", "b", "e", "d"}), sorter.sort());
 
-  Resources addedResources = Resources::parse("cpus:0;mem:100").get();
-  sorter.add(slaveId, addedResources);
-  // total resources is now cpus = 50, mem = 200
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:50;mem:200").get());
 
   Resources fResources = Resources::parse("cpus:5;mem:1").get();
   sorter.add("f");
@@ -202,7 +202,8 @@ TEST(DRFSorterTest, WDRF)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.activate("a");
@@ -262,9 +263,8 @@ TEST(DRFSorterTest, UpdateWeight)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.activate("a");
@@ -294,9 +294,8 @@ TEST(DRFSorterTest, AllocationCountTieBreak)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.add("b");
@@ -377,8 +376,8 @@ TEST(DRFSorterTest, ShallowHierarchy)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a/a");
   sorter.activate("a/a");
@@ -420,16 +419,16 @@ TEST(DRFSorterTest, ShallowHierarchy)
   sorter.activate("e/e");
   sorter.allocated("e/e", slaveId, eResources);
 
-  Resources removedResources = Resources::parse("cpus:50;mem:0").get();
-  sorter.remove(slaveId, removedResources);
-  // total resources is now cpus = 50, mem = 100
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:50;mem:100").get());
 
   // shares: b/b = .04, c/c = .02, d/d = .06, e/e = .05
   EXPECT_EQ(vector<string>({"c/c", "b/b", "e/e", "d/d"}), sorter.sort());
 
-  Resources addedResources = Resources::parse("cpus:0;mem:100").get();
-  sorter.add(slaveId, addedResources);
-  // total resources is now cpus = 50, mem = 200
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:50;mem:200").get());
 
   Resources fResources = Resources::parse("cpus:5;mem:1").get();
   sorter.add("f/f");
@@ -472,8 +471,8 @@ TEST(DRFSorterTest, DeepHierarchy)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a/a/a/a/a");
   sorter.activate("a/a/a/a/a");
@@ -515,17 +514,17 @@ TEST(DRFSorterTest, DeepHierarchy)
   sorter.activate("e/e/e/e/e/e");
   sorter.allocated("e/e/e/e/e/e", slaveId, eResources);
 
-  Resources removedResources = Resources::parse("cpus:50;mem:0").get();
-  sorter.remove(slaveId, removedResources);
-  // total resources is now cpus = 50, mem = 100
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:50;mem:100").get());
 
   // shares: b/b/b/b = .04, c/c/c = .02, d/d = .06, e/e/e/e/e/e = .05
   EXPECT_EQ(vector<string>({"c/c/c", "b/b/b/b", "e/e/e/e/e/e", "d/d"}),
             sorter.sort());
 
-  Resources addedResources = Resources::parse("cpus:0;mem:100").get();
-  sorter.add(slaveId, addedResources);
-  // total resources is now cpus = 50, mem = 200
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:50;mem:200").get());
 
   Resources fResources = Resources::parse("cpus:5;mem:1").get();
   sorter.add("f/f");
@@ -568,8 +567,8 @@ TEST(DRFSorterTest, HierarchicalAllocation)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.add("b/c");
@@ -675,8 +674,8 @@ TEST(DRFSorterTest, HierarchicalIterationOrder)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a/b");
   sorter.add("c");
@@ -723,7 +722,8 @@ TEST(DRFSorterTest, AddChildToLeaf)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.activate("a");
@@ -794,7 +794,8 @@ TEST(DRFSorterTest, AddChildToInternal)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("x/a");
   sorter.activate("x/a");
@@ -837,7 +838,8 @@ TEST(DRFSorterTest, AddChildToInactiveLeaf)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.activate("a");
@@ -872,7 +874,8 @@ TEST(DRFSorterTest, RemoveLeafCollapseParent)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.activate("a");
@@ -907,7 +910,8 @@ TEST(DRFSorterTest, RemoveLeafCollapseParentInactive)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.add("a");
   sorter.activate("a");
@@ -943,7 +947,8 @@ TEST(DRFSorterTest, ChangeWeightOnSubtree)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get());
 
   sorter.updateWeight("b", 3);
   sorter.updateWeight("a", 2);
@@ -1009,9 +1014,9 @@ TEST(DRFSorterTest, SplitResourceShares)
   disk2.mutable_disk()->mutable_persistence()->set_id("ID2");
   disk2.mutable_disk()->mutable_volume()->set_container_path("data");
 
-  sorter.add(
+  sorter.addSlave(
       slaveId,
-      Resources::parse("cpus:100;mem:100;disk:95").get() + disk1 + disk2);
+      ResourceQuantities::fromString("cpus:100;mem:100;disk:100").get());
 
   // Now, allocate resources to "a" and "b". Note that "b" will have
   // more disk if the shares are accounted correctly!
@@ -1036,7 +1041,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocation)
   sorter.activate("a");
   sorter.activate("b");
 
-  sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:10;mem:10;disk:10").get());
 
   sorter.allocated(
       "a", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
@@ -1073,7 +1079,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocationNestedClient)
   sorter.activate("a/x");
   sorter.activate("b/y");
 
-  sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:10;mem:10;disk:10").get());
 
   sorter.allocated(
       "a/x", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
@@ -1107,7 +1114,8 @@ TYPED_TEST(CommonSorterTest, AllocationForInactiveClient)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  sorter.add(slaveId, Resources::parse("cpus:10;mem:10").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:10;mem:10").get());
 
   sorter.add("a");
   sorter.add("b");
@@ -1153,11 +1161,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlaves)
   sorter.add("framework");
   sorter.activate("framework");
 
-  Resources slaveResources =
+  const Resources slaveResources =
     Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
 
-  sorter.add(slaveA, slaveResources);
-  sorter.add(slaveB, slaveResources);
+  const ResourceQuantities slaveResourceQuantities =
+    ResourceQuantities::fromScalarResources(slaveResources.scalars());
+
+  sorter.addSlave(slaveA, slaveResourceQuantities);
+  sorter.addSlave(slaveB, slaveResourceQuantities);
 
   sorter.allocated("framework", slaveA, slaveResources);
   sorter.allocated("framework", slaveB, slaveResources);
@@ -1186,11 +1197,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlavesUpdateAllocation)
   sorter.add("framework");
   sorter.activate("framework");
 
-  Resources slaveResources =
+  const Resources slaveResources =
     Resources::parse("cpus:2;mem:512;disk:10;ports:[31000-32000]").get();
 
-  sorter.add(slaveA, slaveResources);
-  sorter.add(slaveB, slaveResources);
+  const ResourceQuantities slaveResourceQuantities =
+    ResourceQuantities::fromScalarResources(slaveResources.scalars());
+
+  sorter.addSlave(slaveA, slaveResourceQuantities);
+  sorter.addSlave(slaveB, slaveResourceQuantities);
 
   sorter.allocated("framework", slaveA, slaveResources);
   sorter.allocated("framework", slaveB, slaveResources);
@@ -1228,7 +1242,8 @@ TEST(DRFSorterTest, UpdateTotal)
   sorter.activate("a");
   sorter.activate("b");
 
-  sorter.add(slaveId, Resources::parse("cpus:10;mem:100").get());
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:10;mem:100").get());
 
   // Dominant share of "a" is 0.2 (cpus).
   sorter.allocated(
@@ -1242,8 +1257,9 @@ TEST(DRFSorterTest, UpdateTotal)
 
   // Update the total resources by removing the previous total and
   // adding back the new total.
-  sorter.remove(slaveId, Resources::parse("cpus:10;mem:100").get());
-  sorter.add(slaveId, Resources::parse("cpus:100;mem:10").get());
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromString("cpus:100;mem:10").get());
 
   // Now the dominant share of "a" is 0.1 (mem) and "b" is 0.2 (mem),
   // which should change the sort order.
@@ -1268,8 +1284,11 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal)
   sorter.activate("a");
   sorter.activate("b");
 
-  sorter.add(slaveA, Resources::parse("cpus:5;mem:50").get());
-  sorter.add(slaveB, Resources::parse("cpus:5;mem:50").get());
+  sorter.addSlave(
+      slaveA, ResourceQuantities::fromString("cpus:5;mem:50").get());
+
+  sorter.addSlave(
+      slaveB, ResourceQuantities::fromString("cpus:5;mem:50").get());
 
   // Dominant share of "a" is 0.2 (cpus).
   sorter.allocated(
@@ -1281,10 +1300,10 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal)
 
   EXPECT_EQ(vector<string>({"b", "a"}), sorter.sort());
 
-  // Update the total resources of slaveA by removing the previous
-  // total and adding the new total.
-  sorter.remove(slaveA, Resources::parse("cpus:5;mem:50").get());
-  sorter.add(slaveA, Resources::parse("cpus:95;mem:50").get());
+  // Update the total resources of slaveA.
+  sorter.removeSlave(slaveA);
+  sorter.addSlave(
+      slaveA, ResourceQuantities::fromString("cpus:95;mem:50").get());
 
   // Now the dominant share of "a" is 0.02 (cpus) and "b" is 0.03
   // (mem), which should change the sort order.
@@ -1308,7 +1327,7 @@ TEST(DRFSorterTest, RemoveResources)
   sorter.activate("b");
 
   Resources slaveTotal = Resources::parse("cpus", "10", "*").get();
-  sorter.add(slaveId, slaveTotal);
+  sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(slaveTotal));
 
   // Dominant share of "a" is 0.6 (cpus).
   Resources allocatedForA = Resources::parse("cpus", "6", "*").get();
@@ -1321,8 +1340,10 @@ TEST(DRFSorterTest, RemoveResources)
 
   // Remove cpus from the total resources as well as the allocation of "a".
   Resources removed = Resources::parse("cpus", "5", "*").get();
-  sorter.remove(slaveId, slaveTotal);
-  sorter.add(slaveId, slaveTotal - removed);
+  sorter.removeSlave(slaveId);
+  sorter.addSlave(
+      slaveId, ResourceQuantities::fromScalarResources(slaveTotal - removed));
+
   sorter.update("a", slaveId, allocatedForA, allocatedForA - removed);
 
   // Now the dominant share of "a" is 0.2 (cpus) and that of "b" is 0.8 (cpus),
@@ -1351,7 +1372,7 @@ TEST(DRFSorterTest, RevocableResources)
   revocable.mutable_revocable();
   Resources total = Resources::parse("cpus:10;mem:100").get() + revocable;
 
-  sorter.add(slaveId, total);
+  sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(total));
 
   // Dominant share of "a" is 0.1 (cpus).
   Resources a = Resources::parse("cpus:2;mem:1").get();
@@ -1381,15 +1402,14 @@ TEST(DRFSorterTest, SharedResources)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resource sharedDisk = createDiskResource(
+  const Resource sharedDisk = createDiskResource(
       "100", "role1", "id1", "path1", None(), true);
 
-  // Set totalResources to have disk of 1000, with disk 100 being shared.
-  Resources totalResources = Resources::parse(
-      "cpus:100;mem:100;disk(role1):900").get();
-  totalResources += sharedDisk;
-
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId,
+      ResourceQuantities::fromScalarResources(
+          Resources::parse("cpus:100;mem:100;disk(role1):900").get() +
+            sharedDisk));
 
   // Verify sort() works when shared resources are in the allocations.
   sorter.add("a");
@@ -1435,28 +1455,6 @@ TEST(DRFSorterTest, SharedResources)
   //         d = .1 (dominant: disk).
   EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort());
 
-  // Verify other basic allocator methods work when shared resources
-  // are in the allocations.
-  Resources removedResources = Resources::parse("cpus:50;mem:0").get();
-  sorter.remove(slaveId, removedResources);
-
-  // Total resources is now:
-  // cpus:50;mem:100;disk(role1):900;disk(role1)[id1]:100
-
-  // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk),
-  //         d = .1 (dominant: disk).
-  EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort());
-
-  Resources addedResources = Resources::parse("cpus:0;mem:100").get();
-  sorter.add(slaveId, addedResources);
-
-  // Total resources is now:
-  // cpus:50;mem:200;disk(role1):900;disk(role1)[id1]:100
-
-  // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk),
-  //         d = .1 (dominant: disk).
-  EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort());
-
   EXPECT_TRUE(sorter.contains("b"));
 
   EXPECT_FALSE(sorter.contains("a"));
@@ -1475,15 +1473,14 @@ TEST(DRFSorterTest, SameDominantSharedResourcesAcrossClients)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resource sharedDisk = createDiskResource(
+  const Resource sharedDisk = createDiskResource(
       "900", "role1", "id1", "path1", None(), true);
 
-  // Set totalResources to have disk of 1000, with disk 900 being shared.
-  Resources totalResources = Resources::parse(
-      "cpus:100;mem:100;disk(role1):100").get();
-  totalResources += sharedDisk;
-
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId,
+      ResourceQuantities::fromScalarResources(
+          Resources::parse("cpus:100;mem:100;disk(role1):100").get()
+            + sharedDisk));
 
   // Add 2 clients each with the same shared disk, but with varying
   // cpus and mem.
@@ -1524,15 +1521,14 @@ TEST(DRFSorterTest, SameSharedResourcesSameClient)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resource sharedDisk = createDiskResource(
+  const Resource sharedDisk = createDiskResource(
       "50", "role1", "id1", "path1", None(), true);
 
-  // Set totalResources to have disk of 1000, with disk of 50 being shared.
-  Resources totalResources = Resources::parse(
-      "cpus:100;mem:100;disk(role1):950").get();
-  totalResources += sharedDisk;
-
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId,
+      ResourceQuantities::fromScalarResources(
+          Resources::parse("cpus:100;mem:100;disk(role1):950").get()
+            + sharedDisk));
 
   // Verify sort() works when shared resources are in the allocations.
   sorter.add("a");
@@ -1569,15 +1565,14 @@ TEST(DRFSorterTest, SharedResourcesUnallocated)
   SlaveID slaveId;
   slaveId.set_value("agentId");
 
-  Resource sharedDisk = createDiskResource(
+  const Resource sharedDisk = createDiskResource(
       "100", "role1", "id1", "path1", None(), true);
 
-  // Set totalResources to have disk of 1000, with disk 100 being shared.
-  Resources totalResources = Resources::parse(
-      "cpus:100;mem:100;disk(role1):900").get();
-  totalResources += sharedDisk;
-
-  sorter.add(slaveId, totalResources);
+  sorter.addSlave(
+      slaveId,
+      ResourceQuantities::fromScalarResources(
+          Resources::parse("cpus:100;mem:100;disk(role1):900").get()
+            + sharedDisk));
 
   // Allocate 3 copies of shared resources to client 'a', but allocate no
   // shared resource to client 'b'.
@@ -1613,43 +1608,6 @@ TEST(DRFSorterTest, SharedResourcesUnallocated)
 }
 
 
-// This test verifies that shared resources are removed from the sorter
-// only when all instances of the the same shared resource are removed.
-TYPED_TEST(CommonSorterTest, RemoveSharedResources)
-{
-  TypeParam sorter;
-
-  SlaveID slaveId;
-  slaveId.set_value("agentId");
-
-  Resource sharedDisk = createDiskResource(
-      "100", "role1", "id1", "path1", None(), true);
-
-  sorter.add(
-      slaveId, Resources::parse("cpus:100;mem:100;disk(role1):900").get());
-
-  Resources quantity1 = sorter.totalScalarQuantities();
-
-  sorter.add(slaveId, sharedDisk);
-  Resources quantity2 = sorter.totalScalarQuantities();
-
-  EXPECT_EQ(Resources::parse("disk:100").get(), quantity2 - quantity1);
-
-  sorter.add(slaveId, sharedDisk);
-  Resources quantity3 = sorter.totalScalarQuantities();
-
-  EXPECT_NE(quantity1, quantity3);
-  EXPECT_EQ(quantity2, quantity3);
-
-  // The quantity of the shared disk is removed  when the last copy is removed.
-  sorter.remove(slaveId, sharedDisk);
-  EXPECT_EQ(sorter.totalScalarQuantities(), quantity3);
-
-  sorter.remove(slaveId, sharedDisk);
-  EXPECT_EQ(sorter.totalScalarQuantities(), quantity1);
-}
-
-
 // This benchmark simulates sorting a number of clients that have
 // different amount of allocations.
 //
@@ -1690,8 +1648,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort)
       cout << "Added " << clientCount << " clients in "
            << watch.elapsed() << endl;
 
-      Resources agentResources = Resources::parse(
-          "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get();
+      const ResourceQuantities agentScalarQuantities =
+        ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096").get();
 
       watch.start();
       {
@@ -1701,7 +1659,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort)
 
           agents.push_back(slaveId);
 
-          sorter.add(slaveId, agentResources);
+          sorter.addSlave(slaveId, agentScalarQuantities);
         }
       }
       watch.stop();
@@ -1769,7 +1727,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort)
       watch.start();
       {
         foreach (const SlaveID& slaveId, agents) {
-          sorter.remove(slaveId, agentResources);
+          sorter.removeSlave(slaveId);
         }
       }
       watch.stop();
@@ -1870,8 +1828,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort)
       cout << "Added " << clientCount << " clients in "
            << watch.elapsed() << endl;
 
-      Resources agentResources = Resources::parse(
-          "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get();
+      const ResourceQuantities agentScalarQuantities =
+        ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096").get();
 
       watch.start();
       {
@@ -1881,7 +1839,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort)
 
           agents.push_back(slaveId);
 
-          sorter.add(slaveId, agentResources);
+          sorter.addSlave(slaveId, agentScalarQuantities);
         }
       }
       watch.stop();
@@ -1949,7 +1907,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort)
       watch.start();
       {
         foreach (const SlaveID& slaveId, agents) {
-          sorter.remove(slaveId, agentResources);
+          sorter.removeSlave(slaveId);
         }
       }
       watch.stop();