You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/10/13 20:03:49 UTC

mesos git commit: Added static->dynamic transformation to Allocator.

Repository: mesos
Updated Branches:
  refs/heads/master 38dbadc94 -> 2d0b65ede


Added static->dynamic transformation to Allocator.

This improves the compilation time of Mesos significantly, allowing
developers to iterate more quickly on allocator changes.

Review: https://reviews.apache.org/r/38869


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d0b65ed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d0b65ed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d0b65ed

Branch: refs/heads/master
Commit: 2d0b65edeea129b427e60a3ca360fc29f77aa0d5
Parents: 38dbadc
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Sep 29 17:01:13 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Tue Oct 13 20:03:04 2015 +0200

----------------------------------------------------------------------
 src/CMakeLists.txt                          |    1 +
 src/Makefile.am                             |    1 +
 src/master/allocator/mesos/hierarchical.cpp | 1191 ++++++++++++++++++++
 src/master/allocator/mesos/hierarchical.hpp | 1273 +---------------------
 4 files changed, 1249 insertions(+), 1217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 536a99f..98e76ce 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -142,6 +142,7 @@ set(MASTER_SRC
   master/repairer.cpp
   master/validation.cpp
   master/allocator/allocator.cpp
+  master/allocator/mesos/hierarchical.cpp
   master/allocator/sorter/drf/sorter.cpp
   )
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index d855cb8..4a0eeb8 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -495,6 +495,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	master/repairer.cpp						\
 	master/validation.cpp						\
 	master/allocator/allocator.cpp					\
+	master/allocator/mesos/hierarchical.cpp				\
 	master/allocator/sorter/drf/sorter.cpp				\
 	messages/flags.proto						\
 	messages/messages.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
new file mode 100644
index 0000000..0a6f8a6
--- /dev/null
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -0,0 +1,1191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "master/allocator/mesos/hierarchical.hpp"
+
+#include <algorithm>
+#include <vector>
+
+#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/event.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/check.hpp>
+#include <stout/hashset.hpp>
+#include <stout/stopwatch.hpp>
+#include <stout/stringify.hpp>
+
+namespace mesos {
+namespace internal {
+namespace master {
+namespace allocator {
+namespace internal {
+
+// Used to represent "filters" for resources unused in offers.
+class OfferFilter
+{
+public:
+  virtual ~OfferFilter() {}
+
+  virtual bool filter(const Resources& resources) = 0;
+};
+
+
+class RefusedOfferFilter : public OfferFilter
+{
+public:
+  RefusedOfferFilter(
+      const Resources& _resources,
+      const process::Timeout& _timeout)
+    : resources(_resources), timeout(_timeout) {}
+
+  virtual bool filter(const Resources& _resources)
+  {
+    // TODO(jieyu): Consider separating the superset check for regular
+    // and revocable resources. For example, frameworks might want
+    // more revocable resources only or non-revocable resources only,
+    // but currently the filter only expires if there is more of both
+    // revocable and non-revocable resources.
+    return resources.contains(_resources) && // Refused resources are superset.
+           timeout.remaining() > Seconds(0);
+  }
+
+  const Resources resources;
+  const process::Timeout timeout;
+};
+
+
+// Used to represent "filters" for inverse offers.
+// NOTE: Since this specific allocator implementation only sends inverse offers
+// for maintenance primitives, and those are at the whole slave level, we only
+// need to filter based on the time-out.
+// If this allocator implementation starts sending out more resource specific
+// inverse offers, then we can capture the `unavailableResources` in the filter
+// function.
+class InverseOfferFilter
+{
+public:
+  virtual ~InverseOfferFilter() {}
+
+  virtual bool filter() = 0;
+};
+
+
+// NOTE: See comment above `InverseOfferFilter` regarding capturing
+// `unavailableResources` if this allocator starts sending fine-grained inverse
+// offers.
+class RefusedInverseOfferFilter : public InverseOfferFilter
+{
+public:
+  RefusedInverseOfferFilter(const process::Timeout& _timeout)
+    : timeout(_timeout) {}
+
+  virtual bool filter()
+  {
+    // See comment above why we currently don't do more fine-grained filtering.
+    return timeout.remaining() > Seconds(0);
+  }
+
+  const process::Timeout timeout;
+};
+
+
+void HierarchicalAllocatorProcess::initialize(
+    const Duration& _allocationInterval,
+    const lambda::function<
+        void(const FrameworkID&,
+             const hashmap<SlaveID, Resources>&)>& _offerCallback,
+    const lambda::function<
+        void(const FrameworkID&,
+             const hashmap<SlaveID, UnavailableResources>&)>&
+      _inverseOfferCallback,
+    const hashmap<std::string, mesos::master::RoleInfo>& _roles)
+{
+  allocationInterval = _allocationInterval;
+  offerCallback = _offerCallback;
+  inverseOfferCallback = _inverseOfferCallback;
+  roles = _roles;
+  initialized = true;
+
+  roleSorter = sorterFactory->createRoleSorter();
+  foreachpair (
+      const std::string& name, const mesos::master::RoleInfo& roleInfo, roles) {
+    roleSorter->add(name, roleInfo.weight());
+    frameworkSorters[name] = sorterFactory->createFrameworkSorter();
+  }
+
+  if (roleSorter->count() == 0) {
+    LOG(ERROR) << "No roles specified, cannot allocate resources!";
+  }
+
+  VLOG(1) << "Initialized hierarchical allocator process";
+
+  delay(allocationInterval, self(), &Self::batch);
+}
+
+
+void HierarchicalAllocatorProcess::addFramework(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const hashmap<SlaveID, Resources>& used)
+{
+  CHECK(initialized);
+
+  const std::string& role = frameworkInfo.role();
+
+  CHECK(roles.contains(role));
+
+  CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
+  frameworkSorters[role]->add(frameworkId.value());
+
+  // TODO(bmahler): Validate that the reserved resources have the
+  // framework's role.
+
+  // Update the allocation to this framework.
+  foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
+    roleSorter->allocated(role, slaveId, allocated.unreserved());
+    frameworkSorters[role]->add(slaveId, allocated);
+    frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated);
+  }
+
+  frameworks[frameworkId] = Framework();
+  frameworks[frameworkId].role = frameworkInfo.role();
+  frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
+
+  // Check if the framework desires revocable resources.
+  frameworks[frameworkId].revocable = false;
+  foreach (const FrameworkInfo::Capability& capability,
+           frameworkInfo.capabilities()) {
+    if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
+      frameworks[frameworkId].revocable = true;
+    }
+  }
+
+  frameworks[frameworkId].suppressed = false;
+
+  LOG(INFO) << "Added framework " << frameworkId;
+
+  allocate();
+}
+
+
+void HierarchicalAllocatorProcess::removeFramework(
+    const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  CHECK(frameworks.contains(frameworkId));
+  const std::string& role = frameworks[frameworkId].role;
+
+  // Might not be in 'frameworkSorters[role]' because it was previously
+  // deactivated and never re-added.
+  if (frameworkSorters[role]->contains(frameworkId.value())) {
+    hashmap<SlaveID, Resources> allocation =
+      frameworkSorters[role]->allocation(frameworkId.value());
+
+    foreachpair (
+        const SlaveID& slaveId, const Resources& allocated, allocation) {
+      roleSorter->unallocated(role, slaveId, allocated.unreserved());
+      frameworkSorters[role]->remove(slaveId, allocated);
+    }
+
+    frameworkSorters[role]->remove(frameworkId.value());
+  }
+
+  // Do not delete the filters contained in this
+  // framework's `offerFilters` hashset yet, see comments in
+  // HierarchicalAllocatorProcess::reviveOffers and
+  // HierarchicalAllocatorProcess::expire.
+  frameworks.erase(frameworkId);
+
+  LOG(INFO) << "Removed framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::activateFramework(
+    const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  CHECK(frameworks.contains(frameworkId));
+  const std::string& role = frameworks[frameworkId].role;
+
+  frameworkSorters[role]->activate(frameworkId.value());
+
+  LOG(INFO) << "Activated framework " << frameworkId;
+
+  allocate();
+}
+
+
+void HierarchicalAllocatorProcess::deactivateFramework(
+    const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  CHECK(frameworks.contains(frameworkId));
+  const std::string& role = frameworks[frameworkId].role;
+
+  frameworkSorters[role]->deactivate(frameworkId.value());
+
+  // Note that the Sorter *does not* remove the resources allocated
+  // to this framework. For now, this is important because if the
+  // framework fails over and is activated, we still want a record
+  // of the resources that it is using. We might be able to collapse
+  // the added/removed and activated/deactivated in the future.
+
+  // Do not delete the filters contained in this
+  // framework's `offerFilters` hashset yet, see comments in
+  // HierarchicalAllocatorProcess::reviveOffers and
+  // HierarchicalAllocatorProcess::expire.
+  frameworks[frameworkId].offerFilters.clear();
+  frameworks[frameworkId].inverseOfferFilters.clear();
+
+  LOG(INFO) << "Deactivated framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::updateFramework(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo)
+{
+  CHECK(initialized);
+
+  CHECK(frameworks.contains(frameworkId));
+
+  // TODO(jmlvanre): Once we allow frameworks to re-register with a
+  // new 'role' or 'checkpoint' flag, we need to update our internal
+  // 'frameworks' structure. See MESOS-703 for progress on allowing
+  // these fields to be updated.
+  CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role());
+  CHECK_EQ(frameworks[frameworkId].checkpoint, frameworkInfo.checkpoint());
+
+  frameworks[frameworkId].revocable = false;
+
+  foreach (const FrameworkInfo::Capability& capability,
+           frameworkInfo.capabilities()) {
+    if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
+      frameworks[frameworkId].revocable = true;
+    }
+  }
+}
+
+
+void HierarchicalAllocatorProcess::addSlave(
+    const SlaveID& slaveId,
+    const SlaveInfo& slaveInfo,
+    const Option<Unavailability>& unavailability,
+    const Resources& total,
+    const hashmap<FrameworkID, Resources>& used)
+{
+  CHECK(initialized);
+  CHECK(!slaves.contains(slaveId));
+
+  roleSorter->add(slaveId, total.unreserved());
+
+  foreachpair (const FrameworkID& frameworkId,
+               const Resources& allocated,
+               used) {
+    if (frameworks.contains(frameworkId)) {
+      const std::string& role = frameworks[frameworkId].role;
+
+      // TODO(bmahler): Validate that the reserved resources have the
+      // framework's role.
+
+      roleSorter->allocated(role, slaveId, allocated.unreserved());
+      frameworkSorters[role]->add(slaveId, allocated);
+      frameworkSorters[role]->allocated(
+          frameworkId.value(), slaveId, allocated);
+    }
+  }
+
+  slaves[slaveId] = Slave();
+  slaves[slaveId].total = total;
+  slaves[slaveId].allocated = Resources::sum(used);
+  slaves[slaveId].activated = true;
+  slaves[slaveId].checkpoint = slaveInfo.checkpoint();
+  slaves[slaveId].hostname = slaveInfo.hostname();
+
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and OfferFilter.
+  if (unavailability.isSome()) {
+    slaves[slaveId].maintenance =
+      typename Slave::Maintenance(unavailability.get());
+  }
+
+  LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
+            << ") with " << slaves[slaveId].total
+            << " (allocated: " << slaves[slaveId].allocated << ")";
+
+  allocate(slaveId);
+}
+
+
+void HierarchicalAllocatorProcess::removeSlave(
+    const SlaveID& slaveId)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  // TODO(bmahler): Per MESOS-621, this should remove the allocations
+  // that any frameworks have on this slave. Otherwise the caller may
+  // "leak" allocated resources accidentally if they forget to recover
+  // all the resources. Fixing this would require more information
+  // than what we currently track in the allocator.
+
+  roleSorter->remove(slaveId, slaves[slaveId].total.unreserved());
+
+  slaves.erase(slaveId);
+
+  // Note that we DO NOT actually delete any filters associated with
+  // this slave, that will occur when the delayed
+  // HierarchicalAllocatorProcess::expire gets invoked (or the framework
+  // that applied the filters gets removed).
+
+  LOG(INFO) << "Removed slave " << slaveId;
+}
+
+
+void HierarchicalAllocatorProcess::updateSlave(
+    const SlaveID& slaveId,
+    const Resources& oversubscribed)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  // Check that all the oversubscribed resources are revocable.
+  CHECK_EQ(oversubscribed, oversubscribed.revocable());
+
+  // Update the total resources.
+
+  // First remove the old oversubscribed resources from the total.
+  slaves[slaveId].total -= slaves[slaveId].total.revocable();
+
+  // Now add the new estimate of oversubscribed resources.
+  slaves[slaveId].total += oversubscribed;
+
+  // Now, update the total resources in the role sorter.
+  roleSorter->update(
+      slaveId,
+      slaves[slaveId].total.unreserved());
+
+  LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname << ")"
+            << " updated with oversubscribed resources " << oversubscribed
+            << " (total: " << slaves[slaveId].total
+            << ", allocated: " << slaves[slaveId].allocated << ")";
+
+  allocate(slaveId);
+}
+
+
+void HierarchicalAllocatorProcess::activateSlave(
+    const SlaveID& slaveId)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  slaves[slaveId].activated = true;
+
+  LOG(INFO)<< "Slave " << slaveId << " reactivated";
+}
+
+
+void HierarchicalAllocatorProcess::deactivateSlave(
+    const SlaveID& slaveId)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  slaves[slaveId].activated = false;
+
+  LOG(INFO) << "Slave " << slaveId << " deactivated";
+}
+
+
+void HierarchicalAllocatorProcess::updateWhitelist(
+    const Option<hashset<std::string>>& _whitelist)
+{
+  CHECK(initialized);
+
+  whitelist = _whitelist;
+
+  if (whitelist.isSome()) {
+    LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get());
+
+    if (whitelist.get().empty()) {
+      LOG(WARNING) << "Whitelist is empty, no offers will be made!";
+    }
+  } else {
+    LOG(INFO) << "Advertising offers for all slaves";
+  }
+}
+
+
+void HierarchicalAllocatorProcess::requestResources(
+    const FrameworkID& frameworkId,
+    const std::vector<Request>& requests)
+{
+  CHECK(initialized);
+
+  LOG(INFO) << "Received resource request from framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::updateAllocation(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const std::vector<Offer::Operation>& operations)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+  CHECK(frameworks.contains(frameworkId));
+
+  // Here we apply offer operations to the allocated resources, which
+  // in turns leads to an update of the total. The available resources
+  // remain unchanged.
+
+  // Update the allocated resources.
+  Sorter* frameworkSorter = frameworkSorters[frameworks[frameworkId].role];
+
+  Resources frameworkAllocation =
+    frameworkSorter->allocation(frameworkId.value(), slaveId);
+
+  Try<Resources> updatedFrameworkAllocation =
+    frameworkAllocation.apply(operations);
+
+  CHECK_SOME(updatedFrameworkAllocation);
+
+  frameworkSorter->update(
+      frameworkId.value(),
+      slaveId,
+      frameworkAllocation,
+      updatedFrameworkAllocation.get());
+
+  roleSorter->update(
+      frameworks[frameworkId].role,
+      slaveId,
+      frameworkAllocation.unreserved(),
+      updatedFrameworkAllocation.get().unreserved());
+
+  Try<Resources> updatedSlaveAllocation =
+    slaves[slaveId].allocated.apply(operations);
+
+  CHECK_SOME(updatedSlaveAllocation);
+
+  slaves[slaveId].allocated = updatedSlaveAllocation.get();
+
+  // Update the total resources.
+  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+  CHECK_SOME(updatedTotal);
+
+  slaves[slaveId].total = updatedTotal.get();
+
+  LOG(INFO) << "Updated allocation of framework " << frameworkId
+            << " on slave " << slaveId
+            << " from " << frameworkAllocation
+            << " to " << updatedFrameworkAllocation.get();
+}
+
+
+process::Future<Nothing>
+HierarchicalAllocatorProcess::updateAvailable(
+    const SlaveID& slaveId,
+    const std::vector<Offer::Operation>& operations)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
+
+  // It's possible for this 'apply' to fail here because a call to
+  // 'allocate' could have been enqueued by the allocator itself
+  // just before master's request to enqueue 'updateAvailable'
+  // arrives to the allocator.
+  //
+  //   Master -------R------------
+  //                  \----+
+  //                       |
+  //   Allocator --A-----A-U---A--
+  //                \___/ \___/
+  //
+  //   where A = allocate, R = reserve, U = updateAvailable
+  Try<Resources> updatedAvailable = available.apply(operations);
+  if (updatedAvailable.isError()) {
+    return process::Failure(updatedAvailable.error());
+  }
+
+  // Update the total resources.
+  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+  CHECK_SOME(updatedTotal);
+
+  slaves[slaveId].total = updatedTotal.get();
+
+  // Now, update the total resources in the role sorter.
+  roleSorter->update(slaveId, slaves[slaveId].total.unreserved());
+
+  return Nothing();
+}
+
+
+void HierarchicalAllocatorProcess::updateUnavailability(
+    const SlaveID& slaveId,
+    const Option<Unavailability>& unavailability)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and OfferFilter.
+
+  // We explicitly remove all filters for the inverse offers of this slave. We
+  // do this because we want to force frameworks to reassess the calculations
+  // they have made to respond to the inverse offer. Unavailability of a slave
+  // can have a large effect on failure domain calculations and inter-leaved
+  // unavailability schedules.
+  foreachvalue (Framework& framework, frameworks) {
+    framework.inverseOfferFilters.erase(slaveId);
+  }
+
+  // Remove any old unavailability.
+  slaves[slaveId].maintenance = None();
+
+  // If we have a new unavailability.
+  if (unavailability.isSome()) {
+    slaves[slaveId].maintenance =
+      typename Slave::Maintenance(unavailability.get());
+  }
+
+  allocate(slaveId);
+}
+
+
+void HierarchicalAllocatorProcess::updateInverseOffer(
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const Option<UnavailableResources>& unavailableResources,
+    const Option<mesos::master::InverseOfferStatus>& status,
+    const Option<Filters>& filters)
+{
+  CHECK(initialized);
+  CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+  CHECK(slaves[slaveId].maintenance.isSome());
+
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and OfferFilter.
+
+  // We use a reference by alias because we intend to modify the
+  // `maintenance` and to improve readability.
+  typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get();
+
+  // Only handle inverse offers that we currently have outstanding. If it is not
+  // currently outstanding this means it is old and can be safely ignored.
+  if (maintenance.offersOutstanding.contains(frameworkId)) {
+    // We always remove the outstanding offer so that we will send a new offer
+    // out the next time we schedule inverse offers.
+    maintenance.offersOutstanding.erase(frameworkId);
+
+    // If the response is `Some`, this means the framework responded. Otherwise
+    // if it is `None` the inverse offer timed out or was rescinded.
+    if (status.isSome()) {
+      // For now we don't allow frameworks to respond with `UNKNOWN`. The caller
+      // should guard against this. This goes against the pattern of not
+      // checking external invariants; however, the allocator and master are
+      // currently so tightly coupled that this check is valuable.
+      CHECK_NE(
+          status.get().status(),
+          mesos::master::InverseOfferStatus::UNKNOWN);
+
+      // If the framework responded, we update our state to match.
+      maintenance.statuses[frameworkId].CopyFrom(status.get());
+    }
+  }
+
+  // No need to install filters if `filters` is none.
+  if (filters.isNone()) {
+    return;
+  }
+
+  // Create a refused resource filter.
+  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+  if (seconds.isError()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused inverse offer filter because the input value "
+                 << "is invalid: " << seconds.error();
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  } else if (seconds.get() < Duration::zero()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused inverse offer filter because the input value "
+                 << "is negative";
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  }
+
+  CHECK_SOME(seconds);
+
+  if (seconds.get() != Duration::zero()) {
+    VLOG(1) << "Framework " << frameworkId
+            << " filtered inverse offers from slave " << slaveId
+            << " for " << seconds.get();
+
+    // Create a new inverse offer filter and delay its expiration.
+    InverseOfferFilter* inverseOfferFilter =
+      new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
+
+    frameworks[frameworkId]
+      .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+
+    // We need to disambiguate the function call to pick the correct
+    // expire() overload.
+    void (Self::*expireInverseOffer)(
+             const FrameworkID&,
+             const SlaveID&,
+             InverseOfferFilter*) = &Self::expire;
+
+    delay(
+        seconds.get(),
+        self(),
+        expireInverseOffer,
+        frameworkId,
+        slaveId,
+        inverseOfferFilter);
+  }
+}
+
+
+process::Future<
+    hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+HierarchicalAllocatorProcess::getInverseOfferStatuses()
+{
+  CHECK(initialized);
+
+  hashmap<
+      SlaveID,
+      hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result;
+
+  // Make a copy of the most recent statuses.
+  foreachpair (const SlaveID& id, const Slave& slave, slaves) {
+    if (slave.maintenance.isSome()) {
+      result[id] = slave.maintenance.get().statuses;
+    }
+  }
+
+  return result;
+}
+
+
+void HierarchicalAllocatorProcess::recoverResources(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources,
+    const Option<Filters>& filters)
+{
+  CHECK(initialized);
+
+  if (resources.empty()) {
+    return;
+  }
+
+  // Updated resources allocated to framework (if framework still
+  // exists, which it might not in the event that we dispatched
+  // Master::offer before we received
+  // MesosAllocatorProcess::removeFramework or
+  // MesosAllocatorProcess::deactivateFramework, in which case we will
+  // have already recovered all of its resources).
+  if (frameworks.contains(frameworkId)) {
+    const std::string& role = frameworks[frameworkId].role;
+
+    CHECK(frameworkSorters.contains(role));
+
+    if (frameworkSorters[role]->contains(frameworkId.value())) {
+      frameworkSorters[role]->unallocated(
+          frameworkId.value(), slaveId, resources);
+      frameworkSorters[role]->remove(slaveId, resources);
+      roleSorter->unallocated(role, slaveId, resources.unreserved());
+    }
+  }
+
+  // Update resources allocated on slave (if slave still exists,
+  // which it might not in the event that we dispatched Master::offer
+  // before we received Allocator::removeSlave).
+  if (slaves.contains(slaveId)) {
+    // NOTE: We cannot add the following CHECK due to the double
+    // precision errors. See MESOS-1187 for details.
+    // CHECK(slaves[slaveId].allocated.contains(resources));
+
+    slaves[slaveId].allocated -= resources;
+
+    LOG(INFO) << "Recovered " << resources
+              << " (total: " << slaves[slaveId].total
+              << ", allocated: " << slaves[slaveId].allocated
+              << ") on slave " << slaveId
+              << " from framework " << frameworkId;
+  }
+
+  // No need to install the filter if 'filters' is none.
+  if (filters.isNone()) {
+    return;
+  }
+
+  // No need to install the filter if slave/framework does not exist.
+  if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
+    return;
+  }
+
+  // Create a refused resources filter.
+  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+  if (seconds.isError()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused resources filter because the input value "
+                 << "is invalid: " << seconds.error();
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  } else if (seconds.get() < Duration::zero()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused resources filter because the input value "
+                 << "is negative";
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  }
+
+  CHECK_SOME(seconds);
+
+  if (seconds.get() != Duration::zero()) {
+    VLOG(1) << "Framework " << frameworkId
+            << " filtered slave " << slaveId
+            << " for " << seconds.get();
+
+    // Create a new filter and delay its expiration.
+    OfferFilter* offerFilter = new RefusedOfferFilter(
+        resources,
+        process::Timeout::in(seconds.get()));
+
+    frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
+
+    // We need to disambiguate the function call to pick the correct
+    // expire() overload.
+    void (Self::*expireOffer)(
+              const FrameworkID&,
+              const SlaveID&,
+              OfferFilter*) = &Self::expire;
+
+    delay(
+        seconds.get(),
+        self(),
+        expireOffer,
+        frameworkId,
+        slaveId,
+        offerFilter);
+  }
+}
+
+
+void HierarchicalAllocatorProcess::suppressOffers(
+    const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+  frameworks[frameworkId].suppressed = true;
+
+  LOG(INFO) << "Suppressed offers for framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::reviveOffers(
+    const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  frameworks[frameworkId].offerFilters.clear();
+  frameworks[frameworkId].inverseOfferFilters.clear();
+  frameworks[frameworkId].suppressed = false;
+
+  // We delete each actual `OfferFilter` when
+  // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
+  // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
+  // address) could get reused and `HierarchicalAllocatorProcess::expire`
+  // would expire that filter too soon. Note that this only works
+  // right now because ALL Filter types "expire".
+
+  LOG(INFO) << "Removed offer filters for framework " << frameworkId;
+
+  allocate();
+}
+
+
+void HierarchicalAllocatorProcess::batch()
+{
+  allocate();
+  delay(allocationInterval, self(), &Self::batch);
+}
+
+
+void HierarchicalAllocatorProcess::allocate()
+{
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  allocate(slaves.keys());
+
+  VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
+            << stopwatch.elapsed();
+}
+
+
+void HierarchicalAllocatorProcess::allocate(
+    const SlaveID& slaveId)
+{
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  // TODO(bmahler): Add initializer list constructor for hashset.
+  hashset<SlaveID> slaves;
+  slaves.insert(slaveId);
+  allocate(slaves);
+
+  VLOG(1) << "Performed allocation for slave " << slaveId << " in "
+          << stopwatch.elapsed();
+}
+
+
+void HierarchicalAllocatorProcess::allocate(
+    const hashset<SlaveID>& slaveIds_)
+{
+  if (roleSorter->count() == 0) {
+    LOG(ERROR) << "No roles specified, cannot allocate resources!";
+    return;
+  }
+
+  // Compute the offerable resources, per framework:
+  //   (1) For reserved resources on the slave, allocate these to a
+  //       framework having the corresponding role.
+  //   (2) For unreserved resources on the slave, allocate these
+  //       to a framework of any role.
+  hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
+
+  // Randomize the order in which slaves' resources are allocated.
+  // TODO(vinod): Implement a smarter sorting algorithm.
+  std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
+  std::random_shuffle(slaveIds.begin(), slaveIds.end());
+
+  foreach (const SlaveID& slaveId, slaveIds) {
+    // Don't send offers for non-whitelisted and deactivated slaves.
+    if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) {
+      continue;
+    }
+
+    foreach (const std::string& role, roleSorter->sort()) {
+      foreach (const std::string& frameworkId_,
+               frameworkSorters[role]->sort()) {
+        FrameworkID frameworkId;
+        frameworkId.set_value(frameworkId_);
+
+        // If the framework has suppressed offers, ignore.
+        if (frameworks[frameworkId].suppressed) {
+          continue;
+        }
+
+        // Calculate the currently available resources on the slave.
+        Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
+
+        // NOTE: Currently, frameworks are allowed to have '*' role.
+        // Calling reserved('*') returns an empty Resources object.
+        Resources resources = available.unreserved() + available.reserved(role);
+
+        // Remove revocable resources if the framework has not opted
+        // for them.
+        if (!frameworks[frameworkId].revocable) {
+          resources -= resources.revocable();
+        }
+
+        // If the resources are not allocatable, ignore.
+        if (!allocatable(resources)) {
+          continue;
+        }
+
+        // If the framework filters these resources, ignore.
+        if (isFiltered(frameworkId, slaveId, resources)) {
+          continue;
+        }
+
+        VLOG(2) << "Allocating " << resources << " on slave " << slaveId
+                << " to framework " << frameworkId;
+
+        // Note that we perform "coarse-grained" allocation,
+        // meaning that we always allocate the entire remaining
+        // slave resources to a single framework.
+        offerable[frameworkId][slaveId] = resources;
+        slaves[slaveId].allocated += resources;
+
+        // Reserved resources are only accounted for in the framework
+        // sorter, since the reserved resources are not shared across
+        // roles.
+        frameworkSorters[role]->add(slaveId, resources);
+        frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
+        roleSorter->allocated(role, slaveId, resources.unreserved());
+      }
+    }
+  }
+
+  if (offerable.empty()) {
+    VLOG(1) << "No resources available to allocate!";
+  } else {
+    // Now offer the resources to each framework.
+    foreachkey (const FrameworkID& frameworkId, offerable) {
+      offerCallback(frameworkId, offerable[frameworkId]);
+    }
+  }
+
+  // NOTE: For now, we implement maintenance inverse offers within the
+  // allocator. We leverage the existing timer/cycle of offers to also do any
+  // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
+  deallocate(slaveIds_);
+}
+
+
+void HierarchicalAllocatorProcess::deallocate(
+    const hashset<SlaveID>& slaveIds_)
+{
+  if (frameworkSorters.empty()) {
+    LOG(ERROR) << "No frameworks specified, cannot send inverse offers!";
+    return;
+  }
+
+  // In this case, `offerable` is actually the slaves and/or resources that we
+  // want the master to create `InverseOffer`s from.
+  hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
+
+  // For maintenance, we use the framework sorters to determine which frameworks
+  // have (1) reserved and / or (2) unreserved resource on the specified
+  // slaveIds. This way we only send inverse offers to frameworks that have the
+  // potential to lose something. We keep track of which frameworks already have
+  // an outstanding inverse offer for the given slave in the
+  // UnavailabilityStatus of the specific slave using the `offerOutstanding`
+  // flag. This is equivalent to the accounting we do for resources when we send
+  // regular offers. If we didn't keep track of outstanding offers then we would
+  // keep generating new inverse offers even though the framework had not
+  // responded yet.
+
+  foreachvalue (Sorter* frameworkSorter, frameworkSorters) {
+    foreach (const SlaveID& slaveId, slaveIds_) {
+      CHECK(slaves.contains(slaveId));
+
+      if (slaves[slaveId].maintenance.isSome()) {
+        // We use a reference by alias because we intend to modify the
+        // `maintenance` and to improve readability.
+        typename Slave::Maintenance& maintenance =
+          slaves[slaveId].maintenance.get();
+
+        hashmap<std::string, Resources> allocation =
+          frameworkSorter->allocation(slaveId);
+
+        foreachkey (const std::string& frameworkId_, allocation) {
+          FrameworkID frameworkId;
+          frameworkId.set_value(frameworkId_);
+
+          // If this framework doesn't already have inverse offers for the
+          // specified slave.
+          if (!offerable[frameworkId].contains(slaveId)) {
+            // If there isn't already an outstanding inverse offer to this
+            // framework for the specified slave.
+            if (!maintenance.offersOutstanding.contains(frameworkId)) {
+              // Ignore in case the framework filters inverse offers for this
+              // slave.
+              // NOTE: Since this specific allocator implementation only sends
+              // inverse offers for maintenance primitives, and those are at the
+              // whole slave level, we only need to filter based on the
+              // time-out.
+              if (isFiltered(frameworkId, slaveId)) {
+                continue;
+              }
+
+              const UnavailableResources unavailableResources =
+                UnavailableResources{
+                    Resources(),
+                    maintenance.unavailability};
+
+              // For now we send inverse offers with empty resources when the
+              // inverse offer represents maintenance on the machine. In the
+              // future we could be more specific about the resources on the
+              // host, as we have the information available.
+              offerable[frameworkId][slaveId] = unavailableResources;
+
+              // Mark this framework as having an offer oustanding for the
+              // specified slave.
+              maintenance.offersOutstanding.insert(frameworkId);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  if (offerable.empty()) {
+    VLOG(1) << "No inverse offers to send out!";
+  } else {
+    // Now send inverse offers to each framework.
+    foreachkey (const FrameworkID& frameworkId, offerable) {
+      inverseOfferCallback(frameworkId, offerable[frameworkId]);
+    }
+  }
+}
+
+
+void HierarchicalAllocatorProcess::expire(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    OfferFilter* offerFilter)
+{
+  // The filter might have already been removed (e.g., if the
+  // framework no longer exists or in
+  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+  // keep the address from getting reused possibly causing premature
+  // expiration).
+  if (frameworks.contains(frameworkId) &&
+      frameworks[frameworkId].offerFilters.contains(slaveId) &&
+      frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
+    frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
+    if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
+      frameworks[frameworkId].offerFilters.erase(slaveId);
+    }
+  }
+
+  delete offerFilter;
+}
+
+
+void HierarchicalAllocatorProcess::expire(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    InverseOfferFilter* inverseOfferFilter)
+{
+  // The filter might have already been removed (e.g., if the
+  // framework no longer exists or in
+  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+  // keep the address from getting reused possibly causing premature
+  // expiration).
+  if (frameworks.contains(frameworkId) &&
+      frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
+      frameworks[frameworkId].inverseOfferFilters[slaveId]
+        .contains(inverseOfferFilter)) {
+    frameworks[frameworkId].inverseOfferFilters[slaveId]
+      .erase(inverseOfferFilter);
+
+    if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
+      frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+    }
+  }
+
+  delete inverseOfferFilter;
+}
+
+
+bool
+HierarchicalAllocatorProcess::isWhitelisted(
+    const SlaveID& slaveId)
+{
+  CHECK(slaves.contains(slaveId));
+
+  return whitelist.isNone() ||
+         whitelist.get().contains(slaves[slaveId].hostname);
+}
+
+
+bool
+HierarchicalAllocatorProcess::isFiltered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources)
+{
+  CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+
+  // Do not offer a non-checkpointing slave's resources to a checkpointing
+  // framework. This is a short term fix until the following is resolved:
+  // https://issues.apache.org/jira/browse/MESOS-444.
+  if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
+    VLOG(1) << "Filtered offer with " << resources
+            << " on non-checkpointing slave " << slaveId
+            << " for checkpointing framework " << frameworkId;
+
+    return true;
+  }
+
+  if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
+    foreach (
+      OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
+      if (offerFilter->filter(resources)) {
+        VLOG(1) << "Filtered offer with " << resources
+                << " on slave " << slaveId
+                << " for framework " << frameworkId;
+
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
+bool HierarchicalAllocatorProcess::isFiltered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId)
+{
+  CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+
+  if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
+    foreach (
+        InverseOfferFilter* inverseOfferFilter,
+        frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+      if (inverseOfferFilter->filter()) {
+        VLOG(1) << "Filtered unavailability on slave " << slaveId
+                << " for framework " << frameworkId;
+
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
+bool
+HierarchicalAllocatorProcess::allocatable(
+    const Resources& resources)
+{
+  Option<double> cpus = resources.cpus();
+  Option<Bytes> mem = resources.mem();
+
+  return (cpus.isSome() && cpus.get() >= MIN_CPUS) ||
+         (mem.isSome() && mem.get() >= MIN_MEM);
+}
+
+} // namespace internal {
+} // namespace allocator {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index d57c55e..e468b5a 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -19,26 +19,19 @@
 #ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
 #define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
 
-#include <algorithm>
-#include <vector>
+#include <string>
 
-#include <mesos/resources.hpp>
-#include <mesos/type_utils.hpp>
+#include <mesos/mesos.hpp>
 
-#include <process/event.hpp>
-#include <process/delay.hpp>
 #include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/metrics/gauge.hpp>
 #include <process/metrics/metrics.hpp>
-#include <process/timeout.hpp>
 
-#include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
-#include <stout/stopwatch.hpp>
-#include <stout/stringify.hpp>
+#include <stout/option.hpp>
 
 #include "master/allocator/mesos/allocator.hpp"
 #include "master/allocator/sorter/drf/sorter.hpp"
@@ -50,10 +43,6 @@ namespace internal {
 namespace master {
 namespace allocator {
 
-// Forward declarations.
-class OfferFilter;
-class InverseOfferFilter;
-
 // We forward declare the hierarchical allocator process so that we
 // can typedef an instantiation of it with DRF sorters.
 template <typename RoleSorter, typename FrameworkSorter>
@@ -66,16 +55,52 @@ typedef MesosAllocator<HierarchicalDRFAllocatorProcess>
 HierarchicalDRFAllocator;
 
 
+namespace internal {
+
+// Forward declarations.
+class OfferFilter;
+class InverseOfferFilter;
+
+
+// A level of indirection that allows us to keep the allocator implementation
+// in an implementation file: `hierarchical.cpp`. This maps the static
+// templatization of `HierarchicalAllocatorProcess` to a polymorphic
+// implementation in the internal namespace.
+struct SorterFactoryBase
+{
+  virtual ~SorterFactoryBase() {}
+
+  virtual Sorter* createRoleSorter() const = 0;
+
+  virtual Sorter* createFrameworkSorter() const = 0;
+};
+
+
+template <typename RoleSorter, typename FrameworkSorter>
+struct SorterFactory : public SorterFactoryBase
+{
+  virtual Sorter* createRoleSorter() const
+  {
+    return new RoleSorter();
+  }
+
+  virtual Sorter* createFrameworkSorter() const
+  {
+    return new FrameworkSorter();
+  }
+};
+
+
 // Implements the basic allocator algorithm - first pick a role by
 // some criteria, then pick one of their frameworks to allocate to.
-template <typename RoleSorter, typename FrameworkSorter>
 class HierarchicalAllocatorProcess : public MesosAllocatorProcess
 {
 public:
-  HierarchicalAllocatorProcess()
+  HierarchicalAllocatorProcess(SorterFactoryBase* _sorterFactory)
     : ProcessBase(process::ID::generate("hierarchical-allocator")),
       initialized(false),
       metrics(*this),
+      sorterFactory(_sorterFactory),
       roleSorter(NULL) {}
 
   virtual ~HierarchicalAllocatorProcess() {}
@@ -179,8 +204,8 @@ public:
 
 protected:
   // Useful typedefs for dispatch/delay/defer to self()/this.
-  typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> Self;
-  typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> This;
+  typedef HierarchicalAllocatorProcess Self;
+  typedef HierarchicalAllocatorProcess This;
 
   // Callback for doing batch allocations.
   void batch();
@@ -361,1215 +386,29 @@ protected:
   // resources as regular resources when doing fairness calculations.
   // TODO(vinod): Consider using a different fairness algorithm for
   // oversubscribed resources.
-  RoleSorter* roleSorter;
-  hashmap<std::string, FrameworkSorter*> frameworkSorters;
-};
-
-
-// Used to represent "filters" for resources unused in offers.
-class OfferFilter
-{
-public:
-  virtual ~OfferFilter() {}
-
-  virtual bool filter(const Resources& resources) = 0;
-};
-
-
-class RefusedOfferFilter: public OfferFilter
-{
-public:
-  RefusedOfferFilter(
-      const Resources& _resources,
-      const process::Timeout& _timeout)
-    : resources(_resources), timeout(_timeout) {}
-
-  virtual bool filter(const Resources& _resources)
-  {
-    // TODO(jieyu): Consider separating the superset check for regular
-    // and revocable resources. For example, frameworks might want
-    // more revocable resources only or non-revocable resources only,
-    // but currently the filter only expires if there is more of both
-    // revocable and non-revocable resources.
-    return resources.contains(_resources) && // Refused resources are superset.
-           timeout.remaining() > Seconds(0);
-  }
-
-  const Resources resources;
-  const process::Timeout timeout;
+  SorterFactoryBase* sorterFactory;
+  Sorter* roleSorter;
+  hashmap<std::string, Sorter*> frameworkSorters;
 };
 
 
-// Used to represent "filters" for inverse offers.
-// NOTE: Since this specific allocator implementation only sends inverse offers
-// for maintenance primitives, and those are at the whole slave level, we only
-// need to filter based on the time-out.
-// If this allocator implementation starts sending out more resource specific
-// inverse offers, then we can capture the `unavailableResources` in the filter
-// function.
-class InverseOfferFilter
-{
-public:
-  virtual ~InverseOfferFilter() {}
-
-  virtual bool filter() = 0;
-};
+} // namespace internal {
 
 
-// NOTE: See comment above `InverseOfferFilter` regarding capturing
-// `unavailableResources` if this allocator starts sending fine-grained inverse
-// offers.
-class RefusedInverseOfferFilter: public InverseOfferFilter
+// We map the templatized version of the `HierarchicalAllocatorProcess` to one
+// that relies on polymorphic sorters in the internal namespace. This allows us
+// to keep the implemention of the allocator in the implementation file.
+template <typename RoleSorter, typename FrameworkSorter>
+class HierarchicalAllocatorProcess
+  : public internal::HierarchicalAllocatorProcess
 {
 public:
-  RefusedInverseOfferFilter(const process::Timeout& _timeout)
-    : timeout(_timeout) {}
-
-  virtual bool filter()
-  {
-    // See comment above why we currently don't do more fine-grained filtering.
-    return timeout.remaining() > Seconds(0);
-  }
-
-  const process::Timeout timeout;
+  HierarchicalAllocatorProcess()
+    : internal::HierarchicalAllocatorProcess(
+          new internal::SorterFactory<RoleSorter, FrameworkSorter>()) {}
 };
 
 
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
-    const Duration& _allocationInterval,
-    const lambda::function<
-        void(const FrameworkID&,
-             const hashmap<SlaveID, Resources>&)>& _offerCallback,
-    const lambda::function<
-        void(const FrameworkID&,
-             const hashmap<SlaveID, UnavailableResources>&)>&
-      _inverseOfferCallback,
-    const hashmap<std::string, mesos::master::RoleInfo>& _roles)
-{
-  allocationInterval = _allocationInterval;
-  offerCallback = _offerCallback;
-  inverseOfferCallback = _inverseOfferCallback;
-  roles = _roles;
-  initialized = true;
-
-  roleSorter = new RoleSorter();
-  foreachpair (
-      const std::string& name, const mesos::master::RoleInfo& roleInfo, roles) {
-    roleSorter->add(name, roleInfo.weight());
-    frameworkSorters[name] = new FrameworkSorter();
-  }
-
-  if (roleSorter->count() == 0) {
-    LOG(ERROR) << "No roles specified, cannot allocate resources!";
-  }
-
-  VLOG(1) << "Initialized hierarchical allocator process";
-
-  delay(allocationInterval, self(), &Self::batch);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
-    const FrameworkID& frameworkId,
-    const FrameworkInfo& frameworkInfo,
-    const hashmap<SlaveID, Resources>& used)
-{
-  CHECK(initialized);
-
-  const std::string& role = frameworkInfo.role();
-
-  CHECK(roles.contains(role));
-
-  CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
-  frameworkSorters[role]->add(frameworkId.value());
-
-  // TODO(bmahler): Validate that the reserved resources have the
-  // framework's role.
-
-  // Update the allocation to this framework.
-  foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
-    roleSorter->allocated(role, slaveId, allocated.unreserved());
-    frameworkSorters[role]->add(slaveId, allocated);
-    frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated);
-  }
-
-  frameworks[frameworkId] = Framework();
-  frameworks[frameworkId].role = frameworkInfo.role();
-  frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
-
-  // Check if the framework desires revocable resources.
-  frameworks[frameworkId].revocable = false;
-  foreach (const FrameworkInfo::Capability& capability,
-           frameworkInfo.capabilities()) {
-    if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
-      frameworks[frameworkId].revocable = true;
-    }
-  }
-
-  frameworks[frameworkId].suppressed = false;
-
-  LOG(INFO) << "Added framework " << frameworkId;
-
-  allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
-    const FrameworkID& frameworkId)
-{
-  CHECK(initialized);
-
-  CHECK(frameworks.contains(frameworkId));
-  const std::string& role = frameworks[frameworkId].role;
-
-  // Might not be in 'frameworkSorters[role]' because it was previously
-  // deactivated and never re-added.
-  if (frameworkSorters[role]->contains(frameworkId.value())) {
-    hashmap<SlaveID, Resources> allocation =
-      frameworkSorters[role]->allocation(frameworkId.value());
-
-    foreachpair (
-        const SlaveID& slaveId, const Resources& allocated, allocation) {
-      roleSorter->unallocated(role, slaveId, allocated.unreserved());
-      frameworkSorters[role]->remove(slaveId, allocated);
-    }
-
-    frameworkSorters[role]->remove(frameworkId.value());
-  }
-
-  // Do not delete the filters contained in this
-  // framework's `offerFilters` hashset yet, see comments in
-  // HierarchicalAllocatorProcess::reviveOffers and
-  // HierarchicalAllocatorProcess::expire.
-  frameworks.erase(frameworkId);
-
-  LOG(INFO) << "Removed framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateFramework(
-    const FrameworkID& frameworkId)
-{
-  CHECK(initialized);
-
-  CHECK(frameworks.contains(frameworkId));
-  const std::string& role = frameworks[frameworkId].role;
-
-  frameworkSorters[role]->activate(frameworkId.value());
-
-  LOG(INFO) << "Activated framework " << frameworkId;
-
-  allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
-    const FrameworkID& frameworkId)
-{
-  CHECK(initialized);
-
-  CHECK(frameworks.contains(frameworkId));
-  const std::string& role = frameworks[frameworkId].role;
-
-  frameworkSorters[role]->deactivate(frameworkId.value());
-
-  // Note that the Sorter *does not* remove the resources allocated
-  // to this framework. For now, this is important because if the
-  // framework fails over and is activated, we still want a record
-  // of the resources that it is using. We might be able to collapse
-  // the added/removed and activated/deactivated in the future.
-
-  // Do not delete the filters contained in this
-  // framework's `offerFilters` hashset yet, see comments in
-  // HierarchicalAllocatorProcess::reviveOffers and
-  // HierarchicalAllocatorProcess::expire.
-  frameworks[frameworkId].offerFilters.clear();
-  frameworks[frameworkId].inverseOfferFilters.clear();
-
-  LOG(INFO) << "Deactivated framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateFramework(
-    const FrameworkID& frameworkId,
-    const FrameworkInfo& frameworkInfo)
-{
-  CHECK(initialized);
-
-  CHECK(frameworks.contains(frameworkId));
-
-  // TODO(jmlvanre): Once we allow frameworks to re-register with a
-  // new 'role' or 'checkpoint' flag, we need to update our internal
-  // 'frameworks' structure. See MESOS-703 for progress on allowing
-  // these fields to be updated.
-  CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role());
-  CHECK_EQ(frameworks[frameworkId].checkpoint, frameworkInfo.checkpoint());
-
-  frameworks[frameworkId].revocable = false;
-
-  foreach (const FrameworkInfo::Capability& capability,
-           frameworkInfo.capabilities()) {
-    if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
-      frameworks[frameworkId].revocable = true;
-    }
-  }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
-    const SlaveID& slaveId,
-    const SlaveInfo& slaveInfo,
-    const Option<Unavailability>& unavailability,
-    const Resources& total,
-    const hashmap<FrameworkID, Resources>& used)
-{
-  CHECK(initialized);
-  CHECK(!slaves.contains(slaveId));
-
-  roleSorter->add(slaveId, total.unreserved());
-
-  foreachpair (const FrameworkID& frameworkId,
-               const Resources& allocated,
-               used) {
-    if (frameworks.contains(frameworkId)) {
-      const std::string& role = frameworks[frameworkId].role;
-
-      // TODO(bmahler): Validate that the reserved resources have the
-      // framework's role.
-
-      roleSorter->allocated(role, slaveId, allocated.unreserved());
-      frameworkSorters[role]->add(slaveId, allocated);
-      frameworkSorters[role]->allocated(
-          frameworkId.value(), slaveId, allocated);
-    }
-  }
-
-  slaves[slaveId] = Slave();
-  slaves[slaveId].total = total;
-  slaves[slaveId].allocated = Resources::sum(used);
-  slaves[slaveId].activated = true;
-  slaves[slaveId].checkpoint = slaveInfo.checkpoint();
-  slaves[slaveId].hostname = slaveInfo.hostname();
-
-  // NOTE: We currently implement maintenance in the allocator to be able to
-  // leverage state and features such as the FrameworkSorter and OfferFilter.
-  if (unavailability.isSome()) {
-    slaves[slaveId].maintenance =
-      typename Slave::Maintenance(unavailability.get());
-  }
-
-  LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
-            << ") with " << slaves[slaveId].total
-            << " (allocated: " << slaves[slaveId].allocated << ")";
-
-  allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave(
-    const SlaveID& slaveId)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-
-  // TODO(bmahler): Per MESOS-621, this should remove the allocations
-  // that any frameworks have on this slave. Otherwise the caller may
-  // "leak" allocated resources accidentally if they forget to recover
-  // all the resources. Fixing this would require more information
-  // than what we currently track in the allocator.
-
-  roleSorter->remove(slaveId, slaves[slaveId].total.unreserved());
-
-  slaves.erase(slaveId);
-
-  // Note that we DO NOT actually delete any filters associated with
-  // this slave, that will occur when the delayed
-  // HierarchicalAllocatorProcess::expire gets invoked (or the framework
-  // that applied the filters gets removed).
-
-  LOG(INFO) << "Removed slave " << slaveId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateSlave(
-    const SlaveID& slaveId,
-    const Resources& oversubscribed)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-
-  // Check that all the oversubscribed resources are revocable.
-  CHECK_EQ(oversubscribed, oversubscribed.revocable());
-
-  // Update the total resources.
-
-  // First remove the old oversubscribed resources from the total.
-  slaves[slaveId].total -= slaves[slaveId].total.revocable();
-
-  // Now add the new estimate of oversubscribed resources.
-  slaves[slaveId].total += oversubscribed;
-
-  // Now, update the total resources in the role sorter.
-  roleSorter->update(
-      slaveId,
-      slaves[slaveId].total.unreserved());
-
-  LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname << ")"
-            << " updated with oversubscribed resources " << oversubscribed
-            << " (total: " << slaves[slaveId].total
-            << ", allocated: " << slaves[slaveId].allocated << ")";
-
-  allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateSlave(
-    const SlaveID& slaveId)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-
-  slaves[slaveId].activated = true;
-
-  LOG(INFO)<< "Slave " << slaveId << " reactivated";
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateSlave(
-    const SlaveID& slaveId)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-
-  slaves[slaveId].activated = false;
-
-  LOG(INFO) << "Slave " << slaveId << " deactivated";
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateWhitelist(
-    const Option<hashset<std::string>>& _whitelist)
-{
-  CHECK(initialized);
-
-  whitelist = _whitelist;
-
-  if (whitelist.isSome()) {
-    LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get());
-
-    if (whitelist.get().empty()) {
-      LOG(WARNING) << "Whitelist is empty, no offers will be made!";
-    }
-  } else {
-    LOG(INFO) << "Advertising offers for all slaves";
-  }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
-    const FrameworkID& frameworkId,
-    const std::vector<Request>& requests)
-{
-  CHECK(initialized);
-
-  LOG(INFO) << "Received resource request from framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    const std::vector<Offer::Operation>& operations)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-  CHECK(frameworks.contains(frameworkId));
-
-  // Here we apply offer operations to the allocated resources, which
-  // in turns leads to an update of the total. The available resources
-  // remain unchanged.
-
-  // Update the allocated resources.
-  FrameworkSorter* frameworkSorter =
-    frameworkSorters[frameworks[frameworkId].role];
-
-  Resources frameworkAllocation =
-    frameworkSorter->allocation(frameworkId.value(), slaveId);
-
-  Try<Resources> updatedFrameworkAllocation =
-    frameworkAllocation.apply(operations);
-
-  CHECK_SOME(updatedFrameworkAllocation);
-
-  frameworkSorter->update(
-      frameworkId.value(),
-      slaveId,
-      frameworkAllocation,
-      updatedFrameworkAllocation.get());
-
-  roleSorter->update(
-      frameworks[frameworkId].role,
-      slaveId,
-      frameworkAllocation.unreserved(),
-      updatedFrameworkAllocation.get().unreserved());
-
-  Try<Resources> updatedSlaveAllocation =
-    slaves[slaveId].allocated.apply(operations);
-
-  CHECK_SOME(updatedSlaveAllocation);
-
-  slaves[slaveId].allocated = updatedSlaveAllocation.get();
-
-  // Update the total resources.
-  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
-  CHECK_SOME(updatedTotal);
-
-  slaves[slaveId].total = updatedTotal.get();
-
-  LOG(INFO) << "Updated allocation of framework " << frameworkId
-            << " on slave " << slaveId
-            << " from " << frameworkAllocation
-            << " to " << updatedFrameworkAllocation.get();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-process::Future<Nothing>
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
-    const SlaveID& slaveId,
-    const std::vector<Offer::Operation>& operations)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-
-  Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
-
-  // It's possible for this 'apply' to fail here because a call to
-  // 'allocate' could have been enqueued by the allocator itself
-  // just before master's request to enqueue 'updateAvailable'
-  // arrives to the allocator.
-  //
-  //   Master -------R------------
-  //                  \----+
-  //                       |
-  //   Allocator --A-----A-U---A--
-  //                \___/ \___/
-  //
-  //   where A = allocate, R = reserve, U = updateAvailable
-  Try<Resources> updatedAvailable = available.apply(operations);
-  if (updatedAvailable.isError()) {
-    return process::Failure(updatedAvailable.error());
-  }
-
-  // Update the total resources.
-  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
-  CHECK_SOME(updatedTotal);
-
-  slaves[slaveId].total = updatedTotal.get();
-
-  // Now, update the total resources in the role sorter.
-  roleSorter->update(slaveId, slaves[slaveId].total.unreserved());
-
-  return Nothing();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
-    const SlaveID& slaveId,
-    const Option<Unavailability>& unavailability)
-{
-  CHECK(initialized);
-  CHECK(slaves.contains(slaveId));
-
-  // NOTE: We currently implement maintenance in the allocator to be able to
-  // leverage state and features such as the FrameworkSorter and OfferFilter.
-
-  // We explicitly remove all filters for the inverse offers of this slave. We
-  // do this because we want to force frameworks to reassess the calculations
-  // they have made to respond to the inverse offer. Unavailability of a slave
-  // can have a large effect on failure domain calculations and inter-leaved
-  // unavailability schedules.
-  foreachvalue (Framework& framework, frameworks) {
-    framework.inverseOfferFilters.erase(slaveId);
-  }
-
-  // Remove any old unavailability.
-  slaves[slaveId].maintenance = None();
-
-  // If we have a new unavailability.
-  if (unavailability.isSome()) {
-    slaves[slaveId].maintenance =
-      typename Slave::Maintenance(unavailability.get());
-  }
-
-  allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
-    const SlaveID& slaveId,
-    const FrameworkID& frameworkId,
-    const Option<UnavailableResources>& unavailableResources,
-    const Option<mesos::master::InverseOfferStatus>& status,
-    const Option<Filters>& filters)
-{
-  CHECK(initialized);
-  CHECK(frameworks.contains(frameworkId));
-  CHECK(slaves.contains(slaveId));
-  CHECK(slaves[slaveId].maintenance.isSome());
-
-  // NOTE: We currently implement maintenance in the allocator to be able to
-  // leverage state and features such as the FrameworkSorter and OfferFilter.
-
-  // We use a reference by alias because we intend to modify the
-  // `maintenance` and to improve readability.
-  typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get();
-
-  // Only handle inverse offers that we currently have outstanding. If it is not
-  // currently outstanding this means it is old and can be safely ignored.
-  if (maintenance.offersOutstanding.contains(frameworkId)) {
-    // We always remove the outstanding offer so that we will send a new offer
-    // out the next time we schedule inverse offers.
-    maintenance.offersOutstanding.erase(frameworkId);
-
-    // If the response is `Some`, this means the framework responded. Otherwise
-    // if it is `None` the inverse offer timed out or was rescinded.
-    if (status.isSome()) {
-      // For now we don't allow frameworks to respond with `UNKNOWN`. The caller
-      // should guard against this. This goes against the pattern of not
-      // checking external invariants; however, the allocator and master are
-      // currently so tightly coupled that this check is valuable.
-      CHECK_NE(
-          status.get().status(),
-          mesos::master::InverseOfferStatus::UNKNOWN);
-
-      // If the framework responded, we update our state to match.
-      maintenance.statuses[frameworkId].CopyFrom(status.get());
-    }
-  }
-
-  // No need to install filters if `filters` is none.
-  if (filters.isNone()) {
-    return;
-  }
-
-  // Create a refused resource filter.
-  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
-
-  if (seconds.isError()) {
-    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
-                 << "the refused inverse offer filter because the input value "
-                 << "is invalid: " << seconds.error();
-
-    seconds = Duration::create(Filters().refuse_seconds());
-  } else if (seconds.get() < Duration::zero()) {
-    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
-                 << "the refused inverse offer filter because the input value "
-                 << "is negative";
-
-    seconds = Duration::create(Filters().refuse_seconds());
-  }
-
-  CHECK_SOME(seconds);
-
-  if (seconds.get() != Duration::zero()) {
-    VLOG(1) << "Framework " << frameworkId
-            << " filtered inverse offers from slave " << slaveId
-            << " for " << seconds.get();
-
-    // Create a new inverse offer filter and delay its expiration.
-    InverseOfferFilter* inverseOfferFilter =
-      new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
-
-    frameworks[frameworkId]
-      .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
-
-    // We need to disambiguate the function call to pick the correct
-    // expire() overload.
-    void (Self::*expireInverseOffer)(
-             const FrameworkID&,
-             const SlaveID&,
-             InverseOfferFilter*) = &Self::expire;
-
-    delay(
-        seconds.get(),
-        self(),
-        expireInverseOffer,
-        frameworkId,
-        slaveId,
-        inverseOfferFilter);
-  }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-process::Future<
-    hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
-HierarchicalAllocatorProcess<
-    RoleSorter, FrameworkSorter>::getInverseOfferStatuses()
-{
-  CHECK(initialized);
-
-  hashmap<
-      SlaveID,
-      hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result;
-
-  // Make a copy of the most recent statuses.
-  foreachpair (const SlaveID& id, const Slave& slave, slaves) {
-    if (slave.maintenance.isSome()) {
-      result[id] = slave.maintenance.get().statuses;
-    }
-  }
-
-  return result;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    const Resources& resources,
-    const Option<Filters>& filters)
-{
-  CHECK(initialized);
-
-  if (resources.empty()) {
-    return;
-  }
-
-  // Updated resources allocated to framework (if framework still
-  // exists, which it might not in the event that we dispatched
-  // Master::offer before we received
-  // MesosAllocatorProcess::removeFramework or
-  // MesosAllocatorProcess::deactivateFramework, in which case we will
-  // have already recovered all of its resources).
-  if (frameworks.contains(frameworkId)) {
-    const std::string& role = frameworks[frameworkId].role;
-
-    CHECK(frameworkSorters.contains(role));
-
-    if (frameworkSorters[role]->contains(frameworkId.value())) {
-      frameworkSorters[role]->unallocated(
-          frameworkId.value(), slaveId, resources);
-      frameworkSorters[role]->remove(slaveId, resources);
-      roleSorter->unallocated(role, slaveId, resources.unreserved());
-    }
-  }
-
-  // Update resources allocated on slave (if slave still exists,
-  // which it might not in the event that we dispatched Master::offer
-  // before we received Allocator::removeSlave).
-  if (slaves.contains(slaveId)) {
-    // NOTE: We cannot add the following CHECK due to the double
-    // precision errors. See MESOS-1187 for details.
-    // CHECK(slaves[slaveId].allocated.contains(resources));
-
-    slaves[slaveId].allocated -= resources;
-
-    LOG(INFO) << "Recovered " << resources
-              << " (total: " << slaves[slaveId].total
-              << ", allocated: " << slaves[slaveId].allocated
-              << ") on slave " << slaveId
-              << " from framework " << frameworkId;
-  }
-
-  // No need to install the filter if 'filters' is none.
-  if (filters.isNone()) {
-    return;
-  }
-
-  // No need to install the filter if slave/framework does not exist.
-  if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
-    return;
-  }
-
-  // Create a refused resources filter.
-  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
-
-  if (seconds.isError()) {
-    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
-                 << "the refused resources filter because the input value "
-                 << "is invalid: " << seconds.error();
-
-    seconds = Duration::create(Filters().refuse_seconds());
-  } else if (seconds.get() < Duration::zero()) {
-    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
-                 << "the refused resources filter because the input value "
-                 << "is negative";
-
-    seconds = Duration::create(Filters().refuse_seconds());
-  }
-
-  CHECK_SOME(seconds);
-
-  if (seconds.get() != Duration::zero()) {
-    VLOG(1) << "Framework " << frameworkId
-            << " filtered slave " << slaveId
-            << " for " << seconds.get();
-
-    // Create a new filter and delay its expiration.
-    OfferFilter* offerFilter = new RefusedOfferFilter(
-        resources,
-        process::Timeout::in(seconds.get()));
-
-    frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
-
-    // We need to disambiguate the function call to pick the correct
-    // expire() overload.
-    void (Self::*expireOffer)(
-              const FrameworkID&,
-              const SlaveID&,
-              OfferFilter*) = &Self::expire;
-
-    delay(
-        seconds.get(),
-        self(),
-        expireOffer,
-        frameworkId,
-        slaveId,
-        offerFilter);
-  }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::suppressOffers(
-    const FrameworkID& frameworkId)
-{
-  CHECK(initialized);
-  frameworks[frameworkId].suppressed = true;
-
-  LOG(INFO) << "Suppressed offers for framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
-    const FrameworkID& frameworkId)
-{
-  CHECK(initialized);
-
-  frameworks[frameworkId].offerFilters.clear();
-  frameworks[frameworkId].inverseOfferFilters.clear();
-  frameworks[frameworkId].suppressed = false;
-
-  // We delete each actual `OfferFilter` when
-  // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
-  // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
-  // address) could get reused and `HierarchicalAllocatorProcess::expire`
-  // would expire that filter too soon. Note that this only works
-  // right now because ALL Filter types "expire".
-
-  LOG(INFO) << "Removed offer filters for framework " << frameworkId;
-
-  allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::batch()
-{
-  allocate();
-  delay(allocationInterval, self(), &Self::batch);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate()
-{
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  allocate(slaves.keys());
-
-  VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
-            << stopwatch.elapsed();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
-    const SlaveID& slaveId)
-{
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  // TODO(bmahler): Add initializer list constructor for hashset.
-  hashset<SlaveID> slaves;
-  slaves.insert(slaveId);
-  allocate(slaves);
-
-  VLOG(1) << "Performed allocation for slave " << slaveId << " in "
-          << stopwatch.elapsed();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
-    const hashset<SlaveID>& slaveIds_)
-{
-  if (roleSorter->count() == 0) {
-    LOG(ERROR) << "No roles specified, cannot allocate resources!";
-    return;
-  }
-
-  // Compute the offerable resources, per framework:
-  //   (1) For reserved resources on the slave, allocate these to a
-  //       framework having the corresponding role.
-  //   (2) For unreserved resources on the slave, allocate these
-  //       to a framework of any role.
-  hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
-
-  // Randomize the order in which slaves' resources are allocated.
-  // TODO(vinod): Implement a smarter sorting algorithm.
-  std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
-  std::random_shuffle(slaveIds.begin(), slaveIds.end());
-
-  foreach (const SlaveID& slaveId, slaveIds) {
-    // Don't send offers for non-whitelisted and deactivated slaves.
-    if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) {
-      continue;
-    }
-
-    foreach (const std::string& role, roleSorter->sort()) {
-      foreach (const std::string& frameworkId_,
-               frameworkSorters[role]->sort()) {
-        FrameworkID frameworkId;
-        frameworkId.set_value(frameworkId_);
-
-        // If the framework has suppressed offers, ignore.
-        if (frameworks[frameworkId].suppressed) {
-          continue;
-        }
-
-        // Calculate the currently available resources on the slave.
-        Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
-
-        // NOTE: Currently, frameworks are allowed to have '*' role.
-        // Calling reserved('*') returns an empty Resources object.
-        Resources resources = available.unreserved() + available.reserved(role);
-
-        // Remove revocable resources if the framework has not opted
-        // for them.
-        if (!frameworks[frameworkId].revocable) {
-          resources -= resources.revocable();
-        }
-
-        // If the resources are not allocatable, ignore.
-        if (!allocatable(resources)) {
-          continue;
-        }
-
-        // If the framework filters these resources, ignore.
-        if (isFiltered(frameworkId, slaveId, resources)) {
-          continue;
-        }
-
-        VLOG(2) << "Allocating " << resources << " on slave " << slaveId
-                << " to framework " << frameworkId;
-
-        // Note that we perform "coarse-grained" allocation,
-        // meaning that we always allocate the entire remaining
-        // slave resources to a single framework.
-        offerable[frameworkId][slaveId] = resources;
-        slaves[slaveId].allocated += resources;
-
-        // Reserved resources are only accounted for in the framework
-        // sorter, since the reserved resources are not shared across
-        // roles.
-        frameworkSorters[role]->add(slaveId, resources);
-        frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
-        roleSorter->allocated(role, slaveId, resources.unreserved());
-      }
-    }
-  }
-
-  if (offerable.empty()) {
-    VLOG(1) << "No resources available to allocate!";
-  } else {
-    // Now offer the resources to each framework.
-    foreachkey (const FrameworkID& frameworkId, offerable) {
-      offerCallback(frameworkId, offerable[frameworkId]);
-    }
-  }
-
-  // NOTE: For now, we implement maintenance inverse offers within the
-  // allocator. We leverage the existing timer/cycle of offers to also do any
-  // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
-  deallocate(slaveIds_);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
-    const hashset<SlaveID>& slaveIds_)
-{
-  if (frameworkSorters.empty()) {
-    LOG(ERROR) << "No frameworks specified, cannot send inverse offers!";
-    return;
-  }
-
-  // In this case, `offerable` is actually the slaves and/or resources that we
-  // want the master to create `InverseOffer`s from.
-  hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
-
-  // For maintenance, we use the framework sorters to determine which frameworks
-  // have (1) reserved and / or (2) unreserved resource on the specified
-  // slaveIds. This way we only send inverse offers to frameworks that have the
-  // potential to lose something. We keep track of which frameworks already have
-  // an outstanding inverse offer for the given slave in the
-  // UnavailabilityStatus of the specific slave using the `offerOutstanding`
-  // flag. This is equivalent to the accounting we do for resources when we send
-  // regular offers. If we didn't keep track of outstanding offers then we would
-  // keep generating new inverse offers even though the framework had not
-  // responded yet.
-
-  foreachvalue (FrameworkSorter* frameworkSorter, frameworkSorters) {
-    foreach (const SlaveID& slaveId, slaveIds_) {
-      CHECK(slaves.contains(slaveId));
-
-      if (slaves[slaveId].maintenance.isSome()) {
-        // We use a reference by alias because we intend to modify the
-        // `maintenance` and to improve readability.
-        typename Slave::Maintenance& maintenance =
-          slaves[slaveId].maintenance.get();
-
-        hashmap<std::string, Resources> allocation =
-          frameworkSorter->allocation(slaveId);
-
-        foreachkey (const std::string& frameworkId_, allocation) {
-          FrameworkID frameworkId;
-          frameworkId.set_value(frameworkId_);
-
-          // If this framework doesn't already have inverse offers for the
-          // specified slave.
-          if (!offerable[frameworkId].contains(slaveId)) {
-            // If there isn't already an outstanding inverse offer to this
-            // framework for the specified slave.
-            if (!maintenance.offersOutstanding.contains(frameworkId)) {
-              // Ignore in case the framework filters inverse offers for this
-              // slave.
-              // NOTE: Since this specific allocator implementation only sends
-              // inverse offers for maintenance primitives, and those are at the
-              // whole slave level, we only need to filter based on the
-              // time-out.
-              if (isFiltered(frameworkId, slaveId)) {
-                continue;
-              }
-
-              const UnavailableResources unavailableResources =
-                UnavailableResources{
-                    Resources(),
-                    maintenance.unavailability};
-
-              // For now we send inverse offers with empty resources when the
-              // inverse offer represents maintenance on the machine. In the
-              // future we could be more specific about the resources on the
-              // host, as we have the information available.
-              offerable[frameworkId][slaveId] = unavailableResources;
-
-              // Mark this framework as having an offer oustanding for the
-              // specified slave.
-              maintenance.offersOutstanding.insert(frameworkId);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  if (offerable.empty()) {
-    VLOG(1) << "No inverse offers to send out!";
-  } else {
-    // Now send inverse offers to each framework.
-    foreachkey (const FrameworkID& frameworkId, offerable) {
-      inverseOfferCallback(frameworkId, offerable[frameworkId]);
-    }
-  }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    OfferFilter* offerFilter)
-{
-  // The filter might have already been removed (e.g., if the
-  // framework no longer exists or in
-  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
-  // keep the address from getting reused possibly causing premature
-  // expiration).
-  if (frameworks.contains(frameworkId) &&
-      frameworks[frameworkId].offerFilters.contains(slaveId) &&
-      frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
-    frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
-    if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
-      frameworks[frameworkId].offerFilters.erase(slaveId);
-    }
-  }
-
-  delete offerFilter;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    InverseOfferFilter* inverseOfferFilter)
-{
-  // The filter might have already been removed (e.g., if the
-  // framework no longer exists or in
-  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
-  // keep the address from getting reused possibly causing premature
-  // expiration).
-  if (frameworks.contains(frameworkId) &&
-      frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
-      frameworks[frameworkId].inverseOfferFilters[slaveId]
-        .contains(inverseOfferFilter)) {
-    frameworks[frameworkId].inverseOfferFilters[slaveId]
-      .erase(inverseOfferFilter);
-
-    if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
-      frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
-    }
-  }
-
-  delete inverseOfferFilter;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
-    const SlaveID& slaveId)
-{
-  CHECK(slaves.contains(slaveId));
-
-  return whitelist.isNone() ||
-         whitelist.get().contains(slaves[slaveId].hostname);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    const Resources& resources)
-{
-  CHECK(frameworks.contains(frameworkId));
-  CHECK(slaves.contains(slaveId));
-
-  // Do not offer a non-checkpointing slave's resources to a checkpointing
-  // framework. This is a short term fix until the following is resolved:
-  // https://issues.apache.org/jira/browse/MESOS-444.
-  if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
-    VLOG(1) << "Filtered offer with " << resources
-            << " on non-checkpointing slave " << slaveId
-            << " for checkpointing framework " << frameworkId;
-
-    return true;
-  }
-
-  if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
-    foreach (
-      OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
-      if (offerFilter->filter(resources)) {
-        VLOG(1) << "Filtered offer with " << resources
-                << " on slave " << slaveId
-                << " for framework " << frameworkId;
-
-        return true;
-      }
-    }
-  }
-
-  return false;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId)
-{
-  CHECK(frameworks.contains(frameworkId));
-  CHECK(slaves.contains(slaveId));
-
-  if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
-    foreach (
-        InverseOfferFilter* inverseOfferFilter,
-        frameworks[frameworkId].inverseOfferFilters[slaveId]) {
-      if (inverseOfferFilter->filter()) {
-        VLOG(1) << "Filtered unavailability on slave " << slaveId
-                << " for framework " << frameworkId;
-
-        return true;
-      }
-    }
-  }
-
-  return false;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
-    const Resources& resources)
-{
-  Option<double> cpus = resources.cpus();
-  Option<Bytes> mem = resources.mem();
-
-  return (cpus.isSome() && cpus.get() >= MIN_CPUS) ||
-         (mem.isSome() && mem.get() >= MIN_MEM);
-}
-
 } // namespace allocator {
 } // namespace master {
 } // namespace internal {