You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/02/03 00:12:56 UTC
[1/3] mesos git commit: Introduce a helper for injecting
AllocationInfo into offer operations.
Repository: mesos
Updated Branches:
refs/heads/master 36d191b02 -> a4946a30b
Introduce a helper for injecting AllocationInfo into offer operations.
Previously, `Resource` did not contain `AllocationInfo`. So for
backwards compatibility with old schedulers and tooling, we must
allow operations to contain `Resource`s without an `AllocationInfo`.
This introduces a function which allows the master to inject the
offer's `AllocationInfo` into the operation's resources.
Review: https://reviews.apache.org/r/55863
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ec12ed58
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ec12ed58
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ec12ed58
Branch: refs/heads/master
Commit: ec12ed58811e21229628b08221a6519e0d8c6825
Parents: 36d191b
Author: Benjamin Mahler <bm...@apache.org>
Authored: Sun Jan 22 16:56:12 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Feb 2 15:06:20 2017 -0800
----------------------------------------------------------------------
src/common/protobuf_utils.cpp | 92 +++++++++++++++++++++++++++++++++
src/common/protobuf_utils.hpp | 11 ++++
src/tests/protobuf_utils_tests.cpp | 87 +++++++++++++++++++++++++++++++
3 files changed, 190 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ec12ed58/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index b0fb2ab..ed84e9a 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -348,6 +348,98 @@ Label createLabel(const string& key, const Option<string>& value)
}
+void adjustOfferOperation(
+ Offer::Operation* operation,
+ const Resource::AllocationInfo& allocationInfo)
+{
+ auto adjustResources = [](
+ RepeatedPtrField<Resource>* resources,
+ const Resource::AllocationInfo& allocationInfo) {
+ foreach (Resource& resource, *resources) {
+ if (!resource.has_allocation_info()) {
+ resource.mutable_allocation_info()->CopyFrom(allocationInfo);
+ }
+ }
+ };
+
+ switch (operation->type()) {
+ case Offer::Operation::LAUNCH: {
+ Offer::Operation::Launch* launch = operation->mutable_launch();
+
+ foreach (TaskInfo& task, *launch->mutable_task_infos()) {
+ adjustResources(task.mutable_resources(), allocationInfo);
+
+ if (task.has_executor()) {
+ adjustResources(
+ task.mutable_executor()->mutable_resources(),
+ allocationInfo);
+ }
+ }
+ break;
+ }
+
+ case Offer::Operation::LAUNCH_GROUP: {
+ Offer::Operation::LaunchGroup* launchGroup =
+ operation->mutable_launch_group();
+
+ if (launchGroup->has_executor()) {
+ adjustResources(
+ launchGroup->mutable_executor()->mutable_resources(),
+ allocationInfo);
+ }
+
+ TaskGroupInfo* taskGroup = launchGroup->mutable_task_group();
+
+ foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
+ adjustResources(task.mutable_resources(), allocationInfo);
+
+ if (task.has_executor()) {
+ adjustResources(
+ task.mutable_executor()->mutable_resources(),
+ allocationInfo);
+ }
+ }
+ break;
+ }
+
+ case Offer::Operation::RESERVE: {
+ adjustResources(
+ operation->mutable_reserve()->mutable_resources(),
+ allocationInfo);
+
+ break;
+ }
+
+ case Offer::Operation::UNRESERVE: {
+ adjustResources(
+ operation->mutable_unreserve()->mutable_resources(),
+ allocationInfo);
+
+ break;
+ }
+
+ case Offer::Operation::CREATE: {
+ adjustResources(
+ operation->mutable_create()->mutable_volumes(),
+ allocationInfo);
+
+ break;
+ }
+
+ case Offer::Operation::DESTROY: {
+ adjustResources(
+ operation->mutable_destroy()->mutable_volumes(),
+ allocationInfo);
+
+ break;
+ }
+
+ case Offer::Operation::UNKNOWN:
+ break; // No-op.
+ }
+}
+
+
TimeInfo getCurrentTime()
{
TimeInfo timeInfo;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ec12ed58/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index d5436b3..3ba689f 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -116,6 +116,17 @@ Label createLabel(
const Option<std::string>& value = None());
+// Previously, `Resource` did not contain `AllocationInfo`.
+// So for backwards compatibility with old schedulers and
+// tooling, we must allow operations to contain `Resource`s
+// without an `AllocationInfo`. This allows the master to
+// inject the offer's `AllocationInfo` into the operation's
+// resources.
+void adjustOfferOperation(
+ Offer::Operation* operation,
+ const Resource::AllocationInfo& allocationInfo);
+
+
// Helper function that fills in a TimeInfo from the current time.
TimeInfo getCurrentTime();
http://git-wip-us.apache.org/repos/asf/mesos/blob/ec12ed58/src/tests/protobuf_utils_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/protobuf_utils_tests.cpp b/src/tests/protobuf_utils_tests.cpp
index 4cf748b..2c32d56 100644
--- a/src/tests/protobuf_utils_tests.cpp
+++ b/src/tests/protobuf_utils_tests.cpp
@@ -24,6 +24,8 @@
#include "common/protobuf_utils.hpp"
+#include "tests/mesos.hpp"
+
using std::set;
using std::string;
using std::vector;
@@ -62,6 +64,91 @@ TEST(ProtobufUtilTest, GetRoles)
}
+// Tests that offer operations can be adjusted to include
+// the appropriate allocation info, if not already set.
+TEST(ProtobufUtilTest, AdjustAllocationInfoInOfferOperation)
+{
+ Resources resources = Resources::parse("cpus:1").get();
+
+ Resources allocatedResources = resources;
+ allocatedResources.allocate("role");
+
+ Resource::AllocationInfo allocationInfo;
+ allocationInfo.set_role("role");
+
+ // Test the LAUNCH case. This should be constructing a valid
+ // task and executor, but for now this just sets the resources
+ // in order to verify the allocation info injection.
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_resources()->CopyFrom(resources);
+
+ TaskInfo taskInfo;
+ taskInfo.mutable_resources()->CopyFrom(resources);
+ taskInfo.mutable_executor()->CopyFrom(executorInfo);
+
+ Offer::Operation launch = LAUNCH({taskInfo});
+ protobuf::adjustOfferOperation(&launch, allocationInfo);
+
+ ASSERT_EQ(1, launch.launch().task_infos_size());
+
+ EXPECT_EQ(allocatedResources,
+ launch.launch().task_infos(0).resources());
+
+ EXPECT_EQ(allocatedResources,
+ launch.launch().task_infos(0).executor().resources());
+
+ // Test the LAUNCH_GROUP case. This should be constructing a valid
+ // task and executor, but for now this just sets the resources in
+ // order to verify the allocation info injection.
+ TaskGroupInfo taskGroupInfo;
+ taskGroupInfo.add_tasks()->CopyFrom(taskInfo);
+
+ Offer::Operation launchGroup = LAUNCH_GROUP(executorInfo, taskGroupInfo);
+ protobuf::adjustOfferOperation(&launchGroup, allocationInfo);
+
+ ASSERT_EQ(1, launchGroup.launch_group().task_group().tasks_size());
+
+ EXPECT_EQ(allocatedResources,
+ launchGroup.launch_group().task_group().tasks(0).resources());
+
+ EXPECT_EQ(allocatedResources,
+ launchGroup.launch_group().task_group().tasks(0).executor()
+ .resources());
+
+ // Test the RESERVE case. This should be constructing a valid
+ // reservation, but for now this just sets the resources in
+ // order to verify the allocation info injection.
+ Offer::Operation reserve = RESERVE(resources);
+ protobuf::adjustOfferOperation(&reserve, allocationInfo);
+
+ EXPECT_EQ(allocatedResources, reserve.reserve().resources());
+
+ // Test the UNRESERVE case. This should be constructing a valid
+ // reservation, but for now this just sets the resources in
+ // order to verify the allocation info injection.
+ Offer::Operation unreserve = UNRESERVE(resources);
+ protobuf::adjustOfferOperation(&unreserve, allocationInfo);
+
+ EXPECT_EQ(allocatedResources, unreserve.unreserve().resources());
+
+ // Test the CREATE case. This should be constructing a valid
+ // volume, but for now this just sets the resources in order
+ // to verify the allocation info injection.
+ Offer::Operation create = CREATE(resources);
+ protobuf::adjustOfferOperation(&create, allocationInfo);
+
+ EXPECT_EQ(allocatedResources, create.create().volumes());
+
+ // Test the DESTROY case. This should be constructing a valid
+ // volume, but for now this just sets the resources in order
+ // to verify the allocation info injection.
+ Offer::Operation destroy = DESTROY(resources);
+ protobuf::adjustOfferOperation(&destroy, allocationInfo);
+
+ EXPECT_EQ(allocatedResources, destroy.destroy().volumes());
+}
+
+
// This tests that Capabilities are correctly constructed
// from given FrameworkInfo Capabilities.
TEST(ProtobufUtilTest, FrameworkCapabilities)
[3/3] mesos git commit: Prevent unintended mutation in the allocator.
Posted by bm...@apache.org.
Prevent unintended mutation in the allocator.
Currently, a lof of code in the allocator makes use of the `[]`
operator to access the agents, frameworks and sorters, which
can lead to subtle bugs where insertion was unintended.
With this change, a few const functions can be marked as such.
Review: https://reviews.apache.org/r/55910
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a4946a30
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a4946a30
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a4946a30
Branch: refs/heads/master
Commit: a4946a30b4d811e6e13c06e0ce175ab8c5ef7f09
Parents: 33fd460
Author: Benjamin Mahler <bm...@apache.org>
Authored: Tue Jan 24 17:25:49 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Feb 2 16:05:46 2017 -0800
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.cpp | 394 +++++++++++++----------
src/master/allocator/mesos/hierarchical.hpp | 10 +-
2 files changed, 233 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a4946a30/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 1d54f4c..5f54056 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -62,7 +62,7 @@ class OfferFilter
public:
virtual ~OfferFilter() {}
- virtual bool filter(const Resources& resources) = 0;
+ virtual bool filter(const Resources& resources) const = 0;
};
@@ -71,7 +71,7 @@ class RefusedOfferFilter : public OfferFilter
public:
RefusedOfferFilter(const Resources& _resources) : resources(_resources) {}
- virtual bool filter(const Resources& _resources)
+ virtual bool filter(const Resources& _resources) const
{
// TODO(jieyu): Consider separating the superset check for regular
// and revocable resources. For example, frameworks might want
@@ -99,7 +99,7 @@ class InverseOfferFilter
public:
virtual ~InverseOfferFilter() {}
- virtual bool filter() = 0;
+ virtual bool filter() const = 0;
};
@@ -112,7 +112,7 @@ public:
RefusedInverseOfferFilter(const Timeout& _timeout)
: timeout(_timeout) {}
- virtual bool filter()
+ virtual bool filter() const
{
// See comment above why we currently don't do more fine-grained filtering.
return timeout.remaining() > Seconds(0);
@@ -123,6 +123,13 @@ private:
};
+HierarchicalAllocatorProcess::Framework::Framework(
+ const FrameworkInfo& frameworkInfo)
+ : role(frameworkInfo.role()),
+ suppressed(false),
+ capabilities(frameworkInfo.capabilities()) {}
+
+
void HierarchicalAllocatorProcess::initialize(
const Duration& _allocationInterval,
const lambda::function<
@@ -228,22 +235,26 @@ void HierarchicalAllocatorProcess::addFramework(
CHECK(initialized);
CHECK(!frameworks.contains(frameworkId));
- const string& role = frameworkInfo.role();
+ frameworks.insert({frameworkId, Framework(frameworkInfo)});
+
+ const Framework& framework = frameworks.at(frameworkId);
+
+ const string& role = framework.role;
// If this is the first framework to register as this role,
// initialize state as necessary.
if (!activeRoles.contains(role)) {
activeRoles[role] = 1;
roleSorter->add(role, roleWeight(role));
- frameworkSorters[role].reset(frameworkSorterFactory());
- frameworkSorters[role]->initialize(fairnessExcludeResourceNames);
+ frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())});
+ frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
metrics.addRole(role);
} else {
activeRoles[role]++;
}
- CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
- frameworkSorters[role]->add(frameworkId.value());
+ CHECK(!frameworkSorters.at(role)->contains(frameworkId.value()));
+ frameworkSorters.at(role)->add(frameworkId.value());
// TODO(bmahler): Validate that the reserved resources have the
// framework's role.
@@ -252,8 +263,8 @@ void HierarchicalAllocatorProcess::addFramework(
foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
if (slaves.contains(slaveId)) {
roleSorter->allocated(role, slaveId, allocated);
- frameworkSorters[role]->add(slaveId, allocated);
- frameworkSorters[role]->allocated(
+ frameworkSorters.at(role)->add(slaveId, allocated);
+ frameworkSorters.at(role)->allocated(
frameworkId.value(), slaveId, allocated);
if (quotas.contains(role)) {
@@ -263,12 +274,6 @@ void HierarchicalAllocatorProcess::addFramework(
}
}
- frameworks[frameworkId] = Framework();
- frameworks[frameworkId].role = frameworkInfo.role();
- frameworks[frameworkId].suppressed = false;
- frameworks[frameworkId].capabilities =
- Capabilities(frameworkInfo.capabilities());
-
LOG(INFO) << "Added framework " << frameworkId;
if (active) {
@@ -285,20 +290,20 @@ void HierarchicalAllocatorProcess::removeFramework(
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
- const string& role = frameworks[frameworkId].role;
+ const string& role = frameworks.at(frameworkId).role;
CHECK(activeRoles.contains(role));
// Might not be in 'frameworkSorters[role]' because it was previously
// deactivated and never re-added.
- if (frameworkSorters[role]->contains(frameworkId.value())) {
+ if (frameworkSorters.at(role)->contains(frameworkId.value())) {
hashmap<SlaveID, Resources> allocation =
- frameworkSorters[role]->allocation(frameworkId.value());
+ frameworkSorters.at(role)->allocation(frameworkId.value());
// Update the allocation for this framework.
foreachpair (
const SlaveID& slaveId, const Resources& allocated, allocation) {
roleSorter->unallocated(role, slaveId, allocated);
- frameworkSorters[role]->remove(slaveId, allocated);
+ frameworkSorters.at(role)->remove(slaveId, allocated);
if (quotas.contains(role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
@@ -306,7 +311,7 @@ void HierarchicalAllocatorProcess::removeFramework(
}
}
- frameworkSorters[role]->remove(frameworkId.value());
+ frameworkSorters.at(role)->remove(frameworkId.value());
}
// If this is the last framework that was registered for this role,
@@ -345,10 +350,10 @@ void HierarchicalAllocatorProcess::activateFramework(
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
- const string& role = frameworks[frameworkId].role;
+ const string& role = frameworks.at(frameworkId).role;
CHECK(frameworkSorters.contains(role));
- frameworkSorters[role]->activate(frameworkId.value());
+ frameworkSorters.at(role)->activate(frameworkId.value());
LOG(INFO) << "Activated framework " << frameworkId;
@@ -362,10 +367,11 @@ void HierarchicalAllocatorProcess::deactivateFramework(
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
- const string& role = frameworks[frameworkId].role;
+ Framework& framework = frameworks.at(frameworkId);
+ const string& role = framework.role;
CHECK(frameworkSorters.contains(role));
- frameworkSorters[role]->deactivate(frameworkId.value());
+ frameworkSorters.at(role)->deactivate(frameworkId.value());
// Note that the Sorter *does not* remove the resources allocated
// to this framework. For now, this is important because if the
@@ -377,12 +383,12 @@ void HierarchicalAllocatorProcess::deactivateFramework(
// framework's `offerFilters` hashset yet, see comments in
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
- frameworks[frameworkId].offerFilters.clear();
- frameworks[frameworkId].inverseOfferFilters.clear();
+ framework.offerFilters.clear();
+ framework.inverseOfferFilters.clear();
// Clear the suppressed flag to make sure the framework can be offered
// resources immediately after getting activated.
- frameworks[frameworkId].suppressed = false;
+ framework.suppressed = false;
LOG(INFO) << "Deactivated framework " << frameworkId;
}
@@ -395,13 +401,14 @@ void HierarchicalAllocatorProcess::updateFramework(
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
+ Framework& framework = frameworks.at(frameworkId);
+
// TODO(jmlvanre): Once we allow frameworks to re-register with a new 'role',
// we need to update our internal 'frameworks' structure. See MESOS-703 for
// progress on allowing these fields to be updated.
- CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role());
+ CHECK_EQ(framework.role, frameworkInfo.role());
- frameworks[frameworkId].capabilities =
- Capabilities(frameworkInfo.capabilities());
+ framework.capabilities = Capabilities(frameworkInfo.capabilities());
}
@@ -426,7 +433,7 @@ void HierarchicalAllocatorProcess::addSlave(
const Resources& allocated,
used) {
if (frameworks.contains(frameworkId)) {
- const string& role = frameworks[frameworkId].role;
+ const string& role = frameworks.at(frameworkId).role;
// TODO(bmahler): Validate that the reserved resources have the
// framework's role.
@@ -434,8 +441,8 @@ void HierarchicalAllocatorProcess::addSlave(
CHECK(frameworkSorters.contains(role));
roleSorter->allocated(role, slaveId, allocated);
- frameworkSorters[role]->add(slaveId, allocated);
- frameworkSorters[role]->allocated(
+ frameworkSorters.at(role)->add(slaveId, allocated);
+ frameworkSorters.at(role)->allocated(
frameworkId.value(), slaveId, allocated);
if (quotas.contains(role)) {
@@ -446,16 +453,18 @@ void HierarchicalAllocatorProcess::addSlave(
}
slaves[slaveId] = Slave();
- slaves[slaveId].total = total;
- slaves[slaveId].allocated = Resources::sum(used);
- slaves[slaveId].activated = true;
- slaves[slaveId].hostname = slaveInfo.hostname();
+
+ Slave& slave = slaves.at(slaveId);
+
+ slave.total = total;
+ slave.allocated = Resources::sum(used);
+ slave.activated = true;
+ slave.hostname = slaveInfo.hostname();
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
if (unavailability.isSome()) {
- slaves[slaveId].maintenance =
- Slave::Maintenance(unavailability.get());
+ slave.maintenance = Slave::Maintenance(unavailability.get());
}
// If we have just a number of recovered agents, we cannot distinguish
@@ -476,9 +485,9 @@ void HierarchicalAllocatorProcess::addSlave(
resume();
}
- LOG(INFO) << "Added agent " << slaveId << " (" << slaves[slaveId].hostname
- << ") with " << slaves[slaveId].total
- << " (allocated: " << slaves[slaveId].allocated << ")";
+ LOG(INFO) << "Added agent " << slaveId << " (" << slave.hostname << ")"
+ << " with " << slave.total
+ << " (allocated: " << slave.allocated << ")";
allocate(slaveId);
}
@@ -496,10 +505,10 @@ void HierarchicalAllocatorProcess::removeSlave(
// all the resources. Fixing this would require more information
// than what we currently track in the allocator.
- roleSorter->remove(slaveId, slaves[slaveId].total);
+ roleSorter->remove(slaveId, slaves.at(slaveId).total);
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
- quotaRoleSorter->remove(slaveId, slaves[slaveId].total.nonRevocable());
+ quotaRoleSorter->remove(slaveId, slaves.at(slaveId).total.nonRevocable());
slaves.erase(slaveId);
allocationCandidates.erase(slaveId);
@@ -523,7 +532,9 @@ void HierarchicalAllocatorProcess::updateSlave(
// Check that all the oversubscribed resources are revocable.
CHECK_EQ(oversubscribed, oversubscribed.revocable());
- const Resources oldRevocable = slaves[slaveId].total.revocable();
+ Slave& slave = slaves.at(slaveId);
+
+ const Resources oldRevocable = slave.total.revocable();
// Update the total resources.
//
@@ -535,7 +546,7 @@ void HierarchicalAllocatorProcess::updateSlave(
//
// TODO(alexr): Update this math once the source of revocable resources
// is extended beyond oversubscription.
- slaves[slaveId].total = slaves[slaveId].total.nonRevocable() + oversubscribed;
+ slave.total = slave.total.nonRevocable() + oversubscribed;
// Update the total resources in the `roleSorter` by removing the
// previous oversubscribed resources and adding the new
@@ -547,10 +558,10 @@ void HierarchicalAllocatorProcess::updateSlave(
// function only changes the revocable resources on the slave, but
// the quota role sorter only manages non-revocable resources.
- LOG(INFO) << "Agent " << slaveId << " (" << slaves[slaveId].hostname << ")"
+ LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
<< " updated with oversubscribed resources " << oversubscribed
- << " (total: " << slaves[slaveId].total
- << ", allocated: " << slaves[slaveId].allocated << ")";
+ << " (total: " << slave.total
+ << ", allocated: " << slave.allocated << ")";
allocate(slaveId);
}
@@ -562,9 +573,9 @@ void HierarchicalAllocatorProcess::activateSlave(
CHECK(initialized);
CHECK(slaves.contains(slaveId));
- slaves[slaveId].activated = true;
+ slaves.at(slaveId).activated = true;
- LOG(INFO)<< "Agent " << slaveId << " reactivated";
+ LOG(INFO) << "Agent " << slaveId << " reactivated";
}
@@ -574,7 +585,7 @@ void HierarchicalAllocatorProcess::deactivateSlave(
CHECK(initialized);
CHECK(slaves.contains(slaveId));
- slaves[slaveId].activated = false;
+ slaves.at(slaveId).activated = false;
LOG(INFO) << "Agent " << slaveId << " deactivated";
}
@@ -619,10 +630,12 @@ void HierarchicalAllocatorProcess::updateAllocation(
CHECK(slaves.contains(slaveId));
CHECK(frameworks.contains(frameworkId));
- const string& role = frameworks[frameworkId].role;
- CHECK(frameworkSorters.contains(role));
+ Slave& slave = slaves.at(slaveId);
+ const Framework& framework = frameworks.at(frameworkId);
- const Owned<Sorter>& frameworkSorter = frameworkSorters[role];
+ CHECK(frameworkSorters.contains(framework.role));
+
+ const Owned<Sorter>& frameworkSorter = frameworkSorters.at(framework.role);
// We keep a copy of the offered resources here and it is updated
// by the operations.
@@ -687,15 +700,15 @@ void HierarchicalAllocatorProcess::updateAllocation(
// resources already allocated to the framework (validated by the
// master, see the CHECK above), this doesn't have an impact on
// the allocator's allocation algorithm.
- slaves[slaveId].allocated += additional;
+ slave.allocated += additional;
frameworkSorter->add(slaveId, additional);
frameworkSorter->allocated(frameworkId.value(), slaveId, additional);
- roleSorter->allocated(role, slaveId, additional);
+ roleSorter->allocated(framework.role, slaveId, additional);
- if (quotas.contains(role)) {
+ if (quotas.contains(framework.role)) {
quotaRoleSorter->allocated(
- role, slaveId, additional.nonRevocable());
+ framework.role, slaveId, additional.nonRevocable());
}
}
@@ -707,18 +720,17 @@ void HierarchicalAllocatorProcess::updateAllocation(
// resource quantities remain unchanged.
// Update the per-slave allocation.
- Try<Resources> updatedSlaveAllocation =
- slaves[slaveId].allocated.apply(operation);
+ Try<Resources> updatedSlaveAllocation = slave.allocated.apply(operation);
CHECK_SOME(updatedSlaveAllocation);
- slaves[slaveId].allocated = updatedSlaveAllocation.get();
+ slave.allocated = updatedSlaveAllocation.get();
// Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operation);
+ Try<Resources> updatedTotal = slave.total.apply(operation);
CHECK_SOME(updatedTotal);
- slaves[slaveId].total = updatedTotal.get();
+ slave.total = updatedTotal.get();
// Update the total and allocated resources in each sorter.
Resources frameworkAllocation =
@@ -745,7 +757,7 @@ void HierarchicalAllocatorProcess::updateAllocation(
roleSorter->add(slaveId, updatedFrameworkAllocation.get());
roleSorter->update(
- role,
+ framework.role,
slaveId,
frameworkAllocation,
updatedFrameworkAllocation.get());
@@ -758,10 +770,10 @@ void HierarchicalAllocatorProcess::updateAllocation(
quotaRoleSorter->add(
slaveId, updatedFrameworkAllocation.get().nonRevocable());
- if (quotas.contains(role)) {
+ if (quotas.contains(framework.role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->update(
- role,
+ framework.role,
slaveId,
frameworkAllocation.nonRevocable(),
updatedFrameworkAllocation.get().nonRevocable());
@@ -783,7 +795,7 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable(
CHECK(initialized);
CHECK(slaves.contains(slaveId));
- Resources available = slaves[slaveId].available();
+ Slave& slave = slaves.at(slaveId);
// It's possible for this 'apply' to fail here because a call to
// 'allocate' could have been enqueued by the allocator itself
@@ -797,17 +809,17 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable(
// \___/ \___/
//
// where A = allocate, R = reserve, U = updateAvailable
- Try<Resources> updatedAvailable = available.apply(operations);
+ Try<Resources> updatedAvailable = slave.available().apply(operations);
if (updatedAvailable.isError()) {
return Failure(updatedAvailable.error());
}
// Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ Try<Resources> updatedTotal = slave.total.apply(operations);
CHECK_SOME(updatedTotal);
- const Resources oldTotal = slaves[slaveId].total;
- slaves[slaveId].total = updatedTotal.get();
+ const Resources oldTotal = slave.total;
+ slave.total = updatedTotal.get();
// Now, update the total resources in the role sorters by removing
// the previous resources at this slave and adding the new resources.
@@ -829,6 +841,8 @@ void HierarchicalAllocatorProcess::updateUnavailability(
CHECK(initialized);
CHECK(slaves.contains(slaveId));
+ Slave& slave = slaves.at(slaveId);
+
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
@@ -842,12 +856,11 @@ void HierarchicalAllocatorProcess::updateUnavailability(
}
// Remove any old unavailability.
- slaves[slaveId].maintenance = None();
+ slave.maintenance = None();
// If we have a new unavailability.
if (unavailability.isSome()) {
- slaves[slaveId].maintenance =
- Slave::Maintenance(unavailability.get());
+ slave.maintenance = Slave::Maintenance(unavailability.get());
}
allocate(slaveId);
@@ -864,14 +877,18 @@ void HierarchicalAllocatorProcess::updateInverseOffer(
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
CHECK(slaves.contains(slaveId));
- CHECK(slaves[slaveId].maintenance.isSome());
+
+ Framework& framework = frameworks.at(frameworkId);
+ Slave& slave = slaves.at(slaveId);
+
+ CHECK(slave.maintenance.isSome());
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
// We use a reference by alias because we intend to modify the
// `maintenance` and to improve readability.
- Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get();
+ Slave::Maintenance& maintenance = slave.maintenance.get();
// Only handle inverse offers that we currently have outstanding. If it is not
// currently outstanding this means it is old and can be safely ignored.
@@ -927,8 +944,7 @@ void HierarchicalAllocatorProcess::updateInverseOffer(
InverseOfferFilter* inverseOfferFilter =
new RefusedInverseOfferFilter(Timeout::in(seconds.get()));
- frameworks[frameworkId]
- .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+ framework.inverseOfferFilters[slaveId].insert(inverseOfferFilter);
// We need to disambiguate the function call to pick the correct
// `expire()` overload.
@@ -985,19 +1001,21 @@ void HierarchicalAllocatorProcess::recoverResources(
// MesosAllocatorProcess::deactivateFramework, in which case we will
// have already recovered all of its resources).
if (frameworks.contains(frameworkId)) {
- const string& role = frameworks[frameworkId].role;
+ const Framework& framework = frameworks.at(frameworkId);
- CHECK(frameworkSorters.contains(role));
+ CHECK(frameworkSorters.contains(framework.role));
- if (frameworkSorters[role]->contains(frameworkId.value())) {
- frameworkSorters[role]->unallocated(
- frameworkId.value(), slaveId, resources);
- frameworkSorters[role]->remove(slaveId, resources);
- roleSorter->unallocated(role, slaveId, resources);
+ const Owned<Sorter>& frameworkSorter = frameworkSorters.at(framework.role);
- if (quotas.contains(role)) {
+ if (frameworkSorter->contains(frameworkId.value())) {
+ frameworkSorter->unallocated(frameworkId.value(), slaveId, resources);
+ frameworkSorter->remove(slaveId, resources);
+ roleSorter->unallocated(framework.role, slaveId, resources);
+
+ if (quotas.contains(framework.role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
- quotaRoleSorter->unallocated(role, slaveId, resources.nonRevocable());
+ quotaRoleSorter->unallocated(
+ framework.role, slaveId, resources.nonRevocable());
}
}
}
@@ -1006,14 +1024,16 @@ void HierarchicalAllocatorProcess::recoverResources(
// which it might not in the event that we dispatched Master::offer
// before we received Allocator::removeSlave).
if (slaves.contains(slaveId)) {
- CHECK(slaves[slaveId].allocated.contains(resources));
+ Slave& slave = slaves.at(slaveId);
+
+ CHECK(slave.allocated.contains(resources));
- slaves[slaveId].allocated -= resources;
+ slave.allocated -= resources;
VLOG(1) << "Recovered " << resources
- << " (total: " << slaves[slaveId].total
- << ", allocated: " << slaves[slaveId].allocated
- << ") on agent " << slaveId
+ << " (total: " << slave.total
+ << ", allocated: " << slave.allocated << ")"
+ << " on agent " << slaveId
<< " from framework " << frameworkId;
}
@@ -1053,7 +1073,7 @@ void HierarchicalAllocatorProcess::recoverResources(
// Create a new filter.
OfferFilter* offerFilter = new RefusedOfferFilter(resources);
- frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
+ frameworks.at(frameworkId).offerFilters[slaveId].insert(offerFilter);
// Expire the filter after both an `allocationInterval` and the
// `timeout` have elapsed. This ensures that the filter does not
@@ -1089,16 +1109,18 @@ void HierarchicalAllocatorProcess::suppressOffers(
const FrameworkID& frameworkId)
{
CHECK(initialized);
+ CHECK(frameworks.contains(frameworkId));
- frameworks[frameworkId].suppressed = true;
+ Framework& framework = frameworks.at(frameworkId);
- const string& role = frameworks[frameworkId].role;
+ framework.suppressed = true;
+
+ CHECK(frameworkSorters.contains(framework.role));
- CHECK(frameworkSorters.contains(role));
// Deactivating the framework in the sorter is fine as long as
// SUPPRESS is not parameterized. When parameterization is added,
// we have to differentiate between the cases here.
- frameworkSorters[role]->deactivate(frameworkId.value());
+ frameworkSorters.at(framework.role)->deactivate(frameworkId.value());
LOG(INFO) << "Suppressed offers for framework " << frameworkId;
}
@@ -1108,21 +1130,22 @@ void HierarchicalAllocatorProcess::reviveOffers(
const FrameworkID& frameworkId)
{
CHECK(initialized);
+ CHECK(frameworks.contains(frameworkId));
- frameworks[frameworkId].offerFilters.clear();
- frameworks[frameworkId].inverseOfferFilters.clear();
+ Framework& framework = frameworks.at(frameworkId);
- if (frameworks[frameworkId].suppressed) {
- frameworks[frameworkId].suppressed = false;
+ framework.offerFilters.clear();
+ framework.inverseOfferFilters.clear();
- const string& role = frameworks[frameworkId].role;
+ if (framework.suppressed) {
+ framework.suppressed = false;
- CHECK(frameworkSorters.contains(role));
+ CHECK(frameworkSorters.contains(framework.role));
// Activating the framework in the sorter on REVIVE is fine as long as
// SUPPRESS is not parameterized. When parameterization is added,
// we may need to differentiate between the cases here.
- frameworkSorters[role]->activate(frameworkId.value());
+ frameworkSorters.at(framework.role)->activate(frameworkId.value());
}
// We delete each actual `OfferFilter` when
@@ -1346,10 +1369,12 @@ void HierarchicalAllocatorProcess::__allocate()
vector<SlaveID> slaveIds;
slaveIds.reserve(allocationCandidates.size());
- // Filter out non-whitelisted and deactivated slaves in order not to send
- // offers for them.
+ // Filter out non-whitelisted, removed, and deactivated slaves
+ // in order not to send offers for them.
foreach (const SlaveID& slaveId, allocationCandidates) {
- if (isWhitelisted(slaveId) && slaves[slaveId].activated) {
+ if (isWhitelisted(slaveId) &&
+ slaves.contains(slaveId) &&
+ slaves.at(slaveId).activated) {
slaveIds.push_back(slaveId);
}
}
@@ -1417,15 +1442,24 @@ void HierarchicalAllocatorProcess::__allocate()
// Fetch frameworks according to their fair share.
// NOTE: Suppressed frameworks are not included in the sort.
- foreach (const string& frameworkId_, frameworkSorters[role]->sort()) {
+ CHECK(frameworkSorters.contains(role));
+ const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
+
+ foreach (const string& frameworkId_, frameworkSorter->sort()) {
FrameworkID frameworkId;
frameworkId.set_value(frameworkId_);
+ CHECK(slaves.contains(slaveId));
+ CHECK(frameworks.contains(frameworkId));
+
+ const Framework& framework = frameworks.at(frameworkId);
+ Slave& slave = slaves.at(slaveId);
+
// Only offer resources from slaves that have GPUs to
// frameworks that are capable of receiving GPUs.
// See MESOS-5634.
- if (!frameworks[frameworkId].capabilities.gpuResources &&
- slaves[slaveId].total.gpus().getOrElse(0) > 0) {
+ if (!framework.capabilities.gpuResources &&
+ slave.total.gpus().getOrElse(0) > 0) {
continue;
}
@@ -1435,12 +1469,12 @@ void HierarchicalAllocatorProcess::__allocate()
// Since shared resources are offerable even when they are in use, we
// make one copy of the shared resources available regardless of the
// past allocations.
- Resources available = slaves[slaveId].available().nonShared();
+ Resources available = slave.available().nonShared();
// Offer a shared resource only if it has not been offered in
// this offer cycle to a framework.
- if (frameworks[frameworkId].capabilities.sharedResources) {
- available += slaves[slaveId].total.shared();
+ if (framework.capabilities.sharedResources) {
+ available += slave.total.shared();
if (offeredSharedResources.contains(slaveId)) {
available -= offeredSharedResources[slaveId];
}
@@ -1491,14 +1525,14 @@ void HierarchicalAllocatorProcess::__allocate()
offerable[frameworkId][slaveId] += resources;
offeredSharedResources[slaveId] += resources.shared();
- slaves[slaveId].allocated += resources;
+ slave.allocated += resources;
// Resources allocated as part of the quota count towards the
// role's and the framework's fair share.
//
// NOTE: Revocable resources have already been excluded.
- frameworkSorters[role]->add(slaveId, resources);
- frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
+ frameworkSorter->add(slaveId, resources);
+ frameworkSorter->allocated(frameworkId_, slaveId, resources);
roleSorter->allocated(role, slaveId, resources);
quotaRoleSorter->allocated(role, slaveId, resources);
}
@@ -1571,16 +1605,24 @@ void HierarchicalAllocatorProcess::__allocate()
foreach (const string& role, roleSorter->sort()) {
// NOTE: Suppressed frameworks are not included in the sort.
- foreach (const string& frameworkId_,
- frameworkSorters[role]->sort()) {
+ CHECK(frameworkSorters.contains(role));
+ const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
+
+ foreach (const string& frameworkId_, frameworkSorter->sort()) {
FrameworkID frameworkId;
frameworkId.set_value(frameworkId_);
+ CHECK(slaves.contains(slaveId));
+ CHECK(frameworks.contains(frameworkId));
+
+ const Framework& framework = frameworks.at(frameworkId);
+ Slave& slave = slaves.at(slaveId);
+
// Only offer resources from slaves that have GPUs to
// frameworks that are capable of receiving GPUs.
// See MESOS-5634.
- if (!frameworks[frameworkId].capabilities.gpuResources &&
- slaves[slaveId].total.gpus().getOrElse(0) > 0) {
+ if (!framework.capabilities.gpuResources &&
+ slave.total.gpus().getOrElse(0) > 0) {
continue;
}
@@ -1590,12 +1632,12 @@ void HierarchicalAllocatorProcess::__allocate()
// Since shared resources are offerable even when they are in use, we
// make one copy of the shared resources available regardless of the
// past allocations.
- Resources available = slaves[slaveId].available().nonShared();
+ Resources available = slave.available().nonShared();
// Offer a shared resource only if it has not been offered in
// this offer cycle to a framework.
- if (frameworks[frameworkId].capabilities.sharedResources) {
- available += slaves[slaveId].total.shared();
+ if (framework.capabilities.sharedResources) {
+ available += slave.total.shared();
if (offeredSharedResources.contains(slaveId)) {
available -= offeredSharedResources[slaveId];
}
@@ -1635,7 +1677,7 @@ void HierarchicalAllocatorProcess::__allocate()
}
// Remove revocable resources if the framework has not opted for them.
- if (!frameworks[frameworkId].capabilities.revocableResources) {
+ if (!framework.capabilities.revocableResources) {
resources = resources.nonRevocable();
}
@@ -1678,10 +1720,10 @@ void HierarchicalAllocatorProcess::__allocate()
offeredSharedResources[slaveId] += resources.shared();
allocatedStage2 += scalarQuantity;
- slaves[slaveId].allocated += resources;
+ slave.allocated += resources;
- frameworkSorters[role]->add(slaveId, resources);
- frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
+ frameworkSorter->add(slaveId, resources);
+ frameworkSorter->allocated(frameworkId_, slaveId, resources);
roleSorter->allocated(role, slaveId, resources);
if (quotas.contains(role)) {
@@ -1731,11 +1773,12 @@ void HierarchicalAllocatorProcess::deallocate()
foreach (const SlaveID& slaveId, allocationCandidates) {
CHECK(slaves.contains(slaveId));
- if (slaves[slaveId].maintenance.isSome()) {
+ Slave& slave = slaves.at(slaveId);
+
+ if (slave.maintenance.isSome()) {
// We use a reference by alias because we intend to modify the
// `maintenance` and to improve readability.
- Slave::Maintenance& maintenance =
- slaves[slaveId].maintenance.get();
+ Slave::Maintenance& maintenance = slave.maintenance.get();
hashmap<string, Resources> allocation =
frameworkSorter->allocation(slaveId);
@@ -1799,16 +1842,24 @@ void HierarchicalAllocatorProcess::_expire(
OfferFilter* offerFilter)
{
// The filter might have already been removed (e.g., if the
- // framework no longer exists or in
- // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
- // keep the address from getting reused possibly causing premature
- // expiration).
- if (frameworks.contains(frameworkId) &&
- frameworks[frameworkId].offerFilters.contains(slaveId) &&
- frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
- frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
- if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
- frameworks[frameworkId].offerFilters.erase(slaveId);
+ // framework no longer exists or in `reviveOffers()`) but not
+ // yet deleted (to keep the address from getting reused
+ // possibly causing premature expiration).
+ //
+ // Since this is a performance-sensitive piece of code,
+ // we use find to avoid the doing any redundant lookups.
+
+ auto frameworkIterator = frameworks.find(frameworkId);
+ if (frameworkIterator != frameworks.end()) {
+ Framework& framework = frameworkIterator->second;
+
+ auto filters = framework.offerFilters.find(slaveId);
+ if (filters != framework.offerFilters.end()) {
+ filters->second.erase(offerFilter);
+
+ if (filters->second.empty()) {
+ framework.offerFilters.erase(slaveId);
+ }
}
}
@@ -1840,15 +1891,21 @@ void HierarchicalAllocatorProcess::expire(
// HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
// keep the address from getting reused possibly causing premature
// expiration).
- if (frameworks.contains(frameworkId) &&
- frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
- frameworks[frameworkId].inverseOfferFilters[slaveId]
- .contains(inverseOfferFilter)) {
- frameworks[frameworkId].inverseOfferFilters[slaveId]
- .erase(inverseOfferFilter);
-
- if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
- frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+ //
+ // Since this is a performance-sensitive piece of code,
+ // we use find to avoid the doing any redundant lookups.
+
+ auto frameworkIterator = frameworks.find(frameworkId);
+ if (frameworkIterator != frameworks.end()) {
+ Framework& framework = frameworkIterator->second;
+
+ auto filters = framework.inverseOfferFilters.find(slaveId);
+ if (filters != framework.inverseOfferFilters.end()) {
+ filters->second.erase(inverseOfferFilter);
+
+ if (filters->second.empty()) {
+ framework.inverseOfferFilters.erase(slaveId);
+ }
}
}
@@ -1856,10 +1913,10 @@ void HierarchicalAllocatorProcess::expire(
}
-double HierarchicalAllocatorProcess::roleWeight(const string& name)
+double HierarchicalAllocatorProcess::roleWeight(const string& name) const
{
if (weights.contains(name)) {
- return weights[name];
+ return weights.at(name);
} else {
return 1.0; // Default weight.
}
@@ -1867,26 +1924,28 @@ double HierarchicalAllocatorProcess::roleWeight(const string& name)
bool HierarchicalAllocatorProcess::isWhitelisted(
- const SlaveID& slaveId)
+ const SlaveID& slaveId) const
{
CHECK(slaves.contains(slaveId));
- return whitelist.isNone() ||
- whitelist.get().contains(slaves[slaveId].hostname);
+ const Slave& slave = slaves.at(slaveId);
+
+ return whitelist.isNone() || whitelist->contains(slave.hostname);
}
bool HierarchicalAllocatorProcess::isFiltered(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- const Resources& resources)
+ const Resources& resources) const
{
CHECK(frameworks.contains(frameworkId));
CHECK(slaves.contains(slaveId));
- if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
- foreach (
- OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
+ const Framework& framework = frameworks.at(frameworkId);
+
+ if (framework.offerFilters.contains(slaveId)) {
+ foreach (OfferFilter* offerFilter, framework.offerFilters.at(slaveId)) {
if (offerFilter->filter(resources)) {
VLOG(1) << "Filtered offer with " << resources
<< " on agent " << slaveId
@@ -1903,15 +1962,16 @@ bool HierarchicalAllocatorProcess::isFiltered(
bool HierarchicalAllocatorProcess::isFiltered(
const FrameworkID& frameworkId,
- const SlaveID& slaveId)
+ const SlaveID& slaveId) const
{
CHECK(frameworks.contains(frameworkId));
CHECK(slaves.contains(slaveId));
- if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
- foreach (
- InverseOfferFilter* inverseOfferFilter,
- frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+ const Framework& framework = frameworks.at(frameworkId);
+
+ if (framework.inverseOfferFilters.contains(slaveId)) {
+ foreach (InverseOfferFilter* inverseOfferFilter,
+ framework.inverseOfferFilters.at(slaveId)) {
if (inverseOfferFilter->filter()) {
VLOG(1) << "Filtered unavailability on agent " << slaveId
<< " for framework " << frameworkId;
@@ -1988,7 +2048,7 @@ double HierarchicalAllocatorProcess::_offer_filters_active(
}
foreachkey (const SlaveID& slaveId, framework.offerFilters) {
- result += framework.offerFilters.get(slaveId)->size();
+ result += framework.offerFilters.at(slaveId).size();
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a4946a30/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 339b3d2..a99ed35 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -251,23 +251,23 @@ protected:
InverseOfferFilter* inverseOfferFilter);
// Returns the weight of the specified role name.
- double roleWeight(const std::string& name);
+ double roleWeight(const std::string& name) const;
// Checks whether the slave is whitelisted.
- bool isWhitelisted(const SlaveID& slaveId);
+ bool isWhitelisted(const SlaveID& slaveId) const;
// Returns true if there is a resource offer filter for this framework
// on this slave.
bool isFiltered(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- const Resources& resources);
+ const Resources& resources) const;
// Returns true if there is an inverse offer filter for this framework
// on this slave.
bool isFiltered(
const FrameworkID& frameworkID,
- const SlaveID& slaveID);
+ const SlaveID& slaveID) const;
static bool allocatable(const Resources& resources);
@@ -292,6 +292,8 @@ protected:
struct Framework
{
+ explicit Framework(const FrameworkInfo& frameworkInfo);
+
std::string role;
// Whether the framework suppresses offers.
[2/3] mesos git commit: Cleanups to the allocator tests.
Posted by bm...@apache.org.
Cleanups to the allocator tests.
This was necessary to greatly simplify the changes needed to the
allocator tests as we introduce support for multi-role frameworks.
The main improvement here is to establish and use equality on the
`Allocation` struct, which makes the tests more readable and avoids
the manual probing of the allocation structure across all the tests.
Review: https://reviews.apache.org/r/55868
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/33fd4608
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/33fd4608
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/33fd4608
Branch: refs/heads/master
Commit: 33fd4608b1430238ff8dbc00314a8c1e4b5a748d
Parents: ec12ed5
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Jan 23 18:07:18 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Feb 2 15:49:27 2017 -0800
----------------------------------------------------------------------
src/tests/hierarchical_allocator_tests.cpp | 1071 ++++++++++++-----------
1 file changed, 561 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/33fd4608/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 3ecedb8..1e0b945 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -66,6 +66,7 @@ using std::atomic;
using std::cout;
using std::endl;
using std::map;
+using std::ostream;
using std::set;
using std::string;
using std::vector;
@@ -79,11 +80,34 @@ namespace tests {
struct Allocation
{
+ Allocation() = default;
+
+ Allocation(
+ const FrameworkID& frameworkId_,
+ const hashmap<SlaveID, Resources>& resources_)
+ : frameworkId(frameworkId_),
+ resources(resources_) {}
+
FrameworkID frameworkId;
hashmap<SlaveID, Resources> resources;
};
+bool operator==(const Allocation& left, const Allocation& right)
+{
+ return left.frameworkId == right.frameworkId &&
+ left.resources == right.resources;
+}
+
+
+ostream& operator<<(ostream& stream, const Allocation& allocation)
+{
+ return stream
+ << "FrameworkID: " << allocation.frameworkId
+ << " Resource Allocation: " << stringify(allocation.resources);
+}
+
+
struct Deallocation
{
FrameworkID frameworkId;
@@ -261,10 +285,11 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
FrameworkInfo framework1 = createFrameworkInfo("role1");
allocator->addFramework(framework1.id(), framework1, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(slave1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{slave1.id(), slave1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 1 (cpus=2, mem=1024)
// framework1 share = 1
@@ -282,10 +307,11 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// framework2 will be offered all of slave2's resources since role2
// has the lowest user share, and framework2 is its only framework.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(slave2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{slave2.id(), slave2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 0.67 (cpus=2, mem=1024)
// framework1 share = 1
@@ -302,10 +328,11 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// framework2 will be offered all of slave3's resources since role2
// has the lowest share.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(slave3.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{slave3.id(), slave3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 0.33 (cpus=2, mem=1024)
// framework1 share = 1
@@ -327,10 +354,11 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// framework3 will be offered all of slave4's resources since role1
// has the lowest user share, and framework3 has the lowest share of
// role1's frameworks.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework3.id(), allocation->frameworkId);
- EXPECT_EQ(slave4.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework3.id(),
+ {{slave4.id(), slave4.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 0.67 (cpus=6, mem=5120)
// framework1 share = 0.33 (cpus=2, mem=1024)
@@ -353,10 +381,11 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// Even though framework4 doesn't have any resources, role2 has a
// lower share than role1, so framework2 receives slave5's resources.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(slave5.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{slave5.id(), slave5.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -379,10 +408,11 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
FrameworkInfo framework1 = createFrameworkInfo("role1");
allocator->addFramework(framework1.id(), framework1, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(slave1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{slave1.id(), slave1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
FrameworkInfo framework2 = createFrameworkInfo("role2");
allocator->addFramework(framework2.id(), framework2, {}, true);
@@ -391,20 +421,22 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:512;disk:0");
allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {});
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(slave2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{slave2.id(), slave2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Since `framework1` has more resources allocated to it than `framework2`,
// We expect `framework2` to receive this agent's resources.
SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:512;disk:0");
allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), {});
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(slave3.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{slave3.id(), slave3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Now add another framework in role1. Since the reserved resources
// should be allocated fairly between frameworks within a role, we
@@ -417,10 +449,11 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
"cpus(role1):2;mem(role1):1024;disk(role1):0");
allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), {});
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework3.id(), allocation->frameworkId);
- EXPECT_EQ(slave4.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework3.id(),
+ {{slave4.id(), slave4.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -455,10 +488,11 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
allocator->addFramework(framework1.id(), framework1, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{agent1.id(), agent1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 1 (cpus=2, mem=1024, (ignored) gpus=1)
// framework1 share = 1
@@ -476,10 +510,11 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
// framework2 will be offered all of agent2's resources since role2
// has the lowest user share, and framework2 is its only framework.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 0.67 (cpus=2, mem=1024, (ignored) gpus=1)
// framework1 share = 1
@@ -496,10 +531,11 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
// framework2 will be offered all of agent3's resources since role2
// has the lowest share.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent3.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{agent3.id(), agent3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 0.33 (cpus=2, mem=1024, (ignored)gpus=1)
// framework1 share = 1
@@ -521,10 +557,11 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
// framework3 will be offered all of agent4's resources since role1
// has the lowest user share, and framework3 has the lowest share of
// role1's frameworks.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework3.id(), allocation->frameworkId);
- EXPECT_EQ(agent4.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework3.id(),
+ {{agent4.id(), agent4.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// role1 share = 0.67 (cpus=6, mem=5120, (ignored) gpus=1)
// framework1 share = 0.33 (cpus=2, mem=1024, (ignored) gpus=1)
@@ -547,10 +584,11 @@ TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion)
// Even though framework4 doesn't have any resources, role2 has a
// lower share than role1, so framework2 receives agent5's resources.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent5.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{agent5.id(), agent5.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -577,10 +615,13 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
// `framework` will be offered all of `agent` resources
// because it is the only framework in the cluster.
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
+
// Now `framework` declines the offer and sets a filter
// with the duration greater than the allocation interval.
@@ -591,7 +632,7 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
allocator->recoverResources(
framework.id(),
agent.id(),
- allocation->resources.get(agent.id()).get(),
+ allocation->resources.at(agent.id()),
offerFilter);
// Ensure the offer filter timeout is set before advancing the clock.
@@ -609,7 +650,7 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
// There should be no allocation due to the offer filter.
allocation = allocations.get();
- ASSERT_TRUE(allocation.isPending());
+ EXPECT_TRUE(allocation.isPending());
// Ensure the offer filter times out (2x the allocation interval)
// and the next batch allocation occurs.
@@ -617,9 +658,11 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter)
Clock::settle();
// The next batch allocation should offer resources to `framework1`.
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocation);
metrics = Metrics();
@@ -664,7 +707,7 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
agent1,
None(),
agent1.resources(),
- {std::make_pair(framework1.id(), agent1.resources())});
+ {{framework1.id(), agent1.resources()}});
// Process all triggered allocation events.
//
@@ -685,10 +728,13 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
// `framework2` will be offered all of `agent2` resources
// because its share (0) is smaller than `framework1`.
+ Allocation expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
+
// Total cluster resources (2 agents): cpus=2, mem=1024.
// ROLE1 share = 1 (cpus=2, mem=1024)
@@ -706,7 +752,7 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
allocator->recoverResources(
framework2.id(),
agent2.id(),
- allocation->resources.get(agent2.id()).get(),
+ allocation->resources.at(agent2.id()),
offerFilter);
// Total cluster resources (2 agents): cpus=2, mem=1024.
@@ -728,10 +774,12 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
// Since the filter is applied, resources are offered to `framework1`
// even though its share is greater than `framework2`.
+ expected = Allocation(
+ framework1.id(),
+ {{agent2.id(), agent2.resources()}});
+
allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Total cluster resources (2 agents): cpus=2, mem=1024.
// ROLE1 share = 1 (cpus=2, mem=1024)
@@ -745,7 +793,7 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
allocator->recoverResources(
framework1.id(),
agent2.id(),
- allocation->resources.get(agent2.id()).get(),
+ allocation->resources.at(agent2.id()),
None());
// Total cluster resources (2 agents): cpus=2, mem=1024.
@@ -757,10 +805,12 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout)
Clock::advance(flags.allocation_interval);
// Since the filter is removed, resources are offered to `framework2`.
+ expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
+
allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Total cluster resources (2 agents): cpus=2, mem=1024.
// ROLE1 share = 1 (cpus=2, mem=1024)
@@ -789,10 +839,11 @@ TEST_F(HierarchicalAllocatorTest, MaintenanceInverseOffers)
allocator->addFramework(framework.id(), framework, {}, true);
// Check that the resources go to the framework.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
const process::Time start = Clock::now() + Seconds(60);
@@ -845,11 +896,15 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
FrameworkInfo framework1 = createFrameworkInfo("role1");
allocator->addFramework(framework1.id(), framework1, {}, true);
+ Allocation expected = Allocation(
+ framework1.id(),
+ {
+ {slave1.id(), slave1.resources()},
+ {slave2.id(), slave2.resources()}
+ });
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(slave1.resources() + slave2.resources(),
- Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
allocator->recoverResources(
framework1.id(),
@@ -880,14 +935,14 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
// NOTE: `slave1` and `slave2` have the same resources, we don't care
// which framework received which slave, only that they each received one.
ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
- ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size());
- EXPECT_EQ(slave1.resources(),
- Resources::sum(frameworkAllocations[framework1.id()].resources));
+
+ allocation = frameworkAllocations.at(framework1.id());
+ EXPECT_EQ(slave1.resources(), Resources::sum(allocation->resources));
ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
- ASSERT_EQ(1u, frameworkAllocations[framework2.id()].resources.size());
- EXPECT_EQ(slave2.resources(),
- Resources::sum(frameworkAllocations[framework2.id()].resources));
+
+ allocation = frameworkAllocations.at(framework2.id());
+ EXPECT_EQ(slave2.resources(), Resources::sum(allocation->resources));
}
@@ -917,10 +972,14 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness)
for (int i = 0; i < 10; i++) {
Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
- counts[allocation->frameworkId]++;
- ASSERT_EQ(1u, allocation->resources.size());
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ allocation->frameworkId,
+ {{slave.id(), slave.resources()}});
+
+ EXPECT_EQ(expected, allocation.get());
+
+ counts[allocation->frameworkId]++;
allocator->recoverResources(
allocation->frameworkId,
@@ -963,26 +1022,24 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
FrameworkInfo framework1 = createFrameworkInfo("role1");
allocator->addFramework(framework1.id(), framework1, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(2u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave1.id()));
- EXPECT_TRUE(allocation->resources.contains(slave2.id()));
- EXPECT_EQ(slave1.resources() + Resources(slave2.resources()).unreserved(),
- Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {
+ {slave1.id(), slave1.resources()},
+ {slave2.id(), Resources(slave2.resources()).unreserved()}
+ });
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// framework2 should get all of its reserved resources on slave2.
FrameworkInfo framework2 = createFrameworkInfo("role2");
allocator->addFramework(framework2.id(), framework2, {}, true);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave2.id()));
- EXPECT_EQ(Resources(slave2.resources()).reserved("role2"),
- Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{slave2.id(), Resources(slave2.resources()).reserved("role2")}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -1002,15 +1059,16 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Recover the reserved resources, expect them to be re-offered.
- Resources reserved = Resources(slave.resources()).reserved("role1");
+ Resources reserved = allocation->resources.at(slave.id()).reserved("role1");
+ Resources unreserved = allocation->resources.at(slave.id()).unreserved();
allocator->recoverResources(
allocation->frameworkId,
@@ -1020,16 +1078,14 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
Clock::advance(flags.allocation_interval);
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), reserved}});
+
allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(reserved, Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Recover the unreserved resources, expect them to be re-offered.
- Resources unreserved = Resources(slave.resources()).unreserved();
-
allocator->recoverResources(
allocation->frameworkId,
slave.id(),
@@ -1038,12 +1094,12 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
Clock::advance(flags.allocation_interval);
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), unreserved}});
+
allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(unreserved, Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -1073,12 +1129,11 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
"disk:128");
allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {});
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave2.id()));
- EXPECT_EQ(slave2.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave2.id(), slave2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Enough memory to be considered allocatable.
SlaveInfo slave3 = createSlaveInfo(
@@ -1087,12 +1142,11 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
"disk:128");
allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), {});
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave3.id()));
- EXPECT_EQ(slave3.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave3.id(), slave3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// slave4 has enough cpu and memory to be considered allocatable,
// but it lies across unreserved and reserved resources!
@@ -1104,12 +1158,11 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
"disk:128");
allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), {});
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave4.id()));
- EXPECT_EQ(slave4.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave4.id(), slave4.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -1128,12 +1181,12 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Construct an offer operation for the framework's allocation.
Resource volume = Resources::parse("disk", "5", "*").get();
@@ -1146,7 +1199,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
// Ensure the offer operation can be applied.
Try<Resources> updated =
- Resources::sum(allocation->resources).apply(create);
+ allocation->resources.at(slave.id()).apply(create);
ASSERT_SOME(updated);
@@ -1154,7 +1207,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
allocator->updateAllocation(
framework.id(),
slave.id(),
- Resources::sum(allocation->resources),
+ allocation->resources.at(slave.id()),
{create});
// Now recover the resources, and expect the next allocation to
@@ -1167,21 +1220,15 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
Clock::advance(flags.allocation_interval);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
// The allocation should be the slave's resources with the offer
// operation applied.
- updated = Resources(slave.resources()).apply(create);
- ASSERT_SOME(updated);
-
- EXPECT_NE(Resources(slave.resources()),
- Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), updated.get()}});
- EXPECT_EQ(updated.get(), Resources::sum(allocation->resources));
+ allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -1203,12 +1250,12 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
allocator->addFramework(
framework.id(), framework, hashmap<SlaveID, Resources>(), true);
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Construct an offer operation for the framework's allocation.
// Create a shared volume.
@@ -1218,7 +1265,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
// Ensure the offer operation can be applied.
Try<Resources> update =
- Resources::sum(allocation->resources).apply(create);
+ allocation->resources.at(slave.id()).apply(create);
ASSERT_SOME(update);
@@ -1226,7 +1273,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
allocator->updateAllocation(
framework.id(),
slave.id(),
- Resources::sum(allocation->resources),
+ allocation->resources.at(slave.id()),
{create});
// Now recover the resources, and expect the next allocation to
@@ -1239,21 +1286,14 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
Clock::advance(flags.allocation_interval);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
-
// The allocation should be the slave's resources with the offer
// operation applied.
- update = Resources(slave.resources()).apply(create);
- ASSERT_SOME(update);
-
- EXPECT_NE(Resources(slave.resources()),
- Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), update.get()}});
- EXPECT_EQ(update.get(), Resources::sum(allocation->resources));
+ allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expected, allocation);
// Construct an offer operation for the framework's allocation to
// destroy the shared volume.
@@ -1263,31 +1303,28 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
allocator->updateAllocation(
framework.id(),
slave.id(),
- Resources::sum(allocation->resources),
+ allocation->resources.at(slave.id()),
{destroy});
// The resources to recover should be equal to the agent's original
// resources now that the shared volume is created and then destroyed.
- ASSERT_SOME_EQ(slave.resources(), update->apply(destroy));
+ update = update->apply(destroy);
+ ASSERT_SOME_EQ(slave.resources(), update);
- // Now recover the amount of `slave.resources()` and expect the
- // next allocation to equal `slave.resources()`.
allocator->recoverResources(
framework.id(),
slave.id(),
- slave.resources(),
+ update.get(),
None());
Clock::advance(flags.allocation_interval);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), update.get()}});
- EXPECT_EQ(Resources(slave.resources()),
- Resources::sum(allocation->resources));
+ allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -1307,12 +1344,12 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
allocator->addFramework(framework1.id(), framework1, {}, true);
// Initially, all the resources are allocated to `framework1`.
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{slave.id(), slave.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Create a shared volume.
Resource volume = createDiskResource(
@@ -1321,7 +1358,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
// Ensure the offer operation can be applied.
Try<Resources> update =
- Resources::sum(allocation->resources).apply(create);
+ allocation->resources.at(slave.id()).apply(create);
ASSERT_SOME(update);
@@ -1329,7 +1366,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
allocator->updateAllocation(
framework1.id(),
slave.id(),
- Resources::sum(allocation->resources),
+ allocation->resources.at(slave.id()),
{create});
// Now recover the resources, and expect the next allocation to
@@ -1344,12 +1381,12 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
// opted in for SHARED_RESOURCES.
Clock::advance(flags.allocation_interval);
+ expected = Allocation(
+ framework1.id(),
+ {{slave.id(), update.get() - volume}});
+
allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_TRUE(allocation->resources.at(slave.id()).shared().empty());
+ AWAIT_EXPECT_EQ(expected, allocation);
// Recover the resources for the offer in the next allocation cycle.
allocator->recoverResources(
@@ -1368,12 +1405,12 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
// has opted in for SHARED_RESOURCES.
Clock::advance(flags.allocation_interval);
+ expected = Allocation(
+ framework2.id(),
+ {{slave.id(), update.get()}});
+
allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(allocation->resources.at(slave.id()).shared(), Resources(volume));
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -1393,29 +1430,24 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
Offer::Operation reserve = RESERVE(dynamicallyReserved);
+ Try<Resources> update = Resources(slave.resources()).apply(reserve);
+ ASSERT_SOME(update);
+ EXPECT_NE(Resources(slave.resources()), update.get());
+
// Update the allocation in the allocator.
- Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve});
- AWAIT_EXPECT_READY(update);
+ AWAIT_READY(allocator->updateAvailable(slave.id(), {reserve}));
// Expect to receive the updated available resources.
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
-
// The allocation should be the slave's resources with the offer
// operation applied.
- Try<Resources> updated = Resources(slave.resources()).apply(reserve);
- ASSERT_SOME(updated);
-
- EXPECT_NE(Resources(slave.resources()),
- Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), update.get()}});
- EXPECT_EQ(updated.get(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -1432,12 +1464,11 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Construct an offer operation for the framework's allocation.
Resources unreserved = Resources::parse("cpus:25;mem:50").get();
@@ -1447,8 +1478,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
Offer::Operation reserve = RESERVE(dynamicallyReserved);
// Update the allocation in the allocator.
- Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve});
- AWAIT_EXPECT_FAILED(update);
+ AWAIT_FAILED(allocator->updateAvailable(slave.id(), {reserve}));
}
@@ -1471,28 +1501,33 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
allocator->addFramework(framework.id(), framework, {}, true);
// Initially, all the resources are allocated.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Update the slave with 10 oversubscribed cpus.
Resources oversubscribed = createRevocableResources("cpus", "10");
allocator->updateSlave(slave.id(), oversubscribed);
// The next allocation should be for 10 oversubscribed resources.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(oversubscribed, Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), oversubscribed}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Update the slave again with 12 oversubscribed cpus.
Resources oversubscribed2 = createRevocableResources("cpus", "12");
allocator->updateSlave(slave.id(), oversubscribed2);
// The next allocation should be for 2 oversubscribed cpus.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(oversubscribed2 - oversubscribed,
- Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), oversubscribed2 - oversubscribed}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Update the slave again with 5 oversubscribed cpus.
Resources oversubscribed3 = createRevocableResources("cpus", "5");
@@ -1501,8 +1536,9 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
// Since there are no more available oversubscribed resources there
// shouldn't be an allocation.
Clock::settle();
- allocation = allocations.get();
- ASSERT_TRUE(allocation.isPending());
+
+ Future<Allocation> allocation = allocations.get();
+ EXPECT_TRUE(allocation.isPending());
}
@@ -1523,9 +1559,11 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
allocator->addFramework(framework.id(), framework, {}, true);
// Initially, all the resources are allocated.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Update the slave with 10 oversubscribed cpus.
Resources oversubscribed = createRevocableResources("cpus", "10");
@@ -1534,8 +1572,9 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
// No allocation should be made for oversubscribed resources because
// the framework has not opted in for them.
Clock::settle();
- allocation = allocations.get();
- ASSERT_TRUE(allocation.isPending());
+
+ Future<Allocation> allocation = allocations.get();
+ EXPECT_TRUE(allocation.isPending());
}
@@ -1558,18 +1597,22 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
allocator->addFramework(framework.id(), framework, {}, true);
// Initially, all the resources are allocated.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Update the slave with 10 oversubscribed cpus.
Resources oversubscribed = createRevocableResources("cpus", "10");
allocator->updateSlave(slave.id(), oversubscribed);
// The next allocation should be for 10 oversubscribed cpus.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(oversubscribed, Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), oversubscribed}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Recover 6 oversubscribed cpus and 2 regular cpus.
Resources recovered = createRevocableResources("cpus", "6");
@@ -1581,9 +1624,11 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
// The next allocation should be for 6 oversubscribed and 2 regular
// cpus.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(recovered, Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{slave.id(), recovered}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -1623,11 +1668,11 @@ TEST_F(HierarchicalAllocatorTest, Whitelist)
Clock::advance(flags.allocation_interval);
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{slave.id(), slave.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -1759,10 +1804,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
// `framework1` will be offered all of `agent1`'s resources because it is
// the only framework in the only role with unsatisfied quota.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{agent1.id(), agent1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=1, mem=512.
// QUOTA_ROLE share = 1 (cpus=1, mem=512) [quota: cpus=2, mem=1024]
@@ -1776,10 +1822,12 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
// `framework1` will again be offered all of `agent2`'s resources
// because it is the only framework in the only role with unsatisfied
// quota. `framework2` has to wait.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework1.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ Future<Allocation> allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expected, allocation);
// Total cluster resources: cpus=2, mem=1024.
// QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024]
@@ -1798,7 +1846,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
allocator->recoverResources(
framework1.id(),
agent2.id(),
- allocation->resources.get(agent2.id()).get(),
+ allocation->resources.at(agent2.id()),
offerFilter);
// Total cluster resources: cpus=2, mem=1024.
@@ -1816,16 +1864,18 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee)
// There should be no allocation due to the offer filter.
allocation = allocations.get();
- ASSERT_TRUE(allocation.isPending());
+ EXPECT_TRUE(allocation.isPending());
// Ensure the offer filter times out (2x the allocation interval)
// and the next batch allocation occurs.
Clock::advance(flags.allocation_interval);
// Previously declined resources should be offered to the quota'ed role.
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework1.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocation);
// Total cluster resources: cpus=2, mem=1024.
// QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024]
@@ -1864,7 +1914,7 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
agent1,
None(),
agent1.resources(),
- {std::make_pair(framework1.id(), agent1.resources())});
+ {{framework1.id(), agent1.resources()}});
SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0");
allocator->addSlave(
@@ -1872,7 +1922,7 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
agent2,
None(),
agent2.resources(),
- {std::make_pair(framework1.id(), agent2.resources())});
+ {{framework1.id(), agent2.resources()}});
// Total cluster resources (2 identical agents): cpus=2, mem=1024.
// QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024]
@@ -1901,10 +1951,11 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota)
// Trigger the next batch allocation.
Clock::advance(flags.allocation_interval);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework2.id(),
+ {{agent1.id(), agent1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=2, mem=1024.
// QUOTA_ROLE share = 0.5 (cpus=1, mem=512)
@@ -1967,10 +2018,11 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
// `framework1a` will be offered all of `agent1`'s resources because
// it is the only framework in the only role with unsatisfied quota.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1a.id(), allocation->frameworkId);
- EXPECT_EQ(agent1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1a.id(),
+ {{agent1.id(), agent1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=1, mem=512.
// QUOTA_ROLE share = 1 (cpus=1, mem=512) [quota: cpus=2, mem=1024]
@@ -1988,10 +2040,11 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
// `framework1b` will be offered all of `agent2`'s resources
// (coarse-grained allocation) because its share is 0 and it belongs
// to a role with unsatisfied quota.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1b.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework1b.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=3, mem=1536.
// QUOTA_ROLE share = 1 (cpus=3, mem=1536) [quota: cpus=4, mem=2048]
@@ -2006,10 +2059,11 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
// `framework1a` will be offered all of `agent3`'s resources because
// its share is less than `framework1b`'s and `QUOTA_ROLE` still
// has unsatisfied quota.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1a.id(), allocation->frameworkId);
- EXPECT_EQ(agent3.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework1a.id(),
+ {{agent3.id(), agent3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=4, mem=2048.
// QUOTA_ROLE share = 1 (cpus=4, mem=2048) [quota: cpus=4, mem=2048]
@@ -2031,10 +2085,12 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota)
// Trigger the next batch allocation.
Clock::advance(flags.allocation_interval);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1b.id(), allocation->frameworkId);
- EXPECT_EQ(agent3.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework1b.id(),
+ {{agent3.id(), agent3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
+
// Total cluster resources: cpus=4, mem=2048.
// QUOTA_ROLE share = 1 (cpus=4, mem=2048) [quota: cpus=4, mem=2048]
@@ -2084,11 +2140,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaAllocationGranularity)
// `framework1` will be offered all of `agent`'s resources because
// it is the only framework in the only role with unsatisfied quota
// and the allocator performs coarse-grained allocation.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
- EXPECT_TRUE(Resources(agent.resources()).contains(quota.info.guarantee()));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=1, mem=512.
// QUOTA_ROLE share = 1 (cpus=1, mem=512) [quota: cpus=0.5, mem=200]
@@ -2150,7 +2206,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DRFWithQuota)
agent1,
None(),
agent1.resources(),
- {std::make_pair(framework1.id(), Resources(quota.info.guarantee()))});
+ {{framework1.id(), Resources(quota.info.guarantee())}});
// Total cluster resources (1 agent): cpus=1, mem=512.
// QUOTA_ROLE share = 0.25 (cpus=0.25, mem=128) [quota: cpus=0.25, mem=128]
@@ -2164,11 +2220,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DRFWithQuota)
// `framework2` will be offered all of `agent1`'s resources because its
// share is 0.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent1.resources() - Resources(quota.info.guarantee()),
- Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework2.id(),
+ {{agent1.id(), Resources(agent1.resources()) - quota.info.guarantee()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
metrics = Metrics();
@@ -2205,10 +2261,12 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DRFWithQuota)
// `framework2` will be offered all of `agent2`'s resources (coarse-grained
// allocation). `framework1` does not receive them even though it has a
// smaller allocation, since we have already satisfied its role's quota.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+
+ expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -2246,12 +2304,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
agent1,
None(),
agent1.resources(),
- {std::make_pair(framework1.id(), agent1.resources())});
-
- // Process all triggered allocation events.
- //
- // NOTE: No allocations happen because all resources are already allocated.
- Clock::settle();
+ {{framework1.id(), agent1.resources()}});
// Total cluster resources (1 agent): cpus=1, mem=512.
// QUOTA_ROLE share = 1 (cpus=1, mem=512)
@@ -2265,10 +2318,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
// Free cluster resources on `agent2` will be allocated to `framework2`
// because its share is 0.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources (2 identical agents): cpus=2, mem=1024.
// QUOTA_ROLE share = 0.5 (cpus=1, mem=512)
@@ -2296,10 +2350,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
// Trigger the next batch allocation.
Clock::advance(flags.allocation_interval);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// `framework2` continues declining offers.
allocator->recoverResources(
@@ -2315,10 +2370,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation)
// Since `QUOTA_ROLE` is under quota, `agent2`'s resources will
// be allocated to `framework1`.
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework1.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources: cpus=2, mem=1024.
// QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024]
@@ -2385,10 +2441,12 @@ TEST_F(HierarchicalAllocatorTest, QuotaAbsentFramework)
// `agent2`, `framework` is not allocated anything. However, we
// can't easily test for the absence of an allocation from the
// framework side, so we make due with this instead.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources));
+
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent2.id(), agent2.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources (2 agents): cpus=3, mem=1536.
// QUOTA_ROLE share = 0 [quota: cpus=2, mem=1024], but
@@ -2435,10 +2493,11 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaAbsentFrameworks)
// Due to the coarse-grained nature of the allocations, `framework` will
// get all `agent`'s resources.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -2486,7 +2545,7 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks)
agent1,
None(),
agent1.resources(),
- {std::make_pair(framework1.id(), agent1.resources())});
+ {{framework1.id(), agent1.resources()}});
SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:1024;disk:0");
allocator->addSlave(
@@ -2494,8 +2553,10 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks)
agent2,
None(),
agent2.resources(),
- {std::make_pair(framework2.id(), agent2.resources())});
+ {{framework2.id(), agent2.resources()}});
+ // TODO(bmahler): Add assertions to test this is accurate!
+ //
// Total cluster resources (2 identical agents): cpus=2, mem=2048.
// QUOTA_ROLE1 share = 0.5 (cpus=1, mem=1024) [quota: cpus=1, mem=200]
// framework1 share = 1
@@ -2511,10 +2572,11 @@ TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks)
// `framework2` will get all agent3's resources because its role is under
// quota, while other roles' quotas are satisfied.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(agent3.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework2.id(),
+ {{agent3.id(), agent3.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Total cluster resources (3 agents): cpus=4, mem=4096.
// QUOTA_ROLE1 share = 0.25 (cpus=1, mem=1024) [quota: cpus=1, mem=200]
@@ -2561,19 +2623,19 @@ TEST_F(HierarchicalAllocatorTest, ReservationWithinQuota)
agent1,
None(),
agent1.resources(),
- {std::make_pair(
+ {{
framework1.id(),
// The `mem` portion is used to test that reserved resources are
// accounted for, and the `cpus` portion is allocated to show that
// the result of DRF would be different if `mem` was not accounted.
- Resources::parse("cpus:2;mem(" + QUOTA_ROLE + "):256").get())});
+ Resources::parse("cpus:2;mem(" + QUOTA_ROLE + "):256").get()
+ }});
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
+ Allocation expected = Allocation(
+ framework2.id(),
+ {{agent1.id(), Resources::parse("cpus:6").get()}});
- EXPECT_EQ(Resources::parse("cpus:6").get(),
- Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Since the reserved resources account towards the quota as well as being
// accounted for DRF, we expect these resources to also be allocated to
@@ -2581,12 +2643,11 @@ TEST_F(HierarchicalAllocatorTest, ReservationWithinQuota)
SlaveInfo agent2 = createSlaveInfo("cpus:4");
allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {});
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
+ expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), agent2.resources()}});
- EXPECT_EQ(Resources::parse("cpus:4").get(),
- Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -2634,10 +2695,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
// `framework1` will be offered resources at `agent1` because the
// resources at `agent2` are reserved for a different role.
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(agent1.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{agent1.id(), agent1.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// `framework1` declines the resources on `agent1` for the duration
// of the test.
@@ -2655,7 +2717,7 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
Clock::advance(flags.allocation_interval);
Clock::settle();
- allocation = allocations.get();
+ Future<Allocation> allocation = allocations.get();
EXPECT_TRUE(allocation.isPending());
// Create `framework2` in a non-quota'ed role.
@@ -2664,9 +2726,11 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources)
// `framework2` will be offered the reserved resources at `agent2`
// because those resources are reserved for its role.
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(dynamicallyReserved, Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework2.id(),
+ {{agent2.id(), dynamicallyReserved}});
+
+ AWAIT_EXPECT_EQ(expected, allocation);
// `framework2` declines the resources on `agent2` for the duration
// of the test.
@@ -2712,10 +2776,11 @@ TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework)
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
allocator->recoverResources(
framework.id(),
@@ -2734,7 +2799,7 @@ TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework)
// operations to be processed.
Clock::settle();
- allocation = allocations.get();
+ Future<Allocation> allocation = allocations.get();
EXPECT_TRUE(allocation.isPending());
// Reconnect the framework again.
@@ -2742,9 +2807,11 @@ TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework)
// Framework will be offered all of agent's resources again
// after getting activated.
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -2767,10 +2834,11 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
// Here the revival is totally unnecessary but we should tolerate the
// framework's redundant REVIVE calls.
@@ -2780,7 +2848,7 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
Clock::settle();
// Nothing is allocated because of no additional resources.
- allocation = allocations.get();
+ Future<Allocation> allocation = allocations.get();
EXPECT_TRUE(allocation.isPending());
allocator->recoverResources(
@@ -2803,9 +2871,11 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
// Framework will be offered all of agent's resources again after
// reviving offers.
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -3056,15 +3126,17 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
FrameworkInfo framework1 = createFrameworkInfo("roleA");
allocator->addFramework(framework1.id(), framework1, {}, true);
- Future<Allocation> allocation = allocations.get();
+ Allocation expectedAllocation = Allocation(
+ framework1.id(),
+ {{agent.id(), agent.resources()}});
- AWAIT_READY(allocation);
- ASSERT_EQ(framework1.id(), allocation->frameworkId);
+ Future<Allocation> allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expectedAllocation, allocation);
allocator->recoverResources(
allocation->frameworkId,
agent.id(),
- allocation->resources.get(agent.id()).get(),
+ allocation->resources.at(agent.id()),
offerFilter);
JSON::Object expected;
@@ -3079,15 +3151,17 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
FrameworkInfo framework2 = createFrameworkInfo("roleB");
allocator->addFramework(framework2.id(), framework2, {}, true);
- allocation = allocations.get();
+ expectedAllocation = Allocation(
+ framework2.id(),
+ {{agent.id(), agent.resources()}});
- AWAIT_READY(allocation);
- ASSERT_EQ(framework2.id(), allocation->frameworkId);
+ allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expectedAllocation, allocation);
allocator->recoverResources(
allocation->frameworkId,
agent.id(),
- allocation->resources.get(agent.id()).get(),
+ allocation->resources.at(agent.id()),
offerFilter);
expected.values = {
@@ -3102,15 +3176,17 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
FrameworkInfo framework3 = createFrameworkInfo("roleA");
allocator->addFramework(framework3.id(), framework3, {}, true);
- allocation = allocations.get();
+ expectedAllocation = Allocation(
+ framework3.id(),
+ {{agent.id(), agent.resources()}});
- AWAIT_READY(allocation);
- ASSERT_EQ(framework3.id(), allocation->frameworkId);
+ allocation = allocations.get();
+ AWAIT_EXPECT_EQ(expectedAllocation, allocation);
allocator->recoverResources(
allocation->frameworkId,
agent.id(),
- allocation->resources.get(agent.id()).get(),
+ allocation->resources.at(agent.id()),
offerFilter);
expected.values = {
@@ -3159,7 +3235,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DominantShareMetrics)
allocator->recoverResources(
allocation->frameworkId,
agent1.id(),
- allocation->resources.get(agent1.id()).get(),
+ allocation->resources.at(agent1.id()),
None());
Clock::settle();
@@ -3292,24 +3368,25 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
initialize();
- // Define some constants to make the code read easily.
- const string SINGLE_RESOURCE = "cpus:2;mem:1024";
- const string DOUBLE_RESOURCES = "cpus:4;mem:2048";
- const string TRIPLE_RESOURCES = "cpus:6;mem:3072";
- const string FOURFOLD_RESOURCES = "cpus:8;mem:4096";
- const string TOTAL_RESOURCES = "cpus:12;mem:6144";
+ const Resources SINGLE_RESOURCES = Resources::parse("cpus:2;mem:1024").get();
+ const Resources DOUBLE_RESOURCES = SINGLE_RESOURCES + SINGLE_RESOURCES;
+ const Resources TRIPLE_RESOURCES = DOUBLE_RESOURCES + SINGLE_RESOURCES;
+ const Resources FOURFOLD_RESOURCES = DOUBLE_RESOURCES + DOUBLE_RESOURCES;
+
+ // There will be 6 agents.
+ const Resources TOTAL_RESOURCES = FOURFOLD_RESOURCES + DOUBLE_RESOURCES;
auto awaitAllocationsAndRecoverResources = [this](
- Resources& totalAllocatedResources,
- hashmap<FrameworkID, Allocation>& frameworkAllocations,
+ Resources* totalAllocatedResources,
+ hashmap<FrameworkID, Allocation>* frameworkAllocations,
int allocationsCount,
bool recoverResources) {
for (int i = 0; i < allocationsCount; i++) {
Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
- frameworkAllocations[allocation->frameworkId] = allocation.get();
- totalAllocatedResources += Resources::sum(allocation->resources);
+ (*frameworkAllocations)[allocation->frameworkId] = allocation.get();
+ (*totalAllocatedResources) += Resources::sum(allocation->resources);
if (recoverResources) {
// Recover the allocated resources so they can be offered
@@ -3330,16 +3407,15 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
// Register six agents with the same resources (cpus:2;mem:1024).
vector<SlaveInfo> agents;
for (size_t i = 0; i < 6; i++) {
- SlaveInfo agent = createSlaveInfo(SINGLE_RESOURCE);
+ SlaveInfo agent = createSlaveInfo(SINGLE_RESOURCES);
agents.push_back(agent);
allocator->addSlave(agent.id(), agent, None(), agent.resources(), {});
}
- // Total cluster resources (6 agents): cpus=12, mem=6144.
-
- // Framework1 registers with 'role1' which uses the default weight (1.0),
- // and all resources will be offered to this framework since it is the only
- // framework running so far.
+ // Add two frameworks with the same weight, both should receive
+ // the same amount of resources once the agents are added. However,
+ // since framework1 is added first, it will receive all of the
+ // resources, so we recover them once both frameworks are added.
FrameworkInfo framework1 = createFrameworkInfo("role1");
allocator->addFramework(framework1.id(), framework1, {}, true);
@@ -3352,35 +3428,15 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
// allocation.
Clock::settle();
- // Framework2 registers with 'role2' which also uses the default weight.
- // It will not get any offers due to all resources having outstanding offers
- // to framework1 when it registered.
FrameworkInfo framework2 = createFrameworkInfo("role2");
allocator->addFramework(framework2.id(), framework2, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
-
- // role1 share = 1 (cpus=12, mem=6144)
- // framework1 share = 1
- // role2 share = 0
- // framework2 share = 0
-
- ASSERT_EQ(framework1.id(), allocation->frameworkId);
- ASSERT_EQ(6u, allocation->resources.size());
- EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(),
- Resources::sum(allocation->resources));
-
- // Recover all resources so they can be offered again next time.
- foreachpair (const SlaveID& slaveId,
- const Resources& resources,
- allocation->resources) {
- allocator->recoverResources(
- allocation->frameworkId,
- slaveId,
- resources,
- None());
- }
+ // Recover the allocation to framework1 so that the allocator
+ // can offer to both frameworks.
+ hashmap<FrameworkID, Allocation> frameworkAllocations;
+ Resources totalAllocatedResources;
+ awaitAllocationsAndRecoverResources(
+ &totalAllocatedResources, &frameworkAllocations, 1, true);
// Tests whether `framework1` and `framework2` each get half of the resources
// when their roles' weights are 1:1.
@@ -3397,22 +3453,24 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
// since each framework's role has a weight of 1.0 by default.
hashmap<FrameworkID, Allocation> frameworkAllocations;
Resources totalAllocatedResources;
- awaitAllocationsAndRecoverResources(totalAllocatedResources,
- frameworkAllocations, 2, true);
+ awaitAllocationsAndRecoverResources(
+ &totalAllocatedResources, &frameworkAllocations, 2, true);
+
+ // Both frameworks should get one allocation with three agents.
+ ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
+ ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
- // Framework1 should get one allocation with three agents.
- ASSERT_EQ(3u, frameworkAllocations[framework1.id()].resources.size());
- EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
- Resources::sum(frameworkAllocations[framework1.id()].resources));
- // Framework2 should also get one allocation with three agents.
- ASSERT_EQ(3u, frameworkAllocations[framework2.id()].resources.size());
- EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
- Resources::sum(frameworkAllocations[framework2.id()].resources));
+ Allocation allocation1 = frameworkAllocations.at(framework1.id());
+ Allocation allocation2 = frameworkAllocations.at(framework2.id());
+
+ EXPECT_EQ(TRIPLE_RESOURCES, Resources::sum(allocation1.resources));
+ EXPECT_EQ(TRIPLE_RESOURCES, Resources::sum(allocation2.resources));
// Check to ensure that these two allocations sum to the total resources;
// this check can ensure there are only two allocations in this case.
- EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources);
+ EXPECT_EQ(TOTAL_RESOURCES,
+ totalAllocatedResources.createStrippedScalarQuantity());
}
// Tests whether `framework1` gets 1/3 of the resources and `framework2` gets
@@ -3435,22 +3493,22 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
// resources are offered with a ratio of 1:2 between both frameworks.
hashmap<FrameworkID, Allocation> frameworkAllocations;
Resources totalAllocatedResources;
- awaitAllocationsAndRecoverResources(totalAllocatedResources,
- frameworkAllocations, 2, true);
+ awaitAllocationsAndRecoverResources(
+ &totalAllocatedResources, &frameworkAllocations, 2, true);
+
+ ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
+ ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
- // Framework1 should get one allocation with two agents.
- ASSERT_EQ(2u, frameworkAllocations[framework1.id()].resources.size());
- EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(),
- Resources::sum(frameworkAllocations[framework1.id()].resources));
+ Allocation allocation1 = frameworkAllocations.at(framework1.id());
+ Allocation allocation2 = frameworkAllocations.at(framework2.id());
- // Framework2 should get one allocation with four agents.
- ASSERT_EQ(4u, frameworkAllocations[framework2.id()].resources.size());
- EXPECT_EQ(Resources::parse(FOURFOLD_RESOURCES).get(),
- Resources::sum(frameworkAllocations[framework2.id()].resources));
+ EXPECT_EQ(DOUBLE_RESOURCES, Resources::sum(allocation1.resources));
+ EXPECT_EQ(FOURFOLD_RESOURCES, Resources::sum(allocation2.resources));
// Check to ensure that these two allocations sum to the total resources;
// this check can ensure there are only two allocations in this case.
- EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources);
+ EXPECT_EQ(TOTAL_RESOURCES,
+ totalAllocatedResources.createStrippedScalarQuantity());
}
// Tests whether `framework1` gets 1/6 of the resources, `framework2` gets
@@ -3484,27 +3542,27 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
// will get the proper resource ratio of 1:2:3.
hashmap<FrameworkID, Allocation> frameworkAllocations;
Resources totalAllocatedResources;
- awaitAllocationsAndRecoverResources(totalAllocatedResources,
- frameworkAllocations, 3, false);
+ awaitAllocationsAndRecoverResources(
+ &totalAllocatedResources, &frameworkAllocations, 3, false);
+
+ // Both frameworks should get one allocation with three agents.
+ ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
+ ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
+ ASSERT_TRUE(frameworkAllocations.contains(framework3.id()));
- // Framework1 should get one allocation with one agent.
- ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size());
- EXPECT_EQ(Resources::parse(SINGLE_RESOURCE).get(),
- Resources::sum(frameworkAllocations[framework1.id()].resources));
- // Framework2 should get one allocation with two agents.
- ASSERT_EQ(2u, frameworkAllocations[framework2.id()].resources.size());
- EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(),
- Resources::sum(frameworkAllocations[framework2.id()].resources));
+ Allocation allocation1 = frameworkAllocations.at(framework1.id());
+ Allocation allocation2 = frameworkAllocations.at(framework2.id());
+ Allocation allocation3 = frameworkAllocations.at(framework3.id());
- // Framework3 should get one allocation with three agents.
- ASSERT_EQ(3u, frameworkAllocations[framework3.id()].resources.size());
- EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
- Resources::sum(frameworkAllocations[framework3.id()].resources));
+ EXPECT_EQ(SINGLE_RESOURCES, Resources::sum(allocation1.resources));
+ EXPECT_EQ(DOUBLE_RESOURCES, Resources::sum(allocation2.resources));
+ EXPECT_EQ(TRIPLE_RESOURCES, Resources::sum(allocation3.resources));
- // Check to ensure that these three allocations sum to the total resources;
+ // Check to ensure that these two allocations sum to the total resources;
// this check can ensure there are only three allocations in this case.
- EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources);
+ EXPECT_EQ(TOTAL_RESOURCES,
+ totalAllocatedResources.createStrippedScalarQuantity());
}
}
@@ -3530,10 +3588,11 @@ TEST_F(HierarchicalAllocatorTest, ReviveOffers)
FrameworkInfo framework = createFrameworkInfo("role1");
allocator->addFramework(framework.id(), framework, {}, true);
- Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ Allocation expected = Allocation(
+ framework.id(),
+ {{agent.id(), agent.resources()}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
Filters filter1000s;
filter1000s.set_refuse_seconds(1000.);
@@ -3547,16 +3606,14 @@ TEST_F(HierarchicalAllocatorTest, ReviveOffers)
Clock::advance(flags.allocation_interval);
Clock::settle();
- allocation = allocations.get();
+ Future<Allocation> allocation = allocations.get();
EXPECT_TRUE(allocation.isPending());
allocator->reviveOffers(framework.id());
// Framework will be offered all of agent's resources again
// after reviving offers.
- AWAIT_READY(allocation);
- EXPECT_EQ(framework.id(), allocation->frameworkId);
- EXPECT_EQ(agent.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
}
@@ -3605,12 +3662,12 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
allocator->addSlave(slave.id(), slave, None(), slave.resources(), {});
// Initially, all the resources are allocated to `framework1`.
+ Allocation expected = Allocation(
+ framework1.id(),
+ {{slave.id(), slave.resources()}});
+
Future<Allocation> allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework1.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(slave.resources(), Resources::sum(allocation->resources));
+ AWAIT_EXPECT_EQ(expected, allocation);
// Create a shared volume.
Resource volume = createDiskResource(
@@ -3626,7 +3683,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
// Ensure the CREATE operation can be applied.
Try<Resources> updated =
- Resources::sum(allocation->resources).apply(create);
+ allocation->resources.at(slave.id()).apply(create);
ASSERT_SOME(updated);
@@ -3635,7 +3692,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
allocator->updateAllocation(
framework1.id(),
slave.id(),
- Resources::sum(allocation->resources),
+ allocation->resources.at(slave.id()),
{create, launch});
// Now recover the resources, and expect the next allocation to contain
@@ -3650,12 +3707,11 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
// The offer to 'framework2` should contain the shared volume.
Clock::advance(flags.allocation_interval);
- allocation = allocations.get();
- AWAIT_READY(allocation);
- EXPECT_EQ(framework2.id(), allocation->frameworkId);
- EXPECT_EQ(1u, allocation->resources.size());
- EXPECT_TRUE(allocation->resources.contains(slave.id()));
- EXPECT_EQ(allocation->resources.at(slave.id()).shared(), Resources(volume));
+ expected = Allocation(
+ framework2.id(),
+ {{slave.id(), updated.get() - task.resources() + volume}});
+
+ AWAIT_EXPECT_EQ(expected, allocations.get());
}
@@ -4100,26 +4156,21 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers)
// Pause the clock because we want to manually drive the allocations.
Clock::pause();
- struct Allocation
+ struct OfferedResources
{
FrameworkID frameworkId;
SlaveID slaveId;
Resources resources;
};
- vector<Allocation> allocations;
+ vector<OfferedResources> offers;
- auto offerCallback = [&allocations](
+ auto offerCallback = [&offers](
const FrameworkID& frameworkId,
const hashmap<SlaveID, Resources>& resources)
{
foreachpair (const SlaveID& slaveId, const Resources& r, resources) {
- Allocation allocation;
- allocation.frameworkId = frameworkId;
- allocation.slaveId = slaveId;
- allocation.resources = r;
-
- allocations.push_back(std::move(allocation));
+ offers.push_back(OfferedResources{frameworkId, slaveId, r});
}
};
@@ -4197,17 +4248,17 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers)
for (size_t i = 0; i < allocationsCount; ++i) {
// Recover resources with no filters because we want to test the
// effect of suppression alone.
- foreach (const Allocation& allocation, allocations) {
+ foreach (const OfferedResources& offer, offers) {
allocator->recoverResources(
- allocation.frameworkId,
- allocation.slaveId,
- allocation.resources,
+ offer.frameworkId,
+ offer.slaveId,
+ offer.resources,
None());
}
// Wait for all declined offers to be processed.
Clock::settle();
- allocations.clear();
+ offers.clear();
// Suppress another batch of frameworks. For simplicity and readability
// we loop on allocationsCount. The implication here is that there can be
@@ -4231,7 +4282,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers)
watch.stop();
cout << "allocate() took " << watch.elapsed()
- << " to make " << allocations.size() << " offers with "
+ << " to make " << offers.size() << " offers with "
<< suppressCount << " out of "
<< frameworkCount << " frameworks suppressing offers"
<< endl;