You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/10/13 20:03:49 UTC
mesos git commit: Added static->dynamic transformation to Allocator.
Repository: mesos
Updated Branches:
refs/heads/master 38dbadc94 -> 2d0b65ede
Added static->dynamic transformation to Allocator.
This improves the compilation time of Mesos significantly, allowing
developers to iterate more quickly on allocator changes.
Review: https://reviews.apache.org/r/38869
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d0b65ed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d0b65ed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d0b65ed
Branch: refs/heads/master
Commit: 2d0b65edeea129b427e60a3ca360fc29f77aa0d5
Parents: 38dbadc
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Sep 29 17:01:13 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Tue Oct 13 20:03:04 2015 +0200
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
src/Makefile.am | 1 +
src/master/allocator/mesos/hierarchical.cpp | 1191 ++++++++++++++++++++
src/master/allocator/mesos/hierarchical.hpp | 1273 +---------------------
4 files changed, 1249 insertions(+), 1217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 536a99f..98e76ce 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -142,6 +142,7 @@ set(MASTER_SRC
master/repairer.cpp
master/validation.cpp
master/allocator/allocator.cpp
+ master/allocator/mesos/hierarchical.cpp
master/allocator/sorter/drf/sorter.cpp
)
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index d855cb8..4a0eeb8 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -495,6 +495,7 @@ libmesos_no_3rdparty_la_SOURCES = \
master/repairer.cpp \
master/validation.cpp \
master/allocator/allocator.cpp \
+ master/allocator/mesos/hierarchical.cpp \
master/allocator/sorter/drf/sorter.cpp \
messages/flags.proto \
messages/messages.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
new file mode 100644
index 0000000..0a6f8a6
--- /dev/null
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -0,0 +1,1191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "master/allocator/mesos/hierarchical.hpp"
+
+#include <algorithm>
+#include <vector>
+
+#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/event.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/check.hpp>
+#include <stout/hashset.hpp>
+#include <stout/stopwatch.hpp>
+#include <stout/stringify.hpp>
+
+namespace mesos {
+namespace internal {
+namespace master {
+namespace allocator {
+namespace internal {
+
+// Used to represent "filters" for resources unused in offers.
+class OfferFilter
+{
+public:
+ virtual ~OfferFilter() {}
+
+ virtual bool filter(const Resources& resources) = 0;
+};
+
+
+class RefusedOfferFilter : public OfferFilter
+{
+public:
+ RefusedOfferFilter(
+ const Resources& _resources,
+ const process::Timeout& _timeout)
+ : resources(_resources), timeout(_timeout) {}
+
+ virtual bool filter(const Resources& _resources)
+ {
+ // TODO(jieyu): Consider separating the superset check for regular
+ // and revocable resources. For example, frameworks might want
+ // more revocable resources only or non-revocable resources only,
+ // but currently the filter only expires if there is more of both
+ // revocable and non-revocable resources.
+ return resources.contains(_resources) && // Refused resources are superset.
+ timeout.remaining() > Seconds(0);
+ }
+
+ const Resources resources;
+ const process::Timeout timeout;
+};
+
+
+// Used to represent "filters" for inverse offers.
+// NOTE: Since this specific allocator implementation only sends inverse offers
+// for maintenance primitives, and those are at the whole slave level, we only
+// need to filter based on the time-out.
+// If this allocator implementation starts sending out more resource specific
+// inverse offers, then we can capture the `unavailableResources` in the filter
+// function.
+class InverseOfferFilter
+{
+public:
+ virtual ~InverseOfferFilter() {}
+
+ virtual bool filter() = 0;
+};
+
+
+// NOTE: See comment above `InverseOfferFilter` regarding capturing
+// `unavailableResources` if this allocator starts sending fine-grained inverse
+// offers.
+class RefusedInverseOfferFilter : public InverseOfferFilter
+{
+public:
+ RefusedInverseOfferFilter(const process::Timeout& _timeout)
+ : timeout(_timeout) {}
+
+ virtual bool filter()
+ {
+ // See comment above why we currently don't do more fine-grained filtering.
+ return timeout.remaining() > Seconds(0);
+ }
+
+ const process::Timeout timeout;
+};
+
+
+void HierarchicalAllocatorProcess::initialize(
+ const Duration& _allocationInterval,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& _offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ _inverseOfferCallback,
+ const hashmap<std::string, mesos::master::RoleInfo>& _roles)
+{
+ allocationInterval = _allocationInterval;
+ offerCallback = _offerCallback;
+ inverseOfferCallback = _inverseOfferCallback;
+ roles = _roles;
+ initialized = true;
+
+ roleSorter = sorterFactory->createRoleSorter();
+ foreachpair (
+ const std::string& name, const mesos::master::RoleInfo& roleInfo, roles) {
+ roleSorter->add(name, roleInfo.weight());
+ frameworkSorters[name] = sorterFactory->createFrameworkSorter();
+ }
+
+ if (roleSorter->count() == 0) {
+ LOG(ERROR) << "No roles specified, cannot allocate resources!";
+ }
+
+ VLOG(1) << "Initialized hierarchical allocator process";
+
+ delay(allocationInterval, self(), &Self::batch);
+}
+
+
+void HierarchicalAllocatorProcess::addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const hashmap<SlaveID, Resources>& used)
+{
+ CHECK(initialized);
+
+ const std::string& role = frameworkInfo.role();
+
+ CHECK(roles.contains(role));
+
+ CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
+ frameworkSorters[role]->add(frameworkId.value());
+
+ // TODO(bmahler): Validate that the reserved resources have the
+ // framework's role.
+
+ // Update the allocation to this framework.
+ foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
+ roleSorter->allocated(role, slaveId, allocated.unreserved());
+ frameworkSorters[role]->add(slaveId, allocated);
+ frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated);
+ }
+
+ frameworks[frameworkId] = Framework();
+ frameworks[frameworkId].role = frameworkInfo.role();
+ frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
+
+ // Check if the framework desires revocable resources.
+ frameworks[frameworkId].revocable = false;
+ foreach (const FrameworkInfo::Capability& capability,
+ frameworkInfo.capabilities()) {
+ if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
+ frameworks[frameworkId].revocable = true;
+ }
+ }
+
+ frameworks[frameworkId].suppressed = false;
+
+ LOG(INFO) << "Added framework " << frameworkId;
+
+ allocate();
+}
+
+
+void HierarchicalAllocatorProcess::removeFramework(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& role = frameworks[frameworkId].role;
+
+ // Might not be in 'frameworkSorters[role]' because it was previously
+ // deactivated and never re-added.
+ if (frameworkSorters[role]->contains(frameworkId.value())) {
+ hashmap<SlaveID, Resources> allocation =
+ frameworkSorters[role]->allocation(frameworkId.value());
+
+ foreachpair (
+ const SlaveID& slaveId, const Resources& allocated, allocation) {
+ roleSorter->unallocated(role, slaveId, allocated.unreserved());
+ frameworkSorters[role]->remove(slaveId, allocated);
+ }
+
+ frameworkSorters[role]->remove(frameworkId.value());
+ }
+
+ // Do not delete the filters contained in this
+ // framework's `offerFilters` hashset yet, see comments in
+ // HierarchicalAllocatorProcess::reviveOffers and
+ // HierarchicalAllocatorProcess::expire.
+ frameworks.erase(frameworkId);
+
+ LOG(INFO) << "Removed framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::activateFramework(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& role = frameworks[frameworkId].role;
+
+ frameworkSorters[role]->activate(frameworkId.value());
+
+ LOG(INFO) << "Activated framework " << frameworkId;
+
+ allocate();
+}
+
+
+void HierarchicalAllocatorProcess::deactivateFramework(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& role = frameworks[frameworkId].role;
+
+ frameworkSorters[role]->deactivate(frameworkId.value());
+
+ // Note that the Sorter *does not* remove the resources allocated
+ // to this framework. For now, this is important because if the
+ // framework fails over and is activated, we still want a record
+ // of the resources that it is using. We might be able to collapse
+ // the added/removed and activated/deactivated in the future.
+
+ // Do not delete the filters contained in this
+ // framework's `offerFilters` hashset yet, see comments in
+ // HierarchicalAllocatorProcess::reviveOffers and
+ // HierarchicalAllocatorProcess::expire.
+ frameworks[frameworkId].offerFilters.clear();
+ frameworks[frameworkId].inverseOfferFilters.clear();
+
+ LOG(INFO) << "Deactivated framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::updateFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+
+ // TODO(jmlvanre): Once we allow frameworks to re-register with a
+ // new 'role' or 'checkpoint' flag, we need to update our internal
+ // 'frameworks' structure. See MESOS-703 for progress on allowing
+ // these fields to be updated.
+ CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role());
+ CHECK_EQ(frameworks[frameworkId].checkpoint, frameworkInfo.checkpoint());
+
+ frameworks[frameworkId].revocable = false;
+
+ foreach (const FrameworkInfo::Capability& capability,
+ frameworkInfo.capabilities()) {
+ if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
+ frameworks[frameworkId].revocable = true;
+ }
+ }
+}
+
+
+void HierarchicalAllocatorProcess::addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Option<Unavailability>& unavailability,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used)
+{
+ CHECK(initialized);
+ CHECK(!slaves.contains(slaveId));
+
+ roleSorter->add(slaveId, total.unreserved());
+
+ foreachpair (const FrameworkID& frameworkId,
+ const Resources& allocated,
+ used) {
+ if (frameworks.contains(frameworkId)) {
+ const std::string& role = frameworks[frameworkId].role;
+
+ // TODO(bmahler): Validate that the reserved resources have the
+ // framework's role.
+
+ roleSorter->allocated(role, slaveId, allocated.unreserved());
+ frameworkSorters[role]->add(slaveId, allocated);
+ frameworkSorters[role]->allocated(
+ frameworkId.value(), slaveId, allocated);
+ }
+ }
+
+ slaves[slaveId] = Slave();
+ slaves[slaveId].total = total;
+ slaves[slaveId].allocated = Resources::sum(used);
+ slaves[slaveId].activated = true;
+ slaves[slaveId].checkpoint = slaveInfo.checkpoint();
+ slaves[slaveId].hostname = slaveInfo.hostname();
+
+ // NOTE: We currently implement maintenance in the allocator to be able to
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
+ if (unavailability.isSome()) {
+ slaves[slaveId].maintenance =
+ typename Slave::Maintenance(unavailability.get());
+ }
+
+ LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
+ << ") with " << slaves[slaveId].total
+ << " (allocated: " << slaves[slaveId].allocated << ")";
+
+ allocate(slaveId);
+}
+
+
+void HierarchicalAllocatorProcess::removeSlave(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ // TODO(bmahler): Per MESOS-621, this should remove the allocations
+ // that any frameworks have on this slave. Otherwise the caller may
+ // "leak" allocated resources accidentally if they forget to recover
+ // all the resources. Fixing this would require more information
+ // than what we currently track in the allocator.
+
+ roleSorter->remove(slaveId, slaves[slaveId].total.unreserved());
+
+ slaves.erase(slaveId);
+
+ // Note that we DO NOT actually delete any filters associated with
+ // this slave, that will occur when the delayed
+ // HierarchicalAllocatorProcess::expire gets invoked (or the framework
+ // that applied the filters gets removed).
+
+ LOG(INFO) << "Removed slave " << slaveId;
+}
+
+
+void HierarchicalAllocatorProcess::updateSlave(
+ const SlaveID& slaveId,
+ const Resources& oversubscribed)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ // Check that all the oversubscribed resources are revocable.
+ CHECK_EQ(oversubscribed, oversubscribed.revocable());
+
+ // Update the total resources.
+
+ // First remove the old oversubscribed resources from the total.
+ slaves[slaveId].total -= slaves[slaveId].total.revocable();
+
+ // Now add the new estimate of oversubscribed resources.
+ slaves[slaveId].total += oversubscribed;
+
+ // Now, update the total resources in the role sorter.
+ roleSorter->update(
+ slaveId,
+ slaves[slaveId].total.unreserved());
+
+ LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname << ")"
+ << " updated with oversubscribed resources " << oversubscribed
+ << " (total: " << slaves[slaveId].total
+ << ", allocated: " << slaves[slaveId].allocated << ")";
+
+ allocate(slaveId);
+}
+
+
+void HierarchicalAllocatorProcess::activateSlave(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ slaves[slaveId].activated = true;
+
+ LOG(INFO)<< "Slave " << slaveId << " reactivated";
+}
+
+
+void HierarchicalAllocatorProcess::deactivateSlave(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ slaves[slaveId].activated = false;
+
+ LOG(INFO) << "Slave " << slaveId << " deactivated";
+}
+
+
+void HierarchicalAllocatorProcess::updateWhitelist(
+ const Option<hashset<std::string>>& _whitelist)
+{
+ CHECK(initialized);
+
+ whitelist = _whitelist;
+
+ if (whitelist.isSome()) {
+ LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get());
+
+ if (whitelist.get().empty()) {
+ LOG(WARNING) << "Whitelist is empty, no offers will be made!";
+ }
+ } else {
+ LOG(INFO) << "Advertising offers for all slaves";
+ }
+}
+
+
+void HierarchicalAllocatorProcess::requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests)
+{
+ CHECK(initialized);
+
+ LOG(INFO) << "Received resource request from framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+ CHECK(frameworks.contains(frameworkId));
+
+ // Here we apply offer operations to the allocated resources, which
+ // in turns leads to an update of the total. The available resources
+ // remain unchanged.
+
+ // Update the allocated resources.
+ Sorter* frameworkSorter = frameworkSorters[frameworks[frameworkId].role];
+
+ Resources frameworkAllocation =
+ frameworkSorter->allocation(frameworkId.value(), slaveId);
+
+ Try<Resources> updatedFrameworkAllocation =
+ frameworkAllocation.apply(operations);
+
+ CHECK_SOME(updatedFrameworkAllocation);
+
+ frameworkSorter->update(
+ frameworkId.value(),
+ slaveId,
+ frameworkAllocation,
+ updatedFrameworkAllocation.get());
+
+ roleSorter->update(
+ frameworks[frameworkId].role,
+ slaveId,
+ frameworkAllocation.unreserved(),
+ updatedFrameworkAllocation.get().unreserved());
+
+ Try<Resources> updatedSlaveAllocation =
+ slaves[slaveId].allocated.apply(operations);
+
+ CHECK_SOME(updatedSlaveAllocation);
+
+ slaves[slaveId].allocated = updatedSlaveAllocation.get();
+
+ // Update the total resources.
+ Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ CHECK_SOME(updatedTotal);
+
+ slaves[slaveId].total = updatedTotal.get();
+
+ LOG(INFO) << "Updated allocation of framework " << frameworkId
+ << " on slave " << slaveId
+ << " from " << frameworkAllocation
+ << " to " << updatedFrameworkAllocation.get();
+}
+
+
+process::Future<Nothing>
+HierarchicalAllocatorProcess::updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
+
+ // It's possible for this 'apply' to fail here because a call to
+ // 'allocate' could have been enqueued by the allocator itself
+ // just before master's request to enqueue 'updateAvailable'
+ // arrives to the allocator.
+ //
+ // Master -------R------------
+ // \----+
+ // |
+ // Allocator --A-----A-U---A--
+ // \___/ \___/
+ //
+ // where A = allocate, R = reserve, U = updateAvailable
+ Try<Resources> updatedAvailable = available.apply(operations);
+ if (updatedAvailable.isError()) {
+ return process::Failure(updatedAvailable.error());
+ }
+
+ // Update the total resources.
+ Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ CHECK_SOME(updatedTotal);
+
+ slaves[slaveId].total = updatedTotal.get();
+
+ // Now, update the total resources in the role sorter.
+ roleSorter->update(slaveId, slaves[slaveId].total.unreserved());
+
+ return Nothing();
+}
+
+
+void HierarchicalAllocatorProcess::updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ // NOTE: We currently implement maintenance in the allocator to be able to
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
+
+ // We explicitly remove all filters for the inverse offers of this slave. We
+ // do this because we want to force frameworks to reassess the calculations
+ // they have made to respond to the inverse offer. Unavailability of a slave
+ // can have a large effect on failure domain calculations and inter-leaved
+ // unavailability schedules.
+ foreachvalue (Framework& framework, frameworks) {
+ framework.inverseOfferFilters.erase(slaveId);
+ }
+
+ // Remove any old unavailability.
+ slaves[slaveId].maintenance = None();
+
+ // If we have a new unavailability.
+ if (unavailability.isSome()) {
+ slaves[slaveId].maintenance =
+ typename Slave::Maintenance(unavailability.get());
+ }
+
+ allocate(slaveId);
+}
+
+
+void HierarchicalAllocatorProcess::updateInverseOffer(
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters)
+{
+ CHECK(initialized);
+ CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+ CHECK(slaves[slaveId].maintenance.isSome());
+
+ // NOTE: We currently implement maintenance in the allocator to be able to
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
+
+ // We use a reference by alias because we intend to modify the
+ // `maintenance` and to improve readability.
+ typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get();
+
+ // Only handle inverse offers that we currently have outstanding. If it is not
+ // currently outstanding this means it is old and can be safely ignored.
+ if (maintenance.offersOutstanding.contains(frameworkId)) {
+ // We always remove the outstanding offer so that we will send a new offer
+ // out the next time we schedule inverse offers.
+ maintenance.offersOutstanding.erase(frameworkId);
+
+ // If the response is `Some`, this means the framework responded. Otherwise
+ // if it is `None` the inverse offer timed out or was rescinded.
+ if (status.isSome()) {
+ // For now we don't allow frameworks to respond with `UNKNOWN`. The caller
+ // should guard against this. This goes against the pattern of not
+ // checking external invariants; however, the allocator and master are
+ // currently so tightly coupled that this check is valuable.
+ CHECK_NE(
+ status.get().status(),
+ mesos::master::InverseOfferStatus::UNKNOWN);
+
+ // If the framework responded, we update our state to match.
+ maintenance.statuses[frameworkId].CopyFrom(status.get());
+ }
+ }
+
+ // No need to install filters if `filters` is none.
+ if (filters.isNone()) {
+ return;
+ }
+
+ // Create a refused resource filter.
+ Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+ if (seconds.isError()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused inverse offer filter because the input value "
+ << "is invalid: " << seconds.error();
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ } else if (seconds.get() < Duration::zero()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused inverse offer filter because the input value "
+ << "is negative";
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ }
+
+ CHECK_SOME(seconds);
+
+ if (seconds.get() != Duration::zero()) {
+ VLOG(1) << "Framework " << frameworkId
+ << " filtered inverse offers from slave " << slaveId
+ << " for " << seconds.get();
+
+ // Create a new inverse offer filter and delay its expiration.
+ InverseOfferFilter* inverseOfferFilter =
+ new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
+
+ frameworks[frameworkId]
+ .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+
+ // We need to disambiguate the function call to pick the correct
+ // expire() overload.
+ void (Self::*expireInverseOffer)(
+ const FrameworkID&,
+ const SlaveID&,
+ InverseOfferFilter*) = &Self::expire;
+
+ delay(
+ seconds.get(),
+ self(),
+ expireInverseOffer,
+ frameworkId,
+ slaveId,
+ inverseOfferFilter);
+ }
+}
+
+
+process::Future<
+ hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
+HierarchicalAllocatorProcess::getInverseOfferStatuses()
+{
+ CHECK(initialized);
+
+ hashmap<
+ SlaveID,
+ hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result;
+
+ // Make a copy of the most recent statuses.
+ foreachpair (const SlaveID& id, const Slave& slave, slaves) {
+ if (slave.maintenance.isSome()) {
+ result[id] = slave.maintenance.get().statuses;
+ }
+ }
+
+ return result;
+}
+
+
+void HierarchicalAllocatorProcess::recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters)
+{
+ CHECK(initialized);
+
+ if (resources.empty()) {
+ return;
+ }
+
+ // Updated resources allocated to framework (if framework still
+ // exists, which it might not in the event that we dispatched
+ // Master::offer before we received
+ // MesosAllocatorProcess::removeFramework or
+ // MesosAllocatorProcess::deactivateFramework, in which case we will
+ // have already recovered all of its resources).
+ if (frameworks.contains(frameworkId)) {
+ const std::string& role = frameworks[frameworkId].role;
+
+ CHECK(frameworkSorters.contains(role));
+
+ if (frameworkSorters[role]->contains(frameworkId.value())) {
+ frameworkSorters[role]->unallocated(
+ frameworkId.value(), slaveId, resources);
+ frameworkSorters[role]->remove(slaveId, resources);
+ roleSorter->unallocated(role, slaveId, resources.unreserved());
+ }
+ }
+
+ // Update resources allocated on slave (if slave still exists,
+ // which it might not in the event that we dispatched Master::offer
+ // before we received Allocator::removeSlave).
+ if (slaves.contains(slaveId)) {
+ // NOTE: We cannot add the following CHECK due to the double
+ // precision errors. See MESOS-1187 for details.
+ // CHECK(slaves[slaveId].allocated.contains(resources));
+
+ slaves[slaveId].allocated -= resources;
+
+ LOG(INFO) << "Recovered " << resources
+ << " (total: " << slaves[slaveId].total
+ << ", allocated: " << slaves[slaveId].allocated
+ << ") on slave " << slaveId
+ << " from framework " << frameworkId;
+ }
+
+ // No need to install the filter if 'filters' is none.
+ if (filters.isNone()) {
+ return;
+ }
+
+ // No need to install the filter if slave/framework does not exist.
+ if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
+ return;
+ }
+
+ // Create a refused resources filter.
+ Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+ if (seconds.isError()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused resources filter because the input value "
+ << "is invalid: " << seconds.error();
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ } else if (seconds.get() < Duration::zero()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused resources filter because the input value "
+ << "is negative";
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ }
+
+ CHECK_SOME(seconds);
+
+ if (seconds.get() != Duration::zero()) {
+ VLOG(1) << "Framework " << frameworkId
+ << " filtered slave " << slaveId
+ << " for " << seconds.get();
+
+ // Create a new filter and delay its expiration.
+ OfferFilter* offerFilter = new RefusedOfferFilter(
+ resources,
+ process::Timeout::in(seconds.get()));
+
+ frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
+
+ // We need to disambiguate the function call to pick the correct
+ // expire() overload.
+ void (Self::*expireOffer)(
+ const FrameworkID&,
+ const SlaveID&,
+ OfferFilter*) = &Self::expire;
+
+ delay(
+ seconds.get(),
+ self(),
+ expireOffer,
+ frameworkId,
+ slaveId,
+ offerFilter);
+ }
+}
+
+
+void HierarchicalAllocatorProcess::suppressOffers(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+ frameworks[frameworkId].suppressed = true;
+
+ LOG(INFO) << "Suppressed offers for framework " << frameworkId;
+}
+
+
+void HierarchicalAllocatorProcess::reviveOffers(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ frameworks[frameworkId].offerFilters.clear();
+ frameworks[frameworkId].inverseOfferFilters.clear();
+ frameworks[frameworkId].suppressed = false;
+
+ // We delete each actual `OfferFilter` when
+ // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
+ // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
+ // address) could get reused and `HierarchicalAllocatorProcess::expire`
+ // would expire that filter too soon. Note that this only works
+ // right now because ALL Filter types "expire".
+
+ LOG(INFO) << "Removed offer filters for framework " << frameworkId;
+
+ allocate();
+}
+
+
+void HierarchicalAllocatorProcess::batch()
+{
+ allocate();
+ delay(allocationInterval, self(), &Self::batch);
+}
+
+
+void HierarchicalAllocatorProcess::allocate()
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ allocate(slaves.keys());
+
+ VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
+ << stopwatch.elapsed();
+}
+
+
+void HierarchicalAllocatorProcess::allocate(
+ const SlaveID& slaveId)
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ // TODO(bmahler): Add initializer list constructor for hashset.
+ hashset<SlaveID> slaves;
+ slaves.insert(slaveId);
+ allocate(slaves);
+
+ VLOG(1) << "Performed allocation for slave " << slaveId << " in "
+ << stopwatch.elapsed();
+}
+
+
+void HierarchicalAllocatorProcess::allocate(
+ const hashset<SlaveID>& slaveIds_)
+{
+ if (roleSorter->count() == 0) {
+ LOG(ERROR) << "No roles specified, cannot allocate resources!";
+ return;
+ }
+
+ // Compute the offerable resources, per framework:
+ // (1) For reserved resources on the slave, allocate these to a
+ // framework having the corresponding role.
+ // (2) For unreserved resources on the slave, allocate these
+ // to a framework of any role.
+ hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
+
+ // Randomize the order in which slaves' resources are allocated.
+ // TODO(vinod): Implement a smarter sorting algorithm.
+ std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
+ std::random_shuffle(slaveIds.begin(), slaveIds.end());
+
+ foreach (const SlaveID& slaveId, slaveIds) {
+ // Don't send offers for non-whitelisted and deactivated slaves.
+ if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) {
+ continue;
+ }
+
+ foreach (const std::string& role, roleSorter->sort()) {
+ foreach (const std::string& frameworkId_,
+ frameworkSorters[role]->sort()) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(frameworkId_);
+
+ // If the framework has suppressed offers, ignore.
+ if (frameworks[frameworkId].suppressed) {
+ continue;
+ }
+
+ // Calculate the currently available resources on the slave.
+ Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
+
+ // NOTE: Currently, frameworks are allowed to have '*' role.
+ // Calling reserved('*') returns an empty Resources object.
+ Resources resources = available.unreserved() + available.reserved(role);
+
+ // Remove revocable resources if the framework has not opted
+ // for them.
+ if (!frameworks[frameworkId].revocable) {
+ resources -= resources.revocable();
+ }
+
+ // If the resources are not allocatable, ignore.
+ if (!allocatable(resources)) {
+ continue;
+ }
+
+ // If the framework filters these resources, ignore.
+ if (isFiltered(frameworkId, slaveId, resources)) {
+ continue;
+ }
+
+ VLOG(2) << "Allocating " << resources << " on slave " << slaveId
+ << " to framework " << frameworkId;
+
+ // Note that we perform "coarse-grained" allocation,
+ // meaning that we always allocate the entire remaining
+ // slave resources to a single framework.
+ offerable[frameworkId][slaveId] = resources;
+ slaves[slaveId].allocated += resources;
+
+ // Reserved resources are only accounted for in the framework
+ // sorter, since the reserved resources are not shared across
+ // roles.
+ frameworkSorters[role]->add(slaveId, resources);
+ frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
+ roleSorter->allocated(role, slaveId, resources.unreserved());
+ }
+ }
+ }
+
+ if (offerable.empty()) {
+ VLOG(1) << "No resources available to allocate!";
+ } else {
+ // Now offer the resources to each framework.
+ foreachkey (const FrameworkID& frameworkId, offerable) {
+ offerCallback(frameworkId, offerable[frameworkId]);
+ }
+ }
+
+ // NOTE: For now, we implement maintenance inverse offers within the
+ // allocator. We leverage the existing timer/cycle of offers to also do any
+ // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
+ deallocate(slaveIds_);
+}
+
+
+void HierarchicalAllocatorProcess::deallocate(
+ const hashset<SlaveID>& slaveIds_)
+{
+ if (frameworkSorters.empty()) {
+ LOG(ERROR) << "No frameworks specified, cannot send inverse offers!";
+ return;
+ }
+
+ // In this case, `offerable` is actually the slaves and/or resources that we
+ // want the master to create `InverseOffer`s from.
+ hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
+
+ // For maintenance, we use the framework sorters to determine which frameworks
+ // have (1) reserved and / or (2) unreserved resource on the specified
+ // slaveIds. This way we only send inverse offers to frameworks that have the
+ // potential to lose something. We keep track of which frameworks already have
+ // an outstanding inverse offer for the given slave in the
+ // UnavailabilityStatus of the specific slave using the `offerOutstanding`
+ // flag. This is equivalent to the accounting we do for resources when we send
+ // regular offers. If we didn't keep track of outstanding offers then we would
+ // keep generating new inverse offers even though the framework had not
+ // responded yet.
+
+ foreachvalue (Sorter* frameworkSorter, frameworkSorters) {
+ foreach (const SlaveID& slaveId, slaveIds_) {
+ CHECK(slaves.contains(slaveId));
+
+ if (slaves[slaveId].maintenance.isSome()) {
+ // We use a reference by alias because we intend to modify the
+ // `maintenance` and to improve readability.
+ typename Slave::Maintenance& maintenance =
+ slaves[slaveId].maintenance.get();
+
+ hashmap<std::string, Resources> allocation =
+ frameworkSorter->allocation(slaveId);
+
+ foreachkey (const std::string& frameworkId_, allocation) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(frameworkId_);
+
+ // If this framework doesn't already have inverse offers for the
+ // specified slave.
+ if (!offerable[frameworkId].contains(slaveId)) {
+ // If there isn't already an outstanding inverse offer to this
+ // framework for the specified slave.
+ if (!maintenance.offersOutstanding.contains(frameworkId)) {
+ // Ignore in case the framework filters inverse offers for this
+ // slave.
+ // NOTE: Since this specific allocator implementation only sends
+ // inverse offers for maintenance primitives, and those are at the
+ // whole slave level, we only need to filter based on the
+ // time-out.
+ if (isFiltered(frameworkId, slaveId)) {
+ continue;
+ }
+
+ const UnavailableResources unavailableResources =
+ UnavailableResources{
+ Resources(),
+ maintenance.unavailability};
+
+ // For now we send inverse offers with empty resources when the
+ // inverse offer represents maintenance on the machine. In the
+ // future we could be more specific about the resources on the
+ // host, as we have the information available.
+ offerable[frameworkId][slaveId] = unavailableResources;
+
+ // Mark this framework as having an offer oustanding for the
+ // specified slave.
+ maintenance.offersOutstanding.insert(frameworkId);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (offerable.empty()) {
+ VLOG(1) << "No inverse offers to send out!";
+ } else {
+ // Now send inverse offers to each framework.
+ foreachkey (const FrameworkID& frameworkId, offerable) {
+ inverseOfferCallback(frameworkId, offerable[frameworkId]);
+ }
+ }
+}
+
+
+void HierarchicalAllocatorProcess::expire(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ OfferFilter* offerFilter)
+{
+ // The filter might have already been removed (e.g., if the
+ // framework no longer exists or in
+ // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+ // keep the address from getting reused possibly causing premature
+ // expiration).
+ if (frameworks.contains(frameworkId) &&
+ frameworks[frameworkId].offerFilters.contains(slaveId) &&
+ frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
+ frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
+ if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
+ frameworks[frameworkId].offerFilters.erase(slaveId);
+ }
+ }
+
+ delete offerFilter;
+}
+
+
+void HierarchicalAllocatorProcess::expire(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ InverseOfferFilter* inverseOfferFilter)
+{
+ // The filter might have already been removed (e.g., if the
+ // framework no longer exists or in
+ // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+ // keep the address from getting reused possibly causing premature
+ // expiration).
+ if (frameworks.contains(frameworkId) &&
+ frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
+ frameworks[frameworkId].inverseOfferFilters[slaveId]
+ .contains(inverseOfferFilter)) {
+ frameworks[frameworkId].inverseOfferFilters[slaveId]
+ .erase(inverseOfferFilter);
+
+ if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
+ frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+ }
+ }
+
+ delete inverseOfferFilter;
+}
+
+
+bool
+HierarchicalAllocatorProcess::isWhitelisted(
+ const SlaveID& slaveId)
+{
+ CHECK(slaves.contains(slaveId));
+
+ return whitelist.isNone() ||
+ whitelist.get().contains(slaves[slaveId].hostname);
+}
+
+
+bool
+HierarchicalAllocatorProcess::isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources)
+{
+ CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+
+ // Do not offer a non-checkpointing slave's resources to a checkpointing
+ // framework. This is a short term fix until the following is resolved:
+ // https://issues.apache.org/jira/browse/MESOS-444.
+ if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
+ VLOG(1) << "Filtered offer with " << resources
+ << " on non-checkpointing slave " << slaveId
+ << " for checkpointing framework " << frameworkId;
+
+ return true;
+ }
+
+ if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
+ foreach (
+ OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
+ if (offerFilter->filter(resources)) {
+ VLOG(1) << "Filtered offer with " << resources
+ << " on slave " << slaveId
+ << " for framework " << frameworkId;
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+
+bool HierarchicalAllocatorProcess::isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId)
+{
+ CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+
+ if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
+ foreach (
+ InverseOfferFilter* inverseOfferFilter,
+ frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+ if (inverseOfferFilter->filter()) {
+ VLOG(1) << "Filtered unavailability on slave " << slaveId
+ << " for framework " << frameworkId;
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+
+bool
+HierarchicalAllocatorProcess::allocatable(
+ const Resources& resources)
+{
+ Option<double> cpus = resources.cpus();
+ Option<Bytes> mem = resources.mem();
+
+ return (cpus.isSome() && cpus.get() >= MIN_CPUS) ||
+ (mem.isSome() && mem.get() >= MIN_MEM);
+}
+
+} // namespace internal {
+} // namespace allocator {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index d57c55e..e468b5a 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -19,26 +19,19 @@
#ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
#define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
-#include <algorithm>
-#include <vector>
+#include <string>
-#include <mesos/resources.hpp>
-#include <mesos/type_utils.hpp>
+#include <mesos/mesos.hpp>
-#include <process/event.hpp>
-#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/metrics/gauge.hpp>
#include <process/metrics/metrics.hpp>
-#include <process/timeout.hpp>
-#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
-#include <stout/stopwatch.hpp>
-#include <stout/stringify.hpp>
+#include <stout/option.hpp>
#include "master/allocator/mesos/allocator.hpp"
#include "master/allocator/sorter/drf/sorter.hpp"
@@ -50,10 +43,6 @@ namespace internal {
namespace master {
namespace allocator {
-// Forward declarations.
-class OfferFilter;
-class InverseOfferFilter;
-
// We forward declare the hierarchical allocator process so that we
// can typedef an instantiation of it with DRF sorters.
template <typename RoleSorter, typename FrameworkSorter>
@@ -66,16 +55,52 @@ typedef MesosAllocator<HierarchicalDRFAllocatorProcess>
HierarchicalDRFAllocator;
+namespace internal {
+
+// Forward declarations.
+class OfferFilter;
+class InverseOfferFilter;
+
+
+// A level of indirection that allows us to keep the allocator implementation
+// in an implementation file: `hierarchical.cpp`. This maps the static
+// templatization of `HierarchicalAllocatorProcess` to a polymorphic
+// implementation in the internal namespace.
+struct SorterFactoryBase
+{
+ virtual ~SorterFactoryBase() {}
+
+ virtual Sorter* createRoleSorter() const = 0;
+
+ virtual Sorter* createFrameworkSorter() const = 0;
+};
+
+
+template <typename RoleSorter, typename FrameworkSorter>
+struct SorterFactory : public SorterFactoryBase
+{
+ virtual Sorter* createRoleSorter() const
+ {
+ return new RoleSorter();
+ }
+
+ virtual Sorter* createFrameworkSorter() const
+ {
+ return new FrameworkSorter();
+ }
+};
+
+
// Implements the basic allocator algorithm - first pick a role by
// some criteria, then pick one of their frameworks to allocate to.
-template <typename RoleSorter, typename FrameworkSorter>
class HierarchicalAllocatorProcess : public MesosAllocatorProcess
{
public:
- HierarchicalAllocatorProcess()
+ HierarchicalAllocatorProcess(SorterFactoryBase* _sorterFactory)
: ProcessBase(process::ID::generate("hierarchical-allocator")),
initialized(false),
metrics(*this),
+ sorterFactory(_sorterFactory),
roleSorter(NULL) {}
virtual ~HierarchicalAllocatorProcess() {}
@@ -179,8 +204,8 @@ public:
protected:
// Useful typedefs for dispatch/delay/defer to self()/this.
- typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> Self;
- typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> This;
+ typedef HierarchicalAllocatorProcess Self;
+ typedef HierarchicalAllocatorProcess This;
// Callback for doing batch allocations.
void batch();
@@ -361,1215 +386,29 @@ protected:
// resources as regular resources when doing fairness calculations.
// TODO(vinod): Consider using a different fairness algorithm for
// oversubscribed resources.
- RoleSorter* roleSorter;
- hashmap<std::string, FrameworkSorter*> frameworkSorters;
-};
-
-
-// Used to represent "filters" for resources unused in offers.
-class OfferFilter
-{
-public:
- virtual ~OfferFilter() {}
-
- virtual bool filter(const Resources& resources) = 0;
-};
-
-
-class RefusedOfferFilter: public OfferFilter
-{
-public:
- RefusedOfferFilter(
- const Resources& _resources,
- const process::Timeout& _timeout)
- : resources(_resources), timeout(_timeout) {}
-
- virtual bool filter(const Resources& _resources)
- {
- // TODO(jieyu): Consider separating the superset check for regular
- // and revocable resources. For example, frameworks might want
- // more revocable resources only or non-revocable resources only,
- // but currently the filter only expires if there is more of both
- // revocable and non-revocable resources.
- return resources.contains(_resources) && // Refused resources are superset.
- timeout.remaining() > Seconds(0);
- }
-
- const Resources resources;
- const process::Timeout timeout;
+ SorterFactoryBase* sorterFactory;
+ Sorter* roleSorter;
+ hashmap<std::string, Sorter*> frameworkSorters;
};
-// Used to represent "filters" for inverse offers.
-// NOTE: Since this specific allocator implementation only sends inverse offers
-// for maintenance primitives, and those are at the whole slave level, we only
-// need to filter based on the time-out.
-// If this allocator implementation starts sending out more resource specific
-// inverse offers, then we can capture the `unavailableResources` in the filter
-// function.
-class InverseOfferFilter
-{
-public:
- virtual ~InverseOfferFilter() {}
-
- virtual bool filter() = 0;
-};
+} // namespace internal {
-// NOTE: See comment above `InverseOfferFilter` regarding capturing
-// `unavailableResources` if this allocator starts sending fine-grained inverse
-// offers.
-class RefusedInverseOfferFilter: public InverseOfferFilter
+// We map the templatized version of the `HierarchicalAllocatorProcess` to one
+// that relies on polymorphic sorters in the internal namespace. This allows us
+// to keep the implemention of the allocator in the implementation file.
+template <typename RoleSorter, typename FrameworkSorter>
+class HierarchicalAllocatorProcess
+ : public internal::HierarchicalAllocatorProcess
{
public:
- RefusedInverseOfferFilter(const process::Timeout& _timeout)
- : timeout(_timeout) {}
-
- virtual bool filter()
- {
- // See comment above why we currently don't do more fine-grained filtering.
- return timeout.remaining() > Seconds(0);
- }
-
- const process::Timeout timeout;
+ HierarchicalAllocatorProcess()
+ : internal::HierarchicalAllocatorProcess(
+ new internal::SorterFactory<RoleSorter, FrameworkSorter>()) {}
};
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
- const Duration& _allocationInterval,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& _offerCallback,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, UnavailableResources>&)>&
- _inverseOfferCallback,
- const hashmap<std::string, mesos::master::RoleInfo>& _roles)
-{
- allocationInterval = _allocationInterval;
- offerCallback = _offerCallback;
- inverseOfferCallback = _inverseOfferCallback;
- roles = _roles;
- initialized = true;
-
- roleSorter = new RoleSorter();
- foreachpair (
- const std::string& name, const mesos::master::RoleInfo& roleInfo, roles) {
- roleSorter->add(name, roleInfo.weight());
- frameworkSorters[name] = new FrameworkSorter();
- }
-
- if (roleSorter->count() == 0) {
- LOG(ERROR) << "No roles specified, cannot allocate resources!";
- }
-
- VLOG(1) << "Initialized hierarchical allocator process";
-
- delay(allocationInterval, self(), &Self::batch);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const hashmap<SlaveID, Resources>& used)
-{
- CHECK(initialized);
-
- const std::string& role = frameworkInfo.role();
-
- CHECK(roles.contains(role));
-
- CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
- frameworkSorters[role]->add(frameworkId.value());
-
- // TODO(bmahler): Validate that the reserved resources have the
- // framework's role.
-
- // Update the allocation to this framework.
- foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
- roleSorter->allocated(role, slaveId, allocated.unreserved());
- frameworkSorters[role]->add(slaveId, allocated);
- frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated);
- }
-
- frameworks[frameworkId] = Framework();
- frameworks[frameworkId].role = frameworkInfo.role();
- frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
-
- // Check if the framework desires revocable resources.
- frameworks[frameworkId].revocable = false;
- foreach (const FrameworkInfo::Capability& capability,
- frameworkInfo.capabilities()) {
- if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
- frameworks[frameworkId].revocable = true;
- }
- }
-
- frameworks[frameworkId].suppressed = false;
-
- LOG(INFO) << "Added framework " << frameworkId;
-
- allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
- const std::string& role = frameworks[frameworkId].role;
-
- // Might not be in 'frameworkSorters[role]' because it was previously
- // deactivated and never re-added.
- if (frameworkSorters[role]->contains(frameworkId.value())) {
- hashmap<SlaveID, Resources> allocation =
- frameworkSorters[role]->allocation(frameworkId.value());
-
- foreachpair (
- const SlaveID& slaveId, const Resources& allocated, allocation) {
- roleSorter->unallocated(role, slaveId, allocated.unreserved());
- frameworkSorters[role]->remove(slaveId, allocated);
- }
-
- frameworkSorters[role]->remove(frameworkId.value());
- }
-
- // Do not delete the filters contained in this
- // framework's `offerFilters` hashset yet, see comments in
- // HierarchicalAllocatorProcess::reviveOffers and
- // HierarchicalAllocatorProcess::expire.
- frameworks.erase(frameworkId);
-
- LOG(INFO) << "Removed framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateFramework(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
- const std::string& role = frameworks[frameworkId].role;
-
- frameworkSorters[role]->activate(frameworkId.value());
-
- LOG(INFO) << "Activated framework " << frameworkId;
-
- allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
- const std::string& role = frameworks[frameworkId].role;
-
- frameworkSorters[role]->deactivate(frameworkId.value());
-
- // Note that the Sorter *does not* remove the resources allocated
- // to this framework. For now, this is important because if the
- // framework fails over and is activated, we still want a record
- // of the resources that it is using. We might be able to collapse
- // the added/removed and activated/deactivated in the future.
-
- // Do not delete the filters contained in this
- // framework's `offerFilters` hashset yet, see comments in
- // HierarchicalAllocatorProcess::reviveOffers and
- // HierarchicalAllocatorProcess::expire.
- frameworks[frameworkId].offerFilters.clear();
- frameworks[frameworkId].inverseOfferFilters.clear();
-
- LOG(INFO) << "Deactivated framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
-
- // TODO(jmlvanre): Once we allow frameworks to re-register with a
- // new 'role' or 'checkpoint' flag, we need to update our internal
- // 'frameworks' structure. See MESOS-703 for progress on allowing
- // these fields to be updated.
- CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role());
- CHECK_EQ(frameworks[frameworkId].checkpoint, frameworkInfo.checkpoint());
-
- frameworks[frameworkId].revocable = false;
-
- foreach (const FrameworkInfo::Capability& capability,
- frameworkInfo.capabilities()) {
- if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
- frameworks[frameworkId].revocable = true;
- }
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Option<Unavailability>& unavailability,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used)
-{
- CHECK(initialized);
- CHECK(!slaves.contains(slaveId));
-
- roleSorter->add(slaveId, total.unreserved());
-
- foreachpair (const FrameworkID& frameworkId,
- const Resources& allocated,
- used) {
- if (frameworks.contains(frameworkId)) {
- const std::string& role = frameworks[frameworkId].role;
-
- // TODO(bmahler): Validate that the reserved resources have the
- // framework's role.
-
- roleSorter->allocated(role, slaveId, allocated.unreserved());
- frameworkSorters[role]->add(slaveId, allocated);
- frameworkSorters[role]->allocated(
- frameworkId.value(), slaveId, allocated);
- }
- }
-
- slaves[slaveId] = Slave();
- slaves[slaveId].total = total;
- slaves[slaveId].allocated = Resources::sum(used);
- slaves[slaveId].activated = true;
- slaves[slaveId].checkpoint = slaveInfo.checkpoint();
- slaves[slaveId].hostname = slaveInfo.hostname();
-
- // NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and OfferFilter.
- if (unavailability.isSome()) {
- slaves[slaveId].maintenance =
- typename Slave::Maintenance(unavailability.get());
- }
-
- LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
- << ") with " << slaves[slaveId].total
- << " (allocated: " << slaves[slaveId].allocated << ")";
-
- allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave(
- const SlaveID& slaveId)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- // TODO(bmahler): Per MESOS-621, this should remove the allocations
- // that any frameworks have on this slave. Otherwise the caller may
- // "leak" allocated resources accidentally if they forget to recover
- // all the resources. Fixing this would require more information
- // than what we currently track in the allocator.
-
- roleSorter->remove(slaveId, slaves[slaveId].total.unreserved());
-
- slaves.erase(slaveId);
-
- // Note that we DO NOT actually delete any filters associated with
- // this slave, that will occur when the delayed
- // HierarchicalAllocatorProcess::expire gets invoked (or the framework
- // that applied the filters gets removed).
-
- LOG(INFO) << "Removed slave " << slaveId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateSlave(
- const SlaveID& slaveId,
- const Resources& oversubscribed)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- // Check that all the oversubscribed resources are revocable.
- CHECK_EQ(oversubscribed, oversubscribed.revocable());
-
- // Update the total resources.
-
- // First remove the old oversubscribed resources from the total.
- slaves[slaveId].total -= slaves[slaveId].total.revocable();
-
- // Now add the new estimate of oversubscribed resources.
- slaves[slaveId].total += oversubscribed;
-
- // Now, update the total resources in the role sorter.
- roleSorter->update(
- slaveId,
- slaves[slaveId].total.unreserved());
-
- LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname << ")"
- << " updated with oversubscribed resources " << oversubscribed
- << " (total: " << slaves[slaveId].total
- << ", allocated: " << slaves[slaveId].allocated << ")";
-
- allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateSlave(
- const SlaveID& slaveId)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- slaves[slaveId].activated = true;
-
- LOG(INFO)<< "Slave " << slaveId << " reactivated";
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateSlave(
- const SlaveID& slaveId)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- slaves[slaveId].activated = false;
-
- LOG(INFO) << "Slave " << slaveId << " deactivated";
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateWhitelist(
- const Option<hashset<std::string>>& _whitelist)
-{
- CHECK(initialized);
-
- whitelist = _whitelist;
-
- if (whitelist.isSome()) {
- LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get());
-
- if (whitelist.get().empty()) {
- LOG(WARNING) << "Whitelist is empty, no offers will be made!";
- }
- } else {
- LOG(INFO) << "Advertising offers for all slaves";
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests)
-{
- CHECK(initialized);
-
- LOG(INFO) << "Received resource request from framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
- CHECK(frameworks.contains(frameworkId));
-
- // Here we apply offer operations to the allocated resources, which
- // in turns leads to an update of the total. The available resources
- // remain unchanged.
-
- // Update the allocated resources.
- FrameworkSorter* frameworkSorter =
- frameworkSorters[frameworks[frameworkId].role];
-
- Resources frameworkAllocation =
- frameworkSorter->allocation(frameworkId.value(), slaveId);
-
- Try<Resources> updatedFrameworkAllocation =
- frameworkAllocation.apply(operations);
-
- CHECK_SOME(updatedFrameworkAllocation);
-
- frameworkSorter->update(
- frameworkId.value(),
- slaveId,
- frameworkAllocation,
- updatedFrameworkAllocation.get());
-
- roleSorter->update(
- frameworks[frameworkId].role,
- slaveId,
- frameworkAllocation.unreserved(),
- updatedFrameworkAllocation.get().unreserved());
-
- Try<Resources> updatedSlaveAllocation =
- slaves[slaveId].allocated.apply(operations);
-
- CHECK_SOME(updatedSlaveAllocation);
-
- slaves[slaveId].allocated = updatedSlaveAllocation.get();
-
- // Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
- CHECK_SOME(updatedTotal);
-
- slaves[slaveId].total = updatedTotal.get();
-
- LOG(INFO) << "Updated allocation of framework " << frameworkId
- << " on slave " << slaveId
- << " from " << frameworkAllocation
- << " to " << updatedFrameworkAllocation.get();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-process::Future<Nothing>
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
-
- // It's possible for this 'apply' to fail here because a call to
- // 'allocate' could have been enqueued by the allocator itself
- // just before master's request to enqueue 'updateAvailable'
- // arrives to the allocator.
- //
- // Master -------R------------
- // \----+
- // |
- // Allocator --A-----A-U---A--
- // \___/ \___/
- //
- // where A = allocate, R = reserve, U = updateAvailable
- Try<Resources> updatedAvailable = available.apply(operations);
- if (updatedAvailable.isError()) {
- return process::Failure(updatedAvailable.error());
- }
-
- // Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
- CHECK_SOME(updatedTotal);
-
- slaves[slaveId].total = updatedTotal.get();
-
- // Now, update the total resources in the role sorter.
- roleSorter->update(slaveId, slaves[slaveId].total.unreserved());
-
- return Nothing();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
- const SlaveID& slaveId,
- const Option<Unavailability>& unavailability)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- // NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and OfferFilter.
-
- // We explicitly remove all filters for the inverse offers of this slave. We
- // do this because we want to force frameworks to reassess the calculations
- // they have made to respond to the inverse offer. Unavailability of a slave
- // can have a large effect on failure domain calculations and inter-leaved
- // unavailability schedules.
- foreachvalue (Framework& framework, frameworks) {
- framework.inverseOfferFilters.erase(slaveId);
- }
-
- // Remove any old unavailability.
- slaves[slaveId].maintenance = None();
-
- // If we have a new unavailability.
- if (unavailability.isSome()) {
- slaves[slaveId].maintenance =
- typename Slave::Maintenance(unavailability.get());
- }
-
- allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status,
- const Option<Filters>& filters)
-{
- CHECK(initialized);
- CHECK(frameworks.contains(frameworkId));
- CHECK(slaves.contains(slaveId));
- CHECK(slaves[slaveId].maintenance.isSome());
-
- // NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and OfferFilter.
-
- // We use a reference by alias because we intend to modify the
- // `maintenance` and to improve readability.
- typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get();
-
- // Only handle inverse offers that we currently have outstanding. If it is not
- // currently outstanding this means it is old and can be safely ignored.
- if (maintenance.offersOutstanding.contains(frameworkId)) {
- // We always remove the outstanding offer so that we will send a new offer
- // out the next time we schedule inverse offers.
- maintenance.offersOutstanding.erase(frameworkId);
-
- // If the response is `Some`, this means the framework responded. Otherwise
- // if it is `None` the inverse offer timed out or was rescinded.
- if (status.isSome()) {
- // For now we don't allow frameworks to respond with `UNKNOWN`. The caller
- // should guard against this. This goes against the pattern of not
- // checking external invariants; however, the allocator and master are
- // currently so tightly coupled that this check is valuable.
- CHECK_NE(
- status.get().status(),
- mesos::master::InverseOfferStatus::UNKNOWN);
-
- // If the framework responded, we update our state to match.
- maintenance.statuses[frameworkId].CopyFrom(status.get());
- }
- }
-
- // No need to install filters if `filters` is none.
- if (filters.isNone()) {
- return;
- }
-
- // Create a refused resource filter.
- Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
-
- if (seconds.isError()) {
- LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
- << "the refused inverse offer filter because the input value "
- << "is invalid: " << seconds.error();
-
- seconds = Duration::create(Filters().refuse_seconds());
- } else if (seconds.get() < Duration::zero()) {
- LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
- << "the refused inverse offer filter because the input value "
- << "is negative";
-
- seconds = Duration::create(Filters().refuse_seconds());
- }
-
- CHECK_SOME(seconds);
-
- if (seconds.get() != Duration::zero()) {
- VLOG(1) << "Framework " << frameworkId
- << " filtered inverse offers from slave " << slaveId
- << " for " << seconds.get();
-
- // Create a new inverse offer filter and delay its expiration.
- InverseOfferFilter* inverseOfferFilter =
- new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
-
- frameworks[frameworkId]
- .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
-
- // We need to disambiguate the function call to pick the correct
- // expire() overload.
- void (Self::*expireInverseOffer)(
- const FrameworkID&,
- const SlaveID&,
- InverseOfferFilter*) = &Self::expire;
-
- delay(
- seconds.get(),
- self(),
- expireInverseOffer,
- frameworkId,
- slaveId,
- inverseOfferFilter);
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-process::Future<
- hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
-HierarchicalAllocatorProcess<
- RoleSorter, FrameworkSorter>::getInverseOfferStatuses()
-{
- CHECK(initialized);
-
- hashmap<
- SlaveID,
- hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result;
-
- // Make a copy of the most recent statuses.
- foreachpair (const SlaveID& id, const Slave& slave, slaves) {
- if (slave.maintenance.isSome()) {
- result[id] = slave.maintenance.get().statuses;
- }
- }
-
- return result;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters)
-{
- CHECK(initialized);
-
- if (resources.empty()) {
- return;
- }
-
- // Updated resources allocated to framework (if framework still
- // exists, which it might not in the event that we dispatched
- // Master::offer before we received
- // MesosAllocatorProcess::removeFramework or
- // MesosAllocatorProcess::deactivateFramework, in which case we will
- // have already recovered all of its resources).
- if (frameworks.contains(frameworkId)) {
- const std::string& role = frameworks[frameworkId].role;
-
- CHECK(frameworkSorters.contains(role));
-
- if (frameworkSorters[role]->contains(frameworkId.value())) {
- frameworkSorters[role]->unallocated(
- frameworkId.value(), slaveId, resources);
- frameworkSorters[role]->remove(slaveId, resources);
- roleSorter->unallocated(role, slaveId, resources.unreserved());
- }
- }
-
- // Update resources allocated on slave (if slave still exists,
- // which it might not in the event that we dispatched Master::offer
- // before we received Allocator::removeSlave).
- if (slaves.contains(slaveId)) {
- // NOTE: We cannot add the following CHECK due to the double
- // precision errors. See MESOS-1187 for details.
- // CHECK(slaves[slaveId].allocated.contains(resources));
-
- slaves[slaveId].allocated -= resources;
-
- LOG(INFO) << "Recovered " << resources
- << " (total: " << slaves[slaveId].total
- << ", allocated: " << slaves[slaveId].allocated
- << ") on slave " << slaveId
- << " from framework " << frameworkId;
- }
-
- // No need to install the filter if 'filters' is none.
- if (filters.isNone()) {
- return;
- }
-
- // No need to install the filter if slave/framework does not exist.
- if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
- return;
- }
-
- // Create a refused resources filter.
- Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
-
- if (seconds.isError()) {
- LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
- << "the refused resources filter because the input value "
- << "is invalid: " << seconds.error();
-
- seconds = Duration::create(Filters().refuse_seconds());
- } else if (seconds.get() < Duration::zero()) {
- LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
- << "the refused resources filter because the input value "
- << "is negative";
-
- seconds = Duration::create(Filters().refuse_seconds());
- }
-
- CHECK_SOME(seconds);
-
- if (seconds.get() != Duration::zero()) {
- VLOG(1) << "Framework " << frameworkId
- << " filtered slave " << slaveId
- << " for " << seconds.get();
-
- // Create a new filter and delay its expiration.
- OfferFilter* offerFilter = new RefusedOfferFilter(
- resources,
- process::Timeout::in(seconds.get()));
-
- frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
-
- // We need to disambiguate the function call to pick the correct
- // expire() overload.
- void (Self::*expireOffer)(
- const FrameworkID&,
- const SlaveID&,
- OfferFilter*) = &Self::expire;
-
- delay(
- seconds.get(),
- self(),
- expireOffer,
- frameworkId,
- slaveId,
- offerFilter);
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::suppressOffers(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
- frameworks[frameworkId].suppressed = true;
-
- LOG(INFO) << "Suppressed offers for framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- frameworks[frameworkId].offerFilters.clear();
- frameworks[frameworkId].inverseOfferFilters.clear();
- frameworks[frameworkId].suppressed = false;
-
- // We delete each actual `OfferFilter` when
- // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
- // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
- // address) could get reused and `HierarchicalAllocatorProcess::expire`
- // would expire that filter too soon. Note that this only works
- // right now because ALL Filter types "expire".
-
- LOG(INFO) << "Removed offer filters for framework " << frameworkId;
-
- allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::batch()
-{
- allocate();
- delay(allocationInterval, self(), &Self::batch);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate()
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- allocate(slaves.keys());
-
- VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
- << stopwatch.elapsed();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
- const SlaveID& slaveId)
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- // TODO(bmahler): Add initializer list constructor for hashset.
- hashset<SlaveID> slaves;
- slaves.insert(slaveId);
- allocate(slaves);
-
- VLOG(1) << "Performed allocation for slave " << slaveId << " in "
- << stopwatch.elapsed();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
- const hashset<SlaveID>& slaveIds_)
-{
- if (roleSorter->count() == 0) {
- LOG(ERROR) << "No roles specified, cannot allocate resources!";
- return;
- }
-
- // Compute the offerable resources, per framework:
- // (1) For reserved resources on the slave, allocate these to a
- // framework having the corresponding role.
- // (2) For unreserved resources on the slave, allocate these
- // to a framework of any role.
- hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
-
- // Randomize the order in which slaves' resources are allocated.
- // TODO(vinod): Implement a smarter sorting algorithm.
- std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
- std::random_shuffle(slaveIds.begin(), slaveIds.end());
-
- foreach (const SlaveID& slaveId, slaveIds) {
- // Don't send offers for non-whitelisted and deactivated slaves.
- if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) {
- continue;
- }
-
- foreach (const std::string& role, roleSorter->sort()) {
- foreach (const std::string& frameworkId_,
- frameworkSorters[role]->sort()) {
- FrameworkID frameworkId;
- frameworkId.set_value(frameworkId_);
-
- // If the framework has suppressed offers, ignore.
- if (frameworks[frameworkId].suppressed) {
- continue;
- }
-
- // Calculate the currently available resources on the slave.
- Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
-
- // NOTE: Currently, frameworks are allowed to have '*' role.
- // Calling reserved('*') returns an empty Resources object.
- Resources resources = available.unreserved() + available.reserved(role);
-
- // Remove revocable resources if the framework has not opted
- // for them.
- if (!frameworks[frameworkId].revocable) {
- resources -= resources.revocable();
- }
-
- // If the resources are not allocatable, ignore.
- if (!allocatable(resources)) {
- continue;
- }
-
- // If the framework filters these resources, ignore.
- if (isFiltered(frameworkId, slaveId, resources)) {
- continue;
- }
-
- VLOG(2) << "Allocating " << resources << " on slave " << slaveId
- << " to framework " << frameworkId;
-
- // Note that we perform "coarse-grained" allocation,
- // meaning that we always allocate the entire remaining
- // slave resources to a single framework.
- offerable[frameworkId][slaveId] = resources;
- slaves[slaveId].allocated += resources;
-
- // Reserved resources are only accounted for in the framework
- // sorter, since the reserved resources are not shared across
- // roles.
- frameworkSorters[role]->add(slaveId, resources);
- frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
- roleSorter->allocated(role, slaveId, resources.unreserved());
- }
- }
- }
-
- if (offerable.empty()) {
- VLOG(1) << "No resources available to allocate!";
- } else {
- // Now offer the resources to each framework.
- foreachkey (const FrameworkID& frameworkId, offerable) {
- offerCallback(frameworkId, offerable[frameworkId]);
- }
- }
-
- // NOTE: For now, we implement maintenance inverse offers within the
- // allocator. We leverage the existing timer/cycle of offers to also do any
- // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
- deallocate(slaveIds_);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
- const hashset<SlaveID>& slaveIds_)
-{
- if (frameworkSorters.empty()) {
- LOG(ERROR) << "No frameworks specified, cannot send inverse offers!";
- return;
- }
-
- // In this case, `offerable` is actually the slaves and/or resources that we
- // want the master to create `InverseOffer`s from.
- hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
-
- // For maintenance, we use the framework sorters to determine which frameworks
- // have (1) reserved and / or (2) unreserved resource on the specified
- // slaveIds. This way we only send inverse offers to frameworks that have the
- // potential to lose something. We keep track of which frameworks already have
- // an outstanding inverse offer for the given slave in the
- // UnavailabilityStatus of the specific slave using the `offerOutstanding`
- // flag. This is equivalent to the accounting we do for resources when we send
- // regular offers. If we didn't keep track of outstanding offers then we would
- // keep generating new inverse offers even though the framework had not
- // responded yet.
-
- foreachvalue (FrameworkSorter* frameworkSorter, frameworkSorters) {
- foreach (const SlaveID& slaveId, slaveIds_) {
- CHECK(slaves.contains(slaveId));
-
- if (slaves[slaveId].maintenance.isSome()) {
- // We use a reference by alias because we intend to modify the
- // `maintenance` and to improve readability.
- typename Slave::Maintenance& maintenance =
- slaves[slaveId].maintenance.get();
-
- hashmap<std::string, Resources> allocation =
- frameworkSorter->allocation(slaveId);
-
- foreachkey (const std::string& frameworkId_, allocation) {
- FrameworkID frameworkId;
- frameworkId.set_value(frameworkId_);
-
- // If this framework doesn't already have inverse offers for the
- // specified slave.
- if (!offerable[frameworkId].contains(slaveId)) {
- // If there isn't already an outstanding inverse offer to this
- // framework for the specified slave.
- if (!maintenance.offersOutstanding.contains(frameworkId)) {
- // Ignore in case the framework filters inverse offers for this
- // slave.
- // NOTE: Since this specific allocator implementation only sends
- // inverse offers for maintenance primitives, and those are at the
- // whole slave level, we only need to filter based on the
- // time-out.
- if (isFiltered(frameworkId, slaveId)) {
- continue;
- }
-
- const UnavailableResources unavailableResources =
- UnavailableResources{
- Resources(),
- maintenance.unavailability};
-
- // For now we send inverse offers with empty resources when the
- // inverse offer represents maintenance on the machine. In the
- // future we could be more specific about the resources on the
- // host, as we have the information available.
- offerable[frameworkId][slaveId] = unavailableResources;
-
- // Mark this framework as having an offer oustanding for the
- // specified slave.
- maintenance.offersOutstanding.insert(frameworkId);
- }
- }
- }
- }
- }
- }
-
- if (offerable.empty()) {
- VLOG(1) << "No inverse offers to send out!";
- } else {
- // Now send inverse offers to each framework.
- foreachkey (const FrameworkID& frameworkId, offerable) {
- inverseOfferCallback(frameworkId, offerable[frameworkId]);
- }
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- OfferFilter* offerFilter)
-{
- // The filter might have already been removed (e.g., if the
- // framework no longer exists or in
- // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
- // keep the address from getting reused possibly causing premature
- // expiration).
- if (frameworks.contains(frameworkId) &&
- frameworks[frameworkId].offerFilters.contains(slaveId) &&
- frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
- frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
- if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
- frameworks[frameworkId].offerFilters.erase(slaveId);
- }
- }
-
- delete offerFilter;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- InverseOfferFilter* inverseOfferFilter)
-{
- // The filter might have already been removed (e.g., if the
- // framework no longer exists or in
- // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
- // keep the address from getting reused possibly causing premature
- // expiration).
- if (frameworks.contains(frameworkId) &&
- frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
- frameworks[frameworkId].inverseOfferFilters[slaveId]
- .contains(inverseOfferFilter)) {
- frameworks[frameworkId].inverseOfferFilters[slaveId]
- .erase(inverseOfferFilter);
-
- if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
- frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
- }
- }
-
- delete inverseOfferFilter;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
- const SlaveID& slaveId)
-{
- CHECK(slaves.contains(slaveId));
-
- return whitelist.isNone() ||
- whitelist.get().contains(slaves[slaveId].hostname);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources)
-{
- CHECK(frameworks.contains(frameworkId));
- CHECK(slaves.contains(slaveId));
-
- // Do not offer a non-checkpointing slave's resources to a checkpointing
- // framework. This is a short term fix until the following is resolved:
- // https://issues.apache.org/jira/browse/MESOS-444.
- if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
- VLOG(1) << "Filtered offer with " << resources
- << " on non-checkpointing slave " << slaveId
- << " for checkpointing framework " << frameworkId;
-
- return true;
- }
-
- if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
- foreach (
- OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
- if (offerFilter->filter(resources)) {
- VLOG(1) << "Filtered offer with " << resources
- << " on slave " << slaveId
- << " for framework " << frameworkId;
-
- return true;
- }
- }
- }
-
- return false;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId)
-{
- CHECK(frameworks.contains(frameworkId));
- CHECK(slaves.contains(slaveId));
-
- if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
- foreach (
- InverseOfferFilter* inverseOfferFilter,
- frameworks[frameworkId].inverseOfferFilters[slaveId]) {
- if (inverseOfferFilter->filter()) {
- VLOG(1) << "Filtered unavailability on slave " << slaveId
- << " for framework " << frameworkId;
-
- return true;
- }
- }
- }
-
- return false;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
- const Resources& resources)
-{
- Option<double> cpus = resources.cpus();
- Option<Bytes> mem = resources.mem();
-
- return (cpus.isSome() && cpus.get() >= MIN_CPUS) ||
- (mem.isSome() && mem.get() >= MIN_MEM);
-}
-
} // namespace allocator {
} // namespace master {
} // namespace internal {