You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/03/07 01:26:12 UTC

[6/7] mesos git commit: Updated the allocator to handle frameworks that change its roles.

Updated the allocator to handle frameworks that change its roles.

Review: https://reviews.apache.org/r/57111


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a7b912e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a7b912e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a7b912e

Branch: refs/heads/master
Commit: 2a7b912e0b1c5b52862660209c34fec65c536c46
Parents: 68bb8d1
Author: Michael Park <mp...@apache.org>
Authored: Sat Mar 4 10:53:28 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Mon Mar 6 16:06:20 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp | 184 +++++++++++++++++------
 src/master/allocator/mesos/hierarchical.hpp |  21 ++-
 2 files changed, 154 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2a7b912e/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index dcafc79..0059cce 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -242,20 +242,7 @@ void HierarchicalAllocatorProcess::addFramework(
   const Framework& framework = frameworks.at(frameworkId);
 
   foreach (const string& role, framework.roles) {
-    // If this is the first framework to register as this role,
-    // initialize state as necessary.
-    if (!activeRoles.contains(role)) {
-      activeRoles[role] = 1;
-      roleSorter->add(role, roleWeight(role));
-      frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())});
-      frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
-      metrics.addRole(role);
-    } else {
-      activeRoles[role]++;
-    }
-
-    CHECK(!frameworkSorters.at(role)->contains(frameworkId.value()));
-    frameworkSorters.at(role)->add(frameworkId.value());
+    trackFrameworkUnderRole(frameworkId, role);
   }
 
   // TODO(bmahler): Validate that the reserved resources have the
@@ -326,31 +313,7 @@ void HierarchicalAllocatorProcess::removeFramework(
       }
     }
 
-    frameworkSorters.at(role)->remove(frameworkId.value());
-  }
-
-  foreach (const string& role, framework.roles) {
-    CHECK(activeRoles.contains(role));
-
-    // If this is the last framework that was registered for this role,
-    // cleanup associated state. This is not necessary for correctness
-    // (roles with no registered frameworks will not be offered any
-    // resources), but since many different role names might be used
-    // over time, we want to avoid leaking resources for no-longer-used
-    // role names. Note that we don't remove the role from
-    // `quotaRoleSorter` if it exists there, since roles with a quota
-    // set still influence allocation even if they don't have any
-    // registered frameworks.
-    activeRoles[role]--;
-    if (activeRoles[role] == 0) {
-      activeRoles.erase(role);
-      roleSorter->remove(role);
-
-      CHECK(frameworkSorters.contains(role));
-      frameworkSorters.erase(role);
-
-      metrics.removeRole(role);
-    }
+    untrackFrameworkUnderRole(frameworkId, role);
   }
 
   // Do not delete the filters contained in this
@@ -424,14 +387,55 @@ void HierarchicalAllocatorProcess::updateFramework(
   CHECK(frameworks.contains(frameworkId));
 
   Framework& framework = frameworks.at(frameworkId);
+
+  set<string> oldRoles = framework.roles;
   set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
 
-  // TODO(bmahler): Allow frameworks to update their roles, see MESOS-6627.
-  CHECK(framework.roles == newRoles)
-    << "Expected: " << stringify(framework.roles)
-    << " vs Actual: " << stringify(newRoles);
+  const set<string> removedRoles = [&]() {
+    set<string> result = oldRoles;
+    foreach (const string& role, newRoles) {
+      result.erase(role);
+    }
+    return result;
+  }();
+
+  foreach (const string& role, removedRoles) {
+    CHECK(frameworkSorters.contains(role));
+    frameworkSorters.at(role)->deactivate(frameworkId.value());
+
+    // Stop tracking the framework under this role if there are
+    // no longer any resources allocated to it.
+    if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) {
+      untrackFrameworkUnderRole(frameworkId, role);
+    }
+
+    if (framework.offerFilters.contains(role)) {
+      framework.offerFilters.erase(role);
+    }
+  }
+
+  const set<string> addedRoles = [&]() {
+    set<string> result = newRoles;
+    foreach (const string& role, oldRoles) {
+      result.erase(role);
+    }
+    return result;
+  }();
+
+  foreach (const string& role, addedRoles) {
+    // NOTE: It's possible that we're already tracking this framework
+    // under the role because a framework can unsubscribe from a role
+    // while it still has resources allocated to the role.
+    if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
+      trackFrameworkUnderRole(frameworkId, role);
+    }
+
+    CHECK(frameworkSorters.contains(role));
+    frameworkSorters.at(role)->activate(frameworkId.value());
+  }
 
-  framework.capabilities = Capabilities(frameworkInfo.capabilities());
+  framework.roles = newRoles;
+  framework.capabilities = frameworkInfo.capabilities();
 }
 
 
@@ -463,6 +467,13 @@ void HierarchicalAllocatorProcess::addSlave(
     foreachpair (const string& role,
                  const Resources& allocated,
                  used_.allocations()) {
+      // The framework has resources allocated to this role but it may
+      // or may not be subscribed to the role. Either way, we need to
+      // track the framework under the role.
+      if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
+        trackFrameworkUnderRole(frameworkId, role);
+      }
+
       // TODO(bmahler): Validate that the reserved resources have the
       // framework's role.
       CHECK(roleSorter->contains(role));
@@ -1076,6 +1087,13 @@ void HierarchicalAllocatorProcess::recoverResources(
         quotaRoleSorter->unallocated(
             role, slaveId, resources.nonRevocable());
       }
+
+      // Stop tracking the framework under this role if it's no longer
+      // subscribed and no longer has resources allocated to the role.
+      if (frameworks.at(frameworkId).roles.count(role) == 0 &&
+          frameworkSorter->allocation(frameworkId.value()).empty()) {
+        untrackFrameworkUnderRole(frameworkId, role);
+      }
     }
   }
 
@@ -1500,7 +1518,7 @@ void HierarchicalAllocatorProcess::__allocate()
 
       // If there are no active frameworks in this role, we do not
       // need to do any allocations for this role.
-      if (!activeRoles.contains(role)) {
+      if (!roles.contains(role)) {
         continue;
       }
 
@@ -1649,7 +1667,7 @@ void HierarchicalAllocatorProcess::__allocate()
   // argument to the `allocate()` call) so that frameworks in roles without
   // quota are not unnecessarily deprived of resources.
   Resources remainingClusterResources = roleSorter->totalScalarQuantities();
-  foreachkey (const string& role, activeRoles) {
+  foreachkey (const string& role, roles) {
     remainingClusterResources -= roleSorter->allocationScalarQuantities(role);
   }
 
@@ -1848,7 +1866,7 @@ void HierarchicalAllocatorProcess::__allocate()
 void HierarchicalAllocatorProcess::deallocate()
 {
   // If no frameworks are currently registered, no work to do.
-  if (activeRoles.empty()) {
+  if (roles.empty()) {
     return;
   }
   CHECK(!frameworkSorters.empty());
@@ -2188,6 +2206,78 @@ double HierarchicalAllocatorProcess::_offer_filters_active(
 }
 
 
+bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole(
+    const FrameworkID& frameworkId,
+    const std::string& role) const
+{
+  return roles.contains(role) &&
+         roles.at(role).contains(frameworkId);
+}
+
+
+void HierarchicalAllocatorProcess::trackFrameworkUnderRole(
+    const FrameworkID& frameworkId,
+    const string& role)
+{
+  CHECK(initialized);
+
+  // If this is the first framework to subscribe to this role, or have
+  // resources allocated to this role, initialize state as necessary.
+  if (!roles.contains(role)) {
+    roles[role] = {};
+    CHECK(!roleSorter->contains(role));
+    roleSorter->add(role, roleWeight(role));
+
+    CHECK(!frameworkSorters.contains(role));
+    frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())});
+    frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
+    metrics.addRole(role);
+  }
+
+  CHECK(!roles.at(role).contains(frameworkId));
+  roles.at(role).insert(frameworkId);
+
+  CHECK(!frameworkSorters.at(role)->contains(frameworkId.value()));
+  frameworkSorters.at(role)->add(frameworkId.value());
+}
+
+
+void HierarchicalAllocatorProcess::untrackFrameworkUnderRole(
+    const FrameworkID& frameworkId,
+    const string& role)
+{
+  CHECK(initialized);
+
+  CHECK(roles.contains(role));
+  CHECK(roles.at(role).contains(frameworkId));
+  CHECK(frameworkSorters.contains(role));
+  CHECK(frameworkSorters.at(role)->contains(frameworkId.value()));
+
+  roles.at(role).erase(frameworkId);
+  frameworkSorters.at(role)->remove(frameworkId.value());
+
+  // If no more frameworks are subscribed to this role or have resources
+  // allocated to this role, cleanup associated state. This is not necessary
+  // for correctness (roles with no registered frameworks will not be offered
+  // any resources), but since many different role names might be used over
+  // time, we want to avoid leaking resources for no-longer-used role names.
+  // Note that we don't remove the role from `quotaRoleSorter` if it exists
+  // there, since roles with a quota set still influence allocation even if
+  // they don't have any registered frameworks.
+
+  if (roles.at(role).empty()) {
+    CHECK_EQ(frameworkSorters.at(role)->count(), 0);
+
+    roles.erase(role);
+    roleSorter->remove(role);
+
+    frameworkSorters.erase(role);
+
+    metrics.removeRole(role);
+  }
+}
+
+
 void HierarchicalAllocatorProcess::updateSlaveTotal(
     const SlaveID& slaveId,
     const Resources& total)

http://git-wip-us.apache.org/repos/asf/mesos/blob/2a7b912e/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 0bb24be..646f66e 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -427,10 +427,11 @@ protected:
   // ready after the allocation run is complete.
   Option<process::Future<Nothing>> allocation;
 
-  // Number of registered frameworks for each role. When a role's active
-  // count drops to zero, it is removed from this map; the role is also
-  // removed from `roleSorter` and its `frameworkSorter` is deleted.
-  hashmap<std::string, size_t> activeRoles;
+  // We track information about roles that we're aware of in the system.
+  // Specifically, we keep track of the roles when a framework subscribes to
+  // the role, and/or when there are resources allocated to the role
+  // (e.g. some tasks and/or executors are consuming resources under the role).
+  hashmap<std::string, hashset<FrameworkID>> roles;
 
   // Configured weight for each role, if any; if a role does not
   // appear here, it has the default weight of 1.
@@ -514,6 +515,18 @@ protected:
   const std::function<Sorter*()> frameworkSorterFactory;
 
 private:
+  bool isFrameworkTrackedUnderRole(
+      const FrameworkID& frameworkId,
+      const std::string& role) const;
+
+  void trackFrameworkUnderRole(
+      const FrameworkID& frameworkId,
+      const std::string& role);
+
+  void untrackFrameworkUnderRole(
+      const FrameworkID& frameworkId,
+      const std::string& role);
+
   // Helper to update the agent's total resources maintained in the allocator
   // and the role and quota sorters (whose total resources match the agent's
   // total resources).