You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/20 20:37:37 UTC
[3/5] mesos git commit: Added fitler support for Inverse Offers.
Added fitler support for Inverse Offers.
Review: https://reviews.apache.org/r/38324
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d03297a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d03297a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d03297a
Branch: refs/heads/master
Commit: 9d03297a9064dcde3ec920db4ef66003b4d323da
Parents: eec3fec
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:24:35 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:15 2015 -0400
----------------------------------------------------------------------
include/mesos/master/allocator.hpp | 3 +-
src/master/allocator/mesos/allocator.hpp | 12 +-
src/master/allocator/mesos/hierarchical.hpp | 205 +++++++++++++++++++++--
src/master/master.cpp | 6 +-
src/tests/mesos.hpp | 11 +-
5 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 7301058..3fea47f 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -160,7 +160,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<InverseOfferStatus>& status) = 0;
+ const Option<InverseOfferStatus>& status,
+ const Option<Filters>& filters = None()) = 0;
// Informs the Allocator to recover resources that are considered
// used by the framework.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 4f02dd1..904dc62 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -120,7 +120,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status);
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters);
void recoverResources(
const FrameworkID& frameworkId,
@@ -228,7 +229,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status) = 0;
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters = None()) = 0;
virtual void recoverResources(
const FrameworkID& frameworkId,
@@ -489,7 +491,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status)
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters)
{
return process::dispatch(
process,
@@ -497,7 +500,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
slaveId,
frameworkId,
unavailableResources,
- status);
+ status,
+ filters);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index a4c4107..d3496bc 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -52,7 +52,7 @@ namespace allocator {
// Forward declarations.
class OfferFilter;
-
+class InverseOfferFilter;
// We forward declare the hierarchical allocator process so that we
// can typedef an instantiation of it with DRF sorters.
@@ -158,7 +158,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status);
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters);
void recoverResources(
const FrameworkID& frameworkId,
@@ -192,12 +193,18 @@ protected:
// Send inverse offers from the specified slaves.
void deallocate(const hashset<SlaveID>& slaveIds);
- // Remove a filter for the specified framework.
+ // Remove an offer filter for the specified framework.
void expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
OfferFilter* offerFilter);
+ // Remove an inverse offer filter for the specified framework.
+ void expire(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ InverseOfferFilter* inverseOfferFilter);
+
// Checks whether the slave is whitelisted.
bool isWhitelisted(const SlaveID& slaveId);
@@ -208,6 +215,12 @@ protected:
const SlaveID& slaveId,
const Resources& resources);
+ // Returns true if there is an inverse offer filter for this framework
+ // on this slave.
+ bool isFiltered(
+ const FrameworkID& frameworkID,
+ const SlaveID& slaveID);
+
bool allocatable(const Resources& resources);
bool initialized;
@@ -251,8 +264,9 @@ protected:
// Whether the framework desires revocable resources.
bool revocable;
- // Active filters on offers for the framework.
+ // Active offer and inverse offer filters for the framework.
hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
+ hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
};
double _event_queue_dispatches()
@@ -382,6 +396,41 @@ public:
};
+// Used to represent "filters" for inverse offers.
+// NOTE: Since this specific allocator implementation only sends inverse offers
+// for maintenance primitives, and those are at the whole slave level, we only
+// need to filter based on the time-out.
+// If this allocator implementation starts sending out more resource specific
+// inverse offers, then we can capture the `unavailableResources` in the filter
+// function.
+class InverseOfferFilter
+{
+public:
+ virtual ~InverseOfferFilter() {}
+
+ virtual bool filter() = 0;
+};
+
+
+// NOTE: See comment above `InverseOfferFilter` regarding capturing
+// `unavailableResources` if this allocator starts sending fine-grained inverse
+// offers.
+class RefusedInverseOfferFilter: public InverseOfferFilter
+{
+public:
+ RefusedInverseOfferFilter(const process::Timeout& _timeout)
+ : timeout(_timeout) {}
+
+ virtual bool filter()
+ {
+ // See comment above why we currently don't do more fine-grained filtering.
+ return timeout.remaining() > Seconds(0);
+ }
+
+ const process::Timeout timeout;
+};
+
+
template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
@@ -541,6 +590,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
frameworks[frameworkId].offerFilters.clear();
+ frameworks[frameworkId].inverseOfferFilters.clear();
LOG(INFO) << "Deactivated framework " << frameworkId;
}
@@ -860,6 +910,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
+ // We explicitly remove all filters for the inverse offers of this slave. We
+ // do this because we want to force frameworks to reassess the calculations
+ // they have made to respond to the inverse offer. Unavailability of a slave
+ // can have a large effect on failure domain calculations and inter-leaved
+ // unavailability schedules.
+ foreachvalue (Framework& framework, frameworks) {
+ framework.inverseOfferFilters.erase(slaveId);
+ }
+
// Remove any old unavailability.
slaves[slaveId].maintenance = None();
@@ -879,7 +938,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status)
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
@@ -915,6 +975,58 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
maintenance.statuses[frameworkId].CopyFrom(status.get());
}
}
+
+ // No need to install filters if `filters` is none.
+ if (filters.isNone()) {
+ return;
+ }
+
+ // Create a refused resource filter.
+ Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+ if (seconds.isError()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused inverse offer filter because the input value "
+ << "is invalid: " << seconds.error();
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ } else if (seconds.get() < Duration::zero()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused inverse offer filter because the input value "
+ << "is negative";
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ }
+
+ CHECK_SOME(seconds);
+
+ if (seconds.get() != Duration::zero()) {
+ VLOG(1) << "Framework " << frameworkId
+ << " filtered inverse offers from slave " << slaveId
+ << " for " << seconds.get();
+
+ // Create a new inverse offer filter and delay its expiration.
+ InverseOfferFilter* inverseOfferFilter =
+ new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
+
+ frameworks[frameworkId]
+ .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+
+ // We need to disambiguate the function call to pick the correct
+ // expire() overload.
+ void (Self::*expireInverseOffer)(
+ const FrameworkID&,
+ const SlaveID&,
+ InverseOfferFilter*) = &Self::expire;
+
+ delay(
+ seconds.get(),
+ self(),
+ expireInverseOffer,
+ frameworkId,
+ slaveId,
+ inverseOfferFilter);
+ }
}
@@ -1009,10 +1121,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
+ // We need to disambiguate the function call to pick the correct
+ // expire() overload.
+ void (Self::*expireOffer)(
+ const FrameworkID&,
+ const SlaveID&,
+ OfferFilter*) = &Self::expire;
+
delay(
seconds.get(),
self(),
- &Self::expire,
+ expireOffer,
frameworkId,
slaveId,
offerFilter);
@@ -1038,6 +1157,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
CHECK(initialized);
frameworks[frameworkId].offerFilters.clear();
+ frameworks[frameworkId].inverseOfferFilters.clear();
frameworks[frameworkId].quiesced = false;
// We delete each actual `OfferFilter` when
@@ -1239,14 +1359,26 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
// If there isn't already an outstanding inverse offer to this
// framework for the specified slave.
if (!maintenance.offersOutstanding.contains(frameworkId)) {
+ // Ignore in case the framework filters inverse offers for this
+ // slave.
+ // NOTE: Since this specific allocator implementation only sends
+ // inverse offers for maintenance primitives, and those are at the
+ // whole slave level, we only need to filter based on the
+ // time-out.
+ if (isFiltered(frameworkId, slaveId)) {
+ continue;
+ }
+
+ const UnavailableResources unavailableResources =
+ UnavailableResources{
+ Resources(),
+ maintenance.unavailability};
+
// For now we send inverse offers with empty resources when the
// inverse offer represents maintenance on the machine. In the
// future we could be more specific about the resources on the
// host, as we have the information available.
- offerable[frameworkId][slaveId] =
- UnavailableResources{
- Resources(),
- maintenance.unavailability};
+ offerable[frameworkId][slaveId] = unavailableResources;
// Mark this framework as having an offer oustanding for the
// specified slave.
@@ -1295,6 +1427,34 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ InverseOfferFilter* inverseOfferFilter)
+{
+ // The filter might have already been removed (e.g., if the
+ // framework no longer exists or in
+ // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+ // keep the address from getting reused possibly causing premature
+ // expiration).
+ if (frameworks.contains(frameworkId) &&
+ frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
+ frameworks[frameworkId].inverseOfferFilters[slaveId]
+ .contains(inverseOfferFilter)) {
+ frameworks[frameworkId].inverseOfferFilters[slaveId]
+ .erase(inverseOfferFilter);
+
+ if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
+ frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+ }
+ }
+
+ delete inverseOfferFilter;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
const SlaveID& slaveId)
@@ -1345,6 +1505,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
template <class RoleSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId)
+{
+ CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+
+ if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
+ foreach (
+ InverseOfferFilter* inverseOfferFilter,
+ frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+ if (inverseOfferFilter->filter()) {
+ VLOG(1) << "Filtered unavailability on slave " << slaveId
+ << " for framework " << frameworkId;
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
const Resources& resources)
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5393ee8..6c0db21 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2850,7 +2850,8 @@ void Master::accept(
UnavailableResources{
inverseOffer->resources(),
inverseOffer->unavailability()},
- status);
+ status,
+ accept.filters());
removeInverseOffer(inverseOffer);
continue;
@@ -3324,7 +3325,8 @@ void Master::decline(
UnavailableResources{
inverseOffer->resources(),
inverseOffer->unavailability()},
- status);
+ status,
+ decline.filters());
removeInverseOffer(inverseOffer);
continue;
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index dd587bb..e1c0635 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
ACTION_P(InvokeUpdateInverseOffer, allocator)
{
- return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3);
+ return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3, arg4);
}
@@ -1499,9 +1499,9 @@ public:
EXPECT_CALL(*this, updateUnavailability(_, _))
.WillRepeatedly(DoDefault());
- ON_CALL(*this, updateInverseOffer(_, _, _, _))
+ ON_CALL(*this, updateInverseOffer(_, _, _, _, _))
.WillByDefault(InvokeUpdateInverseOffer(this));
- EXPECT_CALL(*this, updateInverseOffer(_, _, _, _))
+ EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _))
.WillRepeatedly(DoDefault());
ON_CALL(*this, recoverResources(_, _, _, _))
@@ -1590,11 +1590,12 @@ public:
const SlaveID&,
const Option<Unavailability>&));
- MOCK_METHOD4(updateInverseOffer, void(
+ MOCK_METHOD5(updateInverseOffer, void(
const SlaveID&,
const FrameworkID&,
const Option<UnavailableResources>&,
- const Option<mesos::master::InverseOfferStatus>&));
+ const Option<mesos::master::InverseOfferStatus>&,
+ const Option<Filters>&));
MOCK_METHOD4(recoverResources, void(
const FrameworkID&,