You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ne...@apache.org on 2017/04/26 18:20:52 UTC

[06/11] mesos git commit: Removed `allocations` map from DRFSorter.

Removed `allocations` map from DRFSorter.

The sorter previously managed three collections: a set of active
clients, a map from client names to allocations, and a map from client
names to weights. Since the set previously contained only active
clients, each client's allocation needed to be stored separately, since
the sorter needed to remember the allocation made to inactive clients.

Now that the set of clients includes both active and inactive clients,
we can dispense with the additional `allocations` map and store a
client's allocation as a nested struct in the `Client` struct. This
avoids the need to keep the two collections synchronized; the logic for
manipulating allocations is also more properly written as a member
function of the Client::Allocation struct, rather than inline in a
member function of DRFSorter.

Note that this change significantly increases the size of the `Client`
struct. Since we copy clients by value in several places (e.g., when
erasing and re-inserting a `Client` into `std::set`), this change
regresses sorter performance. This will be fixed in the next review in
this chain: we'll move from `std::set<Client>` to `std::set<Client*>`,
which will avoid the need to copy the `Client` on updates.

Review: https://reviews.apache.org/r/58110


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d8faf59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d8faf59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d8faf59

Branch: refs/heads/master
Commit: 3d8faf5910843843e107f81639811e7720eadf0c
Parents: 9321646
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Mar 30 14:28:26 2017 -0700
Committer: Neil Conway <ne...@gmail.com>
Committed: Wed Apr 26 14:02:09 2017 -0400

----------------------------------------------------------------------
 src/master/allocator/sorter/drf/metrics.cpp |   6 +-
 src/master/allocator/sorter/drf/sorter.cpp  | 168 +++++++----------------
 src/master/allocator/sorter/drf/sorter.hpp  | 154 +++++++++++++++------
 3 files changed, 169 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3d8faf59/src/master/allocator/sorter/drf/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/metrics.cpp b/src/master/allocator/sorter/drf/metrics.cpp
index 15aab32..94acb86 100644
--- a/src/master/allocator/sorter/drf/metrics.cpp
+++ b/src/master/allocator/sorter/drf/metrics.cpp
@@ -16,6 +16,8 @@
 
 #include "master/allocator/sorter/drf/metrics.hpp"
 
+#include <set>
+
 #include <process/defer.hpp>
 
 #include <process/metrics/metrics.hpp>
@@ -25,6 +27,7 @@
 
 #include "master/allocator/sorter/drf/sorter.hpp"
 
+using std::set;
 using std::string;
 
 using process::UPID;
@@ -65,7 +68,8 @@ void Metrics::add(const string& client)
         // occurs after the client is removed but before the
         // metric is removed.
         if (sorter->contains(client)) {
-          return sorter->calculateShare(client);
+          set<Client, DRFComparator>::iterator it = sorter->find(client);
+          return sorter->calculateShare(*it);
         }
 
         return 0.0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d8faf59/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 9563f58..ee5b8c0 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -45,10 +45,10 @@ namespace allocator {
 bool DRFComparator::operator()(const Client& client1, const Client& client2)
 {
   if (client1.share == client2.share) {
-    if (client1.allocations == client2.allocations) {
+    if (client1.allocation.count == client2.allocation.count) {
       return client1.name < client2.name;
     }
-    return client1.allocations < client2.allocations;
+    return client1.allocation.count < client2.allocation.count;
   }
   return client1.share < client2.share;
 }
@@ -71,11 +71,9 @@ void DRFSorter::add(const string& name)
 {
   CHECK(!contains(name));
 
-  Client client(name, 0, 0);
+  Client client(name);
   clients.insert(client);
 
-  allocations[name] = Allocation();
-
   if (metrics.isSome()) {
     metrics->add(name);
   }
@@ -84,13 +82,10 @@ void DRFSorter::add(const string& name)
 
 void DRFSorter::remove(const string& name)
 {
-  CHECK(contains(name));
-
   set<Client, DRFComparator>::iterator it = find(name);
   CHECK(it != clients.end());
-  clients.erase(it);
 
-  allocations.erase(name);
+  clients.erase(it);
 
   if (metrics.isSome()) {
     metrics->remove(name);
@@ -100,8 +95,6 @@ void DRFSorter::remove(const string& name)
 
 void DRFSorter::activate(const string& name)
 {
-  CHECK(contains(name));
-
   set<Client, DRFComparator>::iterator it = find(name);
   CHECK(it != clients.end());
 
@@ -117,8 +110,6 @@ void DRFSorter::activate(const string& name)
 
 void DRFSorter::deactivate(const string& name)
 {
-  CHECK(contains(name));
-
   set<Client, DRFComparator>::iterator it = find(name);
   CHECK(it != clients.end());
 
@@ -147,47 +138,19 @@ void DRFSorter::allocated(
     const SlaveID& slaveId,
     const Resources& resources)
 {
-  CHECK(contains(name));
-
-  // Update the number of allocations that have been made to this
-  // client. Note that the client might currently be inactive.
-  //
-  // TODO(benh): Refactor 'updateShare' to be able to reuse it here.
-  {
-    set<Client, DRFComparator>::iterator it = find(name);
-    CHECK(it != clients.end());
-
-    Client client(*it);
-
-    // Update the 'allocations' to reflect the allocator decision.
-    client.allocations++;
-
-    // Remove and reinsert it to update the ordering appropriately.
-    clients.erase(it);
-    clients.insert(client);
-  }
-
-  // Add shared resources to the allocated quantities when the same
-  // resources don't already exist in the allocation.
-  const Resources newShared = resources.shared()
-    .filter([this, name, slaveId](const Resource& resource) {
-      return !allocations[name].resources[slaveId].contains(resource);
-    });
-
-  const Resources scalarQuantities =
-    (resources.nonShared() + newShared).createStrippedScalarQuantity();
+  set<Client, DRFComparator>::iterator it = find(name);
+  CHECK(it != clients.end());
 
-  allocations[name].resources[slaveId] += resources;
-  allocations[name].scalarQuantities += scalarQuantities;
+  Client client(*it);
+  client.allocation.add(slaveId, resources);
 
-  foreach (const Resource& resource, scalarQuantities) {
-    allocations[name].totals[resource.name()] += resource.scalar();
-  }
+  clients.erase(it);
+  clients.insert(client);
 
   // If the total resources have changed, we're going to recalculate
   // all the shares, so don't bother just updating this client.
   if (!dirty) {
-    updateShare(name);
+    updateShare(client.name);
   }
 }
 
@@ -198,34 +161,19 @@ void DRFSorter::update(
     const Resources& oldAllocation,
     const Resources& newAllocation)
 {
-  CHECK(contains(name));
-
   // TODO(bmahler): Check invariants between old and new allocations.
   // Namely, the roles and quantities of resources should be the same!
   // Otherwise, we need to ensure we re-calculate the shares, as
   // is being currently done, for safety.
 
-  const Resources oldAllocationQuantity =
-    oldAllocation.createStrippedScalarQuantity();
-  const Resources newAllocationQuantity =
-    newAllocation.createStrippedScalarQuantity();
-
-  CHECK(allocations[name].resources[slaveId].contains(oldAllocation));
-  CHECK(allocations[name].scalarQuantities.contains(oldAllocationQuantity));
-
-  allocations[name].resources[slaveId] -= oldAllocation;
-  allocations[name].resources[slaveId] += newAllocation;
-
-  allocations[name].scalarQuantities -= oldAllocationQuantity;
-  allocations[name].scalarQuantities += newAllocationQuantity;
+  set<Client, DRFComparator>::iterator it = find(name);
+  CHECK(it != clients.end());
 
-  foreach (const Resource& resource, oldAllocationQuantity) {
-    allocations[name].totals[resource.name()] -= resource.scalar();
-  }
+  Client client(*it);
+  client.allocation.update(slaveId, oldAllocation, newAllocation);
 
-  foreach (const Resource& resource, newAllocationQuantity) {
-    allocations[name].totals[resource.name()] += resource.scalar();
-  }
+  clients.erase(it);
+  clients.insert(client);
 
   // Just assume the total has changed, per the TODO above.
   dirty = true;
@@ -237,35 +185,19 @@ void DRFSorter::unallocated(
     const SlaveID& slaveId,
     const Resources& resources)
 {
-  CHECK(contains(name));
-  CHECK(allocations.at(name).resources.contains(slaveId));
-  CHECK(allocations.at(name).resources.at(slaveId).contains(resources));
-
-  allocations[name].resources[slaveId] -= resources;
-
-  // Remove shared resources from the allocated quantities when there
-  // are no instances of same resources left in the allocation.
-  const Resources absentShared = resources.shared()
-    .filter([this, name, slaveId](const Resource& resource) {
-      return !allocations[name].resources[slaveId].contains(resource);
-    });
-
-  const Resources scalarQuantities =
-    (resources.nonShared() + absentShared).createStrippedScalarQuantity();
-
-  foreach (const Resource& resource, scalarQuantities) {
-    allocations[name].totals[resource.name()] -= resource.scalar();
-  }
+  set<Client, DRFComparator>::iterator it = find(name);
+  CHECK(it != clients.end());
 
-  CHECK(allocations[name].scalarQuantities.contains(scalarQuantities));
-  allocations[name].scalarQuantities -= scalarQuantities;
+  Client client(*it);
+  client.allocation.subtract(slaveId, resources);
 
-  if (allocations[name].resources[slaveId].empty()) {
-    allocations[name].resources.erase(slaveId);
-  }
+  clients.erase(it);
+  clients.insert(client);
 
+  // If the total resources have changed, we're going to recalculate
+  // all the shares, so don't bother just updating this client.
   if (!dirty) {
-    updateShare(name);
+    updateShare(client.name);
   }
 }
 
@@ -273,18 +205,20 @@ void DRFSorter::unallocated(
 const hashmap<SlaveID, Resources>& DRFSorter::allocation(
     const string& name) const
 {
-  CHECK(contains(name));
+  set<Client, DRFComparator>::iterator it = find(name);
+  CHECK(it != clients.end());
 
-  return allocations.at(name).resources;
+  return it->allocation.resources;
 }
 
 
 const Resources& DRFSorter::allocationScalarQuantities(
     const string& name) const
 {
-  CHECK(contains(name));
+  set<Client, DRFComparator>::iterator it = find(name);
+  CHECK(it != clients.end());
 
-  return allocations.at(name).scalarQuantities;
+  return it->allocation.scalarQuantities;
 }
 
 
@@ -296,11 +230,11 @@ hashmap<string, Resources> DRFSorter::allocation(const SlaveID& slaveId) const
 
   hashmap<string, Resources> result;
 
-  foreachpair (const string& name, const Allocation& allocation, allocations) {
-    if (allocation.resources.contains(slaveId)) {
-      // It is safe to use `at()` here because we've just checked the existence
-      // of the key. This avoid un-necessary copies.
-      result.emplace(name, allocation.resources.at(slaveId));
+  foreach (const Client& client, clients) {
+    if (client.allocation.resources.contains(slaveId)) {
+      // It is safe to use `at()` here because we've just checked the
+      // existence of the key. This avoid un-necessary copies.
+      result.emplace(client.name, client.allocation.resources.at(slaveId));
     }
   }
 
@@ -312,10 +246,11 @@ Resources DRFSorter::allocation(
     const string& name,
     const SlaveID& slaveId) const
 {
-  CHECK(contains(name));
+  set<Client, DRFComparator>::iterator it = find(name);
+  CHECK(it != clients.end());
 
-  if (allocations.at(name).resources.contains(slaveId)) {
-    return allocations.at(name).resources.at(slaveId);
+  if (it->allocation.resources.contains(slaveId)) {
+    return it->allocation.resources.at(slaveId);
   }
 
   return Resources();
@@ -400,7 +335,7 @@ vector<string> DRFSorter::sort()
 
     foreach (Client client, clients) {
       // Update the 'share' to get proper sorting.
-      client.share = calculateShare(client.name);
+      client.share = calculateShare(client);
 
       temp.insert(client);
     }
@@ -426,13 +361,14 @@ vector<string> DRFSorter::sort()
 
 bool DRFSorter::contains(const string& name) const
 {
-  return allocations.contains(name);
+  set<Client, DRFComparator>::iterator it = find(name);
+  return it != clients.end();
 }
 
 
 int DRFSorter::count() const
 {
-  return allocations.size();
+  return clients.size();
 }
 
 
@@ -444,7 +380,7 @@ void DRFSorter::updateShare(const string& name)
   Client client(*it);
 
   // Update the 'share' to get proper sorting.
-  client.share = calculateShare(client.name);
+  client.share = calculateShare(client);
 
   // Remove and reinsert it to update the ordering appropriately.
   clients.erase(it);
@@ -452,10 +388,8 @@ void DRFSorter::updateShare(const string& name)
 }
 
 
-double DRFSorter::calculateShare(const string& name) const
+double DRFSorter::calculateShare(const Client& client) const
 {
-  CHECK(contains(name));
-
   double share = 0.0;
 
   // TODO(benh): This implementation of "dominant resource fairness"
@@ -472,15 +406,15 @@ double DRFSorter::calculateShare(const string& name) const
     }
 
     if (scalar.value() > 0.0 &&
-        allocations.at(name).totals.contains(resourceName)) {
+        client.allocation.totals.contains(resourceName)) {
       const double allocation =
-        allocations.at(name).totals.at(resourceName).value();
+        client.allocation.totals.at(resourceName).value();
 
       share = std::max(share, allocation / scalar.value());
     }
   }
 
-  return share / clientWeight(name);
+  return share / clientWeight(client.name);
 }
 
 
@@ -496,7 +430,7 @@ double DRFSorter::clientWeight(const string& name) const
 }
 
 
-set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
+set<Client, DRFComparator>::iterator DRFSorter::find(const string& name) const
 {
   set<Client, DRFComparator>::iterator it;
   for (it = clients.begin(); it != clients.end(); it++) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d8faf59/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 665eeb0..2ef2eb8 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -40,21 +40,122 @@ namespace allocator {
 
 struct Client
 {
-  Client(const std::string& _name, double _share, uint64_t _allocations)
-    : name(_name), share(_share), active(true), allocations(_allocations) {}
+  explicit Client(const std::string& _name)
+    : name(_name), share(0), active(true) {}
 
   std::string name;
   double share;
   bool active;
 
-  // We store the number of times this client has been chosen for
-  // allocation so that we can fairly share the resources across
-  // clients that have the same share. Note that this information is
-  // not persisted across master failovers, but since the point is to
-  // equalize the 'allocations' across clients of the same 'share'
-  // having allocations restart at 0 after a master failover should be
-  // sufficient (famous last words.)
-  uint64_t allocations;
+  // Allocation for a client.
+  struct Allocation {
+    Allocation() : count(0) {}
+
+    void add(const SlaveID& slaveId, const Resources& toAdd) {
+      // Add shared resources to the allocated quantities when the same
+      // resources don't already exist in the allocation.
+      const Resources sharedToAdd = toAdd.shared()
+        .filter([this, slaveId](const Resource& resource) {
+            return !resources[slaveId].contains(resource);
+        });
+
+      const Resources quantitiesToAdd =
+        (toAdd.nonShared() + sharedToAdd).createStrippedScalarQuantity();
+
+      resources[slaveId] += toAdd;
+      scalarQuantities += quantitiesToAdd;
+
+      foreach (const Resource& resource, quantitiesToAdd) {
+        totals[resource.name()] += resource.scalar();
+      }
+
+      count++;
+    }
+
+    void subtract(const SlaveID& slaveId, const Resources& toRemove) {
+      CHECK(resources.contains(slaveId));
+      CHECK(resources.at(slaveId).contains(toRemove));
+
+      resources[slaveId] -= toRemove;
+
+      // Remove shared resources from the allocated quantities when there
+      // are no instances of same resources left in the allocation.
+      const Resources sharedToRemove = toRemove.shared()
+        .filter([this, slaveId](const Resource& resource) {
+            return !resources[slaveId].contains(resource);
+        });
+
+      const Resources quantitiesToRemove =
+        (toRemove.nonShared() + sharedToRemove).createStrippedScalarQuantity();
+
+      foreach (const Resource& resource, quantitiesToRemove) {
+        totals[resource.name()] -= resource.scalar();
+      }
+
+      CHECK(scalarQuantities.contains(quantitiesToRemove));
+      scalarQuantities -= quantitiesToRemove;
+
+      if (resources[slaveId].empty()) {
+        resources.erase(slaveId);
+      }
+    }
+
+    void update(
+        const SlaveID& slaveId,
+        const Resources& oldAllocation,
+        const Resources& newAllocation) {
+      const Resources oldAllocationQuantity =
+        oldAllocation.createStrippedScalarQuantity();
+      const Resources newAllocationQuantity =
+        newAllocation.createStrippedScalarQuantity();
+
+      CHECK(resources[slaveId].contains(oldAllocation));
+      CHECK(scalarQuantities.contains(oldAllocationQuantity));
+
+      resources[slaveId] -= oldAllocation;
+      resources[slaveId] += newAllocation;
+
+      scalarQuantities -= oldAllocationQuantity;
+      scalarQuantities += newAllocationQuantity;
+
+      foreach (const Resource& resource, oldAllocationQuantity) {
+        totals[resource.name()] -= resource.scalar();
+      }
+
+      foreach (const Resource& resource, newAllocationQuantity) {
+        totals[resource.name()] += resource.scalar();
+      }
+    }
+
+    // We store the number of times this client has been chosen for
+    // allocation so that we can fairly share the resources across
+    // clients that have the same share. Note that this information is
+    // not persisted across master failovers, but since the point is
+    // to equalize the `count` across clients of the same `share`
+    // having allocations restart at 0 after a master failover should
+    // be sufficient (famous last words.)
+    uint64_t count;
+
+    // We maintain multiple copies of each shared resource allocated
+    // to a client, where the number of copies represents the number
+    // of times this shared resource has been allocated to (and has
+    // not been recovered from) a specific client.
+    hashmap<SlaveID, Resources> resources;
+
+    // Similarly, we aggregate scalars across slaves and omit information
+    // about dynamic reservations, persistent volumes and sharedness of
+    // the corresponding resource. See notes above.
+    Resources scalarQuantities;
+
+    // We also store a map version of `scalarQuantities`, mapping
+    // the `Resource::name` to aggregated scalar. This improves the
+    // performance of calculating shares. See MESOS-4694.
+    //
+    // TODO(bmahler): Ideally we do not store `scalarQuantities`
+    // redundantly here, investigate performance improvements to
+    // `Resources` to make this unnecessary.
+    hashmap<std::string, Value::Scalar> totals;
+  } allocation;
 };
 
 
@@ -136,7 +237,7 @@ private:
   void updateShare(const std::string& name);
 
   // Returns the dominant resource share for the client.
-  double calculateShare(const std::string& name) const;
+  double calculateShare(const Client& client) const;
 
   // Resources (by name) that will be excluded from fair sharing.
   Option<std::set<std::string>> fairnessExcludeResourceNames;
@@ -147,7 +248,7 @@ private:
 
   // Returns an iterator to the specified client, if
   // it exists in this Sorter.
-  std::set<Client, DRFComparator>::iterator find(const std::string& name);
+  std::set<Client, DRFComparator>::iterator find(const std::string& name) const;
 
   // If true, sort() will recalculate all shares.
   bool dirty = false;
@@ -189,35 +290,6 @@ private:
     hashmap<std::string, Value::Scalar> totals;
   } total_;
 
-  // Allocation for a client.
-  struct Allocation {
-    // We maintain multiple copies of each shared resource allocated
-    // to a client, where the number of copies represents the number
-    // of times this shared resource has been allocated to (and has
-    // not been recovered from) a specific client.
-    hashmap<SlaveID, Resources> resources;
-
-    // Similarly, we aggregate scalars across slaves and omit information
-    // about dynamic reservations, persistent volumes and sharedness of
-    // the corresponding resource. See notes above.
-    Resources scalarQuantities;
-
-    // We also store a map version of `scalarQuantities`, mapping
-    // the `Resource::name` to aggregated scalar. This improves the
-    // performance of calculating shares. See MESOS-4694.
-    //
-    // TODO(bmahler): Ideally we do not store `scalarQuantities`
-    // redundantly here, investigate performance improvements to
-    // `Resources` to make this unnecessary.
-    hashmap<std::string, Value::Scalar> totals;
-  };
-
-  // Maps client names to the resources they have been allocated.
-  //
-  // TODO(neilc): It would be cleaner to store a client's allocation
-  // in the `Client` struct instead.
-  hashmap<std::string, Allocation> allocations;
-
   // Metrics are optionally exposed by the sorter.
   friend Metrics;
   Option<Metrics> metrics;