You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/20 20:37:37 UTC

[3/5] mesos git commit: Added fitler support for Inverse Offers.

Added fitler support for Inverse Offers.

Review: https://reviews.apache.org/r/38324


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d03297a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d03297a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d03297a

Branch: refs/heads/master
Commit: 9d03297a9064dcde3ec920db4ef66003b4d323da
Parents: eec3fec
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:24:35 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:15 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |   3 +-
 src/master/allocator/mesos/allocator.hpp    |  12 +-
 src/master/allocator/mesos/hierarchical.hpp | 205 +++++++++++++++++++++--
 src/master/master.cpp                       |   6 +-
 src/tests/mesos.hpp                         |  11 +-
 5 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 7301058..3fea47f 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -160,7 +160,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<InverseOfferStatus>& status) = 0;
+      const Option<InverseOfferStatus>& status,
+      const Option<Filters>& filters = None()) = 0;
 
   // Informs the Allocator to recover resources that are considered
   // used by the framework.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 4f02dd1..904dc62 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -120,7 +120,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<mesos::master::InverseOfferStatus>& status);
+      const Option<mesos::master::InverseOfferStatus>& status,
+      const Option<Filters>& filters);
 
   void recoverResources(
       const FrameworkID& frameworkId,
@@ -228,7 +229,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<mesos::master::InverseOfferStatus>& status) = 0;
+      const Option<mesos::master::InverseOfferStatus>& status,
+      const Option<Filters>& filters = None()) = 0;
 
   virtual void recoverResources(
       const FrameworkID& frameworkId,
@@ -489,7 +491,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
     const Option<UnavailableResources>& unavailableResources,
-    const Option<mesos::master::InverseOfferStatus>& status)
+    const Option<mesos::master::InverseOfferStatus>& status,
+    const Option<Filters>& filters)
 {
   return process::dispatch(
       process,
@@ -497,7 +500,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
       slaveId,
       frameworkId,
       unavailableResources,
-      status);
+      status,
+      filters);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index a4c4107..d3496bc 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -52,7 +52,7 @@ namespace allocator {
 
 // Forward declarations.
 class OfferFilter;
-
+class InverseOfferFilter;
 
 // We forward declare the hierarchical allocator process so that we
 // can typedef an instantiation of it with DRF sorters.
@@ -158,7 +158,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<mesos::master::InverseOfferStatus>& status);
+      const Option<mesos::master::InverseOfferStatus>& status,
+      const Option<Filters>& filters);
 
   void recoverResources(
       const FrameworkID& frameworkId,
@@ -192,12 +193,18 @@ protected:
   // Send inverse offers from the specified slaves.
   void deallocate(const hashset<SlaveID>& slaveIds);
 
-  // Remove a filter for the specified framework.
+  // Remove an offer filter for the specified framework.
   void expire(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
       OfferFilter* offerFilter);
 
+  // Remove an inverse offer filter for the specified framework.
+  void expire(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      InverseOfferFilter* inverseOfferFilter);
+
   // Checks whether the slave is whitelisted.
   bool isWhitelisted(const SlaveID& slaveId);
 
@@ -208,6 +215,12 @@ protected:
       const SlaveID& slaveId,
       const Resources& resources);
 
+  // Returns true if there is an inverse offer filter for this framework
+  // on this slave.
+  bool isFiltered(
+      const FrameworkID& frameworkID,
+      const SlaveID& slaveID);
+
   bool allocatable(const Resources& resources);
 
   bool initialized;
@@ -251,8 +264,9 @@ protected:
     // Whether the framework desires revocable resources.
     bool revocable;
 
-    // Active filters on offers for the framework.
+    // Active offer and inverse offer filters for the framework.
     hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
+    hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
   };
 
   double _event_queue_dispatches()
@@ -382,6 +396,41 @@ public:
 };
 
 
+// Used to represent "filters" for inverse offers.
+// NOTE: Since this specific allocator implementation only sends inverse offers
+// for maintenance primitives, and those are at the whole slave level, we only
+// need to filter based on the time-out.
+// If this allocator implementation starts sending out more resource specific
+// inverse offers, then we can capture the `unavailableResources` in the filter
+// function.
+class InverseOfferFilter
+{
+public:
+  virtual ~InverseOfferFilter() {}
+
+  virtual bool filter() = 0;
+};
+
+
+// NOTE: See comment above `InverseOfferFilter` regarding capturing
+// `unavailableResources` if this allocator starts sending fine-grained inverse
+// offers.
+class RefusedInverseOfferFilter: public InverseOfferFilter
+{
+public:
+  RefusedInverseOfferFilter(const process::Timeout& _timeout)
+    : timeout(_timeout) {}
+
+  virtual bool filter()
+  {
+    // See comment above why we currently don't do more fine-grained filtering.
+    return timeout.remaining() > Seconds(0);
+  }
+
+  const process::Timeout timeout;
+};
+
+
 template <class RoleSorter, class FrameworkSorter>
 void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
@@ -541,6 +590,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
   // HierarchicalAllocatorProcess::reviveOffers and
   // HierarchicalAllocatorProcess::expire.
   frameworks[frameworkId].offerFilters.clear();
+  frameworks[frameworkId].inverseOfferFilters.clear();
 
   LOG(INFO) << "Deactivated framework " << frameworkId;
 }
@@ -860,6 +910,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
   // NOTE: We currently implement maintenance in the allocator to be able to
   // leverage state and features such as the FrameworkSorter and OfferFilter.
 
+  // We explicitly remove all filters for the inverse offers of this slave. We
+  // do this because we want to force frameworks to reassess the calculations
+  // they have made to respond to the inverse offer. Unavailability of a slave
+  // can have a large effect on failure domain calculations and inter-leaved
+  // unavailability schedules.
+  foreachvalue (Framework& framework, frameworks) {
+    framework.inverseOfferFilters.erase(slaveId);
+  }
+
   // Remove any old unavailability.
   slaves[slaveId].maintenance = None();
 
@@ -879,7 +938,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
     const Option<UnavailableResources>& unavailableResources,
-    const Option<mesos::master::InverseOfferStatus>& status)
+    const Option<mesos::master::InverseOfferStatus>& status,
+    const Option<Filters>& filters)
 {
   CHECK(initialized);
   CHECK(frameworks.contains(frameworkId));
@@ -915,6 +975,58 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
       maintenance.statuses[frameworkId].CopyFrom(status.get());
     }
   }
+
+  // No need to install filters if `filters` is none.
+  if (filters.isNone()) {
+    return;
+  }
+
+  // Create a refused resource filter.
+  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+  if (seconds.isError()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused inverse offer filter because the input value "
+                 << "is invalid: " << seconds.error();
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  } else if (seconds.get() < Duration::zero()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused inverse offer filter because the input value "
+                 << "is negative";
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  }
+
+  CHECK_SOME(seconds);
+
+  if (seconds.get() != Duration::zero()) {
+    VLOG(1) << "Framework " << frameworkId
+            << " filtered inverse offers from slave " << slaveId
+            << " for " << seconds.get();
+
+    // Create a new inverse offer filter and delay its expiration.
+    InverseOfferFilter* inverseOfferFilter =
+      new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
+
+    frameworks[frameworkId]
+      .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+
+    // We need to disambiguate the function call to pick the correct
+    // expire() overload.
+    void (Self::*expireInverseOffer)(
+             const FrameworkID&,
+             const SlaveID&,
+             InverseOfferFilter*) = &Self::expire;
+
+    delay(
+        seconds.get(),
+        self(),
+        expireInverseOffer,
+        frameworkId,
+        slaveId,
+        inverseOfferFilter);
+  }
 }
 
 
@@ -1009,10 +1121,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
 
     frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
 
+    // We need to disambiguate the function call to pick the correct
+    // expire() overload.
+    void (Self::*expireOffer)(
+              const FrameworkID&,
+              const SlaveID&,
+              OfferFilter*) = &Self::expire;
+
     delay(
         seconds.get(),
         self(),
-        &Self::expire,
+        expireOffer,
         frameworkId,
         slaveId,
         offerFilter);
@@ -1038,6 +1157,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
   CHECK(initialized);
 
   frameworks[frameworkId].offerFilters.clear();
+  frameworks[frameworkId].inverseOfferFilters.clear();
   frameworks[frameworkId].quiesced = false;
 
   // We delete each actual `OfferFilter` when
@@ -1239,14 +1359,26 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
             // If there isn't already an outstanding inverse offer to this
             // framework for the specified slave.
             if (!maintenance.offersOutstanding.contains(frameworkId)) {
+              // Ignore in case the framework filters inverse offers for this
+              // slave.
+              // NOTE: Since this specific allocator implementation only sends
+              // inverse offers for maintenance primitives, and those are at the
+              // whole slave level, we only need to filter based on the
+              // time-out.
+              if (isFiltered(frameworkId, slaveId)) {
+                continue;
+              }
+
+              const UnavailableResources unavailableResources =
+                UnavailableResources{
+                    Resources(),
+                    maintenance.unavailability};
+
               // For now we send inverse offers with empty resources when the
               // inverse offer represents maintenance on the machine. In the
               // future we could be more specific about the resources on the
               // host, as we have the information available.
-              offerable[frameworkId][slaveId] =
-                UnavailableResources{
-                    Resources(),
-                    maintenance.unavailability};
+              offerable[frameworkId][slaveId] = unavailableResources;
 
               // Mark this framework as having an offer oustanding for the
               // specified slave.
@@ -1295,6 +1427,34 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
 
 
 template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    InverseOfferFilter* inverseOfferFilter)
+{
+  // The filter might have already been removed (e.g., if the
+  // framework no longer exists or in
+  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+  // keep the address from getting reused possibly causing premature
+  // expiration).
+  if (frameworks.contains(frameworkId) &&
+      frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
+      frameworks[frameworkId].inverseOfferFilters[slaveId]
+        .contains(inverseOfferFilter)) {
+    frameworks[frameworkId].inverseOfferFilters[slaveId]
+      .erase(inverseOfferFilter);
+
+    if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
+      frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+    }
+  }
+
+  delete inverseOfferFilter;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
 bool
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
     const SlaveID& slaveId)
@@ -1345,6 +1505,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
 
 
 template <class RoleSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId)
+{
+  CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+
+  if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
+    foreach (
+        InverseOfferFilter* inverseOfferFilter,
+        frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+      if (inverseOfferFilter->filter()) {
+        VLOG(1) << "Filtered unavailability on slave " << slaveId
+                << " for framework " << frameworkId;
+
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
 bool
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
     const Resources& resources)

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5393ee8..6c0db21 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2850,7 +2850,8 @@ void Master::accept(
             UnavailableResources{
                 inverseOffer->resources(),
                 inverseOffer->unavailability()},
-            status);
+            status,
+            accept.filters());
 
         removeInverseOffer(inverseOffer);
         continue;
@@ -3324,7 +3325,8 @@ void Master::decline(
           UnavailableResources{
               inverseOffer->resources(),
               inverseOffer->unavailability()},
-          status);
+          status,
+          decline.filters());
 
       removeInverseOffer(inverseOffer);
       continue;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index dd587bb..e1c0635 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
 
 ACTION_P(InvokeUpdateInverseOffer, allocator)
 {
-  return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3);
+  return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3, arg4);
 }
 
 
@@ -1499,9 +1499,9 @@ public:
     EXPECT_CALL(*this, updateUnavailability(_, _))
       .WillRepeatedly(DoDefault());
 
-    ON_CALL(*this, updateInverseOffer(_, _, _, _))
+    ON_CALL(*this, updateInverseOffer(_, _, _, _, _))
       .WillByDefault(InvokeUpdateInverseOffer(this));
-    EXPECT_CALL(*this, updateInverseOffer(_, _, _, _))
+    EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _))
       .WillRepeatedly(DoDefault());
 
     ON_CALL(*this, recoverResources(_, _, _, _))
@@ -1590,11 +1590,12 @@ public:
       const SlaveID&,
       const Option<Unavailability>&));
 
-  MOCK_METHOD4(updateInverseOffer, void(
+  MOCK_METHOD5(updateInverseOffer, void(
       const SlaveID&,
       const FrameworkID&,
       const Option<UnavailableResources>&,
-      const Option<mesos::master::InverseOfferStatus>&));
+      const Option<mesos::master::InverseOfferStatus>&,
+      const Option<Filters>&));
 
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,