You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ne...@apache.org on 2017/04/26 18:20:54 UTC

[08/11] mesos git commit: Added support for hierarchical roles to DRFSorter.

Added support for hierarchical roles to DRFSorter.

This commit replaces the sorter's flat list of clients with a tree; the
tree represents the hierarchical relationship between sorter clients.
Each node in the tree contains a vector of pointers to child nodes. The
tree might contain nodes that do not correspond directly to sorter
clients. For example, adding clients "a/b" and "c/d" results in the
following tree:

root
 -> a
   -> b
 -> c
   -> d

The `sort` member function still only returns one result for each active
client in the sorter. This is implemented by ensuring that each sorter
client is associated with a leaf node in the tree (i.e., internal nodes
are not returned by `sort`). Note that it is possible for a leaf node to
be transformed into an internal node by a subsequent insertion; to
handle this case, we "implicitly" create an extra child node, which
maintains the invariant that each client is associated with a leaf
node. For example, if the client "a/b/x" is added to the tree above, the
result is:

root
 -> a
   -> b
     -> .
     -> x
 -> c
   -> d

The "." leaf node holds the allocation that has been made to the "a/b"
client itself; the "a/b" node holds the sum of all the allocations that
have been made to the subtree rooted at "a/b", which also includes
"a/b/x". The "." node is called a "virtual leaf node".

This commit also introduces a new approach to sorting: rather than
keeping a `std::set` of sorter clients, we now keep a tree of
`std::vector`, which is sorted explicitly via `std::sort` when
necessary. The previous implementation tried to optimize the sorting
process by updating the sort order incrementally when a single sorter
client was updated; this commit removes that optimization, and instead
re-sorts the entire tree whenever a change is made that might alter the
sort order. Re-introducing a version of this optimization would make
sense in the future (MESOS-7390), but benchmarking suggests that this
commit results in a net improvement in sorter performance for
non-hierarchical clients, anyway. The performance improvement is likely
due to the introduction of a secondary hashmap that allows the leaf node
associated with a client name to be found efficiently; the previous
implementation required a linear scan of a `std::set`.

Review: https://reviews.apache.org/r/57254


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5ef1992
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5ef1992
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5ef1992

Branch: refs/heads/master
Commit: e5ef1992b2b8e84b5d1487f1578f18f2291cd082
Parents: 5bf32be
Author: Neil Conway <ne...@gmail.com>
Authored: Mon Mar 13 10:18:36 2017 -0700
Committer: Neil Conway <ne...@gmail.com>
Committed: Wed Apr 26 14:02:22 2017 -0400

----------------------------------------------------------------------
 src/master/allocator/sorter/drf/metrics.cpp |  12 +-
 src/master/allocator/sorter/drf/sorter.cpp  | 430 +++++++++++------
 src/master/allocator/sorter/drf/sorter.hpp  | 393 ++++++++++------
 src/master/allocator/sorter/sorter.hpp      |  15 +-
 src/tests/hierarchical_allocator_tests.cpp  | 251 +++++++++-
 src/tests/master_allocator_tests.cpp        | 152 ++++++
 src/tests/sorter_tests.cpp                  | 573 ++++++++++++++++++++++-
 7 files changed, 1497 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/master/allocator/sorter/drf/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/metrics.cpp b/src/master/allocator/sorter/drf/metrics.cpp
index 94acb86..ff63fba 100644
--- a/src/master/allocator/sorter/drf/metrics.cpp
+++ b/src/master/allocator/sorter/drf/metrics.cpp
@@ -16,8 +16,6 @@
 
 #include "master/allocator/sorter/drf/metrics.hpp"
 
-#include <set>
-
 #include <process/defer.hpp>
 
 #include <process/metrics/metrics.hpp>
@@ -27,7 +25,6 @@
 
 #include "master/allocator/sorter/drf/sorter.hpp"
 
-using std::set;
 using std::string;
 
 using process::UPID;
@@ -67,12 +64,13 @@ void Metrics::add(const string& client)
         // The client may have been removed if the dispatch
         // occurs after the client is removed but before the
         // metric is removed.
-        if (sorter->contains(client)) {
-          set<Client, DRFComparator>::iterator it = sorter->find(client);
-          return sorter->calculateShare(*it);
+        DRFSorter::Node* sorterClient = sorter->find(client);
+
+        if (sorterClient == nullptr) {
+          return 0.0;
         }
 
-        return 0.0;
+        return sorter->calculateShare(sorterClient);
       }));
 
   dominantShares.put(client, gauge);

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 7e91c85..73b8e8c 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -30,6 +30,7 @@
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/option.hpp>
+#include <stout/strings.hpp>
 
 using std::set;
 using std::string;
@@ -42,24 +43,22 @@ namespace internal {
 namespace master {
 namespace allocator {
 
-bool DRFComparator::operator()(const Client& client1, const Client& client2)
-{
-  if (client1.share != client2.share) {
-    return client1.share < client2.share;
-  }
 
-  if (client1.allocation.count != client2.allocation.count) {
-    return client1.allocation.count < client2.allocation.count;
-  }
-
-  return client1.name < client2.name;
-}
+DRFSorter::DRFSorter()
+  : root(new Node("", nullptr)) {}
 
 
 DRFSorter::DRFSorter(
     const UPID& allocator,
     const string& metricsPrefix)
-  : metrics(Metrics(allocator, *this, metricsPrefix)) {}
+  : root(new Node("", nullptr)),
+    metrics(Metrics(allocator, *this, metricsPrefix)) {}
+
+
+DRFSorter::~DRFSorter()
+{
+  delete root;
+}
 
 
 void DRFSorter::initialize(
@@ -69,96 +68,231 @@ void DRFSorter::initialize(
 }
 
 
-void DRFSorter::add(const string& name)
+void DRFSorter::add(const string& clientPath)
 {
-  CHECK(!contains(name));
+  vector<string> pathElements = strings::tokenize(clientPath, "/");
+  CHECK(!pathElements.empty());
+
+  Node* current = root;
+  Node* lastCreatedNode = nullptr;
+
+  // Traverse the tree to add new nodes for each element of the path,
+  // if that node doesn't already exist (similar to `mkdir -p`).
+  foreach (const string& element, pathElements) {
+    Node* node = nullptr;
+
+    foreach (Node* child, current->children) {
+      if (child->name == element) {
+        node = child;
+        break;
+      }
+    }
 
-  Client client(name);
-  clients.insert(client);
+    if (node != nullptr) {
+      current = node;
+      continue;
+    }
 
-  if (metrics.isSome()) {
-    metrics->add(name);
+    // We didn't find `element`, so add a new child to `current`.
+    //
+    // If adding this child would result in turning `current` from a
+    // leaf node into an internal node, we need to create an
+    // additional child node: `current` must have been associated with
+    // a client and clients must always be associated with leaf nodes.
+    //
+    // There are two exceptions: if `current` is the root node or it
+    // was just created by the current `add()` call, it does not
+    // correspond to a client, so we don't create an extra child.
+    if (current->children.empty() &&
+        current != root &&
+        current != lastCreatedNode) {
+      Node* parent = CHECK_NOTNULL(current->parent);
+
+      parent->removeChild(current);
+
+      // Create a node under `parent`. This internal node will take
+      // the place of `current` in the tree.
+      Node* internal = new Node(current->name, parent);
+      parent->addChild(internal);
+      internal->allocation = current->allocation;
+
+      CHECK_EQ(current->path, internal->path);
+
+      // Update `current` to become a virtual leaf node and a child of
+      // `internal`.
+      current->name = ".";
+      current->parent = internal;
+      internal->addChild(current);
+      current->path = strings::join("/", parent->path, current->name);
+
+      CHECK_EQ(internal->path, current->clientPath());
+
+      current = internal;
+    }
+
+    // Now actually add a new child to `current`.
+    Node* newChild = new Node(element, current);
+    current->addChild(newChild);
+
+    current = newChild;
+    lastCreatedNode = newChild;
   }
-}
 
+  // `current` is the node associated with the last element of the
+  // path. If we didn't add `current` to the tree above, create a leaf
+  // node now. For example, if the tree contains "a/b" and we add a
+  // new client "a", we want to create a new leaf node "a/." here.
+  if (current != lastCreatedNode) {
+    Node* newChild = new Node(".", current);
+    current->addChild(newChild);
+    current = newChild;
+  }
 
-void DRFSorter::remove(const string& name)
-{
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
+  // `current` is the newly created node associated with the last
+  // element of the path. `current` should be an inactive node with no
+  // children; activate it now.
+  CHECK(current->children.empty());
+  CHECK(!current->active);
+  current->active = true;
+
+  // Add a new entry to the lookup table. The full path of the newly
+  // added client should not already exist in `clients`.
+  CHECK_EQ(clientPath, current->clientPath());
+  CHECK(!clients.contains(clientPath));
+
+  clients[clientPath] = current;
 
-  clients.erase(it);
+  // TODO(neilc): Avoid dirtying the tree in some circumstances.
+  dirty = true;
 
   if (metrics.isSome()) {
-    metrics->remove(name);
+    metrics->add(clientPath);
   }
 }
 
 
-void DRFSorter::activate(const string& name)
+void DRFSorter::remove(const string& clientPath)
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
+  Node* current = CHECK_NOTNULL(find(clientPath));
+
+  // Save a copy of the leaf node's allocated resources, because we
+  // destroy the leaf node below.
+  const hashmap<SlaveID, Resources> leafAllocation =
+    current->allocation.resources;
+
+  // Remove the lookup table entry for the client.
+  CHECK(clients.contains(clientPath));
+  clients.erase(clientPath);
+
+  // To remove a client from the tree, we have to do two things:
+  //
+  //   (1) Update the tree structure to reflect the removal of the
+  //       client. This means removing the client's leaf node, then
+  //       walking back up the tree to remove any internal nodes that
+  //       are now unnecessary.
+  //
+  //   (2) Update allocations of ancestor nodes to reflect the removal
+  //       of the client.
+  //
+  // We do both things at once: find the leaf node, remove it, and
+  // walk up the tree, updating ancestor allocations and removing
+  // ancestors when possible.
+  while (current != root) {
+    Node* parent = CHECK_NOTNULL(current->parent);
+
+    // Update `parent` to reflect the fact that the resources in the
+    // leaf node are no longer allocated to the subtree rooted at
+    // `parent`. We skip `root`, because we never update the
+    // allocation made to the root node.
+    if (parent != root) {
+      foreachpair (const SlaveID& slaveId,
+                   const Resources& resources,
+                   leafAllocation) {
+        parent->allocation.subtract(slaveId, resources);
+      }
+    }
+
+    if (current->children.empty()) {
+      parent->removeChild(current);
+      delete current;
+    } else if (current->children.size() == 1) {
+      // If `current` has only one child that was created to
+      // accommodate inserting `clientPath` (see `DRFSorter::add()`),
+      // we can remove the child node and turn `current` back into a
+      // leaf node.
+      Node* child = *(current->children.begin());
+
+      if (child->name == ".") {
+        CHECK(child->children.empty());
+        CHECK(clients.contains(current->path));
+        CHECK_EQ(child, clients.at(current->path));
 
-  if (!it->active) {
-    Client client(*it);
-    client.active = true;
+        current->active = child->active;
+        current->removeChild(child);
 
-    clients.erase(it);
-    clients.insert(client);
+        clients[current->path] = current;
+
+        delete child;
+      }
+    }
+
+    current = parent;
+  }
+
+  // TODO(neilc): Avoid dirtying the tree in some circumstances.
+  dirty = true;
+
+  if (metrics.isSome()) {
+    metrics->remove(clientPath);
   }
 }
 
 
-void DRFSorter::deactivate(const string& name)
+void DRFSorter::activate(const string& clientPath)
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
+  Node* client = CHECK_NOTNULL(find(clientPath));
+  client->active = true;
+}
 
-  if (it->active) {
-    Client client(*it);
-    client.active = false;
 
-    clients.erase(it);
-    clients.insert(client);
-  }
+void DRFSorter::deactivate(const string& clientPath)
+{
+  Node* client = CHECK_NOTNULL(find(clientPath));
+  client->active = false;
 }
 
 
-void DRFSorter::updateWeight(const string& name, double weight)
+void DRFSorter::updateWeight(const string& path, double weight)
 {
-  weights[name] = weight;
+  weights[path] = weight;
 
-  // It would be possible to avoid dirtying the tree here (in some
-  // cases), but it doesn't seem worth the complexity.
+  // TODO(neilc): Avoid dirtying the tree in some circumstances.
   dirty = true;
 }
 
 
 void DRFSorter::allocated(
-    const string& name,
+    const string& clientPath,
     const SlaveID& slaveId,
     const Resources& resources)
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
-
-  Client client(*it);
-  client.allocation.add(slaveId, resources);
-
-  clients.erase(it);
-  clients.insert(client);
-
-  // If the total resources have changed, we're going to recalculate
-  // all the shares, so don't bother just updating this client.
-  if (!dirty) {
-    updateShare(client.name);
+  Node* current = CHECK_NOTNULL(find(clientPath));
+
+  // NOTE: We don't currently update the `allocation` for the root
+  // node. This is debatable, but the current implementation doesn't
+  // require looking at the allocation of the root node.
+  while (current != root) {
+    current->allocation.add(slaveId, resources);
+    current = CHECK_NOTNULL(current->parent);
   }
+
+  // TODO(neilc): Avoid dirtying the tree in some circumstances.
+  dirty = true;
 }
 
 
 void DRFSorter::update(
-    const string& name,
+    const string& clientPath,
     const SlaveID& slaveId,
     const Resources& oldAllocation,
     const Resources& newAllocation)
@@ -168,14 +302,15 @@ void DRFSorter::update(
   // Otherwise, we need to ensure we re-calculate the shares, as
   // is being currently done, for safety.
 
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
+  Node* current = CHECK_NOTNULL(find(clientPath));
 
-  Client client(*it);
-  client.allocation.update(slaveId, oldAllocation, newAllocation);
-
-  clients.erase(it);
-  clients.insert(client);
+  // NOTE: We don't currently update the `allocation` for the root
+  // node. This is debatable, but the current implementation doesn't
+  // require looking at the allocation of the root node.
+  while (current != root) {
+    current->allocation.update(slaveId, oldAllocation, newAllocation);
+    current = CHECK_NOTNULL(current->parent);
+  }
 
   // Just assume the total has changed, per the TODO above.
   dirty = true;
@@ -183,60 +318,60 @@ void DRFSorter::update(
 
 
 void DRFSorter::unallocated(
-    const string& name,
+    const string& clientPath,
     const SlaveID& slaveId,
     const Resources& resources)
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
-
-  Client client(*it);
-  client.allocation.subtract(slaveId, resources);
-
-  clients.erase(it);
-  clients.insert(client);
-
-  // If the total resources have changed, we're going to recalculate
-  // all the shares, so don't bother just updating this client.
-  if (!dirty) {
-    updateShare(client.name);
+  Node* current = CHECK_NOTNULL(find(clientPath));
+
+  // NOTE: We don't currently update the `allocation` for the root
+  // node. This is debatable, but the current implementation doesn't
+  // require looking at the allocation of the root node.
+  while (current != root) {
+    current->allocation.subtract(slaveId, resources);
+    current = CHECK_NOTNULL(current->parent);
   }
+
+  // TODO(neilc): Avoid dirtying the tree in some circumstances.
+  dirty = true;
 }
 
 
 const hashmap<SlaveID, Resources>& DRFSorter::allocation(
-    const string& name) const
+    const string& clientPath) const
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
-
-  return it->allocation.resources;
+  const Node* client = CHECK_NOTNULL(find(clientPath));
+  return client->allocation.resources;
 }
 
 
 const Resources& DRFSorter::allocationScalarQuantities(
-    const string& name) const
+    const string& clientPath) const
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
-
-  return it->allocation.scalarQuantities;
+  const Node* client = CHECK_NOTNULL(find(clientPath));
+  return client->allocation.scalarQuantities;
 }
 
 
 hashmap<string, Resources> DRFSorter::allocation(const SlaveID& slaveId) const
 {
-  // TODO(jmlvanre): We can index the allocation by slaveId to make this faster.
-  // It is a tradeoff between speed vs. memory. For now we use existing data
-  // structures.
-
   hashmap<string, Resources> result;
 
-  foreach (const Client& client, clients) {
-    if (client.allocation.resources.contains(slaveId)) {
+  // We want to find the allocation that has been made to each client
+  // on a particular `slaveId`. Rather than traversing the tree
+  // looking for leaf nodes (clients), we can instead just iterate
+  // over the `clients` hashmap.
+  //
+  // TODO(jmlvanre): We can index the allocation by slaveId to make
+  // this faster.  It is a tradeoff between speed vs. memory. For now
+  // we use existing data structures.
+  foreachvalue (const Node* client, clients) {
+    if (client->allocation.resources.contains(slaveId)) {
       // It is safe to use `at()` here because we've just checked the
-      // existence of the key. This avoid un-necessary copies.
-      result.emplace(client.name, client.allocation.resources.at(slaveId));
+      // existence of the key. This avoids unnecessary copies.
+      string path = client->clientPath();
+      CHECK(!result.contains(path));
+      result.emplace(path, client->allocation.resources.at(slaveId));
     }
   }
 
@@ -245,14 +380,13 @@ hashmap<string, Resources> DRFSorter::allocation(const SlaveID& slaveId) const
 
 
 Resources DRFSorter::allocation(
-    const string& name,
+    const string& clientPath,
     const SlaveID& slaveId) const
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
+  const Node* client = CHECK_NOTNULL(find(clientPath));
 
-  if (it->allocation.resources.contains(slaveId)) {
-    return it->allocation.resources.at(slaveId);
+  if (client->allocation.resources.contains(slaveId)) {
+    return client->allocation.resources.at(slaveId);
   }
 
   return Resources();
@@ -287,7 +421,7 @@ void DRFSorter::add(const SlaveID& slaveId, const Resources& resources)
     }
 
     // We have to recalculate all shares when the total resources
-    // change, but we put it off until sort is called so that if
+    // change, but we put it off until `sort` is called so that if
     // something else changes before the next allocation we don't
     // recalculate everything twice.
     dirty = true;
@@ -333,38 +467,49 @@ void DRFSorter::remove(const SlaveID& slaveId, const Resources& resources)
 vector<string> DRFSorter::sort()
 {
   if (dirty) {
-    set<Client, DRFComparator> temp;
+    std::function<void (Node*)> sortTree = [this, &sortTree](Node* node) {
+      foreach (Node* child, node->children) {
+        child->share = calculateShare(child);
+      }
 
-    foreach (Client client, clients) {
-      // Update the 'share' to get proper sorting.
-      client.share = calculateShare(client);
+      std::sort(node->children.begin(),
+                node->children.end(),
+                DRFSorter::Node::compareDRF);
 
-      temp.insert(client);
-    }
+      foreach (Node* child, node->children) {
+        sortTree(child);
+      }
+    };
 
-    clients = temp;
+    sortTree(root);
 
-    // Reset dirty to false so as not to re-calculate *all*
-    // shares unless another dirtying operation occurs.
     dirty = false;
   }
 
+  // Return the leaf nodes in the tree. The children of each node are
+  // already sorted in DRF order.
   vector<string> result;
 
-  foreach (const Client& client, clients) {
-    if (client.active) {
-      result.push_back(client.name);
+  std::function<void (const Node*)> listClients =
+      [this, &listClients, &result](const Node* node) {
+    if (node->active) {
+      result.push_back(node->clientPath());
     }
-  }
+
+    foreach (Node* child, node->children) {
+      listClients(child);
+    }
+  };
+
+  listClients(root);
 
   return result;
 }
 
 
-bool DRFSorter::contains(const string& name) const
+bool DRFSorter::contains(const string& clientPath) const
 {
-  set<Client, DRFComparator>::iterator it = find(name);
-  return it != clients.end();
+  return find(clientPath) != nullptr;
 }
 
 
@@ -374,23 +519,7 @@ int DRFSorter::count() const
 }
 
 
-void DRFSorter::updateShare(const string& name)
-{
-  set<Client, DRFComparator>::iterator it = find(name);
-  CHECK(it != clients.end());
-
-  Client client(*it);
-
-  // Update the 'share' to get proper sorting.
-  client.share = calculateShare(client);
-
-  // Remove and reinsert it to update the ordering appropriately.
-  clients.erase(it);
-  clients.insert(client);
-}
-
-
-double DRFSorter::calculateShare(const Client& client) const
+double DRFSorter::calculateShare(const Node* node) const
 {
   double share = 0.0;
 
@@ -408,21 +537,21 @@ double DRFSorter::calculateShare(const Client& client) const
     }
 
     if (scalar.value() > 0.0 &&
-        client.allocation.totals.contains(resourceName)) {
+        node->allocation.totals.contains(resourceName)) {
       const double allocation =
-        client.allocation.totals.at(resourceName).value();
+        node->allocation.totals.at(resourceName).value();
 
       share = std::max(share, allocation / scalar.value());
     }
   }
 
-  return share / clientWeight(client.name);
+  return share / findWeight(node);
 }
 
 
-double DRFSorter::clientWeight(const string& name) const
+double DRFSorter::findWeight(const Node* node) const
 {
-  Option<double> weight = weights.get(name);
+  Option<double> weight = weights.get(node->path);
 
   if (weight.isNone()) {
     return 1.0;
@@ -432,16 +561,15 @@ double DRFSorter::clientWeight(const string& name) const
 }
 
 
-set<Client, DRFComparator>::iterator DRFSorter::find(const string& name) const
+DRFSorter::Node* DRFSorter::find(const string& clientPath) const
 {
-  set<Client, DRFComparator>::iterator it;
-  for (it = clients.begin(); it != clients.end(); it++) {
-    if (name == it->name) {
-      break;
-    }
+  Option<Node*> client = clients.get(clientPath);
+
+  if (client.isNone()) {
+    return nullptr;
   }
 
-  return it;
+  return client.get();
 }
 
 } // namespace allocator {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 2ef2eb8..fee58d6 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -17,6 +17,7 @@
 #ifndef __MASTER_ALLOCATOR_SORTER_DRF_SORTER_HPP__
 #define __MASTER_ALLOCATOR_SORTER_DRF_SORTER_HPP__
 
+#include <algorithm>
 #include <set>
 #include <string>
 #include <vector>
@@ -25,6 +26,7 @@
 #include <mesos/resources.hpp>
 #include <mesos/values.hpp>
 
+#include <stout/check.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/option.hpp>
 
@@ -38,20 +40,248 @@ namespace internal {
 namespace master {
 namespace allocator {
 
-struct Client
+class DRFSorter : public Sorter
 {
-  explicit Client(const std::string& _name)
-    : name(_name), share(0), active(true) {}
+public:
+  DRFSorter();
+
+  explicit DRFSorter(
+      const process::UPID& allocator,
+      const std::string& metricsPrefix);
+
+  virtual ~DRFSorter();
+
+  virtual void initialize(
+      const Option<std::set<std::string>>& fairnessExcludeResourceNames);
+
+  virtual void add(const std::string& clientPath);
+
+  virtual void remove(const std::string& clientPath);
+
+  virtual void activate(const std::string& clientPath);
+
+  virtual void deactivate(const std::string& clientPath);
+
+  virtual void updateWeight(const std::string& path, double weight);
+
+  virtual void allocated(
+      const std::string& clientPath,
+      const SlaveID& slaveId,
+      const Resources& resources);
+
+  virtual void update(
+      const std::string& clientPath,
+      const SlaveID& slaveId,
+      const Resources& oldAllocation,
+      const Resources& newAllocation);
+
+  virtual void unallocated(
+      const std::string& clientPath,
+      const SlaveID& slaveId,
+      const Resources& resources);
+
+  virtual const hashmap<SlaveID, Resources>& allocation(
+      const std::string& clientPath) const;
+
+  virtual const Resources& allocationScalarQuantities(
+      const std::string& clientPath) const;
+
+  virtual hashmap<std::string, Resources> allocation(
+      const SlaveID& slaveId) const;
+
+  virtual Resources allocation(
+      const std::string& clientPath,
+      const SlaveID& slaveId) const;
+
+  virtual const Resources& totalScalarQuantities() const;
+
+  virtual void add(const SlaveID& slaveId, const Resources& resources);
+
+  virtual void remove(const SlaveID& slaveId, const Resources& resources);
+
+  virtual std::vector<std::string> sort();
+
+  virtual bool contains(const std::string& clientPath) const;
+
+  virtual int count() const;
+
+private:
+  // A node in the sorter's tree.
+  struct Node;
 
+  // Returns the dominant resource share for the node.
+  double calculateShare(const Node* node) const;
+
+  // Returns the weight associated with the node. If no weight has
+  // been configured for the node's path, the default weight (1.0) is
+  // returned.
+  double findWeight(const Node* node) const;
+
+  // Returns the client associated with the given path. Returns
+  // nullptr if the path is not found or if the path identifies an
+  // internal node in the tree (not a client).
+  Node* find(const std::string& clientPath) const;
+
+  // Resources (by name) that will be excluded from fair sharing.
+  Option<std::set<std::string>> fairnessExcludeResourceNames;
+
+  // If true, sort() will recalculate all shares.
+  bool dirty = false;
+
+  // The root node in the sorter tree.
+  Node* root;
+
+  // To speed lookups, we keep a map from client paths to the leaf
+  // node associated with that client. There is an entry in this map
+  // for every leaf node in the client tree (except for the root when
+  // the tree is empty). Paths in this map do NOT contain the trailing
+  // "." label we use for leaf nodes.
+  hashmap<std::string, Node*> clients;
+
+  // Weights associated with role paths. Setting the weight for a path
+  // influences the share of all nodes in the subtree rooted at that
+  // path. This hashmap might include weights for paths that are not
+  // currently in the sorter tree.
+  hashmap<std::string, double> weights;
+
+  // Total resources.
+  struct Total
+  {
+    // We need to keep track of the resources (and not just scalar
+    // quantities) to account for multiple copies of the same shared
+    // resources. We need to ensure that we do not update the scalar
+    // quantities for shared resources when the change is only in the
+    // number of copies in the sorter.
+    hashmap<SlaveID, Resources> resources;
+
+    // NOTE: Scalars can be safely aggregated across slaves. We keep
+    // that to speed up the calculation of shares. See MESOS-2891 for
+    // the reasons why we want to do that.
+    //
+    // NOTE: We omit information about dynamic reservations and
+    // persistent volumes here to enable resources to be aggregated
+    // across slaves more effectively. See MESOS-4833 for more
+    // information.
+    //
+    // Sharedness info is also stripped out when resource identities
+    // are omitted because sharedness inherently refers to the
+    // identities of resources and not quantities.
+    Resources scalarQuantities;
+
+    // We also store a map version of `scalarQuantities`, mapping
+    // the `Resource::name` to aggregated scalar. This improves the
+    // performance of calculating shares. See MESOS-4694.
+    //
+    // TODO(bmahler): Ideally we do not store `scalarQuantities`
+    // redundantly here, investigate performance improvements to
+    // `Resources` to make this unnecessary.
+    hashmap<std::string, Value::Scalar> totals;
+  } total_;
+
+  // Metrics are optionally exposed by the sorter.
+  friend Metrics;
+  Option<Metrics> metrics;
+};
+
+
+// Represents a node in the sorter's tree. The structure of the tree
+// reflects the hierarchical relationships between the clients of the
+// sorter. Some (but not all) nodes correspond to sorter clients; some
+// nodes only exist to represent the structure of the sorter
+// tree. Clients are always associated with leaf nodes.
+//
+// For example, if there are two sorter clients "a/b" and "c/d", the
+// tree will contain five nodes: the root node, internal nodes for "a"
+// and "c", and leaf nodes for the clients "a/b" and "c/d".
+struct DRFSorter::Node
+{
+  Node(const std::string& _name, Node* _parent)
+    : name(_name), share(0), active(false), parent(_parent)
+  {
+    // Compute the node's path. Three cases:
+    //
+    //  (1) If the root node, use the empty string
+    //  (2) If a child of the root node, use the child's name
+    //  (3) Otherwise, use the parent's name, "/", and the child's name.
+    if (parent == nullptr) {
+      path = "";
+    } else if (parent->parent == nullptr) {
+      path = name;
+    } else {
+      path = strings::join("/", parent->path, name);
+    }
+  }
+
+  ~Node()
+  {
+    foreach (Node* child, children) {
+      delete child;
+    }
+  }
+
+  // The label of the edge from this node's parent to the
+  // node. "Implicit" leaf nodes are always named ".".
+  //
+  // TODO(neilc): Consider naming implicit leaf nodes in a clearer
+  // way, e.g., by making `name` an Option?
   std::string name;
+
+  // Complete path from root to node. This includes the trailing "."
+  // label for virtual leaf nodes.
+  std::string path;
+
   double share;
+
+  // True if this node represents an active sorter client. False if
+  // this node represents an inactive sorter client or an internal node.
+  //
+  // TODO(neilc): Replace this with a three-valued enum?
   bool active;
 
-  // Allocation for a client.
-  struct Allocation {
+  Node* parent;
+  std::vector<Node*> children;
+
+  // If this node represents a sorter client, this returns the path of
+  // that client. Unlike the `path` field, this does NOT include the
+  // trailing "." label for virtual leaf nodes.
+  //
+  // For example, if the sorter contains two clients "a" and "a/b",
+  // the tree will contain four nodes: the root node, "a", "a/."
+  // (virtual leaf), and "a/b". The `clientPath()` of "a/." is "a",
+  // because that is the name of the client associated with that
+  // virtual leaf node.
+  std::string clientPath() const
+  {
+    if (name == ".") {
+      return CHECK_NOTNULL(parent)->path;
+    }
+
+    return path;
+  }
+
+  void removeChild(const Node* child)
+  {
+    auto it = std::find(children.begin(), children.end(), child);
+    CHECK(it != children.end());
+
+    children.erase(it);
+  }
+
+  void addChild(Node* child)
+  {
+    auto it = std::find(children.begin(), children.end(), child);
+    CHECK(it == children.end());
+
+    children.push_back(child);
+  }
+
+  // Allocation for a node.
+  struct Allocation
+  {
     Allocation() : count(0) {}
 
-    void add(const SlaveID& slaveId, const Resources& toAdd) {
+    void add(const SlaveID& slaveId, const Resources& toAdd)
+    {
       // Add shared resources to the allocated quantities when the same
       // resources don't already exist in the allocation.
       const Resources sharedToAdd = toAdd.shared()
@@ -72,7 +302,8 @@ struct Client
       count++;
     }
 
-    void subtract(const SlaveID& slaveId, const Resources& toRemove) {
+    void subtract(const SlaveID& slaveId, const Resources& toRemove)
+    {
       CHECK(resources.contains(slaveId));
       CHECK(resources.at(slaveId).contains(toRemove));
 
@@ -103,7 +334,8 @@ struct Client
     void update(
         const SlaveID& slaveId,
         const Resources& oldAllocation,
-        const Resources& newAllocation) {
+        const Resources& newAllocation)
+    {
       const Resources oldAllocationQuantity =
         oldAllocation.createStrippedScalarQuantity();
       const Resources newAllocationQuantity =
@@ -156,143 +388,20 @@ struct Client
     // `Resources` to make this unnecessary.
     hashmap<std::string, Value::Scalar> totals;
   } allocation;
-};
-
-
-struct DRFComparator
-{
-  virtual ~DRFComparator() {}
-  virtual bool operator()(const Client& client1, const Client& client2);
-};
-
-
-class DRFSorter : public Sorter
-{
-public:
-  DRFSorter() = default;
-
-  explicit DRFSorter(
-      const process::UPID& allocator,
-      const std::string& metricsPrefix);
-
-  virtual ~DRFSorter() {}
-
-  virtual void initialize(
-      const Option<std::set<std::string>>& fairnessExcludeResourceNames);
-
-  virtual void add(const std::string& name);
-
-  virtual void remove(const std::string& name);
-
-  virtual void activate(const std::string& name);
-
-  virtual void deactivate(const std::string& name);
-
-  virtual void updateWeight(const std::string& name, double weight);
-
-  virtual void allocated(
-      const std::string& name,
-      const SlaveID& slaveId,
-      const Resources& resources);
-
-  virtual void update(
-      const std::string& name,
-      const SlaveID& slaveId,
-      const Resources& oldAllocation,
-      const Resources& newAllocation);
-
-  virtual void unallocated(
-      const std::string& name,
-      const SlaveID& slaveId,
-      const Resources& resources);
-
-  virtual const hashmap<SlaveID, Resources>& allocation(
-      const std::string& name) const;
-
-  virtual const Resources& allocationScalarQuantities(
-      const std::string& name) const;
-
-  virtual hashmap<std::string, Resources> allocation(
-      const SlaveID& slaveId) const;
-
-  virtual Resources allocation(
-      const std::string& name,
-      const SlaveID& slaveId) const;
-
-  virtual const Resources& totalScalarQuantities() const;
-
-  virtual void add(const SlaveID& slaveId, const Resources& resources);
-
-  virtual void remove(const SlaveID& slaveId, const Resources& resources);
-
-  virtual std::vector<std::string> sort();
-
-  virtual bool contains(const std::string& name) const;
-
-  virtual int count() const;
-
-private:
-  // Recalculates the share for the client and moves
-  // it in 'clients' accordingly.
-  void updateShare(const std::string& name);
-
-  // Returns the dominant resource share for the client.
-  double calculateShare(const Client& client) const;
-
-  // Resources (by name) that will be excluded from fair sharing.
-  Option<std::set<std::string>> fairnessExcludeResourceNames;
-
-  // Returns the weight associated with the given path. If no weight
-  // has been configured, the default weight (1.0) is returned.
-  double clientWeight(const std::string& name) const;
-
-  // Returns an iterator to the specified client, if
-  // it exists in this Sorter.
-  std::set<Client, DRFComparator>::iterator find(const std::string& name) const;
-
-  // If true, sort() will recalculate all shares.
-  bool dirty = false;
-
-  // The set of clients, sorted by share.
-  std::set<Client, DRFComparator> clients;
 
-  // Maps client names to the weights that should be applied to their shares.
-  hashmap<std::string, double> weights;
-
-  // Total resources.
-  struct Total {
-    // We need to keep track of the resources (and not just scalar quantities)
-    // to account for multiple copies of the same shared resources. We need to
-    // ensure that we do not update the scalar quantities for shared resources
-    // when the change is only in the number of copies in the sorter.
-    hashmap<SlaveID, Resources> resources;
-
-    // NOTE: Scalars can be safely aggregated across slaves. We keep
-    // that to speed up the calculation of shares. See MESOS-2891 for
-    // the reasons why we want to do that.
-    //
-    // NOTE: We omit information about dynamic reservations and persistent
-    // volumes here to enable resources to be aggregated across slaves
-    // more effectively. See MESOS-4833 for more information.
-    //
-    // Sharedness info is also stripped out when resource identities are
-    // omitted because sharedness inherently refers to the identities of
-    // resources and not quantities.
-    Resources scalarQuantities;
+  // Compares two nodes according to DRF share.
+  static bool compareDRF(const Node* left, const Node* right)
+  {
+    if (left->share != right->share) {
+      return left->share < right->share;
+    }
 
-    // We also store a map version of `scalarQuantities`, mapping
-    // the `Resource::name` to aggregated scalar. This improves the
-    // performance of calculating shares. See MESOS-4694.
-    //
-    // TODO(bmahler): Ideally we do not store `scalarQuantities`
-    // redundantly here, investigate performance improvements to
-    // `Resources` to make this unnecessary.
-    hashmap<std::string, Value::Scalar> totals;
-  } total_;
+    if (left->allocation.count != right->allocation.count) {
+      return left->allocation.count < right->allocation.count;
+    }
 
-  // Metrics are optionally exposed by the sorter.
-  friend Metrics;
-  Option<Metrics> metrics;
+    return left->path < right->path;
+  }
 };
 
 } // namespace allocator {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index 4de249e..0c9f172 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -71,13 +71,14 @@ public:
   // It is a no-op if the client is already not in the sort.
   virtual void deactivate(const std::string& client) = 0;
 
-  // Updates the weight of a client name. The sorter will store this
-  // weight regardless of whether a client with this name has been
-  // added. If a client's weight is not changed, the default weight
-  // (1.0) is used. This interface does not support unsetting
-  // previously set weights; instead, a weight should be reset to the
-  // default value.
-  virtual void updateWeight(const std::string& client, double weight) = 0;
+  // Updates the weight of a client path. This changes the sorter's
+  // behavior for all clients in the subtree identified by this path
+  // (both clients currently in the sorter and any clients that may be
+  // added later). If a client's weight is not explicitly set, the
+  // default weight of 1.0 is used. This interface does not support
+  // unsetting previously set weights; instead, the weight should be
+  // reset to the default value.
+  virtual void updateWeight(const std::string& path, double weight) = 0;
 
   // Specify that resources have been allocated to the given client.
   virtual void allocated(

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 1e2eb96..ad03e17 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -704,6 +704,121 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
 }
 
 
+// This test checks allocator behavior when offering resources to
+// frameworks that register using nested ("hierarchical") roles.
+TEST_F(HierarchicalAllocatorTest, NestedRoleDRF)
+{
+  // Pausing the clock is not necessary, but ensures that the test
+  // doesn't rely on the batch allocation in the allocator, which
+  // would slow down the test.
+  Clock::pause();
+
+  initialize();
+
+  // Total cluster resources will become cpus=2, mem=1024.
+  SlaveInfo slave1 = createSlaveInfo("cpus:2;mem:1024;disk:0");
+  allocator->addSlave(
+      slave1.id(),
+      slave1,
+      AGENT_CAPABILITIES(),
+      None(),
+      slave1.resources(),
+      {});
+
+  // framework1 will be offered all of slave1's resources since it is
+  // the only framework running so far.
+  FrameworkInfo framework1 = createFrameworkInfo({"a/b"});
+  allocator->addFramework(framework1.id(), framework1, {}, true);
+
+  {
+    Allocation expected = Allocation(
+        framework1.id(),
+        {{"a/b", {{slave1.id(), slave1.resources()}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+
+  // a share = 1 (cpus=2, mem=1024)
+  //   a/b share = 1 (cpus=2, mem=1024)
+  //     framework1 share = 1
+
+  // Add a new slave, along with two new frameworks in roles "a/c" and
+  // "d/e". We expect the new slave's resources to be offered to "d/e"
+  // rather than "a/c", since the role subtree under "a" has more
+  // resources than the "d" subtree.
+
+  // Total cluster resources will become cpus=3, mem=1536.
+  SlaveInfo slave2 = createSlaveInfo("cpus:1;mem:512;disk:0");
+  allocator->addSlave(
+      slave2.id(),
+      slave2,
+      AGENT_CAPABILITIES(),
+      None(),
+      slave2.resources(),
+      {});
+
+  FrameworkInfo framework2 = createFrameworkInfo({"a/c"});
+  allocator->addFramework(framework2.id(), framework2, {}, true);
+
+  FrameworkInfo framework3 = createFrameworkInfo({"d/e"});
+  allocator->addFramework(framework3.id(), framework3, {}, true);
+
+  {
+    Allocation expected = Allocation(
+        framework3.id(),
+        {{"d/e", {{slave2.id(), slave2.resources()}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+
+  // a share = 0.666667 (cpus=2, mem=1024)
+  //   a/b share = 0.666667 (cpus=2, mem=1024)
+  //     framework1 share = 1
+  //   a/c share = 0
+  //     framework2 share = 0
+  // d share = 0.333333 (cpus=1, mem=512)
+  //   d/e share = 0.333333 (cpus=1, mem=512)
+  //     framework3 share = 1
+
+  // Add a new slave and a new framework in the role "d/f". The new
+  // slave's resources should be allocated to the new framework (and
+  // not the framework in "a/c"), because the "d" subtree has fewer
+  // allocated resources than the "a" subtree.
+
+  // Total cluster resources will become cpus=5, mem=2560.
+  SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:1024;disk:0");
+  allocator->addSlave(
+      slave3.id(),
+      slave3,
+      AGENT_CAPABILITIES(),
+      None(),
+      slave3.resources(),
+      {});
+
+  FrameworkInfo framework4 = createFrameworkInfo({"d/f"});
+  allocator->addFramework(framework4.id(), framework4, {}, true);
+
+  {
+    Allocation expected = Allocation(
+        framework4.id(),
+        {{"d/f", {{slave3.id(), slave3.resources()}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+
+  // a share = 0.4 (cpus=2, mem=1024)
+  //   a/b share = 0.4 (cpus=2, mem=1024)
+  //     framework1 share = 1
+  //   a/c share = 0
+  //     framework2 share = 0
+  // d share = 0.6 (cpus=3, mem=1536)
+  //   d/e share = 0.2 (cpus=1, mem=512)
+  //     framework3 share = 1
+  //   d/f share = 0.4 (cpus=2, mem=1024)
+  //     framework4 share = 1
+}
+
+
 // This test ensures that an offer filter larger than the
 // allocation interval effectively filters out resources.
 TEST_F(HierarchicalAllocatorTest, OfferFilter)
@@ -4298,8 +4413,8 @@ TEST_F(HierarchicalAllocatorTest, DisproportionateQuotaVsAllocation)
 }
 
 
-// This test checks that quota guarantees work correctly when a nested
-// role is created as a child of an existing role that has quota.
+// This test checks that quota guarantees work as expected when a
+// nested role is created as a child of an existing quota'd role.
 TEST_F(HierarchicalAllocatorTest, NestedRoleQuota)
 {
   // Pausing the clock is not necessary, but ensures that the test
@@ -4313,48 +4428,48 @@ TEST_F(HierarchicalAllocatorTest, NestedRoleQuota)
   const string CHILD_ROLE1 = "a/b/c";
   const string CHILD_ROLE2 = "a/b/d";
 
-  // Create `framework1` and set quota for its role.
+  // Create `framework1` in PARENT_ROLE and set quota for its role.
   FrameworkInfo framework1 = createFrameworkInfo({PARENT_ROLE});
   allocator->addFramework(framework1.id(), framework1, {}, true);
 
   const Quota parentQuota = createQuota(PARENT_ROLE, "cpus:2;mem:1024");
   allocator->setQuota(PARENT_ROLE, parentQuota);
 
-  SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0");
+  SlaveInfo agent = createSlaveInfo("cpus:1;mem:512");
   allocator->addSlave(
-      agent1.id(),
-      agent1,
+      agent.id(),
+      agent,
       AGENT_CAPABILITIES(),
       None(),
-      agent1.resources(),
+      agent.resources(),
       {});
 
-  // `framework1` will be offered all of `agent1`'s resources because
+  // `framework1` will be offered all the resources on `agent` because
   // it is the only framework in the only role with unsatisfied quota.
   {
     Allocation expected = Allocation(
         framework1.id(),
-        {{PARENT_ROLE, {{agent1.id(), agent1.resources()}}}});
+        {{PARENT_ROLE, {{agent.id(), agent.resources()}}}});
 
     AWAIT_EXPECT_EQ(expected, allocations.get());
   }
 
-  // `framework1` declines the resources on `agent1` for the duration
+  // `framework1` declines the resources on `agent` for the duration
   // of the test.
   Filters longFilter;
   longFilter.set_refuse_seconds(flags.allocation_interval.secs() * 10);
 
   allocator->recoverResources(
       framework1.id(),
-      agent1.id(),
-      allocatedResources(agent1.resources(), PARENT_ROLE),
+      agent.id(),
+      allocatedResources(agent.resources(), PARENT_ROLE),
       longFilter);
 
-  // Register a framework in CHILD_ROLE1, which is a child role of
-  // PARENT_ROLE. In the current implementation, because CHILD_ROLE1
-  // does not itself have quota, it will not be offered any of
-  // PARENT_ROLE's quota'd resources. This behavior may change in the
-  // future (MESOS-7150).
+  // Create `framework2` in CHILD_ROLE1, which is a child role of
+  // PARENT_ROLE. CHILD_ROLE1 does not have quota. In the current
+  // implementation, because CHILD_ROLE1 does not itself have quota,
+  // it will not be offered any of PARENT_ROLE's quota'd resources.
+  // This behavior may change in the future (MESOS-7150).
   FrameworkInfo framework2 = createFrameworkInfo({CHILD_ROLE1});
   allocator->addFramework(framework2.id(), framework2, {}, true);
 
@@ -4366,9 +4481,9 @@ TEST_F(HierarchicalAllocatorTest, NestedRoleQuota)
   Future<Allocation> allocation = allocations.get();
   EXPECT_TRUE(allocation.isPending());
 
-  // Register a framework in CHILD_ROLE2, which is a child role of
-  // PARENT_ROLE. Because CHILD_ROLE2 has quota, it will be offered
-  // resources.
+  // Create `framework3` in CHILD_ROLE2, which is a child role of
+  // PARENT_ROLE. CHILD_ROLE2 has quota, so in the current
+  // implementation, it will be offered resources.
   FrameworkInfo framework3 = createFrameworkInfo({CHILD_ROLE2});
   allocator->addFramework(framework3.id(), framework3, {}, true);
 
@@ -4378,13 +4493,107 @@ TEST_F(HierarchicalAllocatorTest, NestedRoleQuota)
   {
     Allocation expected = Allocation(
         framework3.id(),
-        {{CHILD_ROLE2, {{agent1.id(), agent1.resources()}}}});
+        {{CHILD_ROLE2, {{agent.id(), agent.resources()}}}});
 
     AWAIT_EXPECT_EQ(expected, allocation);
   }
 }
 
 
+// This test checks that quota guarantees work as expected when a
+// nested role is created as a child of an existing quota'd role, and
+// the parent role has been allocated resources.
+TEST_F(HierarchicalAllocatorTest, NestedRoleQuotaAllocateToParent)
+{
+  // Pausing the clock is not necessary, but ensures that the test
+  // doesn't rely on the batch allocation in the allocator, which
+  // would slow down the test.
+  Clock::pause();
+
+  initialize();
+
+  const string PARENT_ROLE = "a/b";
+  const string CHILD_ROLE = "a/b/c";
+
+  // Set quota for parent role.
+  const Quota parentQuota = createQuota(PARENT_ROLE, "cpus:4;mem:2048");
+  allocator->setQuota(PARENT_ROLE, parentQuota);
+
+  // Create `framework1` in the parent role.
+  FrameworkInfo framework1 = createFrameworkInfo({PARENT_ROLE});
+  allocator->addFramework(framework1.id(), framework1, {}, true);
+
+  SlaveInfo agent1 = createSlaveInfo("cpus:2;mem:1024");
+  allocator->addSlave(
+      agent1.id(),
+      agent1,
+      AGENT_CAPABILITIES(),
+      None(),
+      agent1.resources(),
+      {});
+
+  // `framework1` will be offered all of the resources on `agent1`.
+  {
+    Allocation expected = Allocation(
+        framework1.id(),
+        {{PARENT_ROLE, {{agent1.id(), agent1.resources()}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+
+  // Create `framework2` in the child role.
+  FrameworkInfo framework2 = createFrameworkInfo({CHILD_ROLE});
+  allocator->addFramework(framework2.id(), framework2, {}, true);
+
+  const Quota childQuota = createQuota(CHILD_ROLE, "cpus:1;mem:512");
+  allocator->setQuota(CHILD_ROLE, childQuota);
+
+  SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512");
+  allocator->addSlave(
+      agent2.id(),
+      agent2,
+      AGENT_CAPABILITIES(),
+      None(),
+      agent2.resources(),
+      {});
+
+  // `framework2` will be offered all of the resources on `agent2`.
+  {
+    Allocation expected = Allocation(
+        framework2.id(),
+        {{CHILD_ROLE, {{agent2.id(), agent2.resources()}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+
+  SlaveInfo agent3 = createSlaveInfo("cpus:1;mem:512");
+  allocator->addSlave(
+      agent3.id(),
+      agent3,
+      AGENT_CAPABILITIES(),
+      None(),
+      agent3.resources(),
+      {});
+
+  // `framework1` will be offered all of the resources on `agent3`.
+  //
+  // NOTE: The quota on PARENT_ROLE actually applies to the entire
+  // subtree rooted at PARENT_ROLE, which includes CHILD_ROLE.
+  // Therefore, `framework1` and `framework2` should both be
+  // candidates to receive the resources at `agent3`. In the current
+  // implementation, we don't "delegate" the PARENT_ROLE quota to the
+  // entire subtree; rather, it can only be used by roles in the
+  // subtree that have quota set (MESOS-7150).
+  {
+    Allocation expected = Allocation(
+        framework1.id(),
+        {{PARENT_ROLE, {{agent3.id(), agent3.resources()}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+}
+
+
 class HierarchicalAllocatorTestWithParam
   : public HierarchicalAllocatorTestBase,
     public WithParamInterface<bool> {};

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index 119e318..3b072b2 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -1780,6 +1780,158 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
 }
 #endif // __WINDOWS__
 
+
+// Checks that accepting offers and launching tasks works as expected
+// with nested roles.
+TYPED_TEST(MasterAllocatorTest, NestedRoles)
+{
+  Clock::pause();
+
+  TestAllocator<TypeParam> allocator;
+
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
+
+  master::Flags masterFlags = this->CreateMasterFlags();
+  Try<Owned<cluster::Master>> master =
+    this->StartMaster(&allocator, masterFlags);
+  ASSERT_SOME(master);
+
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _, _));
+
+  MockExecutor exec1(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer1(&exec1);
+
+  slave::Flags slaveFlags1 = this->CreateSlaveFlags();
+  slaveFlags1.resources = Some("cpus:2;mem:1024");
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave1 =
+    this->StartSlave(detector.get(), &containerizer1, slaveFlags1);
+  ASSERT_SOME(slave1);
+
+  // Advance clock to force agent to register.
+  Clock::advance(slaveFlags1.authentication_backoff_factor);
+  Clock::advance(slaveFlags1.registration_backoff_factor);
+
+  // Register a framework in the "a/b" role and launch a single task,
+  // consuming all the resources on `slave1`.
+  FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_role("a/b");
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(allocator, addFramework(_, _, _, _));
+
+  EXPECT_CALL(sched1, registered(_, _, _));
+
+  EXPECT_CALL(sched1, resourceOffers(&driver1, OfferEq(2, 1024)))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "a/b"));
+
+  EXPECT_CALL(exec1, registered(_, _, _, _));
+
+  EXPECT_CALL(exec1, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> runningStatus1;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&runningStatus1));
+
+  driver1.start();
+
+  AWAIT_READY(runningStatus1);
+  EXPECT_EQ(TASK_RUNNING, runningStatus1->state());
+
+  // Register a framework in the "a/c" role. It should not get any
+  // offers, because there are no unused resources.
+  FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.set_role("a/c");
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(allocator, addFramework(_, _, _, _));
+
+  Future<Nothing> framework2Registered;
+  EXPECT_CALL(sched2, registered(_, _, _))
+    .WillOnce(FutureSatisfy(&framework2Registered));
+
+  driver2.start();
+
+  AWAIT_READY(framework2Registered);
+
+  // Register a framework in the "b/x" role. It should not get any
+  // offers, because there are no unused resources.
+  FrameworkInfo frameworkInfo3 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo3.set_role("b/x");
+
+  MockScheduler sched3;
+  MesosSchedulerDriver driver3(
+      &sched3, frameworkInfo3, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(allocator, addFramework(_, _, _, _));
+
+  Future<Nothing> framework3Registered;
+  EXPECT_CALL(sched3, registered(_, _, _))
+    .WillOnce(FutureSatisfy(&framework3Registered));
+
+  driver3.start();
+
+  AWAIT_READY(framework3Registered);
+
+  // Start a second agent. The resources on this agent should be
+  // offered to `sched3`, because the "b" role subtree is below its
+  // fair-share.
+  MockExecutor exec2(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer2(&exec2);
+
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _, _));
+
+  EXPECT_CALL(sched3, resourceOffers(&driver3, OfferEq(1, 512)))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "b/x"));
+
+  EXPECT_CALL(exec2, registered(_, _, _, _));
+
+  EXPECT_CALL(exec2, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> runningStatus2;
+  EXPECT_CALL(sched3, statusUpdate(&driver3, _))
+    .WillOnce(FutureArg<1>(&runningStatus2));
+
+  slave::Flags slaveFlags2 = this->CreateSlaveFlags();
+  slaveFlags2.resources = Some("cpus:1;mem:512");
+
+  Try<Owned<cluster::Slave>> slave2 =
+    this->StartSlave(detector.get(), &containerizer2, slaveFlags2);
+  ASSERT_SOME(slave2);
+
+  // Advance clock to force agent to register.
+  Clock::advance(slaveFlags2.authentication_backoff_factor);
+  Clock::advance(slaveFlags2.registration_backoff_factor);
+
+  AWAIT_READY(runningStatus2);
+  EXPECT_EQ(TASK_RUNNING, runningStatus2->state());
+
+  EXPECT_CALL(exec1, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(exec2, shutdown(_))
+    .Times(AtMost(1));
+
+  driver1.stop();
+  driver1.join();
+
+  driver2.stop();
+  driver2.join();
+
+  driver3.stop();
+  driver3.join();
+}
+
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5ef1992/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index 7ca6fca..2ddf64e 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -51,6 +51,8 @@ TEST(SorterTest, DRFSorter)
   Resources totalResources = Resources::parse("cpus:100;mem:100").get();
   sorter.add(slaveId, totalResources);
 
+  EXPECT_EQ(vector<string>({}), sorter.sort());
+
   sorter.add("a");
   Resources aResources = Resources::parse("cpus:5;mem:5").get();
   sorter.allocated("a", slaveId, aResources);
@@ -74,6 +76,7 @@ TEST(SorterTest, DRFSorter)
   EXPECT_EQ(vector<string>({"c", "d", "a", "b"}), sorter.sort());
 
   sorter.remove("a");
+
   Resources bUnallocated = Resources::parse("cpus:4;mem:4").get();
   sorter.unallocated("b", slaveId, bUnallocated);
 
@@ -284,6 +287,539 @@ TEST(SorterTest, CountAllocations)
 }
 
 
+// This test checks a simple case of hierarchical allocation: the same
+// sequence of operations happens as in the `DRFSorter` test, but all
+// client names are nested into disjoint branches of the tree. In this
+// case, the hierarchy should not change allocation behavior.
+TEST(SorterTest, ShallowHierarchy)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
+  sorter.add(slaveId, totalResources);
+
+  sorter.add("a/a");
+
+  Resources aResources = Resources::parse("cpus:5;mem:5").get();
+  sorter.allocated("a/a", slaveId, aResources);
+
+  Resources bResources = Resources::parse("cpus:6;mem:6").get();
+  sorter.add("b/b");
+  sorter.allocated("b/b", slaveId, bResources);
+
+  // shares: a/a = .05, b/b = .06
+  EXPECT_EQ(vector<string>({"a/a", "b/b"}), sorter.sort());
+
+  Resources cResources = Resources::parse("cpus:1;mem:1").get();
+  sorter.add("c/c");
+  sorter.allocated("c/c", slaveId, cResources);
+
+  Resources dResources = Resources::parse("cpus:3;mem:1").get();
+  sorter.add("d/d");
+  sorter.allocated("d/d", slaveId, dResources);
+
+  // shares: a/a = .05, b/b = .06, c/c = .01, d/d = .03
+  EXPECT_EQ(vector<string>({"c/c", "d/d", "a/a", "b/b"}), sorter.sort());
+
+  sorter.remove("a/a");
+
+  Resources bUnallocated = Resources::parse("cpus:4;mem:4").get();
+  sorter.unallocated("b/b", slaveId, bUnallocated);
+
+  // shares: b/b = .02, c/c = .01, d/d = .03
+  EXPECT_EQ(vector<string>({"c/c", "b/b", "d/d"}), sorter.sort());
+
+  Resources eResources = Resources::parse("cpus:1;mem:5").get();
+  sorter.add("e/e");
+  sorter.allocated("e/e", slaveId, eResources);
+
+  Resources removedResources = Resources::parse("cpus:50;mem:0").get();
+  sorter.remove(slaveId, removedResources);
+  // total resources is now cpus = 50, mem = 100
+
+  // shares: b/b = .04, c/c = .02, d/d = .06, e/e = .05
+  EXPECT_EQ(vector<string>({"c/c", "b/b", "e/e", "d/d"}), sorter.sort());
+
+  Resources addedResources = Resources::parse("cpus:0;mem:100").get();
+  sorter.add(slaveId, addedResources);
+  // total resources is now cpus = 50, mem = 200
+
+  Resources fResources = Resources::parse("cpus:5;mem:1").get();
+  sorter.add("f/f");
+  sorter.allocated("f/f", slaveId, fResources);
+
+  Resources cResources2 = Resources::parse("cpus:0;mem:15").get();
+  sorter.allocated("c/c", slaveId, cResources2);
+
+  // shares: b = .04, c = .08, d = .06, e = .025, f = .1
+  EXPECT_EQ(vector<string>({"e/e", "b/b", "d/d", "c/c", "f/f"}), sorter.sort());
+
+  EXPECT_TRUE(sorter.contains("b/b"));
+
+  EXPECT_FALSE(sorter.contains("a/a"));
+
+  EXPECT_EQ(5, sorter.count());
+
+  sorter.deactivate("d/d");
+
+  EXPECT_TRUE(sorter.contains("d/d"));
+
+  EXPECT_EQ(vector<string>({"e/e", "b/b", "c/c", "f/f"}), sorter.sort());
+
+  EXPECT_EQ(5, sorter.count());
+
+  sorter.activate("d/d");
+
+  EXPECT_EQ(vector<string>({"e/e", "b/b", "d/d", "c/c", "f/f"}), sorter.sort());
+}
+
+
+// Analogous to `ShallowHierarchy` except the client names are nested
+// more deeply and different client names are at different depths in
+// the tree.
+TEST(SorterTest, DeepHierarchy)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
+  sorter.add(slaveId, totalResources);
+
+  sorter.add("a/a/a/a/a");
+  Resources aResources = Resources::parse("cpus:5;mem:5").get();
+  sorter.allocated("a/a/a/a/a", slaveId, aResources);
+
+  Resources bResources = Resources::parse("cpus:6;mem:6").get();
+  sorter.add("b/b/b/b");
+  sorter.allocated("b/b/b/b", slaveId, bResources);
+
+  // shares: a/a/a/a/a = .05, b/b/b/b = .06
+  EXPECT_EQ(vector<string>({"a/a/a/a/a", "b/b/b/b"}), sorter.sort());
+
+  Resources cResources = Resources::parse("cpus:1;mem:1").get();
+  sorter.add("c/c/c");
+  sorter.allocated("c/c/c", slaveId, cResources);
+
+  Resources dResources = Resources::parse("cpus:3;mem:1").get();
+  sorter.add("d/d");
+  sorter.allocated("d/d", slaveId, dResources);
+
+  // shares: a/a/a/a/a = .05, b/b/b/b = .06, c/c/c = .01, d/d = .03
+  EXPECT_EQ(vector<string>({"c/c/c", "d/d", "a/a/a/a/a", "b/b/b/b"}),
+            sorter.sort());
+
+  sorter.remove("a/a/a/a/a");
+
+  Resources bUnallocated = Resources::parse("cpus:4;mem:4").get();
+  sorter.unallocated("b/b/b/b", slaveId, bUnallocated);
+
+  // shares: b/b/b/b = .02, c/c/c = .01, d/d = .03
+  EXPECT_EQ(vector<string>({"c/c/c", "b/b/b/b", "d/d"}), sorter.sort());
+
+  Resources eResources = Resources::parse("cpus:1;mem:5").get();
+  sorter.add("e/e/e/e/e/e");
+  sorter.allocated("e/e/e/e/e/e", slaveId, eResources);
+
+  Resources removedResources = Resources::parse("cpus:50;mem:0").get();
+  sorter.remove(slaveId, removedResources);
+  // total resources is now cpus = 50, mem = 100
+
+  // shares: b/b/b/b = .04, c/c/c = .02, d/d = .06, e/e/e/e/e/e = .05
+  EXPECT_EQ(vector<string>({"c/c/c", "b/b/b/b", "e/e/e/e/e/e", "d/d"}),
+            sorter.sort());
+
+  Resources addedResources = Resources::parse("cpus:0;mem:100").get();
+  sorter.add(slaveId, addedResources);
+  // total resources is now cpus = 50, mem = 200
+
+  Resources fResources = Resources::parse("cpus:5;mem:1").get();
+  sorter.add("f/f");
+  sorter.allocated("f/f", slaveId, fResources);
+
+  Resources cResources2 = Resources::parse("cpus:0;mem:15").get();
+  sorter.allocated("c/c/c", slaveId, cResources2);
+
+  // shares: b = .04, c = .08, d = .06, e = .025, f = .1
+  EXPECT_EQ(vector<string>({"e/e/e/e/e/e", "b/b/b/b", "d/d", "c/c/c", "f/f"}),
+            sorter.sort());
+
+  EXPECT_TRUE(sorter.contains("b/b/b/b"));
+
+  EXPECT_FALSE(sorter.contains("a/a/a/a/a"));
+
+  EXPECT_EQ(5, sorter.count());
+
+  sorter.deactivate("d/d");
+
+  EXPECT_TRUE(sorter.contains("d/d"));
+
+  EXPECT_EQ(vector<string>({"e/e/e/e/e/e", "b/b/b/b", "c/c/c", "f/f"}),
+            sorter.sort());
+
+  EXPECT_EQ(5, sorter.count());
+
+  sorter.activate("d/d");
+
+  EXPECT_EQ(vector<string>({"e/e/e/e/e/e", "b/b/b/b", "d/d", "c/c/c", "f/f"}),
+            sorter.sort());
+}
+
+
+TEST(SorterTest, HierarchicalAllocation)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  Resources totalResources = Resources::parse("cpus:100;mem:100").get();
+  sorter.add(slaveId, totalResources);
+
+  sorter.add("a");
+  sorter.add("b/c");
+  sorter.add("b/d");
+
+  EXPECT_EQ(3, sorter.count());
+  EXPECT_TRUE(sorter.contains("a"));
+  EXPECT_FALSE(sorter.contains("b"));
+  EXPECT_TRUE(sorter.contains("b/c"));
+  EXPECT_TRUE(sorter.contains("b/d"));
+
+  // Shares: a = 0, b/c = 0, b/d = 0.
+  EXPECT_EQ(vector<string>({"a", "b/c", "b/d"}), sorter.sort());
+
+  Resources aResources = Resources::parse("cpus:6;mem:6").get();
+  sorter.allocated("a", slaveId, aResources);
+
+  // Shares: a = 0.06, b/c = 0, b/d = 0.
+  EXPECT_EQ(vector<string>({"b/c", "b/d", "a"}), sorter.sort());
+
+  Resources cResources = Resources::parse("cpus:4;mem:4").get();
+  sorter.allocated("b/c", slaveId, cResources);
+
+  Resources dResources = Resources::parse("cpus:3;mem:3").get();
+  sorter.allocated("b/d", slaveId, dResources);
+
+  // Shares: a = 0.06, b/d = 0.03, d = 0.04.
+  EXPECT_EQ(vector<string>({"a", "b/d", "b/c"}), sorter.sort());
+
+  {
+    hashmap<string, Resources> agentAllocation =
+      sorter.allocation(slaveId);
+
+    EXPECT_EQ(3, agentAllocation.size());
+    EXPECT_EQ(aResources, agentAllocation.at("a"));
+    EXPECT_EQ(cResources, agentAllocation.at("b/c"));
+    EXPECT_EQ(dResources, agentAllocation.at("b/d"));
+
+    EXPECT_EQ(aResources, sorter.allocation("a", slaveId));
+    EXPECT_EQ(cResources, sorter.allocation("b/c", slaveId));
+    EXPECT_EQ(dResources, sorter.allocation("b/d", slaveId));
+  }
+
+  Resources aExtraResources = Resources::parse("cpus:2;mem:2").get();
+  sorter.allocated("a", slaveId, aExtraResources);
+
+  // Shares: b/d = 0.03, b/c = 0.04, a = 0.08.
+  EXPECT_EQ(vector<string>({"b/d", "b/c", "a"}), sorter.sort());
+
+  sorter.add("b/e/f");
+
+  EXPECT_FALSE(sorter.contains("b/e"));
+  EXPECT_TRUE(sorter.contains("b/e/f"));
+
+  // Shares: b/e/f = 0, b/d = 0.03, b/c = 0.04, a = 0.08.
+  EXPECT_EQ(vector<string>({"b/e/f", "b/d", "b/c", "a"}), sorter.sort());
+
+  Resources fResources = Resources::parse("cpus:3.5;mem:3.5").get();
+  sorter.allocated("b/e/f", slaveId, fResources);
+
+  // Shares: a = 0.08, b/d = 0.03, b/e/f = 0.035, b/c = 0.04.
+  EXPECT_EQ(vector<string>({"a", "b/d", "b/e/f", "b/c"}), sorter.sort());
+
+  // Removing a client should result in updating the fair-share for
+  // the subtree that contains the removed client.
+  sorter.remove("b/e/f");
+
+  EXPECT_FALSE(sorter.contains("b/e/f"));
+  EXPECT_EQ(3, sorter.count());
+
+  // Shares: b/d = 0.03, b/c = 0.04, a = 0.08.
+  EXPECT_EQ(vector<string>({"b/d", "b/c", "a"}), sorter.sort());
+
+  // Updating a client should result in updating the fair-share for
+  // the subtree that contains the updated client.
+  Resources cNewResources = Resources::parse("cpus:1;mem:1").get();
+  sorter.update("b/c", slaveId, cResources, cNewResources);
+
+  // Shares: b/c = 0.01, b/d = 0.03, a = 0.08.
+  EXPECT_EQ(vector<string>({"b/c", "b/d", "a"}), sorter.sort());
+
+  sorter.add("b/e/f");
+  sorter.allocated("b/e/f", slaveId, fResources);
+
+  // Shares: b/c = 0.01, b/d = 0.03, b/e/f = 0.035, a = 0.08.
+  EXPECT_EQ(vector<string>({"b/c", "b/d", "b/e/f", "a"}), sorter.sort());
+
+  EXPECT_EQ(4, sorter.count());
+}
+
+
+// This test checks what happens when a new sorter client is added as
+// a child of what was previously a leaf node.
+TEST(SorterTest, AddChildToLeaf)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+
+  sorter.add("a");
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  sorter.add("b");
+  sorter.allocated(
+      "b", slaveId, Resources::parse("cpus:6;mem:6").get());
+
+  EXPECT_EQ(vector<string>({"b", "a"}), sorter.sort());
+
+  // Add a new client "a/c". The "a" subtree should now compete against
+  // the "b" subtree; within the "a" subtree, "a" should compete (as a
+  // sibling) against "a/c".
+
+  sorter.add("a/c");
+  sorter.allocated(
+      "a/c", slaveId, Resources::parse("cpus:5;mem:5").get());
+
+  EXPECT_EQ(vector<string>({"b", "a/c", "a"}), sorter.sort());
+
+  // Remove the "a" client; the "a/c" client should remain. Note that
+  // "a/c" now appears before "b" in the sort order, because the "a"
+  // subtree is now farther below its fair-share than the "b" subtree.
+
+  sorter.remove("a");
+
+  EXPECT_FALSE(sorter.contains("a"));
+  EXPECT_EQ(vector<string>({"a/c", "b"}), sorter.sort());
+
+  // Re-add the "a" client with the same resources. The client order
+  // should revert to its previous value.
+  sorter.add("a");
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  EXPECT_TRUE(sorter.contains("a"));
+  EXPECT_EQ(vector<string>({"b", "a/c", "a"}), sorter.sort());
+
+  // Check that "a" is considered to have a weight of 1 when it
+  // competes against "a/c".
+  sorter.updateWeight("a/c", 0.2);
+
+  EXPECT_EQ(vector<string>({"b", "a", "a/c"}), sorter.sort());
+
+  // Changing the weight "a" should change how it competes against its
+  // siblings (e.g., "b"), not its children (e.g., "a/c").
+  sorter.updateWeight("a", 3);
+
+  EXPECT_EQ(vector<string>({"a", "a/c", "b"}), sorter.sort());
+
+  sorter.updateWeight("a/c", 1);
+
+  EXPECT_EQ(vector<string>({"a/c", "a", "b"}), sorter.sort());
+}
+
+
+// This test checks what happens when a new sorter client is added as
+// a child of what was previously an internal node.
+TEST(SorterTest, AddChildToInternal)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+
+  sorter.add("x/a");
+  sorter.allocated(
+      "x/a", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  sorter.add("x/b");
+  sorter.allocated(
+      "x/b", slaveId, Resources::parse("cpus:6;mem:6").get());
+
+  EXPECT_EQ(vector<string>({"x/b", "x/a"}), sorter.sort());
+
+  sorter.add("x");
+  sorter.allocated(
+      "x", slaveId, Resources::parse("cpus:7;mem:7").get());
+
+  EXPECT_EQ(vector<string>({"x/b", "x", "x/a"}), sorter.sort());
+
+  sorter.add("z");
+  sorter.allocated(
+      "z", slaveId, Resources::parse("cpus:20;mem:20").get());
+
+  EXPECT_EQ(vector<string>({"z", "x/b", "x", "x/a"}), sorter.sort());
+
+  sorter.remove("x");
+
+  EXPECT_EQ(vector<string>({"x/b", "x/a", "z"}), sorter.sort());
+}
+
+
+// This test checks what happens when a new sorter client is added as
+// a child of what was previously an inactive leaf node.
+TEST(SorterTest, AddChildToInactiveLeaf)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+
+  sorter.add("a");
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  sorter.add("b");
+  sorter.allocated(
+      "b", slaveId, Resources::parse("cpus:6;mem:6").get());
+
+  sorter.deactivate("a");
+
+  EXPECT_EQ(vector<string>({"b"}), sorter.sort());
+
+  sorter.add("a/c");
+  sorter.allocated(
+      "a/c", slaveId, Resources::parse("cpus:5;mem:5").get());
+
+  EXPECT_EQ(vector<string>({"b", "a/c"}), sorter.sort());
+}
+
+
+// This test checks what happens when a sorter client is removed,
+// which allows a leaf node to be collapsed into its parent node. This
+// is basically the inverse situation to `AddChildToLeaf`.
+TEST(SorterTest, RemoveLeafCollapseParent)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+
+  sorter.add("a");
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  sorter.add("b");
+  sorter.allocated(
+      "b", slaveId, Resources::parse("cpus:6;mem:6").get());
+
+  sorter.add("a/c");
+  sorter.allocated(
+      "a/c", slaveId, Resources::parse("cpus:5;mem:5").get());
+
+  EXPECT_EQ(vector<string>({"b", "a/c", "a"}), sorter.sort());
+
+  sorter.remove("a/c");
+
+  EXPECT_EQ(vector<string>({"b", "a"}), sorter.sort());
+}
+
+
+// This test checks what happens when a sorter client is removed and a
+// leaf node can be collapsed into its parent node, we correctly
+// propagate the `inactive` flag from leaf -> parent.
+TEST(SorterTest, RemoveLeafCollapseParentInactive)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+
+  sorter.add("a");
+  sorter.allocated(
+      "a", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  sorter.add("b");
+  sorter.allocated(
+      "b", slaveId, Resources::parse("cpus:6;mem:6").get());
+
+  sorter.add("a/c");
+  sorter.allocated(
+      "a/c", slaveId, Resources::parse("cpus:5;mem:5").get());
+
+  sorter.deactivate("a");
+
+  EXPECT_EQ(vector<string>({"b", "a/c"}), sorter.sort());
+
+  sorter.remove("a/c");
+
+  EXPECT_EQ(vector<string>({"b"}), sorter.sort());
+}
+
+
+// This test checks that setting a weight on an internal node works
+// correctly.
+TEST(SorterTest, ChangeWeightOnSubtree)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
+
+  sorter.updateWeight("b", 3);
+  sorter.updateWeight("a", 2);
+
+  sorter.add("a/x");
+  sorter.add("b/y");
+
+  EXPECT_EQ(vector<string>({"a/x", "b/y"}), sorter.sort());
+
+  sorter.allocated(
+      "a/x", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  sorter.allocated(
+      "b/y", slaveId, Resources::parse("cpus:10;mem:10").get());
+
+  EXPECT_EQ(vector<string>({"b/y", "a/x"}), sorter.sort());
+
+  sorter.add("b/z");
+  sorter.allocated(
+      "b/z", slaveId, Resources::parse("cpus:5;mem:5").get());
+
+  EXPECT_EQ(vector<string>({"b/z", "b/y", "a/x"}), sorter.sort());
+
+  sorter.add("b");
+  sorter.allocated(
+      "b", slaveId, Resources::parse("cpus:4;mem:4").get());
+
+  EXPECT_EQ(vector<string>({"a/x", "b", "b/z", "b/y"}), sorter.sort());
+
+  sorter.add("a/zz");
+  sorter.allocated(
+      "a/zz", slaveId, Resources::parse("cpus:2;mem:2").get());
+
+  EXPECT_EQ(vector<string>({"a/zz", "a/x", "b", "b/z", "b/y"}), sorter.sort());
+}
+
+
 // Some resources are split across multiple resource objects (e.g.
 // persistent volumes). This test ensures that the shares for these
 // are accounted correctly.
@@ -350,11 +886,46 @@ TEST(SorterTest, UpdateAllocation)
 
   hashmap<SlaveID, Resources> allocation = sorter.allocation("a");
   EXPECT_EQ(1u, allocation.size());
-  EXPECT_EQ(newAllocation.get(), allocation[slaveId]);
+  EXPECT_EQ(newAllocation.get(), allocation.at(slaveId));
   EXPECT_EQ(newAllocation.get(), sorter.allocation("a", slaveId));
 }
 
 
+TEST(SorterTest, UpdateAllocationNestedClient)
+{
+  DRFSorter sorter;
+
+  SlaveID slaveId;
+  slaveId.set_value("agentId");
+
+  sorter.add("a/x");
+  sorter.add("b/y");
+
+  sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
+
+  sorter.allocated(
+      "a/x", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
+
+  // Construct an offer operation.
+  Resource volume = Resources::parse("disk", "5", "*").get();
+  volume.mutable_disk()->mutable_persistence()->set_id("ID");
+  volume.mutable_disk()->mutable_volume()->set_container_path("data");
+
+  // Compute the updated allocation.
+  Resources oldAllocation = sorter.allocation("a/x", slaveId);
+  Try<Resources> newAllocation = oldAllocation.apply(CREATE(volume));
+  ASSERT_SOME(newAllocation);
+
+  // Update the resources for the client.
+  sorter.update("a/x", slaveId, oldAllocation, newAllocation.get());
+
+  hashmap<SlaveID, Resources> allocation = sorter.allocation("a/x");
+  EXPECT_EQ(1u, allocation.size());
+  EXPECT_EQ(newAllocation.get(), allocation.at(slaveId));
+  EXPECT_EQ(newAllocation.get(), sorter.allocation("a/x", slaveId));
+}
+
+
 // We aggregate resources from multiple slaves into the sorter.
 // Since non-scalar resources don't aggregate well across slaves,
 // we need to keep track of the SlaveIDs of the resources. This