You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/02/12 21:19:06 UTC
[1/6] mesos git commit: Refactored the allocator interface to support
general implementations.
Repository: mesos
Updated Branches:
refs/heads/master edf4d2d2a -> b3385f361
Refactored the allocator interface to support general implementations.
Introduced a basic Allocator interfaced. This interface does not
require allocators to be based on libprocess processes. For allocators
that do implement allocation logic via an internal libprocess process,
a special wrapper is provided. Allocator users and tests are updated
to use Allocator type instead of AllocatorProcess.
Review: https://reviews.apache.org/r/29890
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2b51582a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2b51582a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2b51582a
Branch: refs/heads/master
Commit: 2b51582a6f6bd3ae431cc4c16ba098656fbd7ef3
Parents: edf4d2d
Author: Alexander Rukletsov <al...@mesosphere.io>
Authored: Thu Feb 12 11:42:05 2015 -0800
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Thu Feb 12 11:42:05 2015 -0800
----------------------------------------------------------------------
src/local/local.cpp | 9 +-
src/master/allocator.hpp | 195 ++++++++++++++++-----
src/master/hierarchical_allocator_process.hpp | 10 +-
src/master/main.cpp | 6 +-
src/tests/cluster.hpp | 29 +--
src/tests/fault_tolerance_tests.cpp | 4 +-
src/tests/hierarchical_allocator_tests.cpp | 8 +-
src/tests/master_allocator_tests.cpp | 16 +-
src/tests/master_authorization_tests.cpp | 18 +-
src/tests/master_tests.cpp | 12 +-
src/tests/mesos.cpp | 2 +-
src/tests/mesos.hpp | 107 +++--------
src/tests/partition_tests.cpp | 4 +-
src/tests/rate_limiting_tests.cpp | 8 +-
src/tests/resource_offers_tests.cpp | 4 +-
src/tests/slave_recovery_tests.cpp | 5 +-
16 files changed, 231 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 3b128c4..649b915 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -67,9 +67,8 @@ using namespace mesos;
using namespace mesos::log;
using mesos::master::allocator::Allocator;
-using mesos::master::allocator::AllocatorProcess;
using mesos::master::allocator::DRFSorter;
-using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
+using mesos::master::allocator::HierarchicalDRFAllocator;
using mesos::master::Master;
using mesos::master::Registrar;
@@ -96,7 +95,6 @@ namespace mesos {
namespace local {
static Allocator* allocator = NULL;
-static AllocatorProcess* allocatorProcess = NULL;
static Log* log = NULL;
static state::Storage* storage = NULL;
static state::protobuf::State* state = NULL;
@@ -121,13 +119,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
if (_allocator == NULL) {
// Create default allocator, save it for deleting later.
- allocatorProcess = new HierarchicalDRFAllocatorProcess();
- _allocator = allocator = new Allocator(allocatorProcess);
+ _allocator = allocator = new HierarchicalDRFAllocator();
} else {
// TODO(benh): Figure out the behavior of allocator pointer and remove the
// else block.
allocator = NULL;
- allocatorProcess = NULL;
}
files = new Files();
@@ -268,7 +264,6 @@ void shutdown()
process::wait(master->self());
delete master;
delete allocator;
- delete allocatorProcess;
master = NULL;
// TODO(benh): Ugh! Because the isolator calls back into the slave
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index 2149ea4..c45b75e 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -45,8 +45,6 @@ class Master; // Forward declaration.
namespace allocator {
-class AllocatorProcess; // Forward declaration.
-
// Basic model of an allocator: resources are allocated to a framework
// in the form of offers. A framework can refuse some resources in
// offers and run tasks in others. Allocated resources can have offer
@@ -55,17 +53,101 @@ class AllocatorProcess; // Forward declaration.
// be recovered from a framework when tasks finish/fail (or are lost
// due to a slave failure) or when an offer is rescinded.
//
-// NOTE: DO NOT subclass this class when implementing a new allocator.
-// Implement AllocatorProcess (above) instead!
+// This is the public API for resource allocators.
+// TODO(alexr): Document API calls.
class Allocator
{
public:
- // The AllocatorProcess object passed to the constructor is spawned
- // and terminated by the allocator. But it is the responsibility
- // of the caller to de-allocate the object, if necessary.
- explicit Allocator(AllocatorProcess* _process);
+ Allocator() {}
+
+ virtual ~Allocator() {}
+
+ virtual void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles) = 0;
+
+ virtual void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used) = 0;
+
+ virtual void removeFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ // Offers are sent only to activated frameworks.
+ virtual void activateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void deactivateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ // Note that the 'total' resources are passed explicitly because it
+ // includes resources that are dynamically "persisted" on the slave
+ // (e.g. persistent volumes, dynamic reservations, etc).
+ // The slaveInfo resources, on the other hand, correspond directly
+ // to the static --resources flag value on the slave.
+ virtual void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used) = 0;
+
+ virtual void removeSlave(
+ const SlaveID& slaveId) = 0;
+
+ // Offers are sent only for activated slaves.
+ virtual void activateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void deactivateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist) = 0;
+
+ virtual void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests) = 0;
+
+ virtual void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations) = 0;
+
+ // Informs the Allocator to recover resources that are considered
+ // used by the framework.
+ virtual void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters) = 0;
+
+ // Whenever a framework that has filtered resources wants to revive
+ // offers for those resources the master invokes this callback.
+ virtual void reviveOffers(
+ const FrameworkID& frameworkId) = 0;
+};
+
+
+class MesosAllocatorProcess;
+
+// A wrapper for Process-based allocators. It redirects all function
+// invocations to the underlying AllocatorProcess and manages its
+// lifetime. We ensure the template parameter AllocatorProcess
+// implements MesosAllocatorProcess by storing a pointer to it.
+//
+// TODO(alexr): Move this class (together with the implementation)
+// into a separate file.
+template <typename AllocatorProcess>
+class MesosAllocator : public Allocator
+{
+public:
+ MesosAllocator();
- virtual ~Allocator();
+ ~MesosAllocator();
void initialize(
const Flags& flags,
@@ -136,19 +218,20 @@ public:
const FrameworkID& frameworkId);
private:
- Allocator(const Allocator&); // Not copyable.
- Allocator& operator=(const Allocator&); // Not assignable.
+ MesosAllocator(const MesosAllocator&); // Not copyable.
+ MesosAllocator& operator=(const MesosAllocator&); // Not assignable.
- AllocatorProcess* process;
+ MesosAllocatorProcess* process;
};
-class AllocatorProcess : public process::Process<AllocatorProcess>
+// The basic interface for all Process-based allocators.
+class MesosAllocatorProcess : public process::Process<MesosAllocatorProcess>
{
public:
- AllocatorProcess() {}
+ MesosAllocatorProcess() {}
- virtual ~AllocatorProcess() {}
+ virtual ~MesosAllocatorProcess() {}
// Explicitly unhide 'initialize' to silence a compiler warning
// from clang, since we overload below.
@@ -213,21 +296,25 @@ public:
};
-inline Allocator::Allocator(AllocatorProcess* _process)
- : process(_process)
+template <typename AllocatorProcess>
+MesosAllocator<AllocatorProcess>::MesosAllocator()
{
+ process = new AllocatorProcess();
process::spawn(process);
}
-inline Allocator::~Allocator()
+template <typename AllocatorProcess>
+MesosAllocator<AllocatorProcess>::~MesosAllocator()
{
process::terminate(process);
process::wait(process);
+ delete process;
}
-inline void Allocator::initialize(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::initialize(
const Flags& flags,
const lambda::function<
void(const FrameworkID&,
@@ -236,58 +323,63 @@ inline void Allocator::initialize(
{
process::dispatch(
process,
- &AllocatorProcess::initialize,
+ &MesosAllocatorProcess::initialize,
flags,
offerCallback,
roles);
}
-inline void Allocator::addFramework(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::addFramework(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const Resources& used)
{
process::dispatch(
process,
- &AllocatorProcess::addFramework,
+ &MesosAllocatorProcess::addFramework,
frameworkId,
frameworkInfo,
used);
}
-inline void Allocator::removeFramework(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::removeFramework(
const FrameworkID& frameworkId)
{
process::dispatch(
process,
- &AllocatorProcess::removeFramework,
+ &MesosAllocatorProcess::removeFramework,
frameworkId);
}
-inline void Allocator::activateFramework(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::activateFramework(
const FrameworkID& frameworkId)
{
process::dispatch(
process,
- &AllocatorProcess::activateFramework,
+ &MesosAllocatorProcess::activateFramework,
frameworkId);
}
-inline void Allocator::deactivateFramework(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::deactivateFramework(
const FrameworkID& frameworkId)
{
process::dispatch(
process,
- &AllocatorProcess::deactivateFramework,
+ &MesosAllocatorProcess::deactivateFramework,
frameworkId);
}
-inline void Allocator::addSlave(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::addSlave(
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const Resources& total,
@@ -295,7 +387,7 @@ inline void Allocator::addSlave(
{
process::dispatch(
process,
- &AllocatorProcess::addSlave,
+ &MesosAllocatorProcess::addSlave,
slaveId,
slaveInfo,
total,
@@ -303,70 +395,80 @@ inline void Allocator::addSlave(
}
-inline void Allocator::removeSlave(const SlaveID& slaveId)
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::removeSlave(
+ const SlaveID& slaveId)
{
process::dispatch(
process,
- &AllocatorProcess::removeSlave,
+ &MesosAllocatorProcess::removeSlave,
slaveId);
}
-inline void Allocator::activateSlave(const SlaveID& slaveId)
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::activateSlave(
+ const SlaveID& slaveId)
{
process::dispatch(
process,
- &AllocatorProcess::activateSlave,
+ &MesosAllocatorProcess::activateSlave,
slaveId);
}
-inline void Allocator::deactivateSlave(const SlaveID& slaveId)
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::deactivateSlave(
+ const SlaveID& slaveId)
{
process::dispatch(
process,
- &AllocatorProcess::deactivateSlave,
+ &MesosAllocatorProcess::deactivateSlave,
slaveId);
}
-inline void Allocator::updateWhitelist(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateWhitelist(
const Option<hashset<std::string> >& whitelist)
{
process::dispatch(
process,
- &AllocatorProcess::updateWhitelist,
+ &MesosAllocatorProcess::updateWhitelist,
whitelist);
}
-inline void Allocator::requestResources(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::requestResources(
const FrameworkID& frameworkId,
const std::vector<Request>& requests)
{
process::dispatch(
process,
- &AllocatorProcess::requestResources,
+ &MesosAllocatorProcess::requestResources,
frameworkId,
requests);
}
-inline void Allocator::updateAllocation(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateAllocation(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations)
{
process::dispatch(
process,
- &AllocatorProcess::updateAllocation,
+ &MesosAllocatorProcess::updateAllocation,
frameworkId,
slaveId,
operations);
}
-inline void Allocator::recoverResources(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& resources,
@@ -374,7 +476,7 @@ inline void Allocator::recoverResources(
{
process::dispatch(
process,
- &AllocatorProcess::recoverResources,
+ &MesosAllocatorProcess::recoverResources,
frameworkId,
slaveId,
resources,
@@ -382,12 +484,13 @@ inline void Allocator::recoverResources(
}
-inline void Allocator::reviveOffers(
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::reviveOffers(
const FrameworkID& frameworkId)
{
process::dispatch(
process,
- &AllocatorProcess::reviveOffers,
+ &MesosAllocatorProcess::reviveOffers,
frameworkId);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 10fa6ec..75e0730 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -55,11 +55,14 @@ class HierarchicalAllocatorProcess;
typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
HierarchicalDRFAllocatorProcess;
+typedef MesosAllocator<HierarchicalDRFAllocatorProcess>
+HierarchicalDRFAllocator;
+
// Implements the basic allocator algorithm - first pick a role by
// some criteria, then pick one of their frameworks to allocate to.
template <typename RoleSorter, typename FrameworkSorter>
-class HierarchicalAllocatorProcess : public AllocatorProcess
+class HierarchicalAllocatorProcess : public MesosAllocatorProcess
{
public:
HierarchicalAllocatorProcess();
@@ -618,8 +621,9 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
// Updated resources allocated to framework (if framework still
// exists, which it might not in the event that we dispatched
- // Master::offer before we received AllocatorProcess::removeFramework
- // or AllocatorProcess::deactivateFramework, in which case we will
+ // Master::offer before we received
+ // MesosAllocatorProcess::removeFramework or
+ // MesosAllocatorProcess::deactivateFramework, in which case we will
// have already recovered all of its resources).
if (frameworks.contains(frameworkId)) {
const std::string& role = frameworks[frameworkId].role;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 1dce7fb..765062f 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -187,10 +187,7 @@ int main(int argc, char** argv)
LOG(INFO) << "Git SHA: " << build::GIT_SHA.get();
}
- allocator::AllocatorProcess* allocatorProcess =
- new allocator::HierarchicalDRFAllocatorProcess();
- allocator::Allocator* allocator =
- new allocator::Allocator(allocatorProcess);
+ allocator::Allocator* allocator = new allocator::HierarchicalDRFAllocator();
Storage* storage = NULL;
Log* log = NULL;
@@ -312,7 +309,6 @@ int main(int argc, char** argv)
process::wait(master->self());
delete master;
delete allocator;
- delete allocatorProcess;
delete registrar;
delete repairer;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 2ea4047..1ea763a 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -98,7 +98,7 @@ public:
// Start a new master with the provided flags and injections.
Try<process::PID<master::Master> > start(
const master::Flags& flags = master::Flags(),
- const Option<master::allocator::AllocatorProcess*>& allocator = None(),
+ const Option<master::allocator::Allocator*>& allocator = None(),
const Option<Authorizer*>& authorizer = None());
// Stops and cleans up a master at the specified PID.
@@ -118,10 +118,10 @@ public:
// Encapsulates a single master's dependencies.
struct Master
{
- Master() : master(NULL) {}
+ Master() : allocator(NULL), createdAllocator(false), master(NULL) {}
- process::Owned<master::allocator::AllocatorProcess> allocatorProcess;
- process::Owned<master::allocator::Allocator> allocator;
+ master::allocator::Allocator* allocator;
+ bool createdAllocator; // Whether we own the allocator.
process::Owned<log::Log> log;
process::Owned<state::Storage> storage;
@@ -243,7 +243,7 @@ inline void Cluster::Masters::shutdown()
inline Try<process::PID<master::Master> > Cluster::Masters::start(
const master::Flags& flags,
- const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
+ const Option<master::allocator::Allocator*>& allocator,
const Option<Authorizer*>& authorizer)
{
// Disallow multiple masters when not using ZooKeeper.
@@ -253,14 +253,13 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
Master master;
- if (allocatorProcess.isNone()) {
- master.allocatorProcess.reset(
- new master::allocator::HierarchicalDRFAllocatorProcess());
- master.allocator.reset(
- new master::allocator::Allocator(master.allocatorProcess.get()));
+ if (allocator.isSome()) {
+ master.allocator = allocator.get();
} else {
- master.allocator.reset(
- new master::allocator::Allocator(allocatorProcess.get()));
+ // If allocator is not provided, fall back to the default one,
+ // managed by Cluster::Masters.
+ master.allocator = new master::allocator::HierarchicalDRFAllocator();
+ master.createdAllocator = true;
}
if (flags.registry == "in_memory") {
@@ -333,7 +332,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
}
master.master = new master::Master(
- master.allocator.get(),
+ master.allocator,
master.registrar.get(),
master.repairer.get(),
&cluster->files,
@@ -397,6 +396,10 @@ inline Try<Nothing> Cluster::Masters::stop(
process::wait(master.master);
delete master.master;
+ if (master.createdAllocator) {
+ delete master.allocator;
+ }
+
masters.erase(pid);
return Nothing();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index dc8efcd..0a477ed 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -614,7 +614,7 @@ TEST_F(FaultToleranceTest, SchedulerReregisterAfterFailoverTimeout)
AWAIT_READY(frameworkId);
Future<Nothing> deactivateFramework = FUTURE_DISPATCH(
- _, &master::allocator::AllocatorProcess::deactivateFramework);
+ _, &master::allocator::MesosAllocatorProcess::deactivateFramework);
Future<Nothing> frameworkFailoverTimeout =
FUTURE_DISPATCH(_, &Master::frameworkFailoverTimeout);
@@ -700,7 +700,7 @@ TEST_F(FaultToleranceTest, SchedulerReregisterAfterUnregistration)
AWAIT_READY(frameworkId);
Future<Nothing> removeFramework = FUTURE_DISPATCH(
- _, &master::allocator::AllocatorProcess::removeFramework);
+ _, &master::allocator::MesosAllocatorProcess::removeFramework);
// Unregister the framework.
driver1.stop();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index df844b5..7b041f0 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -44,8 +44,7 @@ using mesos::master::MIN_CPUS;
using mesos::master::MIN_MEM;
using mesos::master::allocator::Allocator;
-using mesos::master::allocator::AllocatorProcess;
-using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
+using mesos::master::allocator::HierarchicalDRFAllocator;
using process::Clock;
using process::Future;
@@ -67,15 +66,13 @@ class HierarchicalAllocatorTest : public ::testing::Test
{
protected:
HierarchicalAllocatorTest()
- : allocatorProcess(new HierarchicalDRFAllocatorProcess()),
- allocator(new Allocator(allocatorProcess)),
+ : allocator(new HierarchicalDRFAllocator),
nextSlaveId(1),
nextFrameworkId(1) {}
~HierarchicalAllocatorTest()
{
delete allocator;
- delete allocatorProcess;
}
void initialize(
@@ -140,7 +137,6 @@ private:
protected:
master::Flags flags;
- AllocatorProcess* allocatorProcess;
Allocator* allocator;
process::Queue<Allocation> queue;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index b164625..ccbff64 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -46,8 +46,7 @@ using namespace mesos;
using namespace mesos::tests;
using mesos::master::allocator::Allocator;
-using mesos::master::allocator::AllocatorProcess;
-using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
+using mesos::master::allocator::HierarchicalDRFAllocator;
using mesos::master::Master;
@@ -75,15 +74,16 @@ class MasterAllocatorTest : public MesosTest
protected:
void StopAllocator()
{
- process::terminate(allocator.real);
- process::wait(allocator.real);
+ // TODO(alexr): Several tests have been reported flaky if no
+ // explicit stopping of allocation is used. Ensure allocation
+ // is stopped here.
}
- TestAllocatorProcess<T> allocator;
+ TestAllocator<T> allocator;
};
-typedef ::testing::Types<HierarchicalDRFAllocatorProcess> AllocatorTypes;
+typedef ::testing::Types<HierarchicalDRFAllocator> AllocatorTypes;
// Causes all TYPED_TEST(MasterAllocatorTest, ...) to be run for
@@ -1287,7 +1287,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
this->ShutdownMasters();
this->StopAllocator();
- TestAllocatorProcess<TypeParam> allocator2;
+ TestAllocator<TypeParam> allocator2;
EXPECT_CALL(allocator2, initialize(_, _, _));
@@ -1400,7 +1400,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
this->ShutdownMasters();
this->StopAllocator();
- TestAllocatorProcess<TypeParam> allocator2;
+ TestAllocator<TypeParam> allocator2;
EXPECT_CALL(allocator2, initialize(_, _, _));
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 4b8c134..39c54be 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -45,7 +45,7 @@ using namespace mesos::tests;
using mesos::master::Master;
-using mesos::master::allocator::AllocatorProcess;
+using mesos::master::allocator::MesosAllocatorProcess;
using mesos::slave::Slave;
@@ -272,7 +272,7 @@ TEST_F(MasterAuthorizationTest, KillTask)
EXPECT_EQ(TASK_KILLED, status.get().state());
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
// Now complete authorization.
promise.set(true);
@@ -348,7 +348,7 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved)
.WillOnce(FutureArg<1>(&status));
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
// Now complete authorization.
promise.set(true);
@@ -431,7 +431,7 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected)
.Times(AtMost(1));
Future<Nothing> deactivateSlave =
- FUTURE_DISPATCH(_, &AllocatorProcess::deactivateSlave);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::deactivateSlave);
// Now stop the slave.
Stop(slave.get());
@@ -443,7 +443,7 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected)
.WillOnce(FutureArg<1>(&status));
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
// Now complete authorization.
promise.set(true);
@@ -521,7 +521,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved)
AWAIT_READY(authorize);
Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &AllocatorProcess::removeFramework);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
// Now stop the framework.
driver.stop();
@@ -530,7 +530,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved)
AWAIT_READY(removeFramework);
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
// Now complete authorization.
promise.set(true);
@@ -946,7 +946,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration)
Clock::resume();
Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &AllocatorProcess::removeFramework);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
// Now complete authorization.
promise.set(true);
@@ -1008,7 +1008,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration)
AWAIT_READY(authorize2);
Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &AllocatorProcess::removeFramework);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
// Stop the framework.
driver.stop();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index c678527..a5051cb 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -62,7 +62,7 @@ using namespace mesos::tests;
using mesos::master::Master;
-using mesos::master::allocator::AllocatorProcess;
+using mesos::master::allocator::MesosAllocatorProcess;
using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
using mesos::slave::GarbageCollectorProcess;
@@ -1351,7 +1351,7 @@ TEST_F(MasterTest, LaunchAcrossSlavesTest)
combinedOffers.push_back(offers2.get()[0].id());
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
driver.launchTasks(combinedOffers, tasks);
@@ -1444,7 +1444,7 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest)
.WillOnce(FutureArg<1>(&status));
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
driver.launchTasks(combinedOffers, tasks);
@@ -2291,7 +2291,7 @@ TEST_F(MasterTest, OfferTimeout)
.WillOnce(FutureSatisfy(&offerRescinded));
Future<Nothing> recoverResources =
- FUTURE_DISPATCH(_, &AllocatorProcess::recoverResources);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
driver.start();
@@ -2577,7 +2577,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
AWAIT_READY(__statusUpdate2);
Future<Nothing> recoverResources = FUTURE_DISPATCH(
- _, &AllocatorProcess::recoverResources);
+ _, &MesosAllocatorProcess::recoverResources);
// Advance the clock so that the status update manager resends
// TASK_RUNNING update with 'latest_state' as TASK_FINISHED.
@@ -2825,7 +2825,7 @@ TEST_F(MasterTest, SlaveActiveEndpoint)
ASSERT_SOME_EQ(JSON::Boolean(true), status);
Future<Nothing> deactivateSlave =
- FUTURE_DISPATCH(_, &AllocatorProcess::deactivateSlave);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::deactivateSlave);
// Inject a slave exited event at the master causing the master
// to mark the slave as disconnected.
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index ac2dc73..d76ed8c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -188,7 +188,7 @@ Try<PID<master::Master> > MesosTest::StartMaster(
Try<PID<master::Master> > MesosTest::StartMaster(
- master::allocator::AllocatorProcess* allocator,
+ master::allocator::Allocator* allocator,
const Option<master::Flags>& flags)
{
return cluster.masters.start(
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 2b0c90d..266cac3 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -93,7 +93,7 @@ protected:
// Starts a master with the specified allocator process and flags.
virtual Try<process::PID<master::Master> > StartMaster(
- master::allocator::AllocatorProcess* allocator,
+ master::allocator::Allocator* allocator,
const Option<master::Flags>& flags = None());
// Starts a master with the specified authorizer and flags.
@@ -675,15 +675,12 @@ public:
};
-template <typename T = master::allocator::AllocatorProcess>
-class TestAllocatorProcess : public master::allocator::AllocatorProcess
+template <typename T = master::allocator::Allocator>
+class TestAllocator : public master::allocator::Allocator
{
public:
- TestAllocatorProcess()
+ TestAllocator()
{
- // Spawn the underlying allocator process.
- process::spawn(real);
-
// We use 'ON_CALL' and 'WillByDefault' here to specify the
// default actions (call in to the real allocator). This allows
// the tests to leverage the 'DoDefault' action.
@@ -764,11 +761,7 @@ public:
.WillRepeatedly(DoDefault());
}
- ~TestAllocatorProcess()
- {
- process::terminate(real);
- process::wait(real);
- }
+ ~TestAllocator() {}
MOCK_METHOD3(initialize, void(
const master::Flags&,
@@ -838,130 +831,79 @@ public:
ACTION_P(InvokeInitialize, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::initialize,
- arg0,
- arg1,
- arg2);
+ allocator->real.initialize(arg0, arg1, arg2);
}
ACTION_P(InvokeAddFramework, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::addFramework,
- arg0,
- arg1,
- arg2);
+ allocator->real.addFramework(arg0, arg1, arg2);
}
ACTION_P(InvokeRemoveFramework, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::removeFramework, arg0);
+ allocator->real.removeFramework(arg0);
}
ACTION_P(InvokeActivateFramework, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::activateFramework,
- arg0);
+ allocator->real.activateFramework(arg0);
}
ACTION_P(InvokeDeactivateFramework, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::deactivateFramework,
- arg0);
+ allocator->real.deactivateFramework(arg0);
}
ACTION_P(InvokeAddSlave, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::addSlave,
- arg0,
- arg1,
- arg2,
- arg3);
+ allocator->real.addSlave(arg0, arg1, arg2, arg3);
}
ACTION_P(InvokeRemoveSlave, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::removeSlave,
- arg0);
+ allocator->real.removeSlave(arg0);
}
ACTION_P(InvokeActivateSlave, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::activateSlave,
- arg0);
+ allocator->real.activateSlave(arg0);
}
ACTION_P(InvokeDeactivateSlave, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::deactivateSlave,
- arg0);
+ allocator->real.deactivateSlave(arg0);
}
ACTION_P(InvokeUpdateWhitelist, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::updateWhitelist,
- arg0);
+ allocator->real.updateWhitelist(arg0);
}
ACTION_P(InvokeRequestResources, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::requestResources,
- arg0,
- arg1);
+ allocator->real.requestResources(arg0, arg1);
}
ACTION_P(InvokeUpdateAllocation, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::updateAllocation,
- arg0,
- arg1,
- arg2);
+ allocator->real.updateAllocation(arg0, arg1, arg2);
}
ACTION_P(InvokeRecoverResources, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::recoverResources,
- arg0,
- arg1,
- arg2,
- arg3);
+ allocator->real.recoverResources(arg0, arg1, arg2, arg3);
}
@@ -970,22 +912,13 @@ ACTION_P2(InvokeRecoverResourcesWithFilters, allocator, timeout)
Filters filters;
filters.set_refuse_seconds(timeout);
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::recoverResources,
- arg0,
- arg1,
- arg2,
- filters);
+ allocator->real.recoverResources(arg0, arg1, arg2, filters);
}
ACTION_P(InvokeReviveOffers, allocator)
{
- process::dispatch(
- allocator->real,
- &master::allocator::AllocatorProcess::reviveOffers,
- arg0);
+ allocator->real.reviveOffers(arg0);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index bd7940d..eb16a58 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -42,7 +42,7 @@ using namespace mesos::tests;
using mesos::master::Master;
-using mesos::master::allocator::AllocatorProcess;
+using mesos::master::allocator::MesosAllocatorProcess;
using mesos::slave::Slave;
@@ -444,7 +444,7 @@ TEST_F(PartitionTest, OneWayPartitionMasterToSlave)
AWAIT_READY(ping);
Future<Nothing> deactivateSlave =
- FUTURE_DISPATCH(_, &AllocatorProcess::deactivateSlave);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::deactivateSlave);
// Inject a slave exited event at the master causing the master
// to mark the slave as disconnected. The slave should not notice
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 8114c79..718d950 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -36,7 +36,7 @@ using namespace mesos::tests;
using mesos::master::Master;
-using mesos::master::allocator::AllocatorProcess;
+using mesos::master::allocator::MesosAllocatorProcess;
using process::metrics::internal::MetricsProcess;
@@ -189,7 +189,7 @@ TEST_F(RateLimitingTest, NoRateLimiting)
}
Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &AllocatorProcess::removeFramework);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
driver->stop();
driver->join();
@@ -568,7 +568,7 @@ TEST_F(RateLimitingTest, DifferentPrincipalFrameworks)
// 3. Remove a framework and its message counters are deleted while
// the other framework's counters stay.
Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &AllocatorProcess::removeFramework);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
driver1->stop();
driver1->join();
@@ -735,7 +735,7 @@ TEST_F(RateLimitingTest, SamePrincipalFrameworks)
AWAIT_READY(duplicateFrameworkRegisteredMessage2);
Future<Nothing> removeFramework =
- FUTURE_DISPATCH(_, &AllocatorProcess::removeFramework);
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework);
driver1->stop();
driver1->join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index b371153..6f9a435 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -36,8 +36,6 @@
using namespace mesos;
using namespace mesos::tests;
-using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
-
using mesos::master::Master;
using mesos::slave::Slave;
@@ -883,7 +881,7 @@ TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
TEST_F(ResourceOffersTest, Request)
{
- TestAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
+ TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator;
EXPECT_CALL(allocator, initialize(_, _, _))
.Times(1);
http://git-wip-us.apache.org/repos/asf/mesos/blob/2b51582a/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index bd2bdef..8210c52 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -66,10 +66,7 @@ using namespace process;
using google::protobuf::RepeatedPtrField;
-using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
-
using mesos::master::Master;
-
using mesos::slave::Containerizer;
using mesos::slave::Fetcher;
using mesos::slave::GarbageCollectorProcess;
@@ -2257,7 +2254,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
// using an explicit executor.
TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
{
- TestAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
+ TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator;
EXPECT_CALL(allocator, initialize(_, _, _));
[6/6] mesos git commit: Extracted MesosAllocator into a separate file.
Posted by nn...@apache.org.
Extracted MesosAllocator into a separate file.
Review: https://reviews.apache.org/r/29931
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3385f36
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3385f36
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3385f36
Branch: refs/heads/master
Commit: b3385f3619b53d83004688984b8f4733eb5c5e12
Parents: 336e4e2
Author: Alexander Rukletsov <al...@mesosphere.io>
Authored: Thu Feb 12 11:44:58 2015 -0800
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Thu Feb 12 11:44:58 2015 -0800
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/master/allocator/allocator.hpp | 374 +---------------------
src/master/allocator/mesos/allocator.hpp | 383 +++++++++++++++++++++++
src/master/allocator/mesos/hierarchical.hpp | 2 +-
src/master/master.cpp | 2 +-
5 files changed, 390 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3385f36/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index f9efaaa..05ee76d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -509,6 +509,7 @@ libmesos_no_3rdparty_la_SOURCES += \
master/registrar.hpp \
master/validation.hpp \
master/allocator/allocator.hpp \
+ master/allocator/mesos/allocator.hpp \
master/allocator/mesos/hierarchical.hpp \
master/allocator/sorter/drf/sorter.hpp \
master/allocator/sorter/sorter.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3385f36/src/master/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/allocator.hpp b/src/master/allocator/allocator.hpp
index b9d85cc..c2461a3 100644
--- a/src/master/allocator/allocator.hpp
+++ b/src/master/allocator/allocator.hpp
@@ -24,9 +24,6 @@
#include <mesos/resources.hpp>
-#include <process/dispatch.hpp>
-#include <process/process.hpp>
-
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
@@ -79,10 +76,10 @@ public:
const FrameworkID& frameworkId) = 0;
// Note that the 'total' resources are passed explicitly because it
- // includes resources that are dynamically "persisted" on the slave
- // (e.g. persistent volumes, dynamic reservations, etc).
- // The slaveInfo resources, on the other hand, correspond directly
- // to the static --resources flag value on the slave.
+ // includes resources that are dynamically "checkpointed" on the
+ // slave (e.g. persistent volumes, dynamic reservations, etc). The
+ // slaveInfo resources, on the other hand, correspond directly to
+ // the static --resources flag value on the slave.
virtual void addSlave(
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
@@ -125,369 +122,6 @@ public:
const FrameworkID& frameworkId) = 0;
};
-
-class MesosAllocatorProcess;
-
-// A wrapper for Process-based allocators. It redirects all function
-// invocations to the underlying AllocatorProcess and manages its
-// lifetime. We ensure the template parameter AllocatorProcess
-// implements MesosAllocatorProcess by storing a pointer to it.
-//
-// TODO(alexr): Move this class (together with the implementation)
-// into a separate file.
-template <typename AllocatorProcess>
-class MesosAllocator : public Allocator
-{
-public:
- MesosAllocator();
-
- ~MesosAllocator();
-
- void initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles);
-
- void addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used);
-
- void removeFramework(
- const FrameworkID& frameworkId);
-
- // Offers are sent only to activated frameworks.
- void activateFramework(
- const FrameworkID& frameworkId);
-
- void deactivateFramework(
- const FrameworkID& frameworkId);
-
- // Note that the 'total' resources are passed explicitly because it
- // includes resources that are dynamically "checkpointed" on the
- // slave (e.g. persistent volumes, dynamic reservations, etc). The
- // slaveInfo resources, on the other hand, correspond directly to
- // the static --resources flag value on the slave.
- void addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used);
-
- void removeSlave(
- const SlaveID& slaveId);
-
- // Offers are sent only for activated slaves.
- void activateSlave(
- const SlaveID& slaveId);
-
- void deactivateSlave(
- const SlaveID& slaveId);
-
- void updateWhitelist(
- const Option<hashset<std::string> >& whitelist);
-
- void requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests);
-
- void updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations);
-
- // Informs the allocator to recover resources that are considered
- // used by the framework.
- void recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters);
-
- // Whenever a framework that has filtered resources wants to revive
- // offers for those resources the master invokes this callback.
- void reviveOffers(
- const FrameworkID& frameworkId);
-
-private:
- MesosAllocator(const MesosAllocator&); // Not copyable.
- MesosAllocator& operator=(const MesosAllocator&); // Not assignable.
-
- MesosAllocatorProcess* process;
-};
-
-
-// The basic interface for all Process-based allocators.
-class MesosAllocatorProcess : public process::Process<MesosAllocatorProcess>
-{
-public:
- MesosAllocatorProcess() {}
-
- virtual ~MesosAllocatorProcess() {}
-
- // Explicitly unhide 'initialize' to silence a compiler warning
- // from clang, since we overload below.
- using process::ProcessBase::initialize;
-
- virtual void initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles) = 0;
-
- virtual void addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used) = 0;
-
- virtual void removeFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void activateFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void deactivateFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used) = 0;
-
- virtual void removeSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void activateSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void deactivateSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void updateWhitelist(
- const Option<hashset<std::string> >& whitelist) = 0;
-
- virtual void requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests) = 0;
-
- virtual void updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations) = 0;
-
- virtual void recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters) = 0;
-
- virtual void reviveOffers(
- const FrameworkID& frameworkId) = 0;
-};
-
-
-template <typename AllocatorProcess>
-MesosAllocator<AllocatorProcess>::MesosAllocator()
-{
- process = new AllocatorProcess();
- process::spawn(process);
-}
-
-
-template <typename AllocatorProcess>
-MesosAllocator<AllocatorProcess>::~MesosAllocator()
-{
- process::terminate(process);
- process::wait(process);
- delete process;
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::initialize,
- flags,
- offerCallback,
- roles);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::addFramework,
- frameworkId,
- frameworkInfo,
- used);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::removeFramework(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::removeFramework,
- frameworkId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::activateFramework(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::activateFramework,
- frameworkId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::deactivateFramework(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::deactivateFramework,
- frameworkId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::addSlave,
- slaveId,
- slaveInfo,
- total,
- used);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::removeSlave(
- const SlaveID& slaveId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::removeSlave,
- slaveId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::activateSlave(
- const SlaveID& slaveId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::activateSlave,
- slaveId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::deactivateSlave(
- const SlaveID& slaveId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::deactivateSlave,
- slaveId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::updateWhitelist(
- const Option<hashset<std::string> >& whitelist)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::updateWhitelist,
- whitelist);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::requestResources,
- frameworkId,
- requests);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::updateAllocation,
- frameworkId,
- slaveId,
- operations);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::recoverResources,
- frameworkId,
- slaveId,
- resources,
- filters);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::reviveOffers(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::reviveOffers,
- frameworkId);
-}
-
} // namespace allocator {
} // namespace master {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3385f36/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
new file mode 100644
index 0000000..99b40a0
--- /dev/null
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_ALLOCATOR_MESOS_ALLOCATOR_HPP__
+#define __MASTER_ALLOCATOR_MESOS_ALLOCATOR_HPP__
+
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
+#include "master/allocator/allocator.hpp"
+
+namespace mesos {
+namespace master {
+namespace allocator {
+
+class MesosAllocatorProcess;
+
+// A wrapper for Process-based allocators. It redirects all function
+// invocations to the underlying AllocatorProcess and manages its
+// lifetime. We ensure the template parameter AllocatorProcess
+// implements MesosAllocatorProcess by storing a pointer to it.
+template <typename AllocatorProcess>
+class MesosAllocator : public Allocator
+{
+public:
+ MesosAllocator();
+
+ ~MesosAllocator();
+
+ void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles);
+
+ void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used);
+
+ void removeFramework(
+ const FrameworkID& frameworkId);
+
+ void activateFramework(
+ const FrameworkID& frameworkId);
+
+ void deactivateFramework(
+ const FrameworkID& frameworkId);
+
+ void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used);
+
+ void removeSlave(
+ const SlaveID& slaveId);
+
+ void activateSlave(
+ const SlaveID& slaveId);
+
+ void deactivateSlave(
+ const SlaveID& slaveId);
+
+ void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist);
+
+ void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests);
+
+ void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations);
+
+ void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters);
+
+ void reviveOffers(
+ const FrameworkID& frameworkId);
+
+private:
+ MesosAllocator(const MesosAllocator&); // Not copyable.
+ MesosAllocator& operator=(const MesosAllocator&); // Not assignable.
+
+ MesosAllocatorProcess* process;
+};
+
+
+// The basic interface for all Process-based allocators.
+class MesosAllocatorProcess : public process::Process<MesosAllocatorProcess>
+{
+public:
+ MesosAllocatorProcess() {}
+
+ virtual ~MesosAllocatorProcess() {}
+
+ // Explicitly unhide 'initialize' to silence a compiler warning
+ // from clang, since we overload below.
+ using process::ProcessBase::initialize;
+
+ virtual void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles) = 0;
+
+ virtual void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used) = 0;
+
+ virtual void removeFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void activateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void deactivateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used) = 0;
+
+ virtual void removeSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void activateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void deactivateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist) = 0;
+
+ virtual void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests) = 0;
+
+ virtual void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations) = 0;
+
+ virtual void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters) = 0;
+
+ virtual void reviveOffers(
+ const FrameworkID& frameworkId) = 0;
+};
+
+
+template <typename AllocatorProcess>
+MesosAllocator<AllocatorProcess>::MesosAllocator()
+{
+ process = new AllocatorProcess();
+ process::spawn(process);
+}
+
+
+template <typename AllocatorProcess>
+MesosAllocator<AllocatorProcess>::~MesosAllocator()
+{
+ process::terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::initialize,
+ flags,
+ offerCallback,
+ roles);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::addFramework,
+ frameworkId,
+ frameworkInfo,
+ used);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::removeFramework(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::removeFramework,
+ frameworkId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::activateFramework(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::activateFramework,
+ frameworkId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::deactivateFramework(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::deactivateFramework,
+ frameworkId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::addSlave,
+ slaveId,
+ slaveInfo,
+ total,
+ used);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::removeSlave(
+ const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::removeSlave,
+ slaveId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::activateSlave(
+ const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::activateSlave,
+ slaveId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::deactivateSlave(
+ const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::deactivateSlave,
+ slaveId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateWhitelist(
+ const Option<hashset<std::string> >& whitelist)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateWhitelist,
+ whitelist);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::requestResources,
+ frameworkId,
+ requests);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateAllocation,
+ frameworkId,
+ slaveId,
+ operations);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::recoverResources,
+ frameworkId,
+ slaveId,
+ resources,
+ filters);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::reviveOffers(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::reviveOffers,
+ frameworkId);
+}
+
+} // namespace allocator {
+} // namespace master {
+} // namespace mesos {
+
+#endif // __MASTER_ALLOCATOR_MESOS_ALLOCATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3385f36/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 8ddf4f9..2680d62 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -35,7 +35,7 @@
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
-#include "master/allocator/allocator.hpp"
+#include "master/allocator/mesos/allocator.hpp"
#include "master/allocator/sorter/drf/sorter.hpp"
namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3385f36/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a567a34..f10a3cf 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -583,7 +583,7 @@ void Master::initialize()
defer(self(), &Master::offer, lambda::_1, lambda::_2),
roleInfos);
- // Parse the whitelist. Passing allocator::updateWhitelist()
+ // Parse the whitelist. Passing Allocator::updateWhitelist()
// callback is safe because we shut down the whitelistWatcher in
// Master::finalize(), while allocator lifetime is greater than
// masters. Therefore there is no risk of calling into an allocator
[5/6] mesos git commit: Cleaned up includes in allocation sources.
Posted by nn...@apache.org.
Cleaned up includes in allocation sources.
Review: https://reviews.apache.org/r/29929
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/336e4e2e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/336e4e2e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/336e4e2e
Branch: refs/heads/master
Commit: 336e4e2e6b1cee966f5b72bc23341c54e4813cc9
Parents: be6246a
Author: Alexander Rukletsov <al...@mesosphere.io>
Authored: Thu Feb 12 11:44:38 2015 -0800
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Thu Feb 12 11:44:38 2015 -0800
----------------------------------------------------------------------
src/local/local.cpp | 1 -
src/master/allocator/allocator.hpp | 6 ------
src/master/allocator/mesos/hierarchical.hpp | 3 +--
src/master/main.cpp | 1 -
src/tests/master_tests.cpp | 1 -
5 files changed, 1 insertion(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/336e4e2e/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 4b4ec99..1ef04b8 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -68,7 +68,6 @@ using namespace mesos;
using namespace mesos::log;
using mesos::master::allocator::Allocator;
-using mesos::master::allocator::DRFSorter;
using mesos::master::allocator::HierarchicalDRFAllocator;
using mesos::master::Master;
http://git-wip-us.apache.org/repos/asf/mesos/blob/336e4e2e/src/master/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/allocator.hpp b/src/master/allocator/allocator.hpp
index 7b857f0..b9d85cc 100644
--- a/src/master/allocator/allocator.hpp
+++ b/src/master/allocator/allocator.hpp
@@ -24,9 +24,7 @@
#include <mesos/resources.hpp>
-#include <process/future.hpp>
#include <process/dispatch.hpp>
-#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/hashmap.hpp>
@@ -36,13 +34,9 @@
#include "master/flags.hpp"
-#include "messages/messages.hpp"
-
namespace mesos {
namespace master {
-class Master; // Forward declaration.
-
namespace allocator {
// Basic model of an allocator: resources are allocated to a framework
http://git-wip-us.apache.org/repos/asf/mesos/blob/336e4e2e/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index cf2e369..8ddf4f9 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -23,6 +23,7 @@
#include <vector>
#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
@@ -34,8 +35,6 @@
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
-#include "master/master.hpp"
-
#include "master/allocator/allocator.hpp"
#include "master/allocator/sorter/drf/sorter.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/336e4e2e/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 63408f4..a1efd2b 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -54,7 +54,6 @@
#include "master/allocator/allocator.hpp"
#include "master/allocator/mesos/hierarchical.hpp"
-#include "master/allocator/sorter/drf/sorter.hpp"
#include "module/manager.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/336e4e2e/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index b821038..e69a7fb 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -64,7 +64,6 @@ using namespace mesos::tests;
using mesos::master::Master;
using mesos::master::allocator::MesosAllocatorProcess;
-using mesos::master::allocator::HierarchicalDRFAllocatorProcess;
using mesos::slave::GarbageCollectorProcess;
using mesos::slave::Slave;
[2/6] mesos git commit: Moved allocation related sources into a
separate directory.
Posted by nn...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/drf_sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.hpp b/src/master/drf_sorter.hpp
deleted file mode 100644
index 8a16d28..0000000
--- a/src/master/drf_sorter.hpp
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __DRF_SORTER_HPP__
-#define __DRF_SORTER_HPP__
-
-#include <set>
-#include <string>
-
-#include <mesos/resources.hpp>
-
-#include <stout/hashmap.hpp>
-
-#include "master/sorter.hpp"
-
-
-namespace mesos {
-namespace master {
-namespace allocator {
-
-struct Client
-{
- Client(const std::string& _name, double _share, uint64_t _allocations)
- : name(_name), share(_share), allocations(_allocations) {}
-
- std::string name;
- double share;
-
- // We store the number of times this client has been chosen for
- // allocation so that we can fairly share the resources across
- // clients that have the same share. Note that this information is
- // not persisted across master failovers, but since the point is to
- // equalize the 'allocations' across clients of the same 'share'
- // having allocations restart at 0 after a master failover should be
- // sufficient (famous last words.)
- uint64_t allocations;
-};
-
-
-struct DRFComparator
-{
- virtual ~DRFComparator() {}
- virtual bool operator () (const Client& client1, const Client& client2);
-};
-
-
-class DRFSorter : public Sorter
-{
-public:
- virtual ~DRFSorter() {}
-
- virtual void add(const std::string& name, double weight = 1);
-
- virtual void remove(const std::string& name);
-
- virtual void activate(const std::string& name);
-
- virtual void deactivate(const std::string& name);
-
- virtual void allocated(const std::string& name,
- const Resources& resources);
-
- virtual void update(const std::string& name,
- const Resources& oldAllocation,
- const Resources& newAllocation);
-
- virtual void unallocated(const std::string& name,
- const Resources& resources);
-
- virtual Resources allocation(const std::string& name);
-
- virtual void add(const Resources& resources);
-
- virtual void remove(const Resources& resources);
-
- virtual std::list<std::string> sort();
-
- virtual bool contains(const std::string& name);
-
- virtual int count();
-
-private:
- // Recalculates the share for the client and moves
- // it in 'clients' accordingly.
- void update(const std::string& name);
-
- // Returns the dominant resource share for the client.
- double calculateShare(const std::string& name);
-
- // Returns an iterator to the specified client, if
- // it exists in this Sorter.
- std::set<Client, DRFComparator>::iterator find(const std::string& name);
-
- // If true, start() will recalculate all shares.
- bool dirty;
-
- // A set of Clients (names and shares) sorted by share.
- std::set<Client, DRFComparator> clients;
-
- // Maps client names to the resources they have been allocated.
- hashmap<std::string, Resources> allocations;
-
- // Maps client names to the weights that should be applied to their shares.
- hashmap<std::string, double> weights;
-
- // Total resources.
- Resources resources;
-};
-
-} // namespace allocator {
-} // namespace master {
-} // namespace mesos {
-
-#endif // __DRF_SORTER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
deleted file mode 100644
index 75e0730..0000000
--- a/src/master/hierarchical_allocator_process.hpp
+++ /dev/null
@@ -1,922 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
-#define __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
-
-#include <algorithm>
-#include <vector>
-
-#include <mesos/resources.hpp>
-
-#include <process/delay.hpp>
-#include <process/id.hpp>
-#include <process/timeout.hpp>
-
-#include <stout/check.hpp>
-#include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/stopwatch.hpp>
-#include <stout/stringify.hpp>
-
-#include "master/allocator.hpp"
-#include "master/drf_sorter.hpp"
-#include "master/master.hpp"
-#include "master/sorter.hpp"
-
-namespace mesos {
-namespace master {
-namespace allocator {
-
-// Forward declarations.
-class Filter;
-
-
-// We forward declare the hierarchical allocator process so that we
-// can typedef an instantiation of it with DRF sorters.
-template <typename RoleSorter, typename FrameworkSorter>
-class HierarchicalAllocatorProcess;
-
-typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
-HierarchicalDRFAllocatorProcess;
-
-typedef MesosAllocator<HierarchicalDRFAllocatorProcess>
-HierarchicalDRFAllocator;
-
-
-// Implements the basic allocator algorithm - first pick a role by
-// some criteria, then pick one of their frameworks to allocate to.
-template <typename RoleSorter, typename FrameworkSorter>
-class HierarchicalAllocatorProcess : public MesosAllocatorProcess
-{
-public:
- HierarchicalAllocatorProcess();
-
- virtual ~HierarchicalAllocatorProcess();
-
- process::PID<HierarchicalAllocatorProcess> self();
-
- void initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles);
-
- void addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used);
-
- void removeFramework(
- const FrameworkID& frameworkId);
-
- void activateFramework(
- const FrameworkID& frameworkId);
-
- void deactivateFramework(
- const FrameworkID& frameworkId);
-
- void addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used);
-
- void removeSlave(
- const SlaveID& slaveId);
-
- void deactivateSlave(
- const SlaveID& slaveId);
-
- void activateSlave(
- const SlaveID& slaveId);
-
- void updateWhitelist(
- const Option<hashset<std::string> >& whitelist);
-
- void requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests);
-
- void updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations);
-
- void recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters);
-
- void reviveOffers(
- const FrameworkID& frameworkId);
-
-protected:
- // Useful typedefs for dispatch/delay/defer to self()/this.
- typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> Self;
- typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> This;
-
- // Callback for doing batch allocations.
- void batch();
-
- // Allocate any allocatable resources.
- void allocate();
-
- // Allocate resources just from the specified slave.
- void allocate(const SlaveID& slaveId);
-
- // Allocate resources from the specified slaves.
- void allocate(const hashset<SlaveID>& slaveIds);
-
- // Remove a filter for the specified framework.
- void expire(const FrameworkID& frameworkId, Filter* filter);
-
- // Checks whether the slave is whitelisted.
- bool isWhitelisted(const SlaveID& slaveId);
-
- // Returns true if there is a filter for this framework
- // on this slave.
- bool isFiltered(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources);
-
- bool allocatable(const Resources& resources);
-
- bool initialized;
-
- Flags flags;
-
- lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)> offerCallback;
-
- struct Framework
- {
- std::string role;
- bool checkpoint; // Whether the framework desires checkpointing.
-
- hashset<Filter*> filters; // Active filters for the framework.
- };
-
- hashmap<FrameworkID, Framework> frameworks;
-
- struct Slave
- {
- Resources total;
- Resources available;
-
- bool activated; // Whether to offer resources.
- bool checkpoint; // Whether slave supports checkpointing.
-
- std::string hostname;
- };
-
- hashmap<SlaveID, Slave> slaves;
-
- hashmap<std::string, RoleInfo> roles;
-
- // Slaves to send offers for.
- Option<hashset<std::string> > whitelist;
-
- // There are two levels of sorting, hence "hierarchical".
- // Level 1 sorts across roles:
- // Reserved resources are excluded from fairness calculation,
- // since they are forcibly pinned to a role.
- // Level 2 sorts across frameworks within a particular role:
- // Both reserved resources and unreserved resources are used
- // in the fairness calculation. This is because reserved
- // resources can be allocated to any framework in the role.
- RoleSorter* roleSorter;
- hashmap<std::string, FrameworkSorter*> frameworkSorters;
-};
-
-
-// Used to represent "filters" for resources unused in offers.
-class Filter
-{
-public:
- virtual ~Filter() {}
-
- virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
-};
-
-
-class RefusedFilter: public Filter
-{
-public:
- RefusedFilter(
- const SlaveID& _slaveId,
- const Resources& _resources,
- const process::Timeout& _timeout)
- : slaveId(_slaveId), resources(_resources), timeout(_timeout) {}
-
- virtual bool filter(const SlaveID& _slaveId, const Resources& _resources)
- {
- return slaveId == _slaveId &&
- resources.contains(_resources) && // Refused resources are superset.
- timeout.remaining() > Seconds(0);
- }
-
- const SlaveID slaveId;
- const Resources resources;
- const process::Timeout timeout;
-};
-
-
-template <class RoleSorter, class FrameworkSorter>
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::HierarchicalAllocatorProcess() // NOLINT(whitespace/line_length)
- : ProcessBase(process::ID::generate("hierarchical-allocator")),
- initialized(false) {}
-
-
-template <class RoleSorter, class FrameworkSorter>
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::~HierarchicalAllocatorProcess() // NOLINT(whitespace/line_length)
-{}
-
-
-template <class RoleSorter, class FrameworkSorter>
-process::PID<HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> >
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::self()
-{
- return process::PID<HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> >(this); // NOLINT(whitespace/line_length)
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
- const Flags& _flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& _offerCallback,
- const hashmap<std::string, RoleInfo>& _roles)
-{
- flags = _flags;
- offerCallback = _offerCallback;
- roles = _roles;
- initialized = true;
-
- roleSorter = new RoleSorter();
- foreachpair (const std::string& name, const RoleInfo& roleInfo, roles) {
- roleSorter->add(name, roleInfo.weight());
- frameworkSorters[name] = new FrameworkSorter();
- }
-
- if (roleSorter->count() == 0) {
- LOG(ERROR) << "No roles specified, cannot allocate resources!";
- }
-
- VLOG(1) << "Initialized hierarchical allocator process";
-
- delay(flags.allocation_interval, self(), &Self::batch);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used)
-{
- CHECK(initialized);
-
- const std::string& role = frameworkInfo.role();
-
- CHECK(roles.contains(role));
-
- CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
- frameworkSorters[role]->add(frameworkId.value());
-
- // TODO(bmahler): Validate that the reserved resources have the
- // framework's role.
-
- // Update the allocation to this framework.
- roleSorter->allocated(role, used.unreserved());
- frameworkSorters[role]->add(used);
- frameworkSorters[role]->allocated(frameworkId.value(), used);
-
- frameworks[frameworkId] = Framework();
- frameworks[frameworkId].role = frameworkInfo.role();
- frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
-
- LOG(INFO) << "Added framework " << frameworkId;
-
- allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
- const std::string& role = frameworks[frameworkId].role;
-
- // Might not be in 'frameworkSorters[role]' because it was previously
- // deactivated and never re-added.
- if (frameworkSorters[role]->contains(frameworkId.value())) {
- Resources allocation =
- frameworkSorters[role]->allocation(frameworkId.value());
-
- roleSorter->unallocated(role, allocation.unreserved());
- frameworkSorters[role]->remove(allocation);
- frameworkSorters[role]->remove(frameworkId.value());
- }
-
- // Do not delete the filters contained in this
- // framework's 'filters' hashset yet, see comments in
- // HierarchicalAllocatorProcess::reviveOffers and
- // HierarchicalAllocatorProcess::expire.
- frameworks.erase(frameworkId);
-
- LOG(INFO) << "Removed framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateFramework(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
- const std::string& role = frameworks[frameworkId].role;
-
- frameworkSorters[role]->activate(frameworkId.value());
-
- LOG(INFO) << "Activated framework " << frameworkId;
-
- allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- CHECK(frameworks.contains(frameworkId));
- const std::string& role = frameworks[frameworkId].role;
-
- frameworkSorters[role]->deactivate(frameworkId.value());
-
- // Note that the Sorter *does not* remove the resources allocated
- // to this framework. For now, this is important because if the
- // framework fails over and is activated, we still want a record
- // of the resources that it is using. We might be able to collapse
- // the added/removed and activated/deactivated in the future.
-
- // Do not delete the filters contained in this
- // framework's 'filters' hashset yet, see comments in
- // HierarchicalAllocatorProcess::reviveOffers and
- // HierarchicalAllocatorProcess::expire.
- frameworks[frameworkId].filters.clear();
-
- LOG(INFO) << "Deactivated framework " << frameworkId;
-}
-
-
-
-// TODO(bmahler): Generalize this.
-template <typename Iterable>
-Resources sum(const Iterable& resources)
-{
- Resources total;
- foreach (const Resources& r, resources) {
- total += r;
- }
- return total;
-}
-
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used)
-{
- CHECK(initialized);
- CHECK(!slaves.contains(slaveId));
-
- roleSorter->add(total.unreserved());
-
- foreachpair (const FrameworkID& frameworkId,
- const Resources& allocated,
- used) {
- if (frameworks.contains(frameworkId)) {
- const std::string& role = frameworks[frameworkId].role;
-
- // TODO(bmahler): Validate that the reserved resources have the
- // framework's role.
-
- roleSorter->allocated(role, allocated.unreserved());
- frameworkSorters[role]->add(allocated);
- frameworkSorters[role]->allocated(frameworkId.value(), allocated);
- }
- }
-
- slaves[slaveId] = Slave();
- slaves[slaveId].total = total;
- slaves[slaveId].available = total - sum(used.values());
- slaves[slaveId].activated = true;
- slaves[slaveId].checkpoint = slaveInfo.checkpoint();
- slaves[slaveId].hostname = slaveInfo.hostname();
-
- LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
- << ") with " << slaves[slaveId].total
- << " (and " << slaves[slaveId].available << " available)";
-
- allocate(slaveId);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave(
- const SlaveID& slaveId)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- // TODO(bmahler): Per MESOS-621, this should remove the allocations
- // that any frameworks have on this slave. Otherwise the caller may
- // "leak" allocated resources accidentally if they forget to recover
- // all the resources. Fixing this would require more information
- // than what we currently track in the allocator.
-
- roleSorter->remove(slaves[slaveId].total.unreserved());
-
- slaves.erase(slaveId);
-
- // Note that we DO NOT actually delete any filters associated with
- // this slave, that will occur when the delayed
- // HierarchicalAllocatorProcess::expire gets invoked (or the framework
- // that applied the filters gets removed).
-
- LOG(INFO) << "Removed slave " << slaveId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateSlave(
- const SlaveID& slaveId)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- slaves[slaveId].activated = true;
-
- LOG(INFO)<< "Slave " << slaveId << " reactivated";
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateSlave(
- const SlaveID& slaveId)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
-
- slaves[slaveId].activated = false;
-
- LOG(INFO) << "Slave " << slaveId << " deactivated";
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateWhitelist(
- const Option<hashset<std::string> >& _whitelist)
-{
- CHECK(initialized);
-
- whitelist = _whitelist;
-
- if (whitelist.isSome()) {
- LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get());
-
- if (whitelist.get().empty()) {
- LOG(WARNING) << "Whitelist is empty, no offers will be made!";
- }
- } else {
- LOG(INFO) << "Advertising offers for all slaves";
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests)
-{
- CHECK(initialized);
-
- LOG(INFO) << "Received resource request from framework " << frameworkId;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations)
-{
- CHECK(initialized);
- CHECK(slaves.contains(slaveId));
- CHECK(frameworks.contains(frameworkId));
-
- // The total resources on the slave are composed of both allocated
- // and available resources:
- //
- // total = available + allocated
- //
- // Here we apply offer operations to the allocated resources, which
- // in turns leads to an update of the total. The available resources
- // remain unchanged.
-
- FrameworkSorter* frameworkSorter =
- frameworkSorters[frameworks[frameworkId].role];
-
- Resources allocation = frameworkSorter->allocation(frameworkId.value());
-
- // Update the allocated resources.
- Try<Resources> updatedAllocation = allocation.apply(operations);
- CHECK_SOME(updatedAllocation);
-
- frameworkSorter->update(
- frameworkId.value(),
- allocation,
- updatedAllocation.get());
-
- roleSorter->update(
- frameworks[frameworkId].role,
- allocation.unreserved(),
- updatedAllocation.get().unreserved());
-
- // Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
- CHECK_SOME(updatedTotal);
-
- slaves[slaveId].total = updatedTotal.get();
-
- // TODO(bmahler): Validate that the available resources are
- // unaffected. This requires augmenting the sorters with
- // SlaveIDs for allocations, so that we can do:
- //
- // CHECK_EQ(slaves[slaveId].total - updatedAllocation,
- // slaves[slaveId].available);
-
- // TODO(jieyu): Do not log if there is no update.
- LOG(INFO) << "Updated allocation of framework " << frameworkId
- << " on slave " << slaveId
- << " from " << allocation << " to " << updatedAllocation.get();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters)
-{
- CHECK(initialized);
-
- if (resources.empty()) {
- return;
- }
-
- // Updated resources allocated to framework (if framework still
- // exists, which it might not in the event that we dispatched
- // Master::offer before we received
- // MesosAllocatorProcess::removeFramework or
- // MesosAllocatorProcess::deactivateFramework, in which case we will
- // have already recovered all of its resources).
- if (frameworks.contains(frameworkId)) {
- const std::string& role = frameworks[frameworkId].role;
-
- CHECK(frameworkSorters.contains(role));
-
- if (frameworkSorters[role]->contains(frameworkId.value())) {
- frameworkSorters[role]->unallocated(frameworkId.value(), resources);
- frameworkSorters[role]->remove(resources);
- roleSorter->unallocated(role, resources.unreserved());
- }
- }
-
- // Update resources allocatable on slave (if slave still exists,
- // which it might not in the event that we dispatched Master::offer
- // before we received Allocator::removeSlave).
- if (slaves.contains(slaveId)) {
- slaves[slaveId].available += resources;
-
- LOG(INFO) << "Recovered " << resources
- << " (total allocatable: " << slaves[slaveId].available
- << ") on slave " << slaveId
- << " from framework " << frameworkId;
- }
-
- // No need to install the filter if 'filters' is none.
- if (filters.isNone()) {
- return;
- }
-
- // No need to install the filter if slave/framework does not exist.
- if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
- return;
- }
-
- // Create a refused resources filter.
- Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
-
- if (seconds.isError()) {
- LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
- << "the refused resources filter because the input value "
- << "is invalid: " << seconds.error();
-
- seconds = Duration::create(Filters().refuse_seconds());
- } else if (seconds.get() < Duration::zero()) {
- LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
- << "the refused resources filter because the input value "
- << "is negative";
-
- seconds = Duration::create(Filters().refuse_seconds());
- }
-
- CHECK_SOME(seconds);
-
- if (seconds.get() != Duration::zero()) {
- VLOG(1) << "Framework " << frameworkId
- << " filtered slave " << slaveId
- << " for " << seconds.get();
-
- // Create a new filter and delay its expiration.
- Filter* filter = new RefusedFilter(
- slaveId,
- resources,
- process::Timeout::in(seconds.get()));
-
- frameworks[frameworkId].filters.insert(filter);
-
- delay(seconds.get(), self(), &Self::expire, frameworkId, filter);
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
- const FrameworkID& frameworkId)
-{
- CHECK(initialized);
-
- frameworks[frameworkId].filters.clear();
-
- // We delete each actual Filter when
- // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
- // Filter here it's possible that the same Filter (i.e., same
- // address) could get reused and HierarchicalAllocatorProcess::expire
- // would expire that filter too soon. Note that this only works
- // right now because ALL Filter types "expire".
-
- LOG(INFO) << "Removed filters for framework " << frameworkId;
-
- allocate();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::batch()
-{
- allocate();
- delay(flags.allocation_interval, self(), &Self::batch);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate()
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- allocate(slaves.keys());
-
- VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
- << stopwatch.elapsed();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
- const SlaveID& slaveId)
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- // TODO(bmahler): Add initializer list constructor for hashset.
- hashset<SlaveID> slaves;
- slaves.insert(slaveId);
- allocate(slaves);
-
- VLOG(1) << "Performed allocation for slave " << slaveId << " in "
- << stopwatch.elapsed();
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
- const hashset<SlaveID>& slaveIds_)
-{
- if (roleSorter->count() == 0) {
- LOG(ERROR) << "No roles specified, cannot allocate resources!";
- return;
- }
-
- // Compute the offerable resources, per framework:
- // (1) For reserved resources on the slave, allocate these to a
- // framework having the corresponding role.
- // (2) For unreserved resources on the slave, allocate these
- // to a framework of any role.
- hashmap<FrameworkID, hashmap<SlaveID, Resources> > offerable;
-
- // Randomize the order in which slaves' resources are allocated.
- // TODO(vinod): Implement a smarter sorting algorithm.
- std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
- std::random_shuffle(slaveIds.begin(), slaveIds.end());
-
- foreach (const SlaveID& slaveId, slaveIds) {
- // Don't send offers for non-whitelisted and deactivated slaves.
- if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) {
- continue;
- }
-
- foreach (const std::string& role, roleSorter->sort()) {
- foreach (const std::string& frameworkId_,
- frameworkSorters[role]->sort()) {
- FrameworkID frameworkId;
- frameworkId.set_value(frameworkId_);
-
- Resources unreserved = slaves[slaveId].available.unreserved();
- Resources resources = unreserved;
- if (role != "*") {
- resources += slaves[slaveId].available.reserved(role);
- }
-
- // If the resources are not allocatable, ignore.
- if (!allocatable(resources)) {
- continue;
- }
-
- // If the framework filters these resources, ignore.
- if (isFiltered(frameworkId, slaveId, resources)) {
- continue;
- }
-
- VLOG(2) << "Allocating " << resources << " on slave " << slaveId
- << " to framework " << frameworkId;
-
- // Note that we perform "coarse-grained" allocation,
- // meaning that we always allocate the entire remaining
- // slave resources to a single framework.
- offerable[frameworkId][slaveId] = resources;
- slaves[slaveId].available -= resources;
-
- // Reserved resources are only accounted for in the framework
- // sorter, since the reserved resources are not shared across
- // roles.
- frameworkSorters[role]->add(resources);
- frameworkSorters[role]->allocated(frameworkId_, resources);
- roleSorter->allocated(role, unreserved);
- }
- }
- }
-
- if (offerable.empty()) {
- VLOG(1) << "No resources available to allocate!";
- } else {
- // Now offer the resources to each framework.
- foreachkey (const FrameworkID& frameworkId, offerable) {
- offerCallback(frameworkId, offerable[frameworkId]);
- }
- }
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-void
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
- const FrameworkID& frameworkId,
- Filter* filter)
-{
- // The filter might have already been removed (e.g., if the
- // framework no longer exists or in
- // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
- // keep the address from getting reused possibly causing premature
- // expiration).
- if (frameworks.contains(frameworkId) &&
- frameworks[frameworkId].filters.contains(filter)) {
- frameworks[frameworkId].filters.erase(filter);
- }
-
- delete filter;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
- const SlaveID& slaveId)
-{
- CHECK(slaves.contains(slaveId));
-
- return whitelist.isNone() ||
- whitelist.get().contains(slaves[slaveId].hostname);
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources)
-{
- CHECK(frameworks.contains(frameworkId));
- CHECK(slaves.contains(slaveId));
-
- // Do not offer a non-checkpointing slave's resources to a checkpointing
- // framework. This is a short term fix until the following is resolved:
- // https://issues.apache.org/jira/browse/MESOS-444.
- if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
- VLOG(1) << "Filtered " << resources
- << " on non-checkpointing slave " << slaveId
- << " for checkpointing framework " << frameworkId;
- return true;
- }
-
- foreach (Filter* filter, frameworks[frameworkId].filters) {
- if (filter->filter(slaveId, resources)) {
- VLOG(1) << "Filtered " << resources
- << " on slave " << slaveId
- << " for framework " << frameworkId;
- return true;
- }
- }
- return false;
-}
-
-
-template <class RoleSorter, class FrameworkSorter>
-bool
-HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
- const Resources& resources)
-{
- Option<double> cpus = resources.cpus();
- Option<Bytes> mem = resources.mem();
-
- return (cpus.isSome() && cpus.get() >= MIN_CPUS) ||
- (mem.isSome() && mem.get() >= MIN_MEM);
-}
-
-} // namespace allocator {
-} // namespace master {
-} // namespace mesos {
-
-#endif // __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 765062f..63408f4 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -46,15 +46,16 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"
-#include "master/allocator.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
-#include "master/drf_sorter.hpp"
-#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/repairer.hpp"
+#include "master/allocator/allocator.hpp"
+#include "master/allocator/mesos/hierarchical.hpp"
+#include "master/allocator/sorter/drf/sorter.hpp"
+
#include "module/manager.hpp"
#include "state/in_memory.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9e75b6c..a567a34 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -73,10 +73,11 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"
-#include "master/allocator.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "module/manager.hpp"
#include "watcher/whitelist_watcher.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/sorter.hpp b/src/master/sorter.hpp
deleted file mode 100644
index 1561201..0000000
--- a/src/master/sorter.hpp
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __SORTER_HPP__
-#define __SORTER_HPP__
-
-#include <list>
-#include <string>
-
-#include <mesos/resources.hpp>
-
-namespace mesos {
-namespace master {
-namespace allocator {
-
-// Sorters implement the logic for determining the
-// order in which users or frameworks should receive
-// resource allocations.
-//
-// TODO(bmahler): The total and allocated resources are currently
-// aggregated across slaves, which only works for scalar resources.
-// Also, persistent disks are a bit tricky because there will be
-// duplicated persistence IDs within the resources. Consider storing
-// maps keyed off of the slave ID to fix these issues.
-//
-// TODO(bmahler): Templatize this on Client, so that callers can
-// don't need to do string conversion, e.g. FrameworkID, string role,
-// etc.
-class Sorter
-{
-public:
- virtual ~Sorter() {}
-
- // Adds a client to allocate resources to. A client
- // may be a user or a framework.
- virtual void add(const std::string& client, double weight = 1) = 0;
-
- // Removes a client.
- virtual void remove(const std::string& client) = 0;
-
- // Readds a client to the sort after deactivate.
- virtual void activate(const std::string& client) = 0;
-
- // Removes a client from the sort, so it won't get allocated to.
- virtual void deactivate(const std::string& client) = 0;
-
- // Specify that resources have been allocated to the given client.
- virtual void allocated(const std::string& client,
- const Resources& resources) = 0;
-
- // Updates a portion of the allocation for the client, in order to
- // augment the resources with additional metadata (e.g., volumes)
- // This means that the new allocation must not affect the static
- // roles, or the overall quantities of resources!
- virtual void update(const std::string& client,
- const Resources& oldAllocation,
- const Resources& newAllocation) = 0;
-
- // Specify that resources have been unallocated from the given client.
- virtual void unallocated(const std::string& client,
- const Resources& resources) = 0;
-
- // Returns the resources that have been allocated to this client.
- virtual Resources allocation(const std::string& client) = 0;
-
- // Add resources to the total pool of resources this
- // Sorter should consider.
- virtual void add(const Resources& resources) = 0;
-
- // Remove resources from the total pool.
- virtual void remove(const Resources& resources) = 0;
-
- // Returns a list of all clients, in the order that they
- // should be allocated to, according to this Sorter's policy.
- virtual std::list<std::string> sort() = 0;
-
- // Returns true if this Sorter contains the specified client,
- // either active or deactivated.
- virtual bool contains(const std::string& client) = 0;
-
- // Returns the number of clients this Sorter contains,
- // either active or deactivated.
- virtual int count() = 0;
-};
-
-} // namespace allocator {
-} // namespace master {
-} // namespace mesos {
-
-#endif // __SORTER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 1ea763a..ccd68db 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -52,15 +52,16 @@
#include "log/tool/initialize.hpp"
-#include "master/allocator.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
-#include "master/hierarchical_allocator_process.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/repairer.hpp"
+#include "master/allocator/allocator.hpp"
+#include "master/allocator/mesos/hierarchical.hpp"
+
#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 0a477ed..efa5c57 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -39,9 +39,10 @@
#include "common/protobuf_utils.hpp"
-#include "master/allocator.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "sched/constants.hpp"
#include "slave/constants.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 7b041f0..eeecfb6 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -33,10 +33,11 @@
#include <stout/hashset.hpp>
#include <stout/utils.hpp>
-#include "master/allocator.hpp"
#include "master/constants.hpp"
#include "master/flags.hpp"
-#include "master/hierarchical_allocator_process.hpp"
+
+#include "master/allocator/allocator.hpp"
+#include "master/allocator/mesos/hierarchical.hpp"
using namespace mesos;
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index ccbff64..648a7a1 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -33,12 +33,13 @@
#include <stout/some.hpp>
#include <stout/strings.hpp>
-#include "master/allocator.hpp"
#include "master/constants.hpp"
#include "master/detector.hpp"
-#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+#include "master/allocator/mesos/hierarchical.hpp"
+
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index 39c54be..fadbd38 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -31,9 +31,10 @@
#include <stout/gtest.hpp>
#include <stout/try.hpp>
-#include "master/allocator.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "messages/messages.hpp"
#include "slave/slave.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 9f9ee6c..5156898 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -32,9 +32,10 @@
#include "common/protobuf_utils.hpp"
-#include "master/allocator.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "slave/slave.hpp"
#include "tests/containerizer.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index a5051cb..b821038 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -43,10 +43,11 @@
#include <stout/strings.hpp>
#include <stout/try.hpp>
-#include "master/allocator.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "slave/constants.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 266cac3..60c7004 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -45,10 +45,11 @@
#include "messages/messages.hpp" // For google::protobuf::Message.
-#include "master/allocator.hpp"
#include "master/detector.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "slave/slave.hpp"
#include "slave/containerizer/containerizer.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 718d950..784ac76 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -25,10 +25,11 @@
#include <process/metrics/metrics.hpp>
-#include "master/allocator.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
+#include "master/allocator/allocator.hpp"
+
#include "tests/mesos.hpp"
using namespace mesos;
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 6f9a435..1ba73ba 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -26,9 +26,10 @@
#include <stout/strings.hpp>
#include <stout/uuid.hpp>
-#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
+#include "master/allocator/mesos/hierarchical.hpp"
+
#include "slave/slave.hpp"
#include "tests/mesos.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index c6e5db8..7ba32b7 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -28,7 +28,7 @@
#include <stout/gtest.hpp>
-#include "master/drf_sorter.hpp"
+#include "master/allocator/sorter/drf/sorter.hpp"
using namespace mesos;
[3/6] mesos git commit: Moved allocation related sources into a
separate directory.
Posted by nn...@apache.org.
Moved allocation related sources into a separate directory.
Review: https://reviews.apache.org/r/29925
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3093759f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3093759f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3093759f
Branch: refs/heads/master
Commit: 3093759fb978b9131d05510f7f095203d2a383fc
Parents: 2b51582
Author: Alexander Rukletsov <al...@mesosphere.io>
Authored: Thu Feb 12 11:44:06 2015 -0800
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Thu Feb 12 11:44:06 2015 -0800
----------------------------------------------------------------------
src/Makefile.am | 10 +-
src/local/local.cpp | 7 +-
src/master/allocator.hpp | 501 ----------
src/master/allocator/allocator.hpp | 501 ++++++++++
src/master/allocator/mesos/hierarchical.hpp | 922 +++++++++++++++++++
src/master/allocator/sorter/drf/sorter.cpp | 293 ++++++
src/master/allocator/sorter/drf/sorter.hpp | 129 +++
src/master/allocator/sorter/sorter.hpp | 105 +++
src/master/drf_sorter.cpp | 293 ------
src/master/drf_sorter.hpp | 129 ---
src/master/hierarchical_allocator_process.hpp | 922 -------------------
src/master/main.cpp | 7 +-
src/master/master.cpp | 3 +-
src/master/sorter.hpp | 105 ---
src/tests/cluster.hpp | 5 +-
src/tests/fault_tolerance_tests.cpp | 3 +-
src/tests/hierarchical_allocator_tests.cpp | 5 +-
src/tests/master_allocator_tests.cpp | 5 +-
src/tests/master_authorization_tests.cpp | 3 +-
src/tests/master_slave_reconciliation_tests.cpp | 3 +-
src/tests/master_tests.cpp | 3 +-
src/tests/mesos.hpp | 3 +-
src/tests/rate_limiting_tests.cpp | 3 +-
src/tests/resource_offers_tests.cpp | 3 +-
src/tests/sorter_tests.cpp | 2 +-
25 files changed, 1989 insertions(+), 1976 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 39ce858..f9efaaa 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -309,7 +309,6 @@ libmesos_no_3rdparty_la_SOURCES = \
master/contender.cpp \
master/constants.cpp \
master/detector.cpp \
- master/drf_sorter.cpp \
master/http.cpp \
master/master.cpp \
master/metrics.cpp \
@@ -318,6 +317,7 @@ libmesos_no_3rdparty_la_SOURCES = \
master/registrar.cpp \
master/repairer.cpp \
master/validation.cpp \
+ master/allocator/sorter/drf/sorter.cpp \
module/manager.cpp \
sched/constants.cpp \
sched/sched.cpp \
@@ -499,19 +499,19 @@ libmesos_no_3rdparty_la_SOURCES += \
local/local.hpp \
logging/flags.hpp \
logging/logging.hpp \
- master/allocator.hpp \
master/contender.hpp \
master/constants.hpp \
master/detector.hpp \
- master/drf_sorter.hpp \
master/flags.hpp \
- master/hierarchical_allocator_process.hpp \
master/master.hpp \
master/metrics.hpp \
master/repairer.hpp \
master/registrar.hpp \
- master/sorter.hpp \
master/validation.hpp \
+ master/allocator/allocator.hpp \
+ master/allocator/mesos/hierarchical.hpp \
+ master/allocator/sorter/drf/sorter.hpp \
+ master/allocator/sorter/sorter.hpp \
messages/messages.hpp \
module/manager.hpp \
sched/constants.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 649b915..4b4ec99 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -40,15 +40,16 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"
-#include "master/allocator.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
-#include "master/drf_sorter.hpp"
-#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/repairer.hpp"
+#include "master/allocator/allocator.hpp"
+#include "master/allocator/mesos/hierarchical.hpp"
+#include "master/allocator/sorter/drf/sorter.hpp"
+
#include "module/manager.hpp"
#include "slave/gc.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
deleted file mode 100644
index c45b75e..0000000
--- a/src/master/allocator.hpp
+++ /dev/null
@@ -1,501 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __ALLOCATOR_HPP__
-#define __ALLOCATOR_HPP__
-
-#include <string>
-#include <vector>
-
-#include <mesos/resources.hpp>
-
-#include <process/future.hpp>
-#include <process/dispatch.hpp>
-#include <process/pid.hpp>
-#include <process/process.hpp>
-
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-
-#include "master/flags.hpp"
-
-#include "messages/messages.hpp"
-
-namespace mesos {
-namespace master {
-
-class Master; // Forward declaration.
-
-namespace allocator {
-
-// Basic model of an allocator: resources are allocated to a framework
-// in the form of offers. A framework can refuse some resources in
-// offers and run tasks in others. Allocated resources can have offer
-// operations applied to them in order for frameworks to alter the
-// resource metadata (e.g. creating persistent volumes). Resources can
-// be recovered from a framework when tasks finish/fail (or are lost
-// due to a slave failure) or when an offer is rescinded.
-//
-// This is the public API for resource allocators.
-// TODO(alexr): Document API calls.
-class Allocator
-{
-public:
- Allocator() {}
-
- virtual ~Allocator() {}
-
- virtual void initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles) = 0;
-
- virtual void addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used) = 0;
-
- virtual void removeFramework(
- const FrameworkID& frameworkId) = 0;
-
- // Offers are sent only to activated frameworks.
- virtual void activateFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void deactivateFramework(
- const FrameworkID& frameworkId) = 0;
-
- // Note that the 'total' resources are passed explicitly because it
- // includes resources that are dynamically "persisted" on the slave
- // (e.g. persistent volumes, dynamic reservations, etc).
- // The slaveInfo resources, on the other hand, correspond directly
- // to the static --resources flag value on the slave.
- virtual void addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used) = 0;
-
- virtual void removeSlave(
- const SlaveID& slaveId) = 0;
-
- // Offers are sent only for activated slaves.
- virtual void activateSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void deactivateSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void updateWhitelist(
- const Option<hashset<std::string> >& whitelist) = 0;
-
- virtual void requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests) = 0;
-
- virtual void updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations) = 0;
-
- // Informs the Allocator to recover resources that are considered
- // used by the framework.
- virtual void recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters) = 0;
-
- // Whenever a framework that has filtered resources wants to revive
- // offers for those resources the master invokes this callback.
- virtual void reviveOffers(
- const FrameworkID& frameworkId) = 0;
-};
-
-
-class MesosAllocatorProcess;
-
-// A wrapper for Process-based allocators. It redirects all function
-// invocations to the underlying AllocatorProcess and manages its
-// lifetime. We ensure the template parameter AllocatorProcess
-// implements MesosAllocatorProcess by storing a pointer to it.
-//
-// TODO(alexr): Move this class (together with the implementation)
-// into a separate file.
-template <typename AllocatorProcess>
-class MesosAllocator : public Allocator
-{
-public:
- MesosAllocator();
-
- ~MesosAllocator();
-
- void initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles);
-
- void addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used);
-
- void removeFramework(
- const FrameworkID& frameworkId);
-
- // Offers are sent only to activated frameworks.
- void activateFramework(
- const FrameworkID& frameworkId);
-
- void deactivateFramework(
- const FrameworkID& frameworkId);
-
- // Note that the 'total' resources are passed explicitly because it
- // includes resources that are dynamically "checkpointed" on the
- // slave (e.g. persistent volumes, dynamic reservations, etc). The
- // slaveInfo resources, on the other hand, correspond directly to
- // the static --resources flag value on the slave.
- void addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used);
-
- void removeSlave(
- const SlaveID& slaveId);
-
- // Offers are sent only for activated slaves.
- void activateSlave(
- const SlaveID& slaveId);
-
- void deactivateSlave(
- const SlaveID& slaveId);
-
- void updateWhitelist(
- const Option<hashset<std::string> >& whitelist);
-
- void requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests);
-
- void updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations);
-
- // Informs the allocator to recover resources that are considered
- // used by the framework.
- void recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters);
-
- // Whenever a framework that has filtered resources wants to revive
- // offers for those resources the master invokes this callback.
- void reviveOffers(
- const FrameworkID& frameworkId);
-
-private:
- MesosAllocator(const MesosAllocator&); // Not copyable.
- MesosAllocator& operator=(const MesosAllocator&); // Not assignable.
-
- MesosAllocatorProcess* process;
-};
-
-
-// The basic interface for all Process-based allocators.
-class MesosAllocatorProcess : public process::Process<MesosAllocatorProcess>
-{
-public:
- MesosAllocatorProcess() {}
-
- virtual ~MesosAllocatorProcess() {}
-
- // Explicitly unhide 'initialize' to silence a compiler warning
- // from clang, since we overload below.
- using process::ProcessBase::initialize;
-
- virtual void initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles) = 0;
-
- virtual void addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used) = 0;
-
- virtual void removeFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void activateFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void deactivateFramework(
- const FrameworkID& frameworkId) = 0;
-
- virtual void addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used) = 0;
-
- virtual void removeSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void activateSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void deactivateSlave(
- const SlaveID& slaveId) = 0;
-
- virtual void updateWhitelist(
- const Option<hashset<std::string> >& whitelist) = 0;
-
- virtual void requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests) = 0;
-
- virtual void updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations) = 0;
-
- virtual void recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters) = 0;
-
- virtual void reviveOffers(
- const FrameworkID& frameworkId) = 0;
-};
-
-
-template <typename AllocatorProcess>
-MesosAllocator<AllocatorProcess>::MesosAllocator()
-{
- process = new AllocatorProcess();
- process::spawn(process);
-}
-
-
-template <typename AllocatorProcess>
-MesosAllocator<AllocatorProcess>::~MesosAllocator()
-{
- process::terminate(process);
- process::wait(process);
- delete process;
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::initialize(
- const Flags& flags,
- const lambda::function<
- void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>& offerCallback,
- const hashmap<std::string, RoleInfo>& roles)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::initialize,
- flags,
- offerCallback,
- roles);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::addFramework(
- const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::addFramework,
- frameworkId,
- frameworkInfo,
- used);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::removeFramework(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::removeFramework,
- frameworkId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::activateFramework(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::activateFramework,
- frameworkId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::deactivateFramework(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::deactivateFramework,
- frameworkId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::addSlave(
- const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const Resources& total,
- const hashmap<FrameworkID, Resources>& used)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::addSlave,
- slaveId,
- slaveInfo,
- total,
- used);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::removeSlave(
- const SlaveID& slaveId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::removeSlave,
- slaveId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::activateSlave(
- const SlaveID& slaveId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::activateSlave,
- slaveId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::deactivateSlave(
- const SlaveID& slaveId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::deactivateSlave,
- slaveId);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::updateWhitelist(
- const Option<hashset<std::string> >& whitelist)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::updateWhitelist,
- whitelist);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::requestResources(
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::requestResources,
- frameworkId,
- requests);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::updateAllocation(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const std::vector<Offer::Operation>& operations)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::updateAllocation,
- frameworkId,
- slaveId,
- operations);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::recoverResources(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources,
- const Option<Filters>& filters)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::recoverResources,
- frameworkId,
- slaveId,
- resources,
- filters);
-}
-
-
-template <typename AllocatorProcess>
-inline void MesosAllocator<AllocatorProcess>::reviveOffers(
- const FrameworkID& frameworkId)
-{
- process::dispatch(
- process,
- &MesosAllocatorProcess::reviveOffers,
- frameworkId);
-}
-
-} // namespace allocator {
-} // namespace master {
-} // namespace mesos {
-
-#endif // __ALLOCATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/allocator.hpp b/src/master/allocator/allocator.hpp
new file mode 100644
index 0000000..7b857f0
--- /dev/null
+++ b/src/master/allocator/allocator.hpp
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_ALLOCATOR_ALLOCATOR_HPP__
+#define __MASTER_ALLOCATOR_ALLOCATOR_HPP__
+
+#include <string>
+#include <vector>
+
+#include <mesos/resources.hpp>
+
+#include <process/future.hpp>
+#include <process/dispatch.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+
+#include "master/flags.hpp"
+
+#include "messages/messages.hpp"
+
+namespace mesos {
+namespace master {
+
+class Master; // Forward declaration.
+
+namespace allocator {
+
+// Basic model of an allocator: resources are allocated to a framework
+// in the form of offers. A framework can refuse some resources in
+// offers and run tasks in others. Allocated resources can have offer
+// operations applied to them in order for frameworks to alter the
+// resource metadata (e.g. creating persistent volumes). Resources can
+// be recovered from a framework when tasks finish/fail (or are lost
+// due to a slave failure) or when an offer is rescinded.
+//
+// This is the public API for resource allocators.
+// TODO(alexr): Document API calls.
+class Allocator
+{
+public:
+ Allocator() {}
+
+ virtual ~Allocator() {}
+
+ virtual void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles) = 0;
+
+ virtual void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used) = 0;
+
+ virtual void removeFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ // Offers are sent only to activated frameworks.
+ virtual void activateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void deactivateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ // Note that the 'total' resources are passed explicitly because it
+ // includes resources that are dynamically "persisted" on the slave
+ // (e.g. persistent volumes, dynamic reservations, etc).
+ // The slaveInfo resources, on the other hand, correspond directly
+ // to the static --resources flag value on the slave.
+ virtual void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used) = 0;
+
+ virtual void removeSlave(
+ const SlaveID& slaveId) = 0;
+
+ // Offers are sent only for activated slaves.
+ virtual void activateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void deactivateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist) = 0;
+
+ virtual void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests) = 0;
+
+ virtual void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations) = 0;
+
+ // Informs the Allocator to recover resources that are considered
+ // used by the framework.
+ virtual void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters) = 0;
+
+ // Whenever a framework that has filtered resources wants to revive
+ // offers for those resources the master invokes this callback.
+ virtual void reviveOffers(
+ const FrameworkID& frameworkId) = 0;
+};
+
+
+class MesosAllocatorProcess;
+
+// A wrapper for Process-based allocators. It redirects all function
+// invocations to the underlying AllocatorProcess and manages its
+// lifetime. We ensure the template parameter AllocatorProcess
+// implements MesosAllocatorProcess by storing a pointer to it.
+//
+// TODO(alexr): Move this class (together with the implementation)
+// into a separate file.
+template <typename AllocatorProcess>
+class MesosAllocator : public Allocator
+{
+public:
+ MesosAllocator();
+
+ ~MesosAllocator();
+
+ void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles);
+
+ void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used);
+
+ void removeFramework(
+ const FrameworkID& frameworkId);
+
+ // Offers are sent only to activated frameworks.
+ void activateFramework(
+ const FrameworkID& frameworkId);
+
+ void deactivateFramework(
+ const FrameworkID& frameworkId);
+
+ // Note that the 'total' resources are passed explicitly because it
+ // includes resources that are dynamically "checkpointed" on the
+ // slave (e.g. persistent volumes, dynamic reservations, etc). The
+ // slaveInfo resources, on the other hand, correspond directly to
+ // the static --resources flag value on the slave.
+ void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used);
+
+ void removeSlave(
+ const SlaveID& slaveId);
+
+ // Offers are sent only for activated slaves.
+ void activateSlave(
+ const SlaveID& slaveId);
+
+ void deactivateSlave(
+ const SlaveID& slaveId);
+
+ void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist);
+
+ void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests);
+
+ void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations);
+
+ // Informs the allocator to recover resources that are considered
+ // used by the framework.
+ void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters);
+
+ // Whenever a framework that has filtered resources wants to revive
+ // offers for those resources the master invokes this callback.
+ void reviveOffers(
+ const FrameworkID& frameworkId);
+
+private:
+ MesosAllocator(const MesosAllocator&); // Not copyable.
+ MesosAllocator& operator=(const MesosAllocator&); // Not assignable.
+
+ MesosAllocatorProcess* process;
+};
+
+
+// The basic interface for all Process-based allocators.
+class MesosAllocatorProcess : public process::Process<MesosAllocatorProcess>
+{
+public:
+ MesosAllocatorProcess() {}
+
+ virtual ~MesosAllocatorProcess() {}
+
+ // Explicitly unhide 'initialize' to silence a compiler warning
+ // from clang, since we overload below.
+ using process::ProcessBase::initialize;
+
+ virtual void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles) = 0;
+
+ virtual void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used) = 0;
+
+ virtual void removeFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void activateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void deactivateFramework(
+ const FrameworkID& frameworkId) = 0;
+
+ virtual void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used) = 0;
+
+ virtual void removeSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void activateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void deactivateSlave(
+ const SlaveID& slaveId) = 0;
+
+ virtual void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist) = 0;
+
+ virtual void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests) = 0;
+
+ virtual void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations) = 0;
+
+ virtual void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters) = 0;
+
+ virtual void reviveOffers(
+ const FrameworkID& frameworkId) = 0;
+};
+
+
+template <typename AllocatorProcess>
+MesosAllocator<AllocatorProcess>::MesosAllocator()
+{
+ process = new AllocatorProcess();
+ process::spawn(process);
+}
+
+
+template <typename AllocatorProcess>
+MesosAllocator<AllocatorProcess>::~MesosAllocator()
+{
+ process::terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::initialize,
+ flags,
+ offerCallback,
+ roles);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::addFramework,
+ frameworkId,
+ frameworkInfo,
+ used);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::removeFramework(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::removeFramework,
+ frameworkId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::activateFramework(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::activateFramework,
+ frameworkId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::deactivateFramework(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::deactivateFramework,
+ frameworkId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::addSlave,
+ slaveId,
+ slaveInfo,
+ total,
+ used);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::removeSlave(
+ const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::removeSlave,
+ slaveId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::activateSlave(
+ const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::activateSlave,
+ slaveId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::deactivateSlave(
+ const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::deactivateSlave,
+ slaveId);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateWhitelist(
+ const Option<hashset<std::string> >& whitelist)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateWhitelist,
+ whitelist);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::requestResources,
+ frameworkId,
+ requests);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateAllocation,
+ frameworkId,
+ slaveId,
+ operations);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::recoverResources,
+ frameworkId,
+ slaveId,
+ resources,
+ filters);
+}
+
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::reviveOffers(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &MesosAllocatorProcess::reviveOffers,
+ frameworkId);
+}
+
+} // namespace allocator {
+} // namespace master {
+} // namespace mesos {
+
+#endif // __MASTER_ALLOCATOR_ALLOCATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
new file mode 100644
index 0000000..cf2e369
--- /dev/null
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -0,0 +1,922 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
+#define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
+
+#include <algorithm>
+#include <vector>
+
+#include <mesos/resources.hpp>
+
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/stopwatch.hpp>
+#include <stout/stringify.hpp>
+
+#include "master/master.hpp"
+
+#include "master/allocator/allocator.hpp"
+#include "master/allocator/sorter/drf/sorter.hpp"
+
+namespace mesos {
+namespace master {
+namespace allocator {
+
+// Forward declarations.
+class Filter;
+
+
+// We forward declare the hierarchical allocator process so that we
+// can typedef an instantiation of it with DRF sorters.
+template <typename RoleSorter, typename FrameworkSorter>
+class HierarchicalAllocatorProcess;
+
+typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
+HierarchicalDRFAllocatorProcess;
+
+typedef MesosAllocator<HierarchicalDRFAllocatorProcess>
+HierarchicalDRFAllocator;
+
+
+// Implements the basic allocator algorithm - first pick a role by
+// some criteria, then pick one of their frameworks to allocate to.
+template <typename RoleSorter, typename FrameworkSorter>
+class HierarchicalAllocatorProcess : public MesosAllocatorProcess
+{
+public:
+ HierarchicalAllocatorProcess();
+
+ virtual ~HierarchicalAllocatorProcess();
+
+ process::PID<HierarchicalAllocatorProcess> self();
+
+ void initialize(
+ const Flags& flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const hashmap<std::string, RoleInfo>& roles);
+
+ void addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used);
+
+ void removeFramework(
+ const FrameworkID& frameworkId);
+
+ void activateFramework(
+ const FrameworkID& frameworkId);
+
+ void deactivateFramework(
+ const FrameworkID& frameworkId);
+
+ void addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used);
+
+ void removeSlave(
+ const SlaveID& slaveId);
+
+ void deactivateSlave(
+ const SlaveID& slaveId);
+
+ void activateSlave(
+ const SlaveID& slaveId);
+
+ void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist);
+
+ void requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests);
+
+ void updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations);
+
+ void recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters);
+
+ void reviveOffers(
+ const FrameworkID& frameworkId);
+
+protected:
+ // Useful typedefs for dispatch/delay/defer to self()/this.
+ typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> Self;
+ typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> This;
+
+ // Callback for doing batch allocations.
+ void batch();
+
+ // Allocate any allocatable resources.
+ void allocate();
+
+ // Allocate resources just from the specified slave.
+ void allocate(const SlaveID& slaveId);
+
+ // Allocate resources from the specified slaves.
+ void allocate(const hashset<SlaveID>& slaveIds);
+
+ // Remove a filter for the specified framework.
+ void expire(const FrameworkID& frameworkId, Filter* filter);
+
+ // Checks whether the slave is whitelisted.
+ bool isWhitelisted(const SlaveID& slaveId);
+
+ // Returns true if there is a filter for this framework
+ // on this slave.
+ bool isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources);
+
+ bool allocatable(const Resources& resources);
+
+ bool initialized;
+
+ Flags flags;
+
+ lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)> offerCallback;
+
+ struct Framework
+ {
+ std::string role;
+ bool checkpoint; // Whether the framework desires checkpointing.
+
+ hashset<Filter*> filters; // Active filters for the framework.
+ };
+
+ hashmap<FrameworkID, Framework> frameworks;
+
+ struct Slave
+ {
+ Resources total;
+ Resources available;
+
+ bool activated; // Whether to offer resources.
+ bool checkpoint; // Whether slave supports checkpointing.
+
+ std::string hostname;
+ };
+
+ hashmap<SlaveID, Slave> slaves;
+
+ hashmap<std::string, RoleInfo> roles;
+
+ // Slaves to send offers for.
+ Option<hashset<std::string> > whitelist;
+
+ // There are two levels of sorting, hence "hierarchical".
+ // Level 1 sorts across roles:
+ // Reserved resources are excluded from fairness calculation,
+ // since they are forcibly pinned to a role.
+ // Level 2 sorts across frameworks within a particular role:
+ // Both reserved resources and unreserved resources are used
+ // in the fairness calculation. This is because reserved
+ // resources can be allocated to any framework in the role.
+ RoleSorter* roleSorter;
+ hashmap<std::string, FrameworkSorter*> frameworkSorters;
+};
+
+
+// Used to represent "filters" for resources unused in offers.
+class Filter
+{
+public:
+ virtual ~Filter() {}
+
+ virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
+};
+
+
+class RefusedFilter: public Filter
+{
+public:
+ RefusedFilter(
+ const SlaveID& _slaveId,
+ const Resources& _resources,
+ const process::Timeout& _timeout)
+ : slaveId(_slaveId), resources(_resources), timeout(_timeout) {}
+
+ virtual bool filter(const SlaveID& _slaveId, const Resources& _resources)
+ {
+ return slaveId == _slaveId &&
+ resources.contains(_resources) && // Refused resources are superset.
+ timeout.remaining() > Seconds(0);
+ }
+
+ const SlaveID slaveId;
+ const Resources resources;
+ const process::Timeout timeout;
+};
+
+
+template <class RoleSorter, class FrameworkSorter>
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::HierarchicalAllocatorProcess() // NOLINT(whitespace/line_length)
+ : ProcessBase(process::ID::generate("hierarchical-allocator")),
+ initialized(false) {}
+
+
+template <class RoleSorter, class FrameworkSorter>
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::~HierarchicalAllocatorProcess() // NOLINT(whitespace/line_length)
+{}
+
+
+template <class RoleSorter, class FrameworkSorter>
+process::PID<HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> >
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::self()
+{
+ return process::PID<HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> >(this); // NOLINT(whitespace/line_length)
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
+ const Flags& _flags,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, Resources>&)>& _offerCallback,
+ const hashmap<std::string, RoleInfo>& _roles)
+{
+ flags = _flags;
+ offerCallback = _offerCallback;
+ roles = _roles;
+ initialized = true;
+
+ roleSorter = new RoleSorter();
+ foreachpair (const std::string& name, const RoleInfo& roleInfo, roles) {
+ roleSorter->add(name, roleInfo.weight());
+ frameworkSorters[name] = new FrameworkSorter();
+ }
+
+ if (roleSorter->count() == 0) {
+ LOG(ERROR) << "No roles specified, cannot allocate resources!";
+ }
+
+ VLOG(1) << "Initialized hierarchical allocator process";
+
+ delay(flags.allocation_interval, self(), &Self::batch);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used)
+{
+ CHECK(initialized);
+
+ const std::string& role = frameworkInfo.role();
+
+ CHECK(roles.contains(role));
+
+ CHECK(!frameworkSorters[role]->contains(frameworkId.value()));
+ frameworkSorters[role]->add(frameworkId.value());
+
+ // TODO(bmahler): Validate that the reserved resources have the
+ // framework's role.
+
+ // Update the allocation to this framework.
+ roleSorter->allocated(role, used.unreserved());
+ frameworkSorters[role]->add(used);
+ frameworkSorters[role]->allocated(frameworkId.value(), used);
+
+ frameworks[frameworkId] = Framework();
+ frameworks[frameworkId].role = frameworkInfo.role();
+ frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
+
+ LOG(INFO) << "Added framework " << frameworkId;
+
+ allocate();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& role = frameworks[frameworkId].role;
+
+ // Might not be in 'frameworkSorters[role]' because it was previously
+ // deactivated and never re-added.
+ if (frameworkSorters[role]->contains(frameworkId.value())) {
+ Resources allocation =
+ frameworkSorters[role]->allocation(frameworkId.value());
+
+ roleSorter->unallocated(role, allocation.unreserved());
+ frameworkSorters[role]->remove(allocation);
+ frameworkSorters[role]->remove(frameworkId.value());
+ }
+
+ // Do not delete the filters contained in this
+ // framework's 'filters' hashset yet, see comments in
+ // HierarchicalAllocatorProcess::reviveOffers and
+ // HierarchicalAllocatorProcess::expire.
+ frameworks.erase(frameworkId);
+
+ LOG(INFO) << "Removed framework " << frameworkId;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateFramework(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& role = frameworks[frameworkId].role;
+
+ frameworkSorters[role]->activate(frameworkId.value());
+
+ LOG(INFO) << "Activated framework " << frameworkId;
+
+ allocate();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& role = frameworks[frameworkId].role;
+
+ frameworkSorters[role]->deactivate(frameworkId.value());
+
+ // Note that the Sorter *does not* remove the resources allocated
+ // to this framework. For now, this is important because if the
+ // framework fails over and is activated, we still want a record
+ // of the resources that it is using. We might be able to collapse
+ // the added/removed and activated/deactivated in the future.
+
+ // Do not delete the filters contained in this
+ // framework's 'filters' hashset yet, see comments in
+ // HierarchicalAllocatorProcess::reviveOffers and
+ // HierarchicalAllocatorProcess::expire.
+ frameworks[frameworkId].filters.clear();
+
+ LOG(INFO) << "Deactivated framework " << frameworkId;
+}
+
+
+
+// TODO(bmahler): Generalize this.
+template <typename Iterable>
+Resources sum(const Iterable& resources)
+{
+ Resources total;
+ foreach (const Resources& r, resources) {
+ total += r;
+ }
+ return total;
+}
+
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const Resources& total,
+ const hashmap<FrameworkID, Resources>& used)
+{
+ CHECK(initialized);
+ CHECK(!slaves.contains(slaveId));
+
+ roleSorter->add(total.unreserved());
+
+ foreachpair (const FrameworkID& frameworkId,
+ const Resources& allocated,
+ used) {
+ if (frameworks.contains(frameworkId)) {
+ const std::string& role = frameworks[frameworkId].role;
+
+ // TODO(bmahler): Validate that the reserved resources have the
+ // framework's role.
+
+ roleSorter->allocated(role, allocated.unreserved());
+ frameworkSorters[role]->add(allocated);
+ frameworkSorters[role]->allocated(frameworkId.value(), allocated);
+ }
+ }
+
+ slaves[slaveId] = Slave();
+ slaves[slaveId].total = total;
+ slaves[slaveId].available = total - sum(used.values());
+ slaves[slaveId].activated = true;
+ slaves[slaveId].checkpoint = slaveInfo.checkpoint();
+ slaves[slaveId].hostname = slaveInfo.hostname();
+
+ LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
+ << ") with " << slaves[slaveId].total
+ << " (and " << slaves[slaveId].available << " available)";
+
+ allocate(slaveId);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ // TODO(bmahler): Per MESOS-621, this should remove the allocations
+ // that any frameworks have on this slave. Otherwise the caller may
+ // "leak" allocated resources accidentally if they forget to recover
+ // all the resources. Fixing this would require more information
+ // than what we currently track in the allocator.
+
+ roleSorter->remove(slaves[slaveId].total.unreserved());
+
+ slaves.erase(slaveId);
+
+ // Note that we DO NOT actually delete any filters associated with
+ // this slave, that will occur when the delayed
+ // HierarchicalAllocatorProcess::expire gets invoked (or the framework
+ // that applied the filters gets removed).
+
+ LOG(INFO) << "Removed slave " << slaveId;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateSlave(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ slaves[slaveId].activated = true;
+
+ LOG(INFO)<< "Slave " << slaveId << " reactivated";
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateSlave(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ slaves[slaveId].activated = false;
+
+ LOG(INFO) << "Slave " << slaveId << " deactivated";
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateWhitelist(
+ const Option<hashset<std::string> >& _whitelist)
+{
+ CHECK(initialized);
+
+ whitelist = _whitelist;
+
+ if (whitelist.isSome()) {
+ LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get());
+
+ if (whitelist.get().empty()) {
+ LOG(WARNING) << "Whitelist is empty, no offers will be made!";
+ }
+ } else {
+ LOG(INFO) << "Advertising offers for all slaves";
+ }
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests)
+{
+ CHECK(initialized);
+
+ LOG(INFO) << "Received resource request from framework " << frameworkId;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+ CHECK(frameworks.contains(frameworkId));
+
+ // The total resources on the slave are composed of both allocated
+ // and available resources:
+ //
+ // total = available + allocated
+ //
+ // Here we apply offer operations to the allocated resources, which
+ // in turns leads to an update of the total. The available resources
+ // remain unchanged.
+
+ FrameworkSorter* frameworkSorter =
+ frameworkSorters[frameworks[frameworkId].role];
+
+ Resources allocation = frameworkSorter->allocation(frameworkId.value());
+
+ // Update the allocated resources.
+ Try<Resources> updatedAllocation = allocation.apply(operations);
+ CHECK_SOME(updatedAllocation);
+
+ frameworkSorter->update(
+ frameworkId.value(),
+ allocation,
+ updatedAllocation.get());
+
+ roleSorter->update(
+ frameworks[frameworkId].role,
+ allocation.unreserved(),
+ updatedAllocation.get().unreserved());
+
+ // Update the total resources.
+ Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ CHECK_SOME(updatedTotal);
+
+ slaves[slaveId].total = updatedTotal.get();
+
+ // TODO(bmahler): Validate that the available resources are
+ // unaffected. This requires augmenting the sorters with
+ // SlaveIDs for allocations, so that we can do:
+ //
+ // CHECK_EQ(slaves[slaveId].total - updatedAllocation,
+ // slaves[slaveId].available);
+
+ // TODO(jieyu): Do not log if there is no update.
+ LOG(INFO) << "Updated allocation of framework " << frameworkId
+ << " on slave " << slaveId
+ << " from " << allocation << " to " << updatedAllocation.get();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters)
+{
+ CHECK(initialized);
+
+ if (resources.empty()) {
+ return;
+ }
+
+ // Updated resources allocated to framework (if framework still
+ // exists, which it might not in the event that we dispatched
+ // Master::offer before we received
+ // MesosAllocatorProcess::removeFramework or
+ // MesosAllocatorProcess::deactivateFramework, in which case we will
+ // have already recovered all of its resources).
+ if (frameworks.contains(frameworkId)) {
+ const std::string& role = frameworks[frameworkId].role;
+
+ CHECK(frameworkSorters.contains(role));
+
+ if (frameworkSorters[role]->contains(frameworkId.value())) {
+ frameworkSorters[role]->unallocated(frameworkId.value(), resources);
+ frameworkSorters[role]->remove(resources);
+ roleSorter->unallocated(role, resources.unreserved());
+ }
+ }
+
+ // Update resources allocatable on slave (if slave still exists,
+ // which it might not in the event that we dispatched Master::offer
+ // before we received Allocator::removeSlave).
+ if (slaves.contains(slaveId)) {
+ slaves[slaveId].available += resources;
+
+ LOG(INFO) << "Recovered " << resources
+ << " (total allocatable: " << slaves[slaveId].available
+ << ") on slave " << slaveId
+ << " from framework " << frameworkId;
+ }
+
+ // No need to install the filter if 'filters' is none.
+ if (filters.isNone()) {
+ return;
+ }
+
+ // No need to install the filter if slave/framework does not exist.
+ if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
+ return;
+ }
+
+ // Create a refused resources filter.
+ Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+ if (seconds.isError()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused resources filter because the input value "
+ << "is invalid: " << seconds.error();
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ } else if (seconds.get() < Duration::zero()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused resources filter because the input value "
+ << "is negative";
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ }
+
+ CHECK_SOME(seconds);
+
+ if (seconds.get() != Duration::zero()) {
+ VLOG(1) << "Framework " << frameworkId
+ << " filtered slave " << slaveId
+ << " for " << seconds.get();
+
+ // Create a new filter and delay its expiration.
+ Filter* filter = new RefusedFilter(
+ slaveId,
+ resources,
+ process::Timeout::in(seconds.get()));
+
+ frameworks[frameworkId].filters.insert(filter);
+
+ delay(seconds.get(), self(), &Self::expire, frameworkId, filter);
+ }
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
+ const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ frameworks[frameworkId].filters.clear();
+
+ // We delete each actual Filter when
+ // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
+ // Filter here it's possible that the same Filter (i.e., same
+ // address) could get reused and HierarchicalAllocatorProcess::expire
+ // would expire that filter too soon. Note that this only works
+ // right now because ALL Filter types "expire".
+
+ LOG(INFO) << "Removed filters for framework " << frameworkId;
+
+ allocate();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::batch()
+{
+ allocate();
+ delay(flags.allocation_interval, self(), &Self::batch);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate()
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ allocate(slaves.keys());
+
+ VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
+ << stopwatch.elapsed();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
+ const SlaveID& slaveId)
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ // TODO(bmahler): Add initializer list constructor for hashset.
+ hashset<SlaveID> slaves;
+ slaves.insert(slaveId);
+ allocate(slaves);
+
+ VLOG(1) << "Performed allocation for slave " << slaveId << " in "
+ << stopwatch.elapsed();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
+ const hashset<SlaveID>& slaveIds_)
+{
+ if (roleSorter->count() == 0) {
+ LOG(ERROR) << "No roles specified, cannot allocate resources!";
+ return;
+ }
+
+ // Compute the offerable resources, per framework:
+ // (1) For reserved resources on the slave, allocate these to a
+ // framework having the corresponding role.
+ // (2) For unreserved resources on the slave, allocate these
+ // to a framework of any role.
+ hashmap<FrameworkID, hashmap<SlaveID, Resources> > offerable;
+
+ // Randomize the order in which slaves' resources are allocated.
+ // TODO(vinod): Implement a smarter sorting algorithm.
+ std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
+ std::random_shuffle(slaveIds.begin(), slaveIds.end());
+
+ foreach (const SlaveID& slaveId, slaveIds) {
+ // Don't send offers for non-whitelisted and deactivated slaves.
+ if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) {
+ continue;
+ }
+
+ foreach (const std::string& role, roleSorter->sort()) {
+ foreach (const std::string& frameworkId_,
+ frameworkSorters[role]->sort()) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(frameworkId_);
+
+ Resources unreserved = slaves[slaveId].available.unreserved();
+ Resources resources = unreserved;
+ if (role != "*") {
+ resources += slaves[slaveId].available.reserved(role);
+ }
+
+ // If the resources are not allocatable, ignore.
+ if (!allocatable(resources)) {
+ continue;
+ }
+
+ // If the framework filters these resources, ignore.
+ if (isFiltered(frameworkId, slaveId, resources)) {
+ continue;
+ }
+
+ VLOG(2) << "Allocating " << resources << " on slave " << slaveId
+ << " to framework " << frameworkId;
+
+ // Note that we perform "coarse-grained" allocation,
+ // meaning that we always allocate the entire remaining
+ // slave resources to a single framework.
+ offerable[frameworkId][slaveId] = resources;
+ slaves[slaveId].available -= resources;
+
+ // Reserved resources are only accounted for in the framework
+ // sorter, since the reserved resources are not shared across
+ // roles.
+ frameworkSorters[role]->add(resources);
+ frameworkSorters[role]->allocated(frameworkId_, resources);
+ roleSorter->allocated(role, unreserved);
+ }
+ }
+ }
+
+ if (offerable.empty()) {
+ VLOG(1) << "No resources available to allocate!";
+ } else {
+ // Now offer the resources to each framework.
+ foreachkey (const FrameworkID& frameworkId, offerable) {
+ offerCallback(frameworkId, offerable[frameworkId]);
+ }
+ }
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
+ const FrameworkID& frameworkId,
+ Filter* filter)
+{
+ // The filter might have already been removed (e.g., if the
+ // framework no longer exists or in
+ // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+ // keep the address from getting reused possibly causing premature
+ // expiration).
+ if (frameworks.contains(frameworkId) &&
+ frameworks[frameworkId].filters.contains(filter)) {
+ frameworks[frameworkId].filters.erase(filter);
+ }
+
+ delete filter;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+bool
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
+ const SlaveID& slaveId)
+{
+ CHECK(slaves.contains(slaveId));
+
+ return whitelist.isNone() ||
+ whitelist.get().contains(slaves[slaveId].hostname);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+bool
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources)
+{
+ CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+
+ // Do not offer a non-checkpointing slave's resources to a checkpointing
+ // framework. This is a short term fix until the following is resolved:
+ // https://issues.apache.org/jira/browse/MESOS-444.
+ if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
+ VLOG(1) << "Filtered " << resources
+ << " on non-checkpointing slave " << slaveId
+ << " for checkpointing framework " << frameworkId;
+ return true;
+ }
+
+ foreach (Filter* filter, frameworks[frameworkId].filters) {
+ if (filter->filter(slaveId, resources)) {
+ VLOG(1) << "Filtered " << resources
+ << " on slave " << slaveId
+ << " for framework " << frameworkId;
+ return true;
+ }
+ }
+ return false;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+bool
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
+ const Resources& resources)
+{
+ Option<double> cpus = resources.cpus();
+ Option<Bytes> mem = resources.mem();
+
+ return (cpus.isSome() && cpus.get() >= MIN_CPUS) ||
+ (mem.isSome() && mem.get() >= MIN_MEM);
+}
+
+} // namespace allocator {
+} // namespace master {
+} // namespace mesos {
+
+#endif // __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
new file mode 100644
index 0000000..7baee45
--- /dev/null
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "logging/logging.hpp"
+
+#include "master/allocator/sorter/drf/sorter.hpp"
+
+using std::list;
+using std::set;
+using std::string;
+
+
+namespace mesos {
+namespace master {
+namespace allocator {
+
+bool DRFComparator::operator () (const Client& client1, const Client& client2)
+{
+ if (client1.share == client2.share) {
+ if (client1.allocations == client2.allocations) {
+ return client1.name < client2.name;
+ }
+ return client1.allocations < client2.allocations;
+ }
+ return client1.share < client2.share;
+}
+
+
+void DRFSorter::add(const string& name, double weight)
+{
+ Client client(name, 0, 0);
+ clients.insert(client);
+
+ allocations[name] = Resources();
+ weights[name] = weight;
+}
+
+
+void DRFSorter::remove(const string& name)
+{
+ set<Client, DRFComparator>::iterator it = find(name);
+
+ if (it != clients.end()) {
+ clients.erase(it);
+ }
+
+ allocations.erase(name);
+ weights.erase(name);
+}
+
+
+void DRFSorter::activate(const string& name)
+{
+ CHECK(allocations.contains(name));
+
+ Client client(name, calculateShare(name), 0);
+ clients.insert(client);
+}
+
+
+void DRFSorter::deactivate(const string& name)
+{
+ set<Client, DRFComparator>::iterator it = find(name);
+
+ if (it != clients.end()) {
+ // TODO(benh): Removing the client is an unfortuante strategy
+ // because we lose information such as the number of allocations
+ // for this client which means the fairness can be gamed by a
+ // framework disconnecting and reconnecting.
+ clients.erase(it);
+ }
+}
+
+
+void DRFSorter::allocated(
+ const string& name,
+ const Resources& resources)
+{
+ set<Client, DRFComparator>::iterator it = find(name);
+
+ if (it != clients.end()) { // TODO(benh): This should really be a CHECK.
+ // TODO(benh): Refactor 'update' to be able to reuse it here.
+ Client client(*it);
+
+ // Update the 'allocations' to reflect the allocator decision.
+ client.allocations++;
+
+ // Remove and reinsert it to update the ordering appropriately.
+ clients.erase(it);
+ clients.insert(client);
+ }
+
+ allocations[name] += resources;
+
+ // If the total resources have changed, we're going to
+ // recalculate all the shares, so don't bother just
+ // updating this client.
+ if (!dirty) {
+ update(name);
+ }
+}
+
+
+void DRFSorter::update(
+ const string& name,
+ const Resources& oldAllocation,
+ const Resources& newAllocation)
+{
+ CHECK(contains(name));
+
+ // TODO(bmahler): Check invariants between old and new allocations.
+ // Namely, the roles and quantities of resources should be the same!
+ // Otherwise, we need to ensure we re-calculate the shares, as
+ // is being currently done, for safety.
+
+ CHECK(resources.contains(oldAllocation));
+
+ resources -= oldAllocation;
+ resources += newAllocation;
+
+ CHECK(allocations[name].contains(oldAllocation));
+
+ allocations[name] -= oldAllocation;
+ allocations[name] += newAllocation;
+
+ // Just assume the total has changed, per the TODO above.
+ dirty = true;
+}
+
+
+Resources DRFSorter::allocation(
+ const string& name)
+{
+ return allocations[name];
+}
+
+
+void DRFSorter::unallocated(
+ const string& name,
+ const Resources& resources)
+{
+ allocations[name] -= resources;
+
+ if (!dirty) {
+ update(name);
+ }
+}
+
+
+void DRFSorter::add(const Resources& _resources)
+{
+ resources += _resources;
+
+ // We have to recalculate all shares when the total resources
+ // change, but we put it off until sort is called
+ // so that if something else changes before the next allocation
+ // we don't recalculate everything twice.
+ dirty = true;
+}
+
+
+void DRFSorter::remove(const Resources& _resources)
+{
+ resources -= _resources;
+ dirty = true;
+}
+
+
+list<string> DRFSorter::sort()
+{
+ if (dirty) {
+ set<Client, DRFComparator> temp;
+
+ set<Client, DRFComparator>::iterator it;
+ for (it = clients.begin(); it != clients.end(); it++) {
+ Client client(*it);
+
+ // Update the 'share' to get proper sorting.
+ client.share = calculateShare(client.name);
+
+ temp.insert(client);
+ }
+
+ clients = temp;
+ }
+
+ list<string> result;
+
+ set<Client, DRFComparator>::iterator it;
+ for (it = clients.begin(); it != clients.end(); it++) {
+ result.push_back((*it).name);
+ }
+
+ return result;
+}
+
+
+bool DRFSorter::contains(const string& name)
+{
+ return allocations.contains(name);
+}
+
+
+int DRFSorter::count()
+{
+ return allocations.size();
+}
+
+
+void DRFSorter::update(const string& name)
+{
+ set<Client, DRFComparator>::iterator it = find(name);
+
+ if (it != clients.end()) {
+ Client client(*it);
+
+ // Update the 'share' to get proper sorting.
+ client.share = calculateShare(client.name);
+
+ // Remove and reinsert it to update the ordering appropriately.
+ clients.erase(it);
+ clients.insert(client);
+ }
+}
+
+
+double DRFSorter::calculateShare(const string& name)
+{
+ double share = 0;
+
+ // TODO(benh): This implementation of "dominant resource fairness"
+ // currently does not take into account resources that are not
+ // scalars.
+
+ // Scalar resources may be spread across multiple 'Resource'
+ // objects. E.g. persistent volumes. So we first collect the names
+ // of the scalar resources, before computing the totals.
+ hashset<string> scalars;
+ foreach (const Resource& resource, resources) {
+ if (resource.type() == Value::SCALAR) {
+ scalars.insert(resource.name());
+ }
+ }
+
+ foreach (const string& scalar, scalars) {
+ Option<Value::Scalar> total = resources.get<Value::Scalar>(scalar);
+
+ if (total.isSome() && total.get().value() > 0) {
+ Option<Value::Scalar> allocation =
+ allocations[name].get<Value::Scalar>(scalar);
+
+ if (allocation.isNone()) {
+ allocation = Value::Scalar();
+ }
+
+ share = std::max(share, allocation.get().value() / total.get().value());
+ }
+ }
+
+ return share / weights[name];
+}
+
+
+set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
+{
+ set<Client, DRFComparator>::iterator it;
+ for (it = clients.begin(); it != clients.end(); it++) {
+ if (name == (*it).name) {
+ break;
+ }
+ }
+
+ return it;
+}
+
+} // namespace allocator {
+} // namespace master {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
new file mode 100644
index 0000000..966fe03
--- /dev/null
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_ALLOCATOR_SORTER_DRF_SORTER_HPP__
+#define __MASTER_ALLOCATOR_SORTER_DRF_SORTER_HPP__
+
+#include <set>
+#include <string>
+
+#include <mesos/resources.hpp>
+
+#include <stout/hashmap.hpp>
+
+#include "master/allocator/sorter/sorter.hpp"
+
+
+namespace mesos {
+namespace master {
+namespace allocator {
+
+struct Client
+{
+ Client(const std::string& _name, double _share, uint64_t _allocations)
+ : name(_name), share(_share), allocations(_allocations) {}
+
+ std::string name;
+ double share;
+
+ // We store the number of times this client has been chosen for
+ // allocation so that we can fairly share the resources across
+ // clients that have the same share. Note that this information is
+ // not persisted across master failovers, but since the point is to
+ // equalize the 'allocations' across clients of the same 'share'
+ // having allocations restart at 0 after a master failover should be
+ // sufficient (famous last words.)
+ uint64_t allocations;
+};
+
+
+struct DRFComparator
+{
+ virtual ~DRFComparator() {}
+ virtual bool operator () (const Client& client1, const Client& client2);
+};
+
+
+class DRFSorter : public Sorter
+{
+public:
+ virtual ~DRFSorter() {}
+
+ virtual void add(const std::string& name, double weight = 1);
+
+ virtual void remove(const std::string& name);
+
+ virtual void activate(const std::string& name);
+
+ virtual void deactivate(const std::string& name);
+
+ virtual void allocated(const std::string& name,
+ const Resources& resources);
+
+ virtual void update(const std::string& name,
+ const Resources& oldAllocation,
+ const Resources& newAllocation);
+
+ virtual void unallocated(const std::string& name,
+ const Resources& resources);
+
+ virtual Resources allocation(const std::string& name);
+
+ virtual void add(const Resources& resources);
+
+ virtual void remove(const Resources& resources);
+
+ virtual std::list<std::string> sort();
+
+ virtual bool contains(const std::string& name);
+
+ virtual int count();
+
+private:
+ // Recalculates the share for the client and moves
+ // it in 'clients' accordingly.
+ void update(const std::string& name);
+
+ // Returns the dominant resource share for the client.
+ double calculateShare(const std::string& name);
+
+ // Returns an iterator to the specified client, if
+ // it exists in this Sorter.
+ std::set<Client, DRFComparator>::iterator find(const std::string& name);
+
+ // If true, start() will recalculate all shares.
+ bool dirty;
+
+ // A set of Clients (names and shares) sorted by share.
+ std::set<Client, DRFComparator> clients;
+
+ // Maps client names to the resources they have been allocated.
+ hashmap<std::string, Resources> allocations;
+
+ // Maps client names to the weights that should be applied to their shares.
+ hashmap<std::string, double> weights;
+
+ // Total resources.
+ Resources resources;
+};
+
+} // namespace allocator {
+} // namespace master {
+} // namespace mesos {
+
+#endif // __MASTER_ALLOCATOR_SORTER_DRF_SORTER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
new file mode 100644
index 0000000..8cd70dd
--- /dev/null
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_ALLOCATOR_SORTER_SORTER_HPP__
+#define __MASTER_ALLOCATOR_SORTER_SORTER_HPP__
+
+#include <list>
+#include <string>
+
+#include <mesos/resources.hpp>
+
+namespace mesos {
+namespace master {
+namespace allocator {
+
+// Sorters implement the logic for determining the
+// order in which users or frameworks should receive
+// resource allocations.
+//
+// TODO(bmahler): The total and allocated resources are currently
+// aggregated across slaves, which only works for scalar resources.
+// Also, persistent disks are a bit tricky because there will be
+// duplicated persistence IDs within the resources. Consider storing
+// maps keyed off of the slave ID to fix these issues.
+//
+// TODO(bmahler): Templatize this on Client, so that callers can
+// don't need to do string conversion, e.g. FrameworkID, string role,
+// etc.
+class Sorter
+{
+public:
+ virtual ~Sorter() {}
+
+ // Adds a client to allocate resources to. A client
+ // may be a user or a framework.
+ virtual void add(const std::string& client, double weight = 1) = 0;
+
+ // Removes a client.
+ virtual void remove(const std::string& client) = 0;
+
+ // Readds a client to the sort after deactivate.
+ virtual void activate(const std::string& client) = 0;
+
+ // Removes a client from the sort, so it won't get allocated to.
+ virtual void deactivate(const std::string& client) = 0;
+
+ // Specify that resources have been allocated to the given client.
+ virtual void allocated(const std::string& client,
+ const Resources& resources) = 0;
+
+ // Updates a portion of the allocation for the client, in order to
+ // augment the resources with additional metadata (e.g., volumes)
+ // This means that the new allocation must not affect the static
+ // roles, or the overall quantities of resources!
+ virtual void update(const std::string& client,
+ const Resources& oldAllocation,
+ const Resources& newAllocation) = 0;
+
+ // Specify that resources have been unallocated from the given client.
+ virtual void unallocated(const std::string& client,
+ const Resources& resources) = 0;
+
+ // Returns the resources that have been allocated to this client.
+ virtual Resources allocation(const std::string& client) = 0;
+
+ // Add resources to the total pool of resources this
+ // Sorter should consider.
+ virtual void add(const Resources& resources) = 0;
+
+ // Remove resources from the total pool.
+ virtual void remove(const Resources& resources) = 0;
+
+ // Returns a list of all clients, in the order that they
+ // should be allocated to, according to this Sorter's policy.
+ virtual std::list<std::string> sort() = 0;
+
+ // Returns true if this Sorter contains the specified client,
+ // either active or deactivated.
+ virtual bool contains(const std::string& client) = 0;
+
+ // Returns the number of clients this Sorter contains,
+ // either active or deactivated.
+ virtual int count() = 0;
+};
+
+} // namespace allocator {
+} // namespace master {
+} // namespace mesos {
+
+#endif // __MASTER_ALLOCATOR_SORTER_SORTER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3093759f/src/master/drf_sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.cpp b/src/master/drf_sorter.cpp
deleted file mode 100644
index 0e4974c..0000000
--- a/src/master/drf_sorter.cpp
+++ /dev/null
@@ -1,293 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "logging/logging.hpp"
-
-#include "master/drf_sorter.hpp"
-
-using std::list;
-using std::set;
-using std::string;
-
-
-namespace mesos {
-namespace master {
-namespace allocator {
-
-bool DRFComparator::operator () (const Client& client1, const Client& client2)
-{
- if (client1.share == client2.share) {
- if (client1.allocations == client2.allocations) {
- return client1.name < client2.name;
- }
- return client1.allocations < client2.allocations;
- }
- return client1.share < client2.share;
-}
-
-
-void DRFSorter::add(const string& name, double weight)
-{
- Client client(name, 0, 0);
- clients.insert(client);
-
- allocations[name] = Resources();
- weights[name] = weight;
-}
-
-
-void DRFSorter::remove(const string& name)
-{
- set<Client, DRFComparator>::iterator it = find(name);
-
- if (it != clients.end()) {
- clients.erase(it);
- }
-
- allocations.erase(name);
- weights.erase(name);
-}
-
-
-void DRFSorter::activate(const string& name)
-{
- CHECK(allocations.contains(name));
-
- Client client(name, calculateShare(name), 0);
- clients.insert(client);
-}
-
-
-void DRFSorter::deactivate(const string& name)
-{
- set<Client, DRFComparator>::iterator it = find(name);
-
- if (it != clients.end()) {
- // TODO(benh): Removing the client is an unfortuante strategy
- // because we lose information such as the number of allocations
- // for this client which means the fairness can be gamed by a
- // framework disconnecting and reconnecting.
- clients.erase(it);
- }
-}
-
-
-void DRFSorter::allocated(
- const string& name,
- const Resources& resources)
-{
- set<Client, DRFComparator>::iterator it = find(name);
-
- if (it != clients.end()) { // TODO(benh): This should really be a CHECK.
- // TODO(benh): Refactor 'update' to be able to reuse it here.
- Client client(*it);
-
- // Update the 'allocations' to reflect the allocator decision.
- client.allocations++;
-
- // Remove and reinsert it to update the ordering appropriately.
- clients.erase(it);
- clients.insert(client);
- }
-
- allocations[name] += resources;
-
- // If the total resources have changed, we're going to
- // recalculate all the shares, so don't bother just
- // updating this client.
- if (!dirty) {
- update(name);
- }
-}
-
-
-void DRFSorter::update(
- const string& name,
- const Resources& oldAllocation,
- const Resources& newAllocation)
-{
- CHECK(contains(name));
-
- // TODO(bmahler): Check invariants between old and new allocations.
- // Namely, the roles and quantities of resources should be the same!
- // Otherwise, we need to ensure we re-calculate the shares, as
- // is being currently done, for safety.
-
- CHECK(resources.contains(oldAllocation));
-
- resources -= oldAllocation;
- resources += newAllocation;
-
- CHECK(allocations[name].contains(oldAllocation));
-
- allocations[name] -= oldAllocation;
- allocations[name] += newAllocation;
-
- // Just assume the total has changed, per the TODO above.
- dirty = true;
-}
-
-
-Resources DRFSorter::allocation(
- const string& name)
-{
- return allocations[name];
-}
-
-
-void DRFSorter::unallocated(
- const string& name,
- const Resources& resources)
-{
- allocations[name] -= resources;
-
- if (!dirty) {
- update(name);
- }
-}
-
-
-void DRFSorter::add(const Resources& _resources)
-{
- resources += _resources;
-
- // We have to recalculate all shares when the total resources
- // change, but we put it off until sort is called
- // so that if something else changes before the next allocation
- // we don't recalculate everything twice.
- dirty = true;
-}
-
-
-void DRFSorter::remove(const Resources& _resources)
-{
- resources -= _resources;
- dirty = true;
-}
-
-
-list<string> DRFSorter::sort()
-{
- if (dirty) {
- set<Client, DRFComparator> temp;
-
- set<Client, DRFComparator>::iterator it;
- for (it = clients.begin(); it != clients.end(); it++) {
- Client client(*it);
-
- // Update the 'share' to get proper sorting.
- client.share = calculateShare(client.name);
-
- temp.insert(client);
- }
-
- clients = temp;
- }
-
- list<string> result;
-
- set<Client, DRFComparator>::iterator it;
- for (it = clients.begin(); it != clients.end(); it++) {
- result.push_back((*it).name);
- }
-
- return result;
-}
-
-
-bool DRFSorter::contains(const string& name)
-{
- return allocations.contains(name);
-}
-
-
-int DRFSorter::count()
-{
- return allocations.size();
-}
-
-
-void DRFSorter::update(const string& name)
-{
- set<Client, DRFComparator>::iterator it = find(name);
-
- if (it != clients.end()) {
- Client client(*it);
-
- // Update the 'share' to get proper sorting.
- client.share = calculateShare(client.name);
-
- // Remove and reinsert it to update the ordering appropriately.
- clients.erase(it);
- clients.insert(client);
- }
-}
-
-
-double DRFSorter::calculateShare(const string& name)
-{
- double share = 0;
-
- // TODO(benh): This implementation of "dominant resource fairness"
- // currently does not take into account resources that are not
- // scalars.
-
- // Scalar resources may be spread across multiple 'Resource'
- // objects. E.g. persistent volumes. So we first collect the names
- // of the scalar resources, before computing the totals.
- hashset<string> scalars;
- foreach (const Resource& resource, resources) {
- if (resource.type() == Value::SCALAR) {
- scalars.insert(resource.name());
- }
- }
-
- foreach (const string& scalar, scalars) {
- Option<Value::Scalar> total = resources.get<Value::Scalar>(scalar);
-
- if (total.isSome() && total.get().value() > 0) {
- Option<Value::Scalar> allocation =
- allocations[name].get<Value::Scalar>(scalar);
-
- if (allocation.isNone()) {
- allocation = Value::Scalar();
- }
-
- share = std::max(share, allocation.get().value() / total.get().value());
- }
- }
-
- return share / weights[name];
-}
-
-
-set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
-{
- set<Client, DRFComparator>::iterator it;
- for (it = clients.begin(); it != clients.end(); it++) {
- if (name == (*it).name) {
- break;
- }
- }
-
- return it;
-}
-
-} // namespace allocator {
-} // namespace master {
-} // namespace mesos {
[4/6] mesos git commit: Removed unnecessary lifecycle method from
MasterAllocatorTest.
Posted by nn...@apache.org.
Removed unnecessary lifecycle method from MasterAllocatorTest.
TestAllocator does not cause GMOCK warnings for unused method calls
any more. Hence there is no need for stopping allocation explicitly.
Review: https://reviews.apache.org/r/29927
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/be6246a1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/be6246a1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/be6246a1
Branch: refs/heads/master
Commit: be6246a11276074dfb60a892a890b80cb7cd37cd
Parents: 3093759
Author: Alexander Rukletsov <al...@mesosphere.io>
Authored: Thu Feb 12 11:44:16 2015 -0800
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Thu Feb 12 11:44:16 2015 -0800
----------------------------------------------------------------------
src/tests/master_allocator_tests.cpp | 9 ---------
1 file changed, 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/be6246a1/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index 648a7a1..df9ac24 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -73,13 +73,6 @@ template <typename T>
class MasterAllocatorTest : public MesosTest
{
protected:
- void StopAllocator()
- {
- // TODO(alexr): Several tests have been reported flaky if no
- // explicit stopping of allocation is used. Ensure allocation
- // is stopped here.
- }
-
TestAllocator<T> allocator;
};
@@ -1286,7 +1279,6 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
.WillRepeatedly(DoDefault());
this->ShutdownMasters();
- this->StopAllocator();
TestAllocator<TypeParam> allocator2;
@@ -1399,7 +1391,6 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
.WillRepeatedly(DoDefault());
this->ShutdownMasters();
- this->StopAllocator();
TestAllocator<TypeParam> allocator2;