You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by me...@apache.org on 2016/02/19 10:16:34 UTC
[1/2] mesos git commit: Extended allocator interface to support
dynamic weights.
Repository: mesos
Updated Branches:
refs/heads/master 83bf5671e -> 46cce8e35
Extended allocator interface to support dynamic weights.
Review: https://reviews.apache.org/r/41597/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/815a596f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/815a596f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/815a596f
Branch: refs/heads/master
Commit: 815a596f77ef3113ccb1b2d3d87ac6a3ff3b9e16
Parents: 83bf567
Author: Yongqiao Wang <yq...@cn.ibm.com>
Authored: Thu Feb 18 23:50:07 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Thu Feb 18 23:50:07 2016 -0800
----------------------------------------------------------------------
include/mesos/master/allocator.hpp | 7 ++++++
include/mesos/mesos.proto | 12 +++++++++
include/mesos/v1/mesos.proto | 12 +++++++++
src/master/allocator/mesos/allocator.hpp | 17 +++++++++++++
src/master/allocator/mesos/hierarchical.cpp | 31 ++++++++++++++++++++++++
src/master/allocator/mesos/hierarchical.hpp | 3 +++
src/master/allocator/sorter/drf/sorter.cpp | 7 ++++++
src/master/allocator/sorter/drf/sorter.hpp | 2 ++
src/master/allocator/sorter/sorter.hpp | 3 +++
src/tests/allocator.hpp | 14 +++++++++++
10 files changed, 108 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index e163669..a4743c5 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -379,6 +379,13 @@ public:
*/
virtual void removeQuota(
const std::string& role) = 0;
+
+ /**
+ * Updates the weight of each provided role.
+ * Subsequent allocation calculations will use these updated weights.
+ */
+ virtual void updateWeights(
+ const std::vector<WeightInfo>& weightInfos) = 0;
};
} // namespace allocator {
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8047946..636550f 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1764,3 +1764,15 @@ message DiscoveryInfo {
optional Ports ports = 6;
optional Labels labels = 7;
}
+
+
+/**
+ * Named WeightInfo to indicate resource allocation
+ * priority between the different roles.
+ */
+message WeightInfo {
+ required double weight = 1;
+
+ // Related role name.
+ optional string role = 2;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index d909e60..1d5af88 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1665,3 +1665,15 @@ message DiscoveryInfo {
optional Ports ports = 6;
optional Labels labels = 7;
}
+
+
+/**
+ * Named WeightInfo to indicate resource allocation
+ * priority between the different roles.
+ */
+message WeightInfo {
+ required double weight = 1;
+
+ // Related role name.
+ optional string role = 2;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 581eaad..64bce0f 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -149,6 +149,9 @@ public:
void removeQuota(
const std::string& role);
+ void updateWeights(
+ const std::vector<WeightInfo>& weightInfos);
+
private:
MesosAllocator();
MesosAllocator(const MesosAllocator&); // Not copyable.
@@ -272,6 +275,9 @@ public:
virtual void removeQuota(
const std::string& role) = 0;
+
+ virtual void updateWeights(
+ const std::vector<WeightInfo>& weightInfos) = 0;
};
@@ -620,6 +626,17 @@ inline void MesosAllocator<AllocatorProcess>::removeQuota(
role);
}
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateWeights(
+ const std::vector<WeightInfo>& weightInfos)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateWeights,
+ weightInfos);
+}
+
} // namespace allocator {
} // namespace master {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index a9d2c23..5ef29f2 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -1053,6 +1053,37 @@ void HierarchicalAllocatorProcess::removeQuota(
}
+void HierarchicalAllocatorProcess::updateWeights(
+ const vector<WeightInfo>& weightInfos)
+{
+ CHECK(initialized);
+
+ bool rebalance = false;
+
+ // Update the weight for each specified role.
+ foreach (const WeightInfo& weightInfo, weightInfos) {
+ CHECK(weightInfo.has_role());
+ weights[weightInfo.role()] = weightInfo.weight();
+
+ if (quotas.contains(weightInfo.role())) {
+ quotaRoleSorter->update(weightInfo.role(), weightInfo.weight());
+ }
+
+ if (roleSorter->contains(weightInfo.role())) {
+ rebalance = true;
+ roleSorter->update(weightInfo.role(), weightInfo.weight());
+ }
+ }
+
+ // If at least one of the updated roles has registered frameworks,
+ // then trigger the allocation explicitly in order to promptly
+ // react to the operator's request.
+ if (rebalance) {
+ allocate();
+ }
+}
+
+
void HierarchicalAllocatorProcess::pause()
{
if (!paused) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 20d7ceb..0d39d3f 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -187,6 +187,9 @@ public:
void removeQuota(
const std::string& role);
+ void updateWeights(
+ const std::vector<WeightInfo>& weightInfos);
+
protected:
// Useful typedefs for dispatch/delay/defer to self()/this.
typedef HierarchicalAllocatorProcess Self;
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 18797e4..9e863dd 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -49,6 +49,13 @@ void DRFSorter::add(const string& name, double weight)
}
+void DRFSorter::update(const string& name, double weight)
+{
+ CHECK(weights.contains(name));
+ weights[name] = weight;
+}
+
+
void DRFSorter::remove(const string& name)
{
set<Client, DRFComparator>::iterator it = find(name);
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 4669149..46b2a9c 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -65,6 +65,8 @@ public:
virtual void add(const std::string& name, double weight = 1);
+ virtual void update(const std::string& name, double weight);
+
virtual void remove(const std::string& name);
virtual void activate(const std::string& name);
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index a0a779b..ba91a38 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -44,6 +44,9 @@ public:
// may be a user or a framework.
virtual void add(const std::string& client, double weight = 1) = 0;
+ // Update weight of a client.
+ virtual void update(const std::string& client, double weight) = 0;
+
// Removes a client.
virtual void remove(const std::string& client) = 0;
http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/tests/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp
index 206e9ac..4081193 100644
--- a/src/tests/allocator.hpp
+++ b/src/tests/allocator.hpp
@@ -196,6 +196,12 @@ ACTION_P(InvokeRemoveQuota, allocator)
}
+ACTION_P(InvokeUpdateWeights, allocator)
+{
+ allocator->real->updateWeights(arg0);
+}
+
+
template <typename T = master::allocator::HierarchicalDRFAllocator>
mesos::master::allocator::Allocator* createAllocator()
{
@@ -342,6 +348,11 @@ public:
.WillByDefault(InvokeRemoveQuota(this));
EXPECT_CALL(*this, removeQuota(_))
.WillRepeatedly(DoDefault());
+
+ ON_CALL(*this, updateWeights(_))
+ .WillByDefault(InvokeUpdateWeights(this));
+ EXPECT_CALL(*this, updateWeights(_))
+ .WillRepeatedly(DoDefault());
}
virtual ~TestAllocator() {}
@@ -449,6 +460,9 @@ public:
MOCK_METHOD1(removeQuota, void(
const std::string&));
+ MOCK_METHOD1(updateWeights, void(
+ const std::vector<WeightInfo>&));
+
process::Owned<mesos::master::allocator::Allocator> real;
};
[2/2] mesos git commit: Test cases for dynamic weights + allocation
behaviour.
Posted by me...@apache.org.
Test cases for dynamic weights + allocation behaviour.
Review: https://reviews.apache.org/r/41672/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46cce8e3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46cce8e3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46cce8e3
Branch: refs/heads/master
Commit: 46cce8e3523eaa58ea8b28d35508bf1b3d09e5a4
Parents: 815a596
Author: Yongqiao Wang <yq...@cn.ibm.com>
Authored: Thu Feb 18 23:51:10 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Thu Feb 18 23:51:10 2016 -0800
----------------------------------------------------------------------
src/tests/hierarchical_allocator_tests.cpp | 206 ++++++++++++++++++++++++
1 file changed, 206 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/46cce8e3/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 990f372..5f771f0 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -219,6 +219,15 @@ protected:
return resource;
}
+ static WeightInfo createWeightInfo(const string& role, double weight)
+ {
+ WeightInfo weightInfo;
+ weightInfo.set_role(role);
+ weightInfo.set_weight(weight);
+
+ return weightInfo;
+ }
+
protected:
master::Flags flags;
@@ -2594,6 +2603,203 @@ TEST_F(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers)
Clock::resume();
}
+
+// This test ensures that resource allocation is done per role's weight.
+// This is done by having six slaves and three frameworks and making sure each
+// framework gets the number of resources by their role's weight.
+TEST_F(HierarchicalAllocatorTest, UpdateWeight)
+{
+ // Pausing the clock is not necessary, but ensures that the test
+ // doesn't rely on the periodic allocation in the allocator, which
+ // would slow down the test.
+ Clock::pause();
+
+ initialize();
+
+ // Register six slaves with the same resources (cpus:2;mem:1024).
+ vector<SlaveInfo> slaves;
+ const string SINGLE_RESOURCE = "cpus:2;mem:1024";
+ const string DOUBLE_RESOURCES = "cpus:4;mem:2048";
+ const string TRIPLE_RESOURCES = "cpus:6;mem:3072";
+ const string FOURFOLD_RESOURCES = "cpus:8;mem:4096";
+ const string TOTAL_RESOURCES = "cpus:12;mem:6144";
+ for (unsigned i = 0; i < 6; i++) {
+ slaves.push_back(createSlaveInfo(SINGLE_RESOURCE));
+ }
+
+ foreach (const SlaveInfo& slave, slaves) {
+ allocator->addSlave(
+ slave.id(),
+ slave,
+ None(),
+ slave.resources(),
+ hashmap<FrameworkID, Resources>());
+ }
+
+ // Framework1 registers with 'role1' which uses the default
+ // weight (1.0), and all resources will be offered to this framework.
+ FrameworkInfo framework1 = createFrameworkInfo("role1");
+ allocator->addFramework(
+ framework1.id(), framework1, hashmap<SlaveID, Resources>());
+
+ // Framework2 registers with 'role2' which also uses the
+ // default weight (1.0).
+ FrameworkInfo framework2 = createFrameworkInfo("role2");
+ allocator->addFramework(
+ framework2.id(), framework2, hashmap<SlaveID, Resources>());
+
+ // Framework1 gets one allocation with all resources, and Framework2
+ // does not get any offers due to all resources having outstanding
+ // offers to framework1 when it registered.
+ // Recover all resources owned by framework1 so they can be offered
+ // again next time.
+ Future<Allocation> allocation = allocations.get();
+ AWAIT_READY(allocation);
+ ASSERT_EQ(allocation.get().frameworkId, framework1.id());
+ ASSERT_EQ(6u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(),
+ Resources::sum(allocation.get().resources));
+ foreachpair (const SlaveID& slaveId,
+ const Resources& resources,
+ allocation.get().resources) {
+ allocator->recoverResources(
+ allocation.get().frameworkId,
+ slaveId,
+ resources,
+ None());
+ }
+
+ // Because each framework's role has a weight of 1.0 by default, test to
+ // ensure that all resources are offered equally between both frameworks.
+ hashmap<FrameworkID, size_t> counts;
+ Clock::advance(flags.allocation_interval);
+ Resources totalAllocatedResources1;
+ for (unsigned i = 0; i < 2; i++) {
+ Future<Allocation> allocation = allocations.get();
+ AWAIT_READY(allocation);
+ counts[allocation.get().frameworkId]++;
+ totalAllocatedResources1 += Resources::sum(allocation.get().resources);
+
+ // Each framework will get one allocation with three slaves.
+ ASSERT_EQ(3u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
+ Resources::sum(allocation.get().resources));
+
+ // Recover the offered resources so they can be offered again next time.
+ foreachpair (const SlaveID& slaveId,
+ const Resources& resources,
+ allocation.get().resources) {
+ allocator->recoverResources(
+ allocation.get().frameworkId,
+ slaveId,
+ resources,
+ None());
+ }
+ }
+
+ // Check to ensure that these two allocations sum to the total resources,
+ // this check can ensure there are only two allocations in this case.
+ EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources1);
+ EXPECT_EQ(1u, counts[framework1.id()]);
+ EXPECT_EQ(1u, counts[framework2.id()]);
+
+ // Update the weight of framework2's role to 2.0, then their
+ // weights should be 1:2.
+ vector<WeightInfo> weightInfos1;
+ weightInfos1.push_back(createWeightInfo(framework2.role(), 2.0));
+ allocator->updateWeights(weightInfos1);
+
+ // Now that the frameworks's weights are 1:2, test to ensure that all
+ // resources are offered with a ratio of 1:2 between both frameworks.
+ counts.clear();
+ Resources totalAllocatedResources2;
+ Clock::advance(flags.allocation_interval);
+ for (unsigned i = 0; i < 2; i++) {
+ Future<Allocation> allocation = allocations.get();
+ AWAIT_READY(allocation);
+ counts[allocation.get().frameworkId]++;
+ totalAllocatedResources2 += Resources::sum(allocation.get().resources);
+
+ // Framework1 should get one allocation with two slaves.
+ if (allocation.get().frameworkId == framework1.id()) {
+ ASSERT_EQ(2u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(),
+ Resources::sum(allocation.get().resources));
+ } else {
+ // Framework2 should get one allocation with four slaves.
+ ASSERT_EQ(allocation.get().frameworkId, framework2.id());
+ ASSERT_EQ(4u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(FOURFOLD_RESOURCES).get(),
+ Resources::sum(allocation.get().resources));
+ }
+
+ // Recover the allocated resources so they can be offered again next time.
+ foreachpair (const SlaveID& slaveId,
+ const Resources& resources,
+ allocation.get().resources) {
+ allocator->recoverResources(
+ allocation.get().frameworkId,
+ slaveId,
+ resources,
+ None());
+ }
+ }
+ // Check to ensure that these two allocations sum to the total resources,
+ // this check can ensure there are only two allocations in this case.
+ EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources2);
+ EXPECT_EQ(1u, counts[framework1.id()]);
+ EXPECT_EQ(1u, counts[framework2.id()]);
+
+ // Add a new role with a weight of 3.0.
+ vector<WeightInfo> weightInfos2;
+ weightInfos2.push_back(createWeightInfo("role3", 3.0));
+ allocator->updateWeights(weightInfos2);
+
+ // Framework3 registers with 'role3'.
+ FrameworkInfo framework3 = createFrameworkInfo("role3");
+ allocator->addFramework(
+ framework3.id(), framework3, hashmap<SlaveID, Resources>());
+
+ // Currently, there are three frameworks and six slaves in this cluster,
+ // and the weight ratio of these frameworks is 1:2:3, therefore frameworks
+ // will get the proper resource ratio of 1:2:3.
+ counts.clear();
+ Resources totalAllocatedResources3;
+ for (unsigned i = 0; i < 3; i++) {
+ Future<Allocation> allocation = allocations.get();
+ AWAIT_READY(allocation);
+ counts[allocation.get().frameworkId]++;
+ totalAllocatedResources3 += Resources::sum(allocation.get().resources);
+
+ // Framework1 should get one allocation with one slave.
+ if (allocation.get().frameworkId == framework1.id()) {
+ ASSERT_EQ(1u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(SINGLE_RESOURCE).get(),
+ Resources::sum(allocation.get().resources));
+ } else if (allocation.get().frameworkId == framework2.id()) {
+ // Framework2 should get one allocation with two slaves.
+ ASSERT_EQ(2u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(),
+ Resources::sum(allocation.get().resources));
+ } else {
+ // Framework3 should get one allocation with three slaves.
+ ASSERT_EQ(allocation.get().frameworkId, framework3.id());
+ ASSERT_EQ(3u, allocation.get().resources.size());
+ EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
+ Resources::sum(allocation.get().resources));
+ }
+ }
+
+ // Check to ensure that these three allocations sum to the total resources,
+ // this check can ensure there are only three allocations in this case.
+ EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources3);
+ EXPECT_EQ(1u, counts[framework1.id()]);
+ EXPECT_EQ(1u, counts[framework2.id()]);
+ EXPECT_EQ(1u, counts[framework3.id()]);
+
+ Clock::resume();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {