You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/02/07 23:14:28 UTC

mesos git commit: Consolidate update of allocations in `updateAllocation()`.

Repository: mesos
Updated Branches:
  refs/heads/master e1fbab8fb -> e73a778dd


Consolidate update of allocations in `updateAllocation()`.

We handle shared resources for `LAUNCH` operations separately in the
`updateAllocation()` API to add additional copies of shared resources.
But the updates to the allocations in the allocator and sorters
can be consolidated together.

Review: https://reviews.apache.org/r/55359/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e73a778d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e73a778d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e73a778d

Branch: refs/heads/master
Commit: e73a778dde10b46e3d03dfa643f7300abbaf951d
Parents: e1fbab8
Author: Anindya Sinha <an...@apple.com>
Authored: Tue Feb 7 14:35:40 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Feb 7 15:13:38 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp | 192 +++++++++++------------
 1 file changed, 88 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e73a778d/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 56d6791..2eab005 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -673,10 +673,25 @@ void HierarchicalAllocatorProcess::updateAllocation(
   CHECK(frameworkSorters.contains(role));
 
   const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
+  const Resources frameworkAllocation =
+    frameworkSorter->allocation(frameworkId.value(), slaveId);
 
   // We keep a copy of the offered resources here and it is updated
   // by the operations.
-  Resources _offeredResources = offeredResources;
+  Resources updatedOfferedResources = offeredResources;
+
+  // Accumulate consumed resources for all tasks in all `LAUNCH` operations.
+  //
+  // For LAUNCH operations we support tasks requesting more instances of
+  // shared resources than those being offered. We keep track of total
+  // consumed resources to determine the additional instances and allocate
+  // them as part of updating the framework's allocation (i.e., add
+  // them to the allocated resources in the allocator and in each
+  // of the sorters).
+  Resources consumed;
+
+  // Used for logging.
+  hashset<TaskID> taskIds;
 
   foreach (const Offer::Operation& operation, operations) {
     // The operations should have been normalized by the master via
@@ -689,23 +704,14 @@ void HierarchicalAllocatorProcess::updateAllocation(
     //    CHECK_NONE(validateOperationOnAllocatedResources(operation));
     //  }
 
-    Try<Resources> updatedOfferedResources = _offeredResources.apply(operation);
-    CHECK_SOME(updatedOfferedResources);
-    _offeredResources = updatedOfferedResources.get();
-
-    if (operation.type() == Offer::Operation::LAUNCH) {
-      // Additional allocation needed for the operation.
-      //
-      // For LAUNCH operations we support tasks requesting more
-      // instances of shared resources than those being offered. We
-      // keep track of these additional instances and allocate them
-      // as part of updating the framework's allocation (i.e., add
-      // them to the allocated resources in the allocator and in each
-      // of the sorters).
-      Resources additional;
+    // Update the offered resources based on this operation.
+    Try<Resources> _updatedOfferedResources = updatedOfferedResources.apply(
+        operation);
 
-      hashset<TaskID> taskIds;
+    CHECK_SOME(_updatedOfferedResources);
+    updatedOfferedResources = _updatedOfferedResources.get();
 
+    if (operation.type() == Offer::Operation::LAUNCH) {
       foreach (const TaskInfo& task, operation.launch().task_infos()) {
         taskIds.insert(task.task_id());
 
@@ -720,109 +726,87 @@ void HierarchicalAllocatorProcess::updateAllocation(
         // The TODO is to support it. We need to pass in the information
         // pertaining to the executor before enabling shared resources
         // in the executor.
-        const Resources& consumed = task.resources();
-        additional += consumed.shared() - _offeredResources.shared();
-
-        // (Non-shared) executor resources are not removed from
-        // _offeredResources but it's OK because we only care about
-        // shared resources in this variable.
-        _offeredResources -= consumed;
-      }
-
-      if (!additional.empty()) {
-        LOG(INFO) << "Allocating additional resources " << additional
-                  << " for tasks " << stringify(taskIds);
-
-        CHECK_EQ(additional.shared(), additional);
-
-        const Resources frameworkAllocation =
-          frameworkSorter->allocation(frameworkId.value(), slaveId);
-
-        foreach (const Resource& resource, additional) {
-          CHECK(frameworkAllocation.contains(resource));
-        }
-
-        // Allocate these additional resources to this framework. Because
-        // they are merely additional instances of the same shared
-        // resources already allocated to the framework (validated by the
-        // master, see the CHECK above), this doesn't have an impact on
-        // the allocator's allocation algorithm.
-        slave.allocated += additional;
-
-        frameworkSorter->add(slaveId, additional);
-        frameworkSorter->allocated(frameworkId.value(), slaveId, additional);
-
-        roleSorter->allocated(role, slaveId, additional);
-
-        if (quotas.contains(role)) {
-          quotaRoleSorter->allocated(
-              role, slaveId, additional.nonRevocable());
-        }
+        consumed += task.resources();
       }
-
-      continue;
     }
+  }
 
-    // Here we apply offer operations to the allocated and total
-    // resources in the allocator and each of the sorters. The available
-    // resource quantities remain unchanged.
+  // Check that offered resources contain at least one copy of each
+  // consumed shared resource (guaranteed by master validation).
+  Resources consumedShared = consumed.shared();
+  Resources updatedOfferedShared = updatedOfferedResources.shared();
 
-    // Update the total resources in the allocator and the role and
-    // quota sorters.
-    Try<Resources> updatedTotal = slave.total.apply(operation);
-    CHECK_SOME(updatedTotal);
+  foreach (const Resource& resource, consumedShared) {
+    CHECK(updatedOfferedShared.contains(resource));
+  }
 
-    updateSlaveTotal(slaveId, updatedTotal.get());
+  // Determine the additional instances of shared resources needed to be
+  // added to the allocations.
+  Resources additional = consumedShared - updatedOfferedShared;
 
-    // Update the total resources in the framework sorter.
-    Resources frameworkAllocation =
-      frameworkSorter->allocation(frameworkId.value(), slaveId);
+  if (!additional.empty()) {
+    LOG(INFO) << "Allocating additional resources " << additional
+              << " for tasks " << stringify(taskIds)
+              << " of framework " << frameworkId << " on agent " << slaveId;
 
-    Try<Resources> updatedFrameworkAllocation =
-      frameworkAllocation.apply(operation);
+    updatedOfferedResources += additional;
+  }
 
-    CHECK_SOME(updatedFrameworkAllocation);
+  // Update the per-slave allocation.
+  slave.allocated -= offeredResources;
+  slave.allocated += updatedOfferedResources;
 
-    frameworkSorter->remove(slaveId, frameworkAllocation);
-    frameworkSorter->add(slaveId, updatedFrameworkAllocation.get());
+  // Update the allocation in the framework sorter.
+  frameworkSorter->update(
+      frameworkId.value(),
+      slaveId,
+      offeredResources,
+      updatedOfferedResources);
 
-    // Update the per-slave allocation.
-    Try<Resources> updatedSlaveAllocation = slave.allocated.apply(operation);
-    CHECK_SOME(updatedSlaveAllocation);
+  // Update the allocation in the role sorter.
+  roleSorter->update(
+      role,
+      slaveId,
+      offeredResources,
+      updatedOfferedResources);
+
+  // Update the allocated resources in the quota sorter. We only update
+  // the allocated resources if this role has quota set.
+  if (quotas.contains(role)) {
+    // See comment at `quotaRoleSorter` declaration regarding non-revocable.
+    quotaRoleSorter->update(
+        role,
+        slaveId,
+        offeredResources.nonRevocable(),
+        updatedOfferedResources.nonRevocable());
+  }
 
-    slave.allocated = updatedSlaveAllocation.get();
+  // Update the agent total resources so they are consistent with the updated
+  // allocation. We do not directly use `updatedOfferedResources` here because
+  // the agent's total resources shouldn't contain:
+  // 1. The additionally allocated shared resources.
+  // 2. `AllocationInfo` as set in `updatedOfferedResources`.
+  Try<Resources> updatedTotal = slave.total.apply(operations);
+  CHECK_SOME(updatedTotal);
+  updateSlaveTotal(slaveId, updatedTotal.get());
 
-    // Update the allocation in the framework sorter.
-    frameworkSorter->update(
-        frameworkId.value(),
-        slaveId,
-        frameworkAllocation,
-        updatedFrameworkAllocation.get());
+  // Update the total resources in the framework sorter.
+  frameworkSorter->remove(slaveId, offeredResources);
+  frameworkSorter->add(slaveId, updatedOfferedResources);
 
-    // Update the allocated resources in the role sorter.
-    // We only update the allocated resources if this role
-    // has quota set.
-    roleSorter->update(
-        role,
-        slaveId,
-        frameworkAllocation,
-        updatedFrameworkAllocation.get());
+  // Check that the `flattened` quantities for framework allocations
+  // have not changed by the above operations.
+  const Resources updatedFrameworkAllocation =
+    frameworkSorter->allocation(frameworkId.value(), slaveId);
 
-    if (quotas.contains(role)) {
-      // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-      quotaRoleSorter->update(
-          role,
-          slaveId,
-          frameworkAllocation.nonRevocable(),
-          updatedFrameworkAllocation.get().nonRevocable());
-    }
+  CHECK_EQ(
+      frameworkAllocation.flatten().createStrippedScalarQuantity(),
+      updatedFrameworkAllocation.flatten().createStrippedScalarQuantity());
 
-    LOG(INFO) << "Updated allocation of framework " << frameworkId
-              << " on agent " << slaveId
-              << " from " << frameworkAllocation
-              << " to " << updatedFrameworkAllocation.get() << " with "
-              << operation.Type_Name(operation.type()) << " operation";
-  }
+  LOG(INFO) << "Updated allocation of framework " << frameworkId
+            << " on agent " << slaveId
+            << " from " << frameworkAllocation
+            << " to " << updatedFrameworkAllocation;
 }