You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/02/07 23:14:28 UTC
mesos git commit: Consolidate update of allocations in
`updateAllocation()`.
Repository: mesos
Updated Branches:
refs/heads/master e1fbab8fb -> e73a778dd
Consolidate update of allocations in `updateAllocation()`.
We handle shared resources for `LAUNCH` operations separately in the
`updateAllocation()` API to add additional copies of shared resources.
But the updates to the allocations in the allocator and sorters
can be consolidated together.
Review: https://reviews.apache.org/r/55359/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e73a778d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e73a778d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e73a778d
Branch: refs/heads/master
Commit: e73a778dde10b46e3d03dfa643f7300abbaf951d
Parents: e1fbab8
Author: Anindya Sinha <an...@apple.com>
Authored: Tue Feb 7 14:35:40 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Feb 7 15:13:38 2017 -0800
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.cpp | 192 +++++++++++------------
1 file changed, 88 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e73a778d/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 56d6791..2eab005 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -673,10 +673,25 @@ void HierarchicalAllocatorProcess::updateAllocation(
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
+ const Resources frameworkAllocation =
+ frameworkSorter->allocation(frameworkId.value(), slaveId);
// We keep a copy of the offered resources here and it is updated
// by the operations.
- Resources _offeredResources = offeredResources;
+ Resources updatedOfferedResources = offeredResources;
+
+ // Accumulate consumed resources for all tasks in all `LAUNCH` operations.
+ //
+ // For LAUNCH operations we support tasks requesting more instances of
+ // shared resources than those being offered. We keep track of total
+ // consumed resources to determine the additional instances and allocate
+ // them as part of updating the framework's allocation (i.e., add
+ // them to the allocated resources in the allocator and in each
+ // of the sorters).
+ Resources consumed;
+
+ // Used for logging.
+ hashset<TaskID> taskIds;
foreach (const Offer::Operation& operation, operations) {
// The operations should have been normalized by the master via
@@ -689,23 +704,14 @@ void HierarchicalAllocatorProcess::updateAllocation(
// CHECK_NONE(validateOperationOnAllocatedResources(operation));
// }
- Try<Resources> updatedOfferedResources = _offeredResources.apply(operation);
- CHECK_SOME(updatedOfferedResources);
- _offeredResources = updatedOfferedResources.get();
-
- if (operation.type() == Offer::Operation::LAUNCH) {
- // Additional allocation needed for the operation.
- //
- // For LAUNCH operations we support tasks requesting more
- // instances of shared resources than those being offered. We
- // keep track of these additional instances and allocate them
- // as part of updating the framework's allocation (i.e., add
- // them to the allocated resources in the allocator and in each
- // of the sorters).
- Resources additional;
+ // Update the offered resources based on this operation.
+ Try<Resources> _updatedOfferedResources = updatedOfferedResources.apply(
+ operation);
- hashset<TaskID> taskIds;
+ CHECK_SOME(_updatedOfferedResources);
+ updatedOfferedResources = _updatedOfferedResources.get();
+ if (operation.type() == Offer::Operation::LAUNCH) {
foreach (const TaskInfo& task, operation.launch().task_infos()) {
taskIds.insert(task.task_id());
@@ -720,109 +726,87 @@ void HierarchicalAllocatorProcess::updateAllocation(
// The TODO is to support it. We need to pass in the information
// pertaining to the executor before enabling shared resources
// in the executor.
- const Resources& consumed = task.resources();
- additional += consumed.shared() - _offeredResources.shared();
-
- // (Non-shared) executor resources are not removed from
- // _offeredResources but it's OK because we only care about
- // shared resources in this variable.
- _offeredResources -= consumed;
- }
-
- if (!additional.empty()) {
- LOG(INFO) << "Allocating additional resources " << additional
- << " for tasks " << stringify(taskIds);
-
- CHECK_EQ(additional.shared(), additional);
-
- const Resources frameworkAllocation =
- frameworkSorter->allocation(frameworkId.value(), slaveId);
-
- foreach (const Resource& resource, additional) {
- CHECK(frameworkAllocation.contains(resource));
- }
-
- // Allocate these additional resources to this framework. Because
- // they are merely additional instances of the same shared
- // resources already allocated to the framework (validated by the
- // master, see the CHECK above), this doesn't have an impact on
- // the allocator's allocation algorithm.
- slave.allocated += additional;
-
- frameworkSorter->add(slaveId, additional);
- frameworkSorter->allocated(frameworkId.value(), slaveId, additional);
-
- roleSorter->allocated(role, slaveId, additional);
-
- if (quotas.contains(role)) {
- quotaRoleSorter->allocated(
- role, slaveId, additional.nonRevocable());
- }
+ consumed += task.resources();
}
-
- continue;
}
+ }
- // Here we apply offer operations to the allocated and total
- // resources in the allocator and each of the sorters. The available
- // resource quantities remain unchanged.
+ // Check that offered resources contain at least one copy of each
+ // consumed shared resource (guaranteed by master validation).
+ Resources consumedShared = consumed.shared();
+ Resources updatedOfferedShared = updatedOfferedResources.shared();
- // Update the total resources in the allocator and the role and
- // quota sorters.
- Try<Resources> updatedTotal = slave.total.apply(operation);
- CHECK_SOME(updatedTotal);
+ foreach (const Resource& resource, consumedShared) {
+ CHECK(updatedOfferedShared.contains(resource));
+ }
- updateSlaveTotal(slaveId, updatedTotal.get());
+ // Determine the additional instances of shared resources needed to be
+ // added to the allocations.
+ Resources additional = consumedShared - updatedOfferedShared;
- // Update the total resources in the framework sorter.
- Resources frameworkAllocation =
- frameworkSorter->allocation(frameworkId.value(), slaveId);
+ if (!additional.empty()) {
+ LOG(INFO) << "Allocating additional resources " << additional
+ << " for tasks " << stringify(taskIds)
+ << " of framework " << frameworkId << " on agent " << slaveId;
- Try<Resources> updatedFrameworkAllocation =
- frameworkAllocation.apply(operation);
+ updatedOfferedResources += additional;
+ }
- CHECK_SOME(updatedFrameworkAllocation);
+ // Update the per-slave allocation.
+ slave.allocated -= offeredResources;
+ slave.allocated += updatedOfferedResources;
- frameworkSorter->remove(slaveId, frameworkAllocation);
- frameworkSorter->add(slaveId, updatedFrameworkAllocation.get());
+ // Update the allocation in the framework sorter.
+ frameworkSorter->update(
+ frameworkId.value(),
+ slaveId,
+ offeredResources,
+ updatedOfferedResources);
- // Update the per-slave allocation.
- Try<Resources> updatedSlaveAllocation = slave.allocated.apply(operation);
- CHECK_SOME(updatedSlaveAllocation);
+ // Update the allocation in the role sorter.
+ roleSorter->update(
+ role,
+ slaveId,
+ offeredResources,
+ updatedOfferedResources);
+
+ // Update the allocated resources in the quota sorter. We only update
+ // the allocated resources if this role has quota set.
+ if (quotas.contains(role)) {
+ // See comment at `quotaRoleSorter` declaration regarding non-revocable.
+ quotaRoleSorter->update(
+ role,
+ slaveId,
+ offeredResources.nonRevocable(),
+ updatedOfferedResources.nonRevocable());
+ }
- slave.allocated = updatedSlaveAllocation.get();
+ // Update the agent total resources so they are consistent with the updated
+ // allocation. We do not directly use `updatedOfferedResources` here because
+ // the agent's total resources shouldn't contain:
+ // 1. The additionally allocated shared resources.
+ // 2. `AllocationInfo` as set in `updatedOfferedResources`.
+ Try<Resources> updatedTotal = slave.total.apply(operations);
+ CHECK_SOME(updatedTotal);
+ updateSlaveTotal(slaveId, updatedTotal.get());
- // Update the allocation in the framework sorter.
- frameworkSorter->update(
- frameworkId.value(),
- slaveId,
- frameworkAllocation,
- updatedFrameworkAllocation.get());
+ // Update the total resources in the framework sorter.
+ frameworkSorter->remove(slaveId, offeredResources);
+ frameworkSorter->add(slaveId, updatedOfferedResources);
- // Update the allocated resources in the role sorter.
- // We only update the allocated resources if this role
- // has quota set.
- roleSorter->update(
- role,
- slaveId,
- frameworkAllocation,
- updatedFrameworkAllocation.get());
+ // Check that the `flattened` quantities for framework allocations
+ // have not changed by the above operations.
+ const Resources updatedFrameworkAllocation =
+ frameworkSorter->allocation(frameworkId.value(), slaveId);
- if (quotas.contains(role)) {
- // See comment at `quotaRoleSorter` declaration regarding non-revocable.
- quotaRoleSorter->update(
- role,
- slaveId,
- frameworkAllocation.nonRevocable(),
- updatedFrameworkAllocation.get().nonRevocable());
- }
+ CHECK_EQ(
+ frameworkAllocation.flatten().createStrippedScalarQuantity(),
+ updatedFrameworkAllocation.flatten().createStrippedScalarQuantity());
- LOG(INFO) << "Updated allocation of framework " << frameworkId
- << " on agent " << slaveId
- << " from " << frameworkAllocation
- << " to " << updatedFrameworkAllocation.get() << " with "
- << operation.Type_Name(operation.type()) << " operation";
- }
+ LOG(INFO) << "Updated allocation of framework " << frameworkId
+ << " on agent " << slaveId
+ << " from " << frameworkAllocation
+ << " to " << updatedFrameworkAllocation;
}