You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/08/08 08:32:55 UTC
git commit: Fixed allocator to do allocations per slave rather than
per framework.
Repository: mesos
Updated Branches:
refs/heads/master 781414099 -> d376f05fe
Fixed allocator to do allocations per slave rather than per framework.
Review: https://reviews.apache.org/r/24356
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d376f05f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d376f05f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d376f05f
Branch: refs/heads/master
Commit: d376f05fea7f7432a287a7b87c5d1fe44dae01c4
Parents: 7814140
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 18 14:11:14 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Aug 7 23:32:32 2014 -0700
----------------------------------------------------------------------
src/master/hierarchical_allocator_process.hpp | 79 ++++----
src/tests/allocator_tests.cpp | 201 ++++++++++++++++++---
2 files changed, 220 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d376f05f/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index d81082f..34f8cd6 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -19,6 +19,9 @@
#ifndef __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
#define __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
+#include <algorithm>
+#include <vector>
+
#include <mesos/resources.hpp>
#include <process/delay.hpp>
@@ -675,7 +678,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
- const hashset<SlaveID>& slaveIds)
+ const hashset<SlaveID>& slaveIds_)
{
CHECK(initialized);
@@ -684,57 +687,67 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
return;
}
- if (slaveIds.empty()) {
+ if (slaveIds_.empty()) {
VLOG(1) << "No resources available to allocate!";
return;
}
- foreach (const std::string& role, roleSorter->sort()) {
- foreach (const std::string& frameworkIdValue, sorters[role]->sort()) {
- FrameworkID frameworkId;
- frameworkId.set_value(frameworkIdValue);
+ // Randomize the order in which slaves' resources are allocated.
+ // TODO(vinod): Implement a smarter sorting algorithm.
+ std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
+ std::random_shuffle(slaveIds.begin(), slaveIds.end());
+
+ hashmap<FrameworkID, hashmap<SlaveID, Resources> > offerable;
+ foreach (const SlaveID& slaveId, slaveIds) {
+ // If the slave is not activated or whitelisted, ignore it.
+ if (!slaves[slaveId].activated || !slaves[slaveId].whitelisted) {
+ continue;
+ }
+
+ foreach (const std::string& role, roleSorter->sort()) {
+ foreach (const std::string& frameworkIdValue, sorters[role]->sort()) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(frameworkIdValue);
- Resources allocatedResources;
- hashmap<SlaveID, Resources> offerable;
- foreach (const SlaveID& slaveId, slaveIds) {
Resources unreserved = slaves[slaveId].available.extract("*");
Resources resources = unreserved;
-
if (role != "*") {
resources += slaves[slaveId].available.extract(role);
}
- // Check whether or not this framework filters this slave.
- bool filtered = isFiltered(frameworkId, slaveId, resources);
-
- if (!filtered &&
- slaves[slaveId].activated &&
- slaves[slaveId].whitelisted &&
- allocatable(resources)) {
- VLOG(1)
- << "Offering " << resources << " on slave " << slaveId
- << " to framework " << frameworkId;
+ // If the resources are not allocatable, ignore.
+ if (!allocatable(resources)) {
+ continue;
+ }
- offerable[slaveId] = resources;
+ // If the framework filters these resources, ignore.
+ if (isFiltered(frameworkId, slaveId, resources)) {
+ continue;
+ }
- // Update framework and slave resources.
- slaves[slaveId].available -= resources;
+ VLOG(1)
+ << "Offering " << resources << " on slave " << slaveId
+ << " to framework " << frameworkId;
- // We only count resources not reserved for this role
- // in the share the sorter considers.
- allocatedResources += unreserved;
- }
- }
+ offerable[frameworkId][slaveId] = resources;
- if (!offerable.empty()) {
- sorters[role]->add(allocatedResources);
- sorters[role]->allocated(frameworkIdValue, allocatedResources);
- roleSorter->allocated(role, allocatedResources);
+ // Update slave resources.
+ slaves[slaveId].available -= resources;
- dispatch(master, &Master::offer, frameworkId, offerable);
+ // Update the sorters.
+ // We only count resources not reserved for this role
+ // in the share the sorter considers.
+ sorters[role]->add(unreserved);
+ sorters[role]->allocated(frameworkIdValue, unreserved);
+ roleSorter->allocated(role, unreserved);
}
}
}
+
+ // Now offer the resources to each framework.
+ foreachkey (const FrameworkID& frameworkId, offerable) {
+ dispatch(master, &Master::offer, frameworkId, offerable[frameworkId]);
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d376f05f/src/tests/allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp
index f0226cb..774528a 100644
--- a/src/tests/allocator_tests.cpp
+++ b/src/tests/allocator_tests.cpp
@@ -30,6 +30,8 @@
#include <process/gmock.hpp>
#include <process/pid.hpp>
+#include <stout/some.hpp>
+
#include "master/allocator.hpp"
#include "master/detector.hpp"
#include "master/hierarchical_allocator_process.hpp"
@@ -82,12 +84,12 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess)
EXPECT_CALL(allocator, initialize(_, _, _));
master::Flags masterFlags = CreateMasterFlags();
- masterFlags.roles = Option<string>("role1,role2");
+ masterFlags.roles = Some("role1,role2");
Try<PID<Master> > master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
slave::Flags flags1 = CreateSlaveFlags();
- flags1.resources = Option<string>("cpus:2;mem:1024;disk:0");
+ flags1.resources = Some("cpus:2;mem:1024;disk:0");
EXPECT_CALL(allocator, slaveAdded(_, _, _));
@@ -145,7 +147,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess)
AWAIT_READY(frameworkAdded2);
slave::Flags flags2 = CreateSlaveFlags();
- flags2.resources = Option<string>("cpus:1;mem:512;disk:0");
+ flags2.resources = Some("cpus:1;mem:512;disk:0");
EXPECT_CALL(allocator, slaveAdded(_, _, _));
@@ -172,7 +174,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess)
// framework2 share = 1
slave::Flags flags3 = CreateSlaveFlags();
- flags3.resources = Option<string>("cpus:3;mem:2048;disk:0");
+ flags3.resources = Some("cpus:3;mem:2048;disk:0");
EXPECT_CALL(allocator, slaveAdded(_, _, _));
@@ -220,7 +222,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess)
AWAIT_READY(frameworkAdded3);
slave::Flags flags4 = CreateSlaveFlags();
- flags4.resources = Option<string>("cpus:4;mem:4096;disk:0");
+ flags4.resources = Some("cpus:4;mem:4096;disk:0");
EXPECT_CALL(allocator, slaveAdded(_, _, _));
@@ -270,7 +272,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess)
AWAIT_READY(frameworkAdded4);
slave::Flags flags5 = CreateSlaveFlags();
- flags5.resources = Option<string>("cpus:1;mem:512;disk:0");
+ flags5.resources = Some("cpus:1;mem:512;disk:0");
EXPECT_CALL(allocator, slaveAdded(_, _, _));
@@ -322,6 +324,151 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess)
}
+// This test ensures that allocation is done per slave. This is done
+// by having 2 slaves and 2 frameworks and making sure each framework
+// gets only one slave's resources during an allocation.
+TEST_F(DRFAllocatorTest, PerSlaveAllocation)
+{
+ MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _));
+
+ // Start the master.
+ // NOTE: We set a high allocation interval, so that allocator does
+ // allocations only based on events (framework added, slave added)
+ // but not due to allocation interval. This lets us tightly control
+ // the test expectations.
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.roles = Some("role1,role2");
+ masterFlags.allocation_interval = Days(1);
+ Try<PID<Master> > master = StartMaster(&allocator, masterFlags);
+ ASSERT_SOME(master);
+
+ // Start slave 1.
+ slave::Flags flags1 = CreateSlaveFlags();
+ flags1.resources = Some("cpus:2;mem:1024;disk:0");
+
+ Future<Nothing> slaveAdded1;
+ EXPECT_CALL(allocator, slaveAdded(_, _, _))
+ .WillOnce(DoAll(InvokeSlaveAdded(&allocator),
+ FutureSatisfy(&slaveAdded1)));
+
+ Try<PID<Slave> > slave1 = StartSlave(flags1);
+ ASSERT_SOME(slave1);
+
+ AWAIT_READY(slaveAdded1);
+
+ // Start slave 2.
+ slave::Flags flags2 = CreateSlaveFlags();
+ flags2.resources = Some("cpus:2;mem:1024;disk:0");
+
+ Future<Nothing> slaveAdded2;
+ EXPECT_CALL(allocator, slaveAdded(_, _, _))
+ .WillOnce(DoAll(InvokeSlaveAdded(&allocator),
+ FutureSatisfy(&slaveAdded2)));
+
+ Try<PID<Slave> > slave2 = StartSlave(flags2);
+ ASSERT_SOME(slave2);
+
+ AWAIT_READY(slaveAdded2);
+
+ // Start framework 1.
+ FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo1.set_name("framework1");
+ frameworkInfo1.set_user("user1");
+ frameworkInfo1.set_role("role1");
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, frameworkInfo1, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(allocator, frameworkAdded(_, _, _));
+
+ EXPECT_CALL(sched1, registered(_, _, _));
+
+ Future<Nothing> resourcesRecovered1;
+ Future<Nothing> resourcesRecovered2;
+ EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _))
+ .WillOnce(DoAll(InvokeResourcesRecovered(&allocator),
+ FutureSatisfy(&resourcesRecovered1)))
+ .WillOnce(DoAll(InvokeResourcesRecovered(&allocator),
+ FutureSatisfy(&resourcesRecovered2)));
+
+ // Decline the offers immediately so that resources for both slaves
+ // are eligible for allocation to this and other frameworks.
+ Filters filters;
+ filters.set_refuse_seconds(0);
+ EXPECT_CALL(sched1, resourceOffers(_, _))
+ .WillOnce(DeclineOffers(filters));
+
+ driver1.start();
+
+ // Wait until the resources are returned to the allocator.
+ // NOTE: No allocations will be made after this point until a new
+ // framework registers because
+ // 1) 'resourcesRecovered' does not trigger an allocation and
+ // 2) 'flags.allocation_interval' is set to a very high value.
+ AWAIT_READY(resourcesRecovered1);
+ AWAIT_READY(resourcesRecovered2);
+
+ // Start framework 2.
+ FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo2.set_name("framework2");
+ frameworkInfo2.set_user("user2");
+ frameworkInfo2.set_role("role2");
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(
+ &sched2, frameworkInfo2, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(allocator, frameworkAdded(_, _, _));
+
+ EXPECT_CALL(sched2, registered(_, _, _));
+
+ // Offers to framework 1.
+ Future<vector<Offer> > offers1;
+ EXPECT_CALL(sched1, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ // Offers to framework 2.
+ Future<vector<Offer> > offers2;
+ EXPECT_CALL(sched2, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers2));
+
+ driver2.start();
+
+ // Now each framework should receive offers for one slave each.
+ AWAIT_READY(offers1);
+ EXPECT_THAT(offers1.get(), OfferEq(2, 1024));
+
+ AWAIT_READY(offers2);
+ EXPECT_THAT(offers2.get(), OfferEq(2, 1024));
+
+ // Shut everything down.
+ EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _))
+ .WillRepeatedly(DoDefault());
+
+ EXPECT_CALL(allocator, frameworkDeactivated(_))
+ .WillRepeatedly(DoDefault());
+
+ EXPECT_CALL(allocator, frameworkRemoved(_))
+ .WillRepeatedly(DoDefault());
+
+ driver1.stop();
+ driver1.join();
+
+ driver2.stop();
+ driver2.join();
+
+ EXPECT_CALL(allocator, slaveRemoved(_))
+ .WillRepeatedly(DoDefault());
+
+ Shutdown();
+}
+
+
// Helper that simply increments the value by reference.
ACTION_P(Increment, value) { *value += 1; }
@@ -440,7 +587,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources)
EXPECT_CALL(allocator, initialize(_, _, _));
master::Flags masterFlags = CreateMasterFlags();
- masterFlags.roles = Option<string>("role1,role2,role3");
+ masterFlags.roles = Some("role1,role2,role3");
Try<PID<Master> > master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
@@ -455,19 +602,19 @@ TEST_F(ReservationAllocatorTest, ReservedResources)
slave::Flags flags1 = CreateSlaveFlags();
flags1.default_role = "role1";
- flags1.resources = Option<string>("cpus:2;mem:1024;disk:0");
+ flags1.resources = Some("cpus:2;mem:1024;disk:0");
Try<PID<Slave> > slave1 = StartSlave(flags1);
ASSERT_SOME(slave1);
slave::Flags flags2 = CreateSlaveFlags();
flags2.resources =
- Option<string>("cpus(role2):2;mem(role2):1024;cpus:1;mem:1024;disk:0");
+ Some("cpus(role2):2;mem(role2):1024;cpus:1;mem:1024;disk:0");
Try<PID<Slave> > slave2 = StartSlave(flags2);
ASSERT_SOME(slave2);
slave::Flags flags3 = CreateSlaveFlags();
flags3.default_role = "role3";
- flags3.resources = Option<string>("cpus:4;mem:4096;disk:0");
+ flags3.resources = Some("cpus:4;mem:4096;disk:0");
Try<PID<Slave> > slave3 = StartSlave(flags3);
ASSERT_SOME(slave3);
@@ -475,7 +622,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources)
// since there is no framework for role4.
slave::Flags flags4 = CreateSlaveFlags();
flags4.default_role = "role4";
- flags4.resources = Option<string>("cpus:1;mem:1024;disk:0");
+ flags4.resources = Some("cpus:1;mem:1024;disk:0");
Try<PID<Slave> > slave4 = StartSlave(flags4);
ASSERT_SOME(slave4);
@@ -550,7 +697,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources)
slave::Flags flags5 = CreateSlaveFlags();
flags5.default_role = "role1";
- flags5.resources = Option<string>("cpus:1;mem:512;disk:0");
+ flags5.resources = Some("cpus:1;mem:512;disk:0");
EXPECT_CALL(allocator, slaveAdded(_, _, _));
@@ -595,7 +742,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned)
EXPECT_CALL(allocator, initialize(_, _, _));
master::Flags masterFlags = CreateMasterFlags();
- masterFlags.roles = Option<string>("role1,role2");
+ masterFlags.roles = Some("role1,role2");
masterFlags.allocation_interval = Milliseconds(50);
Try<PID<Master> > master = StartMaster(&allocator, masterFlags);
@@ -625,7 +772,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned)
// because there is no framework with role3 and the unreserved
// memory can't be offered without a cpu to go with it.
slave::Flags flags2 = CreateSlaveFlags();
- flags2.resources = Option<string>("cpus(role3):4;mem:1024;disk:0");
+ flags2.resources = Some("cpus(role3):4;mem:1024;disk:0");
Try<PID<Slave> > slave2 = StartSlave(flags2);
ASSERT_SOME(slave2);
@@ -777,7 +924,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
ASSERT_SOME(master);
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:2;mem:1024;disk:0");
+ flags.resources = Some("cpus:2;mem:1024;disk:0");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -835,7 +982,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnused)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags1 = this->CreateSlaveFlags();
- flags1.resources = Option<string>("cpus:2;mem:1024");
+ flags1.resources = Some("cpus:2;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -946,7 +1093,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDispatch)
ASSERT_SOME(master);
slave::Flags flags1 = this->CreateSlaveFlags();
- flags1.resources = Option<string>("cpus:2;mem:1024");
+ flags1.resources = Some("cpus:2;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1079,7 +1226,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailover)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:3;mem:1024");
+ flags.resources = Some("cpus:3;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1223,7 +1370,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited)
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:3;mem:1024");
+ flags.resources = Some("cpus:3;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1358,7 +1505,7 @@ TYPED_TEST(AllocatorTest, SlaveLost)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags1 = this->CreateSlaveFlags();
- flags1.resources = Option<string>("cpus:2;mem:1024");
+ flags1.resources = Some("cpus:2;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1469,7 +1616,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags1 = this->CreateSlaveFlags();
- flags1.resources = Option<string>("cpus:3;mem:1024");
+ flags1.resources = Some("cpus:3;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1518,7 +1665,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
AWAIT_READY(launchTask);
slave::Flags flags2 = this->CreateSlaveFlags();
- flags2.resources = Option<string>("cpus:4;mem:2048");
+ flags2.resources = Some("cpus:4;mem:2048");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1570,7 +1717,7 @@ TYPED_TEST(AllocatorTest, TaskFinished)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:3;mem:1024");
+ flags.resources = Some("cpus:3;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -1690,7 +1837,7 @@ TYPED_TEST(AllocatorTest, WhitelistSlave)
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:2;mem:1024");
+ flags.resources = Some("cpus:2;mem:1024");
Try<string> hostname = os::hostname();
ASSERT_SOME(hostname);
@@ -1773,7 +1920,7 @@ TYPED_TEST(AllocatorTest, RoleTest)
EXPECT_CALL(this->allocator, initialize(_, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
- masterFlags.roles = Option<string>("role2");
+ masterFlags.roles = Some("role2");
Try<PID<Master> > master = this->StartMaster(&this->allocator, masterFlags);
ASSERT_SOME(master);
@@ -1862,7 +2009,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst)
StandaloneMasterDetector slaveDetector(master.get());
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:2;mem:1024");
+ flags.resources = Some("cpus:2;mem:1024");
Try<PID<Slave> > slave = this->StartSlave(&exec, &slaveDetector, flags);
ASSERT_SOME(slave);
@@ -1987,7 +2134,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst)
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
slave::Flags flags = this->CreateSlaveFlags();
- flags.resources = Option<string>("cpus:2;mem:1024");
+ flags.resources = Some("cpus:2;mem:1024");
Try<PID<Slave> > slave = this->StartSlave(&exec, &slaveDetector, flags);
ASSERT_SOME(slave);