You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/30 23:53:04 UTC

git commit: Add Slave and Framework struct to HierarchicalAllocatorProcess. Cleans up the proliferation of hashmaps in HierarchicalAllocatorProcess.

Updated Branches:
  refs/heads/master d9dd5c97a -> 11e8a1dba


Add Slave and Framework struct to HierarchicalAllocatorProcess. Cleans
up the proliferation of hashmaps in HierarchicalAllocatorProcess.

From: Thomas Marshall <tw...@gmail.com>
Review: https://reviews.apache.org/r/11541


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/11e8a1db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/11e8a1db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/11e8a1db

Branch: refs/heads/master
Commit: 11e8a1dba355148dbbbe17f7229cca120534b7b4
Parents: d9dd5c9
Author: Benjamin Hindman <be...@twitter.com>
Authored: Thu May 30 14:51:24 2013 -0700
Committer: Benjamin Hindman <be...@twitter.com>
Committed: Thu May 30 14:51:24 2013 -0700

----------------------------------------------------------------------
 .../3rdparty/stout/include/stout/stringify.hpp     |   17 +
 src/local/local.cpp                                |    9 +-
 src/local/local.hpp                                |   10 +-
 src/master/allocator.hpp                           |    2 +
 src/master/drf_sorter.cpp                          |    2 +
 src/master/drf_sorter.hpp                          |    2 +
 src/master/hierarchical_allocator_process.hpp      |  248 ++++++++-------
 src/master/main.cpp                                |    6 +-
 src/master/master.cpp                              |    2 +
 src/master/master.hpp                              |   13 +-
 src/master/sorter.hpp                              |    2 +
 src/tests/allocator_tests.cpp                      |    6 +-
 src/tests/allocator_zookeeper_tests.cpp            |    5 +-
 src/tests/cluster.hpp                              |   14 +-
 src/tests/mesos.cpp                                |    2 +-
 src/tests/mesos.hpp                                |   34 +-
 src/tests/resource_offers_tests.cpp                |    3 +-
 src/tests/sorter_tests.cpp                         |    4 +-
 18 files changed, 224 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
index 136316d..2bb7290 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
@@ -103,6 +103,23 @@ std::string stringify(const std::map<K, V>& map)
 }
 
 
+template <typename T>
+std::string stringify(const hashset<T>& set)
+{
+  std::ostringstream out;
+  out << "{ ";
+  typename hashset<T>::const_iterator iterator = set.begin();
+  while (iterator != set.end()) {
+    out << stringify(*iterator);
+    if (++iterator != set.end()) {
+      out << ", ";
+    }
+  }
+  out << " }";
+  return out.str();
+}
+
+
 template <typename K, typename V>
 std::string stringify(const hashmap<K, V>& map)
 {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index c35c19a..3364c15 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -40,10 +40,11 @@
 
 using namespace mesos::internal;
 
-using mesos::internal::master::Allocator;
-using mesos::internal::master::AllocatorProcess;
-using mesos::internal::master::DRFSorter;
-using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::allocator::Allocator;
+using mesos::internal::master::allocator::AllocatorProcess;
+using mesos::internal::master::allocator::DRFSorter;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/local/local.hpp
----------------------------------------------------------------------
diff --git a/src/local/local.hpp b/src/local/local.hpp
index a165265..0aa50ef 100644
--- a/src/local/local.hpp
+++ b/src/local/local.hpp
@@ -29,9 +29,14 @@ namespace internal {
 // Forward declarations.
 namespace master {
 
-class Allocator;
 class Master;
 
+namespace allocator {
+
+class Allocator;
+
+} // namespace allocator {
+
 } // namespace master {
 
 class Configuration;
@@ -41,8 +46,7 @@ namespace local {
 // Launch a local cluster with the given flags.
 process::PID<master::Master> launch(
     const Flags& flags,
-    master::Allocator* _allocator = NULL);
-
+    master::allocator::Allocator* _allocator = NULL);
 
 void shutdown();
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index b68b67d..78c75bb 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -42,6 +42,7 @@ namespace master {
 
 class Master; // Forward declaration.
 
+namespace allocator {
 
 // Basic model of an allocator: resources are allocated to a framework
 // in the form of offers. A framework can refuse some resources in
@@ -338,6 +339,7 @@ inline void Allocator::offersRevived(
       frameworkId);
 }
 
+} // namespace allocator {
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/drf_sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.cpp b/src/master/drf_sorter.cpp
index fe31a00..7fcd0ca 100644
--- a/src/master/drf_sorter.cpp
+++ b/src/master/drf_sorter.cpp
@@ -28,6 +28,7 @@ using std::string;
 namespace mesos {
 namespace internal {
 namespace master {
+namespace allocator {
 
 bool DRFComparator::operator () (
     const Client& client1,
@@ -226,6 +227,7 @@ set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
   return it;
 }
 
+} // namespace allocator {
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/drf_sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.hpp b/src/master/drf_sorter.hpp
index 79566cc..866515e 100644
--- a/src/master/drf_sorter.hpp
+++ b/src/master/drf_sorter.hpp
@@ -32,6 +32,7 @@
 namespace mesos {
 namespace internal {
 namespace master {
+namespace allocator {
 
 struct Client
 {
@@ -107,6 +108,7 @@ private:
   Resources resources;
 };
 
+} // namespace allocator {
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 28a7879..1048a28 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -27,6 +27,7 @@
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/stopwatch.hpp>
+#include <stout/stringify.hpp>
 
 #include "common/resources.hpp"
 
@@ -38,6 +39,7 @@
 namespace mesos {
 namespace internal {
 namespace master {
+namespace allocator {
 
 // Forward declarations.
 class Filter;
@@ -52,6 +54,68 @@ typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
 HierarchicalDRFAllocatorProcess;
 
 
+struct Slave
+{
+  Slave() {}
+
+  Slave(const SlaveInfo& _info)
+    : available(_info.resources()),
+      whitelisted(false),
+      info(_info) {}
+
+  Resources resources() const { return info.resources(); }
+
+  std::string hostname() const { return info.hostname(); }
+
+  // Returns true iff this slave is whitelisted and has sufficient
+  // free resources to allocate.
+  bool allocatable() const
+  {
+    // TODO(benh): For now, only make offers when there is some cpu
+    // and memory left. This is an artifact of the original code that
+    // only offered when there was at least 1 cpu "unit" available,
+    // and without doing this a framework might get offered resources
+    // with only memory available (which it obviously will decline)
+    // and then end up waiting the default Filters::refuse_seconds
+    // (unless the framework set it to something different).
+
+    Value::Scalar none;
+    Value::Scalar cpus = available.get("cpus", none);
+    Value::Scalar mem = available.get("mem", none);
+
+    return (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) &&
+      whitelisted;
+  }
+
+  // Contains all of the resources currently free on this slave.
+  Resources available;
+
+  // Indicates if the resources on this slave should be offered to
+  // frameworks.
+  bool whitelisted;
+
+private:
+  SlaveInfo info;
+};
+
+
+struct Framework
+{
+  Framework() {}
+
+  Framework(const FrameworkInfo& _info)
+    : info(_info) {}
+
+  std::string user() const { return info.user(); }
+
+  // Filters that have been added by this framework.
+  hashset<Filter*> filters;
+
+private:
+  FrameworkInfo info;
+};
+
+
 // Implements the basic allocator algorithm - first pick a user by
 // some criteria, then pick one of their frameworks to allocate to.
 template <typename UserSorter, typename FrameworkSorter>
@@ -147,21 +211,15 @@ protected:
   Flags flags;
   PID<Master> master;
 
-  // Maps FrameworkIDs to user names.
-  hashmap<FrameworkID, std::string> users;
+  // Contains all frameworks.
+  hashmap<FrameworkID, Framework> frameworks;
 
   // Maps user names to the Sorter object which contains
   // all of that user's frameworks.
   hashmap<std::string, FrameworkSorter*> sorters;
 
-  // Maps slaves to their allocatable resources.
-  hashmap<SlaveID, Resources> allocatable;
-
   // Contains all active slaves.
-  hashmap<SlaveID, SlaveInfo> slaves;
-
-  // Filters that have been added by frameworks.
-  multihashmap<FrameworkID, Filter*> filters;
+  hashmap<SlaveID, Slave> slaves;
 
   // Slaves to send offers for.
   Option<hashset<std::string> > whitelist;
@@ -250,7 +308,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
 {
   CHECK(initialized);
 
-  std::string user = frameworkInfo.user();
+  const std::string& user = frameworkInfo.user();
   if (!userSorter->contains(user)) {
     userSorter->add(user);
     sorters[user] = new FrameworkSorter();
@@ -264,7 +322,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
   sorters[user]->add(used);
   sorters[user]->allocated(frameworkId.value(), used);
 
-  users[frameworkId] = frameworkInfo.user();
+  frameworks[frameworkId] = Framework(frameworkInfo);
 
   LOG(INFO) << "Added framework " << frameworkId;
 
@@ -279,7 +337,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
 {
   CHECK(initialized);
 
-  std::string user = users[frameworkId];
+  CHECK(frameworks.contains(frameworkId));
+  const std::string& user = frameworks[frameworkId].user();
+
   // Might not be in 'sorters[user]' because it was previously
   // deactivated and never re-added.
   if (sorters[user]->contains(frameworkId.value())) {
@@ -289,7 +349,11 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
     sorters[user]->remove(frameworkId.value());
   }
 
-  users.erase(frameworkId);
+  // Do not delete the filters contained in this
+  // framework's 'filters' hashset yet, see comments in
+  // HierarchicalAllocatorProcess::offersRevived and
+  // HierarchicalAllocatorProcess::expire.
+  frameworks.erase(frameworkId);
 
   // If this user doesn't have any more active frameworks, remove it.
   if (sorters[user]->count() == 0) {
@@ -300,16 +364,6 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
     userSorter->remove(user);
   }
 
-  foreach (Filter* filter, filters.get(frameworkId)) {
-    filters.remove(frameworkId, filter);
-
-    // Do not delete the filter, see comments in
-    // HierarchicalAllocatorProcess::offersRevived and
-    // HierarchicalAllocatorProcess::expire.
-  }
-
-  filters.remove(frameworkId);
-
   LOG(INFO) << "Removed framework " << frameworkId;
 }
 
@@ -322,7 +376,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
 {
   CHECK(initialized);
 
-  std::string user = frameworkInfo.user();
+  const std::string& user = frameworkInfo.user();
   sorters[user]->activate(frameworkId.value());
 
   LOG(INFO) << "Activated framework " << frameworkId;
@@ -338,7 +392,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(
 {
   CHECK(initialized);
 
-  std::string user = users[frameworkId];
+  CHECK(frameworks.contains(frameworkId));
+  const std::string& user = frameworks[frameworkId].user();
+
   sorters[user]->deactivate(frameworkId.value());
 
   // Note that the Sorter *does not* remove the resources allocated
@@ -347,15 +403,11 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(
   // of the resources that it is using. We might be able to collapse
   // the added/removed and activated/deactivated in the future.
 
-  foreach (Filter* filter, filters.get(frameworkId)) {
-    filters.remove(frameworkId, filter);
-
-    // Do not delete the filter, see comments in
-    // HierarchicalAllocatorProcess::offersRevived and
-    // HierarchicalAllocatorProcess::expire.
-  }
-
-  filters.remove(frameworkId);
+  // Do not delete the filters contained in this
+  // framework's 'filters' hashset yet, see comments in
+  // HierarchicalAllocatorProcess::offersRevived and
+  // HierarchicalAllocatorProcess::expire.
+  frameworks[frameworkId].filters.clear();
 
   LOG(INFO) << "Deactivated framework " << frameworkId;
 }
@@ -372,7 +424,8 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
 
   CHECK(!slaves.contains(slaveId));
 
-  slaves[slaveId] = slaveInfo;
+  slaves[slaveId] = Slave(slaveInfo);
+  slaves[slaveId].whitelisted = isWhitelisted(slaveId);
 
   userSorter->add(slaveInfo.resources());
 
@@ -381,8 +434,8 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
   foreachpair (const FrameworkID& frameworkId,
                const Resources& resources,
                used) {
-    if (users.contains(frameworkId)) {
-      const std::string& user = users[frameworkId];
+    if (frameworks.contains(frameworkId)) {
+      const std::string& user = frameworks[frameworkId].user();
       sorters[user]->add(resources);
       sorters[user]->allocated(frameworkId.value(), resources);
       userSorter->allocated(user, resources);
@@ -391,13 +444,15 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
     unused -= resources; // Only want to allocate resources that are not used!
   }
 
-  allocatable[slaveId] = unused;
+  slaves[slaveId].available = unused;
 
   LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
             << ") with " << slaveInfo.resources() << " (and " << unused
             << " available)";
 
-  allocate(slaveId);
+  if (slaves[slaveId].allocatable()) {
+    allocate(slaveId);
+  }
 }
 
 
@@ -414,8 +469,6 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(
 
   slaves.erase(slaveId);
 
-  allocatable.erase(slaveId);
-
   // Note that we DO NOT actually delete any filters associated with
   // this slave, that will occur when the delayed
   // HierarchicalAllocatorProcess::expire gets invoked (or the framework
@@ -435,9 +488,10 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
   whitelist = _whitelist;
 
   if (whitelist.isSome()) {
-    LOG(INFO) << "Updated slave white list:";
-    foreach (const std::string& hostname, whitelist.get()) {
-      LOG(INFO) << "\t" << hostname;
+    LOG(INFO) << "Updated slave white list: " << stringify(whitelist.get());
+
+    foreachkey (const SlaveID& slaveId, slaves) {
+      slaves[slaveId].whitelisted = isWhitelisted(slaveId);
     }
   }
 }
@@ -478,16 +532,16 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
   // because resourcesUnused is only called as the
   // result of a valid task launch by an active
   // framework that doesn't use the entire offer.
-  CHECK(users.contains(frameworkId));
+  CHECK(frameworks.contains(frameworkId));
 
-  std::string user = users[frameworkId];
+  const std::string& user = frameworks[frameworkId].user();
   sorters[user]->unallocated(frameworkId.value(), resources);
   sorters[user]->remove(resources);
   userSorter->unallocated(user, resources);
 
   // Update resources allocatable on slave.
-  CHECK(allocatable.contains(slaveId));
-  allocatable[slaveId] += resources;
+  CHECK(slaves.contains(slaveId));
+  slaves[slaveId].available += resources;
 
   // Create a refused resources filter.
   Try<Duration> seconds_ = Duration::create(Filters().refuse_seconds());
@@ -517,10 +571,10 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
               << " for " << seconds;
 
     // Create a new filter and delay it's expiration.
-    mesos::internal::master::Filter* filter =
+    Filter* filter =
       new RefusedFilter(slaveId, resources, Timeout::in(seconds));
 
-    this->filters.put(frameworkId, filter);
+    frameworks[frameworkId].filters.insert(filter);
 
     delay(seconds, self(), &Self::expire, frameworkId, filter);
   }
@@ -545,9 +599,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
   // Master::offer before we received AllocatorProcess::frameworkRemoved
   // or AllocatorProcess::frameworkDeactivated, in which case we will
   // have already recovered all of its resources).
-  if (users.contains(frameworkId) &&
-      sorters[users[frameworkId]]->contains(frameworkId.value())) {
-    std::string user = users[frameworkId];
+  if (frameworks.contains(frameworkId) &&
+      sorters[frameworks[frameworkId].user()]->contains(frameworkId.value())) {
+    const std::string& user = frameworks[frameworkId].user();
     sorters[user]->unallocated(frameworkId.value(), resources);
     sorters[user]->remove(resources);
     userSorter->unallocated(user, resources);
@@ -556,12 +610,12 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
   // Update resources allocatable on slave (if slave still exists,
   // which it might not in the event that we dispatched Master::offer
   // before we received Allocator::slaveRemoved).
-  if (allocatable.contains(slaveId)) {
-    allocatable[slaveId] += resources;
+  if (slaves.contains(slaveId)) {
+    slaves[slaveId].available += resources;
 
     LOG(INFO) << "Recovered " << resources.allocatable()
-              << " (total allocatable: " << allocatable[slaveId] << ")"
-              << " on slave " << slaveId
+              << " (total allocatable: " << slaves[slaveId].available
+              << ") on slave " << slaveId
               << " from framework " << frameworkId;
   }
 }
@@ -574,18 +628,14 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(
 {
   CHECK(initialized);
 
-  foreach (Filter* filter, filters.get(frameworkId)) {
-    filters.remove(frameworkId, filter);
-
-    // We delete each actual Filter when
-    // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
-    // Filter here it's possible that the same Filter (i.e., same
-    // address) could get reused and HierarchicalAllocatorProcess::expire
-    // would expire that filter too soon. Note that this only works
-    // right now because ALL Filter types "expire".
-  }
+  frameworks[frameworkId].filters.clear();
 
-  filters.remove(frameworkId);
+  // We delete each actual Filter when
+  // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
+  // Filter here it's possible that the same Filter (i.e., same
+  // address) could get reused and HierarchicalAllocatorProcess::expire
+  // would expire that filter too soon. Note that this only works
+  // right now because ALL Filter types "expire".
 
   LOG(INFO) << "Removed filters for framework " << frameworkId;
 
@@ -651,38 +701,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
     return;
   }
 
-  // Get out only "available" resources (i.e., resources that are
-  // allocatable and above a certain threshold, see below).
-  hashmap<SlaveID, Resources> available;
-  foreachpair (const SlaveID& slaveId, Resources resources, allocatable) {
-    if (!slaveIds.contains(slaveId)) {
-      continue;
-    }
-
-    if (isWhitelisted(slaveId)) {
-      resources = resources.allocatable(); // Make sure they're allocatable.
-
-      // TODO(benh): For now, only make offers when there is some cpu
-      // and memory left. This is an artifact of the original code that
-      // only offered when there was at least 1 cpu "unit" available,
-      // and without doing this a framework might get offered resources
-      // with only memory available (which it obviously will decline)
-      // and then end up waiting the default Filters::refuse_seconds
-      // (unless the framework set it to something different).
-
-      Value::Scalar none;
-      Value::Scalar cpus = resources.get("cpus", none);
-      Value::Scalar mem = resources.get("mem", none);
-
-      if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
-        VLOG(1) << "Found available resources: " << resources
-                << " on slave " << slaveId;
-        available[slaveId] = resources;
-      }
-    }
-  }
-
-  if (available.size() == 0) {
+  if (slaveIds.empty()) {
     VLOG(1) << "No resources available to allocate!";
     return;
   }
@@ -694,9 +713,13 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
 
       Resources allocatedResources;
       hashmap<SlaveID, Resources> offerable;
-      foreachpair (const SlaveID& slaveId,
-                   const Resources& resources,
-                   available) {
+      foreach (const SlaveID& slaveId, slaveIds) {
+        if (!slaves[slaveId].allocatable()) {
+          continue;
+        }
+
+        Resources resources = slaves[slaveId].available;
+
         // Check whether or not this framework filters this slave.
         bool filtered = isFiltered(frameworkId, slaveId, resources);
 
@@ -708,16 +731,12 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
           offerable[slaveId] = resources;
 
           // Update framework and slave resources.
-          allocatable[slaveId] -= resources;
+          slaves[slaveId].available -= resources;
           allocatedResources += resources;
         }
       }
 
-      if (offerable.size() > 0) {
-        foreachkey (const SlaveID& slaveId, offerable) {
-          available.erase(slaveId);
-        }
-
+      if (!offerable.empty()) {
         sorters[user]->add(allocatedResources);
         sorters[user]->allocated(frameworkIdValue, allocatedResources);
         userSorter->allocated(user, allocatedResources);
@@ -740,9 +759,11 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
   // HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
   // keep the address from getting reused possibly causing premature
   // expiration).
-  if (users.contains(frameworkId) && filters.contains(frameworkId, filter)) {
-    filters.remove(frameworkId, filter);
+  if (frameworks.contains(frameworkId) &&
+      frameworks[frameworkId].filters.contains(filter)) {
+    frameworks[frameworkId].filters.erase(filter);
   }
+
   delete filter;
 }
 
@@ -769,7 +790,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
     const Resources& resources)
 {
   bool filtered = false;
-  foreach (Filter* filter, filters.get(frameworkId)) {
+
+  CHECK(frameworks.contains(frameworkId));
+  foreach (Filter* filter, frameworks[frameworkId].filters) {
     if (filter->filter(slaveId, resources)) {
       VLOG(1) << "Filtered " << resources
               << " on slave " << slaveId
@@ -781,6 +804,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
   return filtered;
 }
 
+} // namespace allocator {
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 3c1cf4c..19fcb9f 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -114,8 +114,10 @@ int main(int argc, char** argv)
   LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
   LOG(INFO) << "Starting Mesos master";
 
-  AllocatorProcess* allocatorProcess = new HierarchicalDRFAllocatorProcess();
-  Allocator* allocator = new Allocator(allocatorProcess);
+  allocator::AllocatorProcess* allocatorProcess =
+    new allocator::HierarchicalDRFAllocatorProcess();
+  allocator::Allocator* allocator =
+    new allocator::Allocator(allocatorProcess);
 
   Files files;
   Master* master = new Master(allocator, &files, flags);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d031b95..a2e4b90 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -60,6 +60,8 @@ namespace mesos {
 namespace internal {
 namespace master {
 
+using allocator::Allocator;
+
 class WhitelistWatcher : public Process<WhitelistWatcher> {
 public:
   WhitelistWatcher(const string& _path, Allocator* _allocator)

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 82e3596..8e7b74c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -57,7 +57,12 @@ namespace master {
 using namespace process; // Included to make code easier to read.
 
 // Forward declarations.
-class Allocator;
+namespace allocator {
+
+  class Allocator;
+
+}
+
 class SlaveObserver;
 class WhitelistWatcher;
 
@@ -68,8 +73,8 @@ struct Slave;
 class Master : public ProtobufProcess<Master>
 {
 public:
-  Master(Allocator* allocator, Files* files);
-  Master(Allocator* allocator,
+  Master(allocator::Allocator* allocator, Files* files);
+  Master(allocator::Allocator* allocator,
          Files* files,
          const Flags& flags);
 
@@ -210,7 +215,7 @@ private:
 
   bool elected;
 
-  Allocator* allocator;
+  allocator::Allocator* allocator;
   WhitelistWatcher* whitelistWatcher;
   Files* files;
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/sorter.hpp b/src/master/sorter.hpp
index 73db6b1..e54be3b 100644
--- a/src/master/sorter.hpp
+++ b/src/master/sorter.hpp
@@ -25,6 +25,7 @@
 namespace mesos {
 namespace internal {
 namespace master {
+namespace allocator {
 
 // Sorters implement the logic for determining the
 // order in which users or frameworks should receive
@@ -78,6 +79,7 @@ public:
   virtual int count() = 0;
 };
 
+} // namespace allocator {
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp
index b153dee..32f0a90 100644
--- a/src/tests/allocator_tests.cpp
+++ b/src/tests/allocator_tests.cpp
@@ -38,8 +38,9 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
-using mesos::internal::master::Allocator;
-using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::allocator::Allocator;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
@@ -693,6 +694,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited)
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
   slave::Flags flags = this->CreateSlaveFlags();
+
   flags.resources = Option<string>("cpus:3;mem:1024");
 
   EXPECT_CALL(this->allocator, slaveAdded(_, _, _));

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/allocator_zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_zookeeper_tests.cpp b/src/tests/allocator_zookeeper_tests.cpp
index 1034d72..1daaecd 100644
--- a/src/tests/allocator_zookeeper_tests.cpp
+++ b/src/tests/allocator_zookeeper_tests.cpp
@@ -35,8 +35,9 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
-using mesos::internal::master::Allocator;
-using mesos::internal::master::AllocatorProcess;
+using mesos::internal::master::allocator::Allocator;
+using mesos::internal::master::allocator::AllocatorProcess;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 25cd554..f743bb3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -77,7 +77,7 @@ public:
     // expected to outlive the launched master (i.e., until it is
     // stopped via Masters::stop).
     Try<process::PID<master::Master> > start(
-        master::AllocatorProcess* allocatorProcess,
+        master::allocator::AllocatorProcess* allocatorProcess,
         const master::Flags& flags = master::Flags());
 
     // Stops and cleans up a master at the specified PID.
@@ -106,8 +106,8 @@ public:
           detector(NULL) {}
 
       master::Master* master;
-      master::Allocator* allocator;
-      master::AllocatorProcess* allocatorProcess;
+      master::allocator::Allocator* allocator;
+      master::allocator::AllocatorProcess* allocatorProcess;
       MasterDetector* detector;
     };
 
@@ -221,8 +221,8 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
 
   Master master;
 
-  master.allocatorProcess = new master::HierarchicalDRFAllocatorProcess();
-  master.allocator = new master::Allocator(master.allocatorProcess);
+  master.allocatorProcess = new master::allocator::HierarchicalDRFAllocatorProcess();
+  master.allocator = new master::allocator::Allocator(master.allocatorProcess);
   master.master = new master::Master(master.allocator, &cluster->files, flags);
 
   process::PID<master::Master> pid = process::spawn(master.master);
@@ -240,7 +240,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
 
 
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
-  master::AllocatorProcess* allocatorProcess,
+  master::allocator::AllocatorProcess* allocatorProcess,
   const master::Flags& flags)
 {
   // Disallow multiple masters when not using ZooKeeper.
@@ -250,7 +250,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
 
   Master master;
   master.allocatorProcess = NULL;
-  master.allocator = new master::Allocator(allocatorProcess);
+  master.allocator = new master::allocator::Allocator(allocatorProcess);
   master.master = new master::Master(master.allocator, &cluster->files, flags);
 
   process::PID<master::Master> pid = process::spawn(master.master);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index d811267..6dbf7f3 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -54,7 +54,7 @@ Try<process::PID<master::Master> > MesosTest::StartMaster(
 
 
 Try<process::PID<master::Master> > MesosTest::StartMaster(
-    master::AllocatorProcess* allocator,
+    master::allocator::AllocatorProcess* allocator,
     const Option<master::Flags>& flags)
 {
   return cluster.masters.start(

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 12298ae..fca41aa 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -82,7 +82,7 @@ protected:
 
   // Starts a master with the specified allocator process and flags.
   virtual Try<process::PID<master::Master> > StartMaster(
-      master::AllocatorProcess* allocator,
+      master::allocator::AllocatorProcess* allocator,
       const Option<master::Flags>& flags = None());
 
   // Starts a slave with the specified flags.
@@ -321,8 +321,8 @@ public:
 };
 
 
-template <typename T = master::AllocatorProcess>
-class MockAllocatorProcess : public master::AllocatorProcess
+template <typename T = master::allocator::AllocatorProcess>
+class MockAllocatorProcess : public master::allocator::AllocatorProcess
 {
 public:
   MockAllocatorProcess()
@@ -404,7 +404,7 @@ public:
 };
 
 
-typedef ::testing::Types<master::HierarchicalDRFAllocatorProcess>
+typedef ::testing::Types<master::allocator::HierarchicalDRFAllocatorProcess>
 AllocatorTypes;
 
 
@@ -418,7 +418,7 @@ ACTION_P(InvokeInitialize, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::initialize,
+      &master::allocator::AllocatorProcess::initialize,
       arg0,
       arg1);
 }
@@ -428,7 +428,7 @@ ACTION_P(InvokeFrameworkAdded, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::frameworkAdded,
+      &master::allocator::AllocatorProcess::frameworkAdded,
       arg0,
       arg1,
       arg2);
@@ -439,7 +439,7 @@ ACTION_P(InvokeFrameworkRemoved, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::frameworkRemoved, arg0);
+      &master::allocator::AllocatorProcess::frameworkRemoved, arg0);
 }
 
 
@@ -447,7 +447,7 @@ ACTION_P(InvokeFrameworkActivated, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::frameworkActivated,
+      &master::allocator::AllocatorProcess::frameworkActivated,
       arg0,
       arg1);
 }
@@ -457,7 +457,7 @@ ACTION_P(InvokeFrameworkDeactivated, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::frameworkDeactivated,
+      &master::allocator::AllocatorProcess::frameworkDeactivated,
       arg0);
 }
 
@@ -466,7 +466,7 @@ ACTION_P(InvokeSlaveAdded, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::slaveAdded,
+      &master::allocator::AllocatorProcess::slaveAdded,
       arg0,
       arg1,
       arg2);
@@ -477,7 +477,7 @@ ACTION_P(InvokeSlaveRemoved, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::slaveRemoved,
+      &master::allocator::AllocatorProcess::slaveRemoved,
       arg0);
 }
 
@@ -486,7 +486,7 @@ ACTION_P(InvokeUpdateWhitelist, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::updateWhitelist,
+      &master::allocator::AllocatorProcess::updateWhitelist,
       arg0);
 }
 
@@ -495,7 +495,7 @@ ACTION_P(InvokeResourcesRequested, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::resourcesRequested,
+      &master::allocator::AllocatorProcess::resourcesRequested,
       arg0,
       arg1);
 }
@@ -506,7 +506,7 @@ ACTION_P(InvokeResourcesUnused, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::resourcesUnused,
+      &master::allocator::AllocatorProcess::resourcesUnused,
       arg0,
       arg1,
       arg2,
@@ -521,7 +521,7 @@ ACTION_P2(InvokeUnusedWithFilters, allocator, timeout)
 
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::resourcesUnused,
+      &master::allocator::AllocatorProcess::resourcesUnused,
       arg0,
       arg1,
       arg2,
@@ -533,7 +533,7 @@ ACTION_P(InvokeResourcesRecovered, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::resourcesRecovered,
+      &master::allocator::AllocatorProcess::resourcesRecovered,
       arg0,
       arg1,
       arg2);
@@ -544,7 +544,7 @@ ACTION_P(InvokeOffersRevived, allocator)
 {
   process::dispatch(
       allocator->real,
-      &master::AllocatorProcess::offersRevived,
+      &master::allocator::AllocatorProcess::offersRevived,
       arg0);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index b066403..3d5f02d 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -34,7 +34,8 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
-using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index 95e8d28..0a50b43 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -25,8 +25,8 @@
 
 using namespace mesos::internal;
 
-using mesos::internal::master::Sorter;
-using mesos::internal::master::DRFSorter;
+using mesos::internal::master::allocator::Sorter;
+using mesos::internal::master::allocator::DRFSorter;
 
 using std::list;
 using std::string;