You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/05/14 01:02:56 UTC
mesos git commit: Updated sorter to take resources by SlaveID.
Repository: mesos
Updated Branches:
refs/heads/master 8e6df2a28 -> 2c6d487a8
Updated sorter to take resources by SlaveID.
Review: https://reviews.apache.org/r/31667
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c6d487a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c6d487a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c6d487a
Branch: refs/heads/master
Commit: 2c6d487a87cc192bc5a4b6355e76ee61c47ff4e9
Parents: 8e6df2a
Author: Michael Park <mc...@gmail.com>
Authored: Wed May 13 11:53:29 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Wed May 13 15:52:29 2015 -0700
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.hpp | 53 ++++----
src/master/allocator/sorter/drf/sorter.cpp | 54 +++++---
src/master/allocator/sorter/drf/sorter.hpp | 13 +-
src/master/allocator/sorter/sorter.hpp | 16 +--
src/tests/sorter_tests.cpp | 157 ++++++++++++++++++-----
5 files changed, 207 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 09adced..c22f044 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -296,7 +296,7 @@ void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
- const hashmap<SlaveID, Resources>& used_)
+ const hashmap<SlaveID, Resources>& used)
{
CHECK(initialized);
@@ -311,13 +311,11 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
// framework's role.
// Update the allocation to this framework.
- // TODO(mpark): Once the sorter API is updated to operate on
- // 'hashmap<SlaveID, Resources>' rather than 'Resources', update
- // the sorters for each slave instead.
- Resources used = Resources::sum(used_);
- roleSorter->allocated(role, used.unreserved());
- frameworkSorters[role]->add(used);
- frameworkSorters[role]->allocated(frameworkId.value(), used);
+ foreachpair (const SlaveID& slaveId, const Resources& allocated, used) {
+ roleSorter->allocated(role, slaveId, allocated.unreserved());
+ frameworkSorters[role]->add(slaveId, allocated);
+ frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated);
+ }
frameworks[frameworkId] = Framework();
frameworks[frameworkId].role = frameworkInfo.role();
@@ -342,11 +340,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
// Might not be in 'frameworkSorters[role]' because it was previously
// deactivated and never re-added.
if (frameworkSorters[role]->contains(frameworkId.value())) {
- Resources allocation =
+ hashmap<SlaveID, Resources> allocation =
frameworkSorters[role]->allocation(frameworkId.value());
- roleSorter->unallocated(role, allocation.unreserved());
- frameworkSorters[role]->remove(allocation);
+ foreachpair (
+ const SlaveID& slaveId, const Resources& allocated, allocation) {
+ roleSorter->unallocated(role, slaveId, allocated.unreserved());
+ frameworkSorters[role]->remove(slaveId, allocated);
+ }
+
frameworkSorters[role]->remove(frameworkId.value());
}
@@ -417,7 +419,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
CHECK(initialized);
CHECK(!slaves.contains(slaveId));
- roleSorter->add(total.unreserved());
+ roleSorter->add(slaveId, total.unreserved());
foreachpair (const FrameworkID& frameworkId,
const Resources& allocated,
@@ -428,9 +430,10 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
// TODO(bmahler): Validate that the reserved resources have the
// framework's role.
- roleSorter->allocated(role, allocated.unreserved());
- frameworkSorters[role]->add(allocated);
- frameworkSorters[role]->allocated(frameworkId.value(), allocated);
+ roleSorter->allocated(role, slaveId, allocated.unreserved());
+ frameworkSorters[role]->add(slaveId, allocated);
+ frameworkSorters[role]->allocated(
+ frameworkId.value(), slaveId, allocated);
}
}
@@ -463,7 +466,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave(
// all the resources. Fixing this would require more information
// than what we currently track in the allocator.
- roleSorter->remove(slaves[slaveId].total.unreserved());
+ roleSorter->remove(slaveId, slaves[slaveId].total.unreserved());
slaves.erase(slaveId);
@@ -560,7 +563,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
FrameworkSorter* frameworkSorter =
frameworkSorters[frameworks[frameworkId].role];
- Resources allocation = frameworkSorter->allocation(frameworkId.value());
+ Resources allocation =
+ frameworkSorter->allocation(frameworkId.value())[slaveId];
// Update the allocated resources.
Try<Resources> updatedAllocation = allocation.apply(operations);
@@ -568,11 +572,13 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
frameworkSorter->update(
frameworkId.value(),
+ slaveId,
allocation,
updatedAllocation.get());
roleSorter->update(
frameworks[frameworkId].role,
+ slaveId,
allocation.unreserved(),
updatedAllocation.get().unreserved());
@@ -622,9 +628,10 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
CHECK(frameworkSorters.contains(role));
if (frameworkSorters[role]->contains(frameworkId.value())) {
- frameworkSorters[role]->unallocated(frameworkId.value(), resources);
- frameworkSorters[role]->remove(resources);
- roleSorter->unallocated(role, resources.unreserved());
+ frameworkSorters[role]->unallocated(
+ frameworkId.value(), slaveId, resources);
+ frameworkSorters[role]->remove(slaveId, resources);
+ roleSorter->unallocated(role, slaveId, resources.unreserved());
}
}
@@ -812,9 +819,9 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
// Reserved resources are only accounted for in the framework
// sorter, since the reserved resources are not shared across
// roles.
- frameworkSorters[role]->add(resources);
- frameworkSorters[role]->allocated(frameworkId_, resources);
- roleSorter->allocated(role, resources.unreserved());
+ frameworkSorters[role]->add(slaveId, resources);
+ frameworkSorters[role]->allocated(frameworkId_, slaveId, resources);
+ roleSorter->allocated(role, slaveId, resources.unreserved());
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 2f69f38..f53a934 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -47,7 +47,7 @@ void DRFSorter::add(const string& name, double weight)
Client client(name, 0, 0);
clients.insert(client);
- allocations[name] = Resources();
+ allocations[name] = hashmap<SlaveID, Resources>();
weights[name] = weight;
}
@@ -90,6 +90,7 @@ void DRFSorter::deactivate(const string& name)
void DRFSorter::allocated(
const string& name,
+ const SlaveID& slaveId,
const Resources& resources)
{
set<Client, DRFComparator>::iterator it = find(name);
@@ -106,7 +107,7 @@ void DRFSorter::allocated(
clients.insert(client);
}
- allocations[name] += resources;
+ allocations[name][slaveId] += resources;
// If the total resources have changed, we're going to
// recalculate all the shares, so don't bother just
@@ -119,6 +120,7 @@ void DRFSorter::allocated(
void DRFSorter::update(
const string& name,
+ const SlaveID& slaveId,
const Resources& oldAllocation,
const Resources& newAllocation)
{
@@ -129,23 +131,26 @@ void DRFSorter::update(
// Otherwise, we need to ensure we re-calculate the shares, as
// is being currently done, for safety.
- CHECK(resources.contains(oldAllocation));
+ CHECK(resources[slaveId].contains(oldAllocation));
- resources -= oldAllocation;
- resources += newAllocation;
+ resources[slaveId] -= oldAllocation;
+ resources[slaveId] += newAllocation;
- CHECK(allocations[name].contains(oldAllocation));
+ CHECK(allocations[name][slaveId].contains(oldAllocation));
- allocations[name] -= oldAllocation;
- allocations[name] += newAllocation;
+ allocations[name][slaveId] -= oldAllocation;
+ if (allocations[name][slaveId].empty()) {
+ allocations[name].erase(slaveId);
+ }
+
+ allocations[name][slaveId] += newAllocation;
// Just assume the total has changed, per the TODO above.
dirty = true;
}
-Resources DRFSorter::allocation(
- const string& name)
+hashmap<SlaveID, Resources> DRFSorter::allocation(const string& name)
{
return allocations[name];
}
@@ -153,9 +158,13 @@ Resources DRFSorter::allocation(
void DRFSorter::unallocated(
const string& name,
+ const SlaveID& slaveId,
const Resources& resources)
{
- allocations[name] -= resources;
+ allocations[name][slaveId] -= resources;
+ if (allocations[name][slaveId].empty()) {
+ allocations[name].erase(slaveId);
+ }
if (!dirty) {
update(name);
@@ -163,9 +172,9 @@ void DRFSorter::unallocated(
}
-void DRFSorter::add(const Resources& _resources)
+void DRFSorter::add(const SlaveID& slaveId, const Resources& _resources)
{
- resources += _resources;
+ resources[slaveId] += _resources;
// We have to recalculate all shares when the total resources
// change, but we put it off until sort is called
@@ -175,9 +184,13 @@ void DRFSorter::add(const Resources& _resources)
}
-void DRFSorter::remove(const Resources& _resources)
+void DRFSorter::remove(const SlaveID& slaveId, const Resources& _resources)
{
- resources -= _resources;
+ resources[slaveId] -= _resources;
+ if (resources[slaveId].empty()) {
+ resources.erase(slaveId);
+ }
+
dirty = true;
}
@@ -248,22 +261,27 @@ double DRFSorter::calculateShare(const string& name)
// currently does not take into account resources that are not
// scalars.
+ // NOTE: Summation is incorrect for non-scalars, but since we
+ // only care about scalar resources, this is safe.
+ Resources totalResources = Resources::sum(resources);
+ Resources clientAllocation = Resources::sum(allocations[name]);
+
// Scalar resources may be spread across multiple 'Resource'
// objects. E.g. persistent volumes. So we first collect the names
// of the scalar resources, before computing the totals.
hashset<string> scalars;
- foreach (const Resource& resource, resources) {
+ foreach (const Resource& resource, totalResources) {
if (resource.type() == Value::SCALAR) {
scalars.insert(resource.name());
}
}
foreach (const string& scalar, scalars) {
- Option<Value::Scalar> total = resources.get<Value::Scalar>(scalar);
+ Option<Value::Scalar> total = totalResources.get<Value::Scalar>(scalar);
if (total.isSome() && total.get().value() > 0) {
Option<Value::Scalar> allocation =
- allocations[name].get<Value::Scalar>(scalar);
+ clientAllocation.get<Value::Scalar>(scalar);
if (allocation.isNone()) {
allocation = Value::Scalar();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 4366710..fd00c8c 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -74,20 +74,23 @@ public:
virtual void deactivate(const std::string& name);
virtual void allocated(const std::string& name,
+ const SlaveID& slaveId,
const Resources& resources);
virtual void update(const std::string& name,
+ const SlaveID& slaveId,
const Resources& oldAllocation,
const Resources& newAllocation);
virtual void unallocated(const std::string& name,
+ const SlaveID& slaveId,
const Resources& resources);
- virtual Resources allocation(const std::string& name);
+ virtual hashmap<SlaveID, Resources> allocation(const std::string& name);
- virtual void add(const Resources& resources);
+ virtual void add(const SlaveID& slaveId, const Resources& resources);
- virtual void remove(const Resources& resources);
+ virtual void remove(const SlaveID& slaveId, const Resources& resources);
virtual std::list<std::string> sort();
@@ -114,13 +117,13 @@ private:
std::set<Client, DRFComparator> clients;
// Maps client names to the resources they have been allocated.
- hashmap<std::string, Resources> allocations;
+ hashmap<std::string, hashmap<SlaveID, Resources>> allocations;
// Maps client names to the weights that should be applied to their shares.
hashmap<std::string, double> weights;
// Total resources.
- Resources resources;
+ hashmap<SlaveID, Resources> resources;
};
} // namespace allocator {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index e2efb27..f06b946 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -23,6 +23,7 @@
#include <string>
#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
namespace mesos {
namespace internal {
@@ -33,12 +34,6 @@ namespace allocator {
// order in which users or frameworks should receive
// resource allocations.
//
-// TODO(bmahler): The total and allocated resources are currently
-// aggregated across slaves, which only works for scalar resources.
-// Also, persistent disks are a bit tricky because there will be
-// duplicated persistence IDs within the resources. Consider storing
-// maps keyed off of the slave ID to fix these issues.
-//
// TODO(bmahler): Templatize this on Client, so that callers can
// don't need to do string conversion, e.g. FrameworkID, string role,
// etc.
@@ -62,6 +57,7 @@ public:
// Specify that resources have been allocated to the given client.
virtual void allocated(const std::string& client,
+ const SlaveID& slaveId,
const Resources& resources) = 0;
// Updates a portion of the allocation for the client, in order to
@@ -69,22 +65,24 @@ public:
// This means that the new allocation must not affect the static
// roles, or the overall quantities of resources!
virtual void update(const std::string& client,
+ const SlaveID& slaveId,
const Resources& oldAllocation,
const Resources& newAllocation) = 0;
// Specify that resources have been unallocated from the given client.
virtual void unallocated(const std::string& client,
+ const SlaveID& slaveId,
const Resources& resources) = 0;
// Returns the resources that have been allocated to this client.
- virtual Resources allocation(const std::string& client) = 0;
+ virtual hashmap<SlaveID, Resources> allocation(const std::string& client) = 0;
// Add resources to the total pool of resources this
// Sorter should consider.
- virtual void add(const Resources& resources) = 0;
+ virtual void add(const SlaveID& slaveId, const Resources& resources) = 0;
// Remove resources from the total pool.
- virtual void remove(const Resources& resources) = 0;
+ virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0;
// Returns a list of all clients, in the order that they
// should be allocated to, according to this Sorter's policy.
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c6d487a/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index 4244235..6b0389b 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -30,6 +30,8 @@
#include "master/allocator/sorter/drf/sorter.hpp"
+#include "tests/mesos.hpp"
+
using mesos::internal::master::allocator::DRFSorter;
using std::list;
@@ -44,59 +46,62 @@ TEST(SorterTest, DRFSorter)
{
DRFSorter sorter;
+ SlaveID slaveId;
+ slaveId.set_value("slaveId");
+
Resources totalResources = Resources::parse("cpus:100;mem:100").get();
- sorter.add(totalResources);
+ sorter.add(slaveId, totalResources);
sorter.add("a");
Resources aResources = Resources::parse("cpus:5;mem:5").get();
- sorter.allocated("a", aResources);
+ sorter.allocated("a", slaveId, aResources);
Resources bResources = Resources::parse("cpus:6;mem:6").get();
sorter.add("b");
- sorter.allocated("b", bResources);
+ sorter.allocated("b", slaveId, bResources);
// shares: a = .05, b = .06
EXPECT_EQ(list<string>({"a", "b"}), sorter.sort());
Resources cResources = Resources::parse("cpus:1;mem:1").get();
sorter.add("c");
- sorter.allocated("c", cResources);
+ sorter.allocated("c", slaveId, cResources);
Resources dResources = Resources::parse("cpus:3;mem:1").get();
sorter.add("d");
- sorter.allocated("d", dResources);
+ sorter.allocated("d", slaveId, dResources);
// shares: a = .05, b = .06, c = .01, d = .03
EXPECT_EQ(list<string>({"c", "d", "a", "b"}), sorter.sort());
sorter.remove("a");
Resources bUnallocated = Resources::parse("cpus:4;mem:4").get();
- sorter.unallocated("b", bUnallocated);
+ sorter.unallocated("b", slaveId, bUnallocated);
// shares: b = .02, c = .01, d = .03
EXPECT_EQ(list<string>({"c", "b", "d"}), sorter.sort());
Resources eResources = Resources::parse("cpus:1;mem:5").get();
sorter.add("e");
- sorter.allocated("e", eResources);
+ sorter.allocated("e", slaveId, eResources);
Resources removedResources = Resources::parse("cpus:50;mem:0").get();
- sorter.remove(removedResources);
+ sorter.remove(slaveId, removedResources);
// total resources is now cpus = 50, mem = 100
// shares: b = .04, c = .02, d = .06, e = .05
EXPECT_EQ(list<string>({"c", "b", "e", "d"}), sorter.sort());
Resources addedResources = Resources::parse("cpus:0;mem:100").get();
- sorter.add(addedResources);
+ sorter.add(slaveId, addedResources);
// total resources is now cpus = 50, mem = 200
Resources fResources = Resources::parse("cpus:5;mem:1").get();
sorter.add("f");
- sorter.allocated("f", fResources);
+ sorter.allocated("f", slaveId, fResources);
Resources cResources2 = Resources::parse("cpus:0;mem:15").get();
- sorter.allocated("c", cResources2);
+ sorter.allocated("c", slaveId, cResources2);
// shares: b = .04, c = .08, d = .06, e = .025, f = .1
EXPECT_EQ(list<string>({"e", "b", "d", "c", "f"}), sorter.sort());
@@ -125,26 +130,29 @@ TEST(SorterTest, WDRFSorter)
{
DRFSorter sorter;
- sorter.add(Resources::parse("cpus:100;mem:100").get());
+ SlaveID slaveId;
+ slaveId.set_value("slaveId");
+
+ sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get());
sorter.add("a");
- sorter.allocated("a", Resources::parse("cpus:5;mem:5").get());
+ sorter.allocated("a", slaveId, Resources::parse("cpus:5;mem:5").get());
sorter.add("b", 2);
- sorter.allocated("b", Resources::parse("cpus:6;mem:6").get());
+ sorter.allocated("b", slaveId, Resources::parse("cpus:6;mem:6").get());
// shares: a = .05, b = .03
EXPECT_EQ(list<string>({"b", "a"}), sorter.sort());
sorter.add("c");
- sorter.allocated("c", Resources::parse("cpus:4;mem:4").get());
+ sorter.allocated("c", slaveId, Resources::parse("cpus:4;mem:4").get());
// shares: a = .05, b = .03, c = .04
EXPECT_EQ(list<string>({"b", "c", "a"}), sorter.sort());
sorter.add("d", 10);
- sorter.allocated("d", Resources::parse("cpus:10;mem:20").get());
+ sorter.allocated("d", slaveId, Resources::parse("cpus:10;mem:20").get());
// shares: a = .05, b = .03, c = .04, d = .02
EXPECT_EQ(list<string>({"d", "b", "c", "a"}), sorter.sort());
@@ -153,13 +161,13 @@ TEST(SorterTest, WDRFSorter)
EXPECT_EQ(list<string>({"d", "c", "a"}), sorter.sort());
- sorter.allocated("d", Resources::parse("cpus:10;mem:25").get());
+ sorter.allocated("d", slaveId, Resources::parse("cpus:10;mem:25").get());
// shares: a = .05, c = .04, d = .045
EXPECT_EQ(list<string>({"c", "d", "a"}), sorter.sort());
sorter.add("e", .1);
- sorter.allocated("e", Resources::parse("cpus:1;mem:1").get());
+ sorter.allocated("e", slaveId, Resources::parse("cpus:1;mem:1").get());
// shares: a = .05, c = .04, d = .045, e = .1
EXPECT_EQ(list<string>({"c", "d", "a", "e"}), sorter.sort());
@@ -177,6 +185,9 @@ TEST(SorterTest, SplitResourceShares)
{
DRFSorter sorter;
+ SlaveID slaveId;
+ slaveId.set_value("slaveId");
+
sorter.add("a");
sorter.add("b");
@@ -188,13 +199,16 @@ TEST(SorterTest, SplitResourceShares)
disk2.mutable_disk()->mutable_persistence()->set_id("ID2");
disk2.mutable_disk()->mutable_volume()->set_container_path("data");
- sorter.add(Resources::parse("cpus:100;mem:100;disk:95").get()
- + disk1 + disk2);
+ sorter.add(
+ slaveId,
+ Resources::parse("cpus:100;mem:100;disk:95").get() + disk1 + disk2);
// Now, allocate resources to "a" and "b". Note that "b" will have
// more disk if the shares are accounted correctly!
- sorter.allocated("a", Resources::parse("cpus:9;mem:9;disk:9").get());
- sorter.allocated("b", Resources::parse("cpus:9;mem:9").get() + disk1 + disk2);
+ sorter.allocated(
+ "a", slaveId, Resources::parse("cpus:9;mem:9;disk:9").get());
+ sorter.allocated(
+ "b", slaveId, Resources::parse("cpus:9;mem:9").get() + disk1 + disk2);
EXPECT_EQ(list<string>({"a", "b"}), sorter.sort());
}
@@ -204,31 +218,112 @@ TEST(SorterTest, Update)
{
DRFSorter sorter;
+ SlaveID slaveId;
+ slaveId.set_value("slaveId");
+
sorter.add("a");
sorter.add("b");
- sorter.add(Resources::parse("cpus:10;mem:10;disk:10").get());
+ sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
- sorter.allocated("a", Resources::parse("cpus:10;mem:10;disk:10").get());
+ sorter.allocated(
+ "a", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get());
// Construct an offer operation.
Resource volume = Resources::parse("disk", "5", "*").get();
volume.mutable_disk()->mutable_persistence()->set_id("ID");
volume.mutable_disk()->mutable_volume()->set_container_path("data");
- Offer::Operation create;
- create.set_type(Offer::Operation::CREATE);
- create.mutable_create()->add_volumes()->CopyFrom(volume);
+ // Compute the updated allocation.
+ Resources oldAllocation = sorter.allocation("a")[slaveId];
+ Try<Resources> newAllocation = oldAllocation.apply(CREATE(volume));
+ ASSERT_SOME(newAllocation);
+
+ // Update the resources for the client.
+ sorter.update("a", slaveId, oldAllocation, newAllocation.get());
+
+ hashmap<SlaveID, Resources> allocation = sorter.allocation("a");
+ EXPECT_EQ(1u, allocation.size());
+ EXPECT_EQ(newAllocation.get(), allocation[slaveId]);
+}
+
+
+// We aggregate resources from multiple slaves into the sorter.
+// Since non-scalar resources don't aggregate well across slaves,
+// we need to keep track of the SlaveIDs of the resources. This
+// tests that no resources vanish in the process of aggregation
+// by inspecting the result of 'allocation'.
+TEST(SorterTest, MultipleSlaves)
+{
+ DRFSorter sorter;
+
+ SlaveID slaveA;
+ slaveA.set_value("slaveA");
+
+ SlaveID slaveB;
+ slaveB.set_value("slaveB");
+
+ sorter.add("framework");
+
+ Resources slaveResources =
+ Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get();
+
+ sorter.add(slaveA, slaveResources);
+ sorter.add(slaveB, slaveResources);
+
+ sorter.allocated("framework", slaveA, slaveResources);
+ sorter.allocated("framework", slaveB, slaveResources);
+
+ hashmap<SlaveID, Resources> allocation = sorter.allocation("framework");
+ EXPECT_EQ(2u, allocation.size());
+ EXPECT_EQ(slaveResources, allocation[slaveA]);
+ EXPECT_EQ(slaveResources, allocation[slaveB]);
+}
+
+
+// We aggregate resources from multiple slaves into the sorter.
+// Since non-scalar resources don't aggregate well across slaves,
+// we need to keep track of the SlaveIDs of the resources.
+// This tests that no resources vanish in the process of aggregation
+// by performing a updates from unreserved to reserved resources.
+TEST(SorterTest, MultipleSlaveUpdates)
+{
+ DRFSorter sorter;
+
+ SlaveID slaveA;
+ slaveA.set_value("slaveA");
+
+ SlaveID slaveB;
+ slaveB.set_value("slaveB");
+
+ sorter.add("framework");
+
+ Resources slaveResources =
+ Resources::parse("cpus:2;mem:512;disk:10;ports:[31000-32000]").get();
+
+ sorter.add(slaveA, slaveResources);
+ sorter.add(slaveB, slaveResources);
+
+ sorter.allocated("framework", slaveA, slaveResources);
+ sorter.allocated("framework", slaveB, slaveResources);
+
+ // Construct an offer operation.
+ Resource volume = Resources::parse("disk", "5", "*").get();
+ volume.mutable_disk()->mutable_persistence()->set_id("ID");
+ volume.mutable_disk()->mutable_volume()->set_container_path("data");
// Compute the updated allocation.
- Resources allocation = sorter.allocation("a");
- Try<Resources> newAllocation = allocation.apply(create);
+ Try<Resources> newAllocation = slaveResources.apply(CREATE(volume));
ASSERT_SOME(newAllocation);
// Update the resources for the client.
- sorter.update("a", allocation, newAllocation.get());
+ sorter.update("framework", slaveA, slaveResources, newAllocation.get());
+ sorter.update("framework", slaveB, slaveResources, newAllocation.get());
- EXPECT_EQ(newAllocation.get(), sorter.allocation("a"));
+ hashmap<SlaveID, Resources> allocation = sorter.allocation("framework");
+ EXPECT_EQ(2u, allocation.size());
+ EXPECT_EQ(newAllocation.get(), allocation[slaveA]);
+ EXPECT_EQ(newAllocation.get(), allocation[slaveB]);
}
} // namespace tests {