You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/08/23 21:51:45 UTC

[mesos] 01/04: Optimized the allocation loop.

This is an automated email from the ASF dual-hosted git repository.

mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ec6b7b34215e821a63cb79e7d52d94ff08c1e110
Author: Meng Zhu <mz...@mesosphere.io>
AuthorDate: Thu Aug 22 17:54:25 2019 -0700

    Optimized the allocation loop.
    
    Master:
    
    HierarchicalAllocator_WithQuotaParam.LargeAndSmallQuota/2
    Made 3500 allocations in 23.37 secs
    Made 0 allocation in 19.72 secs
    
    Master + this patch:
    
    HierarchicalAllocator_WithQuotaParam.LargeAndSmallQuota/2
    Made 3500 allocations in 16.831380548secs
    Made 0 allocation in 15.102885644secs
    
    Review: https://reviews.apache.org/r/71359
---
 src/master/allocator/mesos/hierarchical.cpp | 86 ++++++++++++++++++-----------
 1 file changed, 53 insertions(+), 33 deletions(-)

diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 649de3b..dd73d5b 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -1965,16 +1965,30 @@ void HierarchicalAllocatorProcess::__allocate()
         break; // Nothing left on this agent.
       }
 
+      ResourceQuantities unsatisfiedQuotaGuarantees =
+        quotaGuarantees -
+        rolesConsumedQuota.get(role).getOrElse(ResourceQuantities());
+
+      // We only allocate to roles with unsatisfied guarantees
+      // in the first stage.
+      if (unsatisfiedQuotaGuarantees.empty()) {
+        continue;
+      }
+
       // Fetch frameworks in the order provided by the sorter.
       // NOTE: Suppressed frameworks are not included in the sort.
       Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role));
 
       foreach (const string& frameworkId_, frameworkSorter->sort()) {
-        Resources available = slave.getAvailable().allocatableTo(role);
+        if (unsatisfiedQuotaGuarantees.empty()) {
+          break;
+        }
 
         // Offer a shared resource only if it has not been offered in this
         // offer cycle to a framework.
-        available -= offeredSharedResources.get(slaveId).getOrElse(Resources());
+        Resources available =
+          slave.getAvailable().allocatableTo(role) -
+          offeredSharedResources.get(slaveId).getOrElse(Resources());
 
         if (available.empty()) {
           break; // Nothing left for the role.
@@ -2048,13 +2062,16 @@ void HierarchicalAllocatorProcess::__allocate()
         // which only enforces quantity. Nevertheless, We choose to allow
         // such bursting for less resource fragmentation.
 
-        ResourceQuantities unsatisfiedQuotaGuarantees =
-          quotaGuarantees -
-          rolesConsumedQuota.get(role).getOrElse(ResourceQuantities());
-
         // Resources that can be used to to increase a role's quota consumption.
+        //
+        // This is hot path, we use explicit filter calls to avoid
+        // multiple traversal.
         Resources quotaResources =
-          available.scalars().unreserved().nonRevocable();
+          available.filter([&](const Resource& resource) {
+            return resource.type() == Value::SCALAR &&
+                   Resources::isUnreserved(resource) &&
+                   !Resources::isRevocable(resource);
+          });
 
         Resources guaranteesAllocation =
           shrinkResources(quotaResources, unsatisfiedQuotaGuarantees);
@@ -2076,17 +2093,23 @@ void HierarchicalAllocatorProcess::__allocate()
           continue;
         }
 
-        // First, reservations and guarantees are always allocated.
+        // This role's reservations, non-scalar resources and revocable
+        // resources, as well as guarantees are always allocated.
         //
         // We need to allocate guarantees unconditionally here so that
         // even the cluster is overcommitted by guarantees (thus deficit in
         // headroom), this role's guarantees can still be allocated.
-        Resources toAllocate = available.reserved(role) + guaranteesAllocation;
+        Resources toAllocate = guaranteesAllocation +
+                               available.filter([&](const Resource& resource) {
+                                 return Resources::isReserved(resource, role) ||
+                                        resource.type() != Value::SCALAR ||
+                                        Resources::isRevocable(resource);
+                               });
 
         Resources additionalScalarAllocation =
           quotaResources - guaranteesAllocation;
 
-        // Second, non-guaranteed quota resources are subject to quota limits
+        // Then, non-guaranteed quota resources are subject to quota limits
         // and global headroom enforcements.
 
         // Limits enforcement.
@@ -2112,13 +2135,6 @@ void HierarchicalAllocatorProcess::__allocate()
 
         toAllocate += additionalScalarAllocation;
 
-        // Lastly, non-scalar resources and revocable resources
-        // are all allocated.
-        toAllocate += available.filter([&](const Resource& resource) {
-          return resource.type() != Value::SCALAR ||
-                 Resources::isRevocable(resource);
-        });
-
         // If the framework filters these resources, ignore.
         if (!allocatable(toAllocate, role, framework) ||
             isFiltered(framework, role, slave, toAllocate)) {
@@ -2140,6 +2156,7 @@ void HierarchicalAllocatorProcess::__allocate()
           ResourceQuantities::fromScalarResources(
               guaranteesAllocation + additionalScalarAllocation);
 
+        unsatisfiedQuotaGuarantees -= increasedQuotaConsumption;
         rolesConsumedQuota[role] += increasedQuotaConsumption;
         for (const string& ancestor : roles::ancestors(role)) {
           rolesConsumedQuota[ancestor] += increasedQuotaConsumption;
@@ -2196,11 +2213,11 @@ void HierarchicalAllocatorProcess::__allocate()
       Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role));
 
       foreach (const string& frameworkId_, frameworkSorter->sort()) {
-        Resources available = slave.getAvailable().allocatableTo(role);
-
         // Offer a shared resource only if it has not been offered in this
         // offer cycle to a framework.
-        available -= offeredSharedResources.get(slaveId).getOrElse(Resources());
+        Resources available =
+          slave.getAvailable().allocatableTo(role) -
+          offeredSharedResources.get(slaveId).getOrElse(Resources());
 
         if (available.empty()) {
           break; // Nothing left for the role.
@@ -2222,15 +2239,25 @@ void HierarchicalAllocatorProcess::__allocate()
 
         available = stripIncapableResources(available, framework.capabilities);
 
-        // First, reservations are always allocated. This also includes the
-        // roles ancestors' reservations.
-        Resources toAllocate = available.reserved();
+        // Reservations (including the roles ancestors' reservations),
+        // non-scalar resources and revocable resources are always allocated.
+        Resources toAllocate = available.filter([&](const Resource& resource) {
+          return Resources::isReserved(resource) ||
+                 resource.type() != Value::SCALAR ||
+                 Resources::isRevocable(resource);
+        });
 
-        // Second, unreserved scalar resources are subject to quota limits
+        // Then, unreserved scalar resources are subject to quota limits
         // and global headroom enforcement.
-
+        //
+        // This is hot path, we use explicit filter calls to avoid
+        // multiple traversal.
         Resources additionalScalarAllocation =
-          available.scalars().unreserved().nonRevocable();
+          available.filter([&](const Resource& resource) {
+            return resource.type() == Value::SCALAR &&
+                   Resources::isUnreserved(resource) &&
+                   !Resources::isRevocable(resource);
+          });
 
         // Limits enforcement.
         if (!quotaLimits.empty()) {
@@ -2258,13 +2285,6 @@ void HierarchicalAllocatorProcess::__allocate()
 
         toAllocate += additionalScalarAllocation;
 
-        // Lastly, non-scalar resources and revocable resources
-        // are all allocated.
-        toAllocate += available.filter([&](const Resource& resource) {
-          return resource.type() != Value::SCALAR ||
-                 Resources::isRevocable(resource);
-        });
-
         // If the framework filters these resources, ignore.
         if (!allocatable(toAllocate, role, framework) ||
             isFiltered(framework, role, slave, toAllocate)) {