You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/11/08 06:51:51 UTC
svn commit: r1406932 [1/2] - in /incubator/mesos/trunk/src: ./ local/
master/ sched/ tests/
Author: benh
Date: Thu Nov 8 05:51:50 2012
New Revision: 1406932
URL: http://svn.apache.org/viewvc?rev=1406932&view=rev
Log:
Created an Allocator facade that wraps AllocatorProcess. Also improved
MockAllocatorProcess.
From: Vinod Kone <vi...@gmail.com>
Review: https://reviews.apache.org/r/7720
Removed:
incubator/mesos/trunk/src/master/allocator.cpp
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/local/local.cpp
incubator/mesos/trunk/src/local/local.hpp
incubator/mesos/trunk/src/master/allocator.hpp
incubator/mesos/trunk/src/master/drf_sorter.cpp
incubator/mesos/trunk/src/master/drf_sorter.hpp
incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
incubator/mesos/trunk/src/master/main.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/master/sorter.hpp
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/tests/allocator_tests.cpp
incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
incubator/mesos/trunk/src/tests/gc_tests.cpp
incubator/mesos/trunk/src/tests/master_detector_tests.cpp
incubator/mesos/trunk/src/tests/master_tests.cpp
incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
incubator/mesos/trunk/src/tests/utils.hpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Thu Nov 8 05:51:50 2012
@@ -152,7 +152,7 @@ noinst_LTLIBRARIES += libmesos_no_third_
nodist_libmesos_no_third_party_la_SOURCES = $(CXX_PROTOS) $(MESSAGES_PROTOS)
libmesos_no_third_party_la_SOURCES = sched/sched.cpp local/local.cpp \
- master/allocator.cpp master/drf_sorter.cpp \
+ master/drf_sorter.cpp \
master/frameworks_manager.cpp master/http.cpp master/master.cpp \
master/slaves_manager.cpp slave/gc.cpp slave/state.cpp \
slave/slave.cpp slave/http.cpp slave/isolation_module.cpp \
Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Thu Nov 8 05:51:50 2012
@@ -33,6 +33,9 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"
+#include "master/allocator.hpp"
+#include "master/drf_sorter.hpp"
+#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "slave/process_based_isolation_module.hpp"
@@ -40,7 +43,10 @@
using namespace mesos::internal;
+using mesos::internal::master::Allocator;
using mesos::internal::master::AllocatorProcess;
+using mesos::internal::master::DRFSorter;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
@@ -60,19 +66,20 @@ namespace mesos {
namespace internal {
namespace local {
-static AllocatorProcess* allocator = NULL;
+static Allocator* allocator = NULL;
+static AllocatorProcess* allocatorProcess = NULL;
static Master* master = NULL;
static map<IsolationModule*, Slave*> slaves;
static MasterDetector* detector = NULL;
static Files* files = NULL;
-
-PID<Master> launch(int numSlaves,
- double cpus,
- uint64_t mem,
- uint64_t disk,
- bool quiet,
- AllocatorProcess* _allocator)
+PID<Master> launch(
+ int numSlaves,
+ double cpus,
+ uint64_t mem,
+ uint64_t disk,
+ bool quiet,
+ Allocator* _allocator)
{
Configuration configuration;
configuration.set("slaves", "*");
@@ -87,8 +94,7 @@ PID<Master> launch(int numSlaves,
}
-PID<Master> launch(const Configuration& configuration,
- AllocatorProcess* _allocator)
+PID<Master> launch(const Configuration& configuration, Allocator* _allocator)
{
int numSlaves = configuration.get<int>("num_slaves", 1);
@@ -98,11 +104,13 @@ PID<Master> launch(const Configuration&
if (_allocator == NULL) {
// Create default allocator, save it for deleting later.
- _allocator = allocator = AllocatorProcess::create("drf", "drf");
+ allocatorProcess = new HierarchicalDRFAllocatorProcess();
+ _allocator = allocator = new Allocator(allocatorProcess);
} else {
// TODO(benh): Figure out the behavior of allocator pointer and remove the
// else block.
allocator = NULL;
+ allocatorProcess = NULL;
}
files = new Files();
@@ -142,6 +150,7 @@ void shutdown()
process::wait(master->self());
delete master;
delete allocator;
+ delete allocatorProcess;
master = NULL;
// TODO(benh): Ugh! Because the isolation module calls back into the
Modified: incubator/mesos/trunk/src/local/local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.hpp (original)
+++ incubator/mesos/trunk/src/local/local.hpp Thu Nov 8 05:51:50 2012
@@ -21,15 +21,22 @@
#include <process/process.hpp>
-#include "configurator/configuration.hpp"
-
-#include "master/allocator.hpp"
-#include "master/master.hpp"
-
namespace mesos {
namespace internal {
+
+// Forward declarations.
+namespace master {
+
+class Allocator;
+class Master;
+
+} // namespace master {
+
+class Configuration;
+
namespace local {
+
// Launch a local cluster with a given number of slaves and given numbers
// of CPUs and memory per slave. Additionally one can also toggle whether
// to initialize Google Logging and whether to log quietly.
@@ -38,12 +45,12 @@ process::PID<master::Master> launch(int
uint64_t mem,
uint64_t disk,
bool quiet,
- master::AllocatorProcess* _allocator = NULL);
+ master::Allocator* _allocator = NULL);
// Launch a local cluster with a given configuration.
process::PID<master::Master> launch(const Configuration& configuration,
- master::AllocatorProcess* _allocator = NULL);
+ master::Allocator* _allocator = NULL);
void shutdown();
Modified: incubator/mesos/trunk/src/master/allocator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/allocator.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/allocator.hpp (original)
+++ incubator/mesos/trunk/src/master/allocator.hpp Thu Nov 8 05:51:50 2012
@@ -19,47 +19,69 @@
#ifndef __ALLOCATOR_HPP__
#define __ALLOCATOR_HPP__
+#include <string>
+
+#include <process/future.hpp>
+#include <process/dispatch.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/option.hpp>
#include "common/resources.hpp"
#include "master/flags.hpp"
-#include "master/master.hpp"
+#include "messages/messages.hpp"
namespace mesos {
namespace internal {
namespace master {
+class Master; // Forward declaration.
+
+
// Basic model of an allocator: resources are allocated to a framework
// in the form of offers. A framework can refuse some resources in
// offers and run tasks in others. Resources can be recovered from a
// framework when tasks finish/fail (or are lost due to a slave
// failure) or when an offer is rescinded.
-
-class AllocatorProcess : public process::Process<AllocatorProcess> {
+// NOTE: New Allocators should implement this interface.
+class AllocatorProcess : public process::Process<AllocatorProcess>
+{
public:
+ AllocatorProcess() {}
+
virtual ~AllocatorProcess() {}
- virtual void initialize(const Flags& flags,
- const process::PID<Master>& master) = 0;
+ virtual void initialize(
+ const Flags& flags,
+ const process::PID<Master>& master) = 0;
- virtual void frameworkAdded(const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used) = 0;
+ virtual void frameworkAdded(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used) = 0;
- virtual void frameworkRemoved(const FrameworkID& frameworkId) = 0;
+ virtual void frameworkRemoved(
+ const FrameworkID& frameworkId) = 0;
- virtual void frameworkActivated(const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo) = 0;
+ virtual void frameworkActivated(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo) = 0;
- virtual void frameworkDeactivated(const FrameworkID& frameworkId) = 0;
+ virtual void frameworkDeactivated(
+ const FrameworkID& frameworkId) = 0;
- virtual void slaveAdded(const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const hashmap<FrameworkID, Resources>& used) = 0;
+ virtual void slaveAdded(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const hashmap<FrameworkID, Resources>& used) = 0;
- virtual void slaveRemoved(const SlaveID& slaveId) = 0;
+ virtual void slaveRemoved(
+ const SlaveID& slaveId) = 0;
virtual void updateWhitelist(
const Option<hashset<std::string> >& whitelist) = 0;
@@ -86,12 +108,236 @@ public:
// Whenever a framework that has filtered resources wants to revive
// offers for those resources the master invokes this callback.
- virtual void offersRevived(const FrameworkID& frameworkId) = 0;
+ virtual void offersRevived(
+ const FrameworkID& frameworkId) = 0;
+};
+
+
+// This is a wrapper around the AllocatorProcess interface.
+// NOTE: DO NOT subclass this class when implementing a new allocator.
+// Implement AllocatorProcess (above) instead!
+class Allocator
+{
+public:
+ // The AllocatorProcess object passed to the constructor is
+ // spawned and terminated by the allocator. But it is the responsibility
+ // of the caller to de-allocate the object, if necessary.
+ Allocator(AllocatorProcess* _process);
+
+ virtual ~Allocator();
+
+ void initialize(
+ const Flags& flags,
+ const process::PID<Master>& master);
+
+ void frameworkAdded(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used);
+
+ void frameworkRemoved(
+ const FrameworkID& frameworkId);
+
+ void frameworkActivated(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo);
- static AllocatorProcess* create(const std::string& userSorterType,
- const std::string& frameworkSorterType);
+ void frameworkDeactivated(
+ const FrameworkID& frameworkId);
+
+ void slaveAdded(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const hashmap<FrameworkID, Resources>& used);
+
+ void slaveRemoved(
+ const SlaveID& slaveId);
+
+ void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist);
+
+ void resourcesRequested(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests);
+
+ void resourcesUnused(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters);
+
+ void resourcesRecovered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources);
+
+ void offersRevived(
+ const FrameworkID& frameworkId);
+
+private:
+ Allocator(const Allocator&); // Not copyable.
+ Allocator& operator=(const Allocator&); // Not assignable.
+
+ AllocatorProcess* process;
};
+
+inline Allocator::Allocator(AllocatorProcess* _process)
+ : process(_process)
+{
+ process::spawn(process);
+}
+
+
+inline Allocator::~Allocator()
+{
+ process::terminate(process);
+ process::wait(process);
+}
+
+
+inline void Allocator::initialize(
+ const Flags& flags,
+ const process::PID<Master>& master)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::initialize,
+ flags,
+ master);
+}
+
+
+inline void Allocator::frameworkAdded(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::frameworkAdded,
+ frameworkId,
+ frameworkInfo,
+ used);
+}
+
+
+inline void Allocator::frameworkRemoved(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::frameworkRemoved,
+ frameworkId);
+}
+
+
+inline void Allocator::frameworkActivated(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::frameworkActivated,
+ frameworkId,
+ frameworkInfo);
+}
+
+
+inline void Allocator::frameworkDeactivated(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::frameworkDeactivated,
+ frameworkId);
+}
+
+
+inline void Allocator::slaveAdded(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const hashmap<FrameworkID, Resources>& used)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::slaveAdded,
+ slaveId,
+ slaveInfo,
+ used);
+}
+
+
+inline void Allocator::slaveRemoved(const SlaveID& slaveId)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::slaveRemoved,
+ slaveId);
+}
+
+
+inline void Allocator::updateWhitelist(
+ const Option<hashset<std::string> >& whitelist)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::updateWhitelist,
+ whitelist);
+}
+
+
+inline void Allocator::resourcesRequested(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::resourcesRequested,
+ frameworkId,
+ requests);
+}
+
+
+inline void Allocator::resourcesUnused(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::resourcesUnused,
+ frameworkId,
+ slaveId,
+ resources,
+ filters);
+}
+
+
+inline void Allocator::resourcesRecovered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::resourcesRecovered,
+ frameworkId,
+ slaveId,
+ resources);
+}
+
+
+inline void Allocator::offersRevived(
+ const FrameworkID& frameworkId)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::offersRevived,
+ frameworkId);
+}
+
} // namespace master {
} // namespace internal {
} // namespace mesos {
Modified: incubator/mesos/trunk/src/master/drf_sorter.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.cpp (original)
+++ incubator/mesos/trunk/src/master/drf_sorter.cpp Thu Nov 8 05:51:50 2012
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include "logging/logging.hpp"
+
#include "master/drf_sorter.hpp"
using std::list;
Modified: incubator/mesos/trunk/src/master/drf_sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.hpp (original)
+++ incubator/mesos/trunk/src/master/drf_sorter.hpp Thu Nov 8 05:51:50 2012
@@ -19,6 +19,13 @@
#ifndef __DRF_SORTER_HPP__
#define __DRF_SORTER_HPP__
+#include <set>
+#include <string>
+
+#include <stout/hashmap.hpp>
+
+#include "common/resources.hpp"
+
#include "master/sorter.hpp"
@@ -58,10 +65,10 @@ public:
virtual void deactivate(const std::string& name);
virtual void allocated(const std::string& name,
- const Resources& resources);
+ const Resources& resources);
virtual void unallocated(const std::string& name,
- const Resources& resources);
+ const Resources& resources);
virtual Resources allocation(const std::string& name);
Modified: incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp (original)
+++ incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp Thu Nov 8 05:51:50 2012
@@ -21,7 +21,6 @@
#include <process/delay.hpp>
#include <process/timeout.hpp>
-#include <process/timer.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
@@ -33,50 +32,62 @@
#include "master/master.hpp"
#include "master/sorter.hpp"
-
namespace mesos {
namespace internal {
namespace master {
// Forward declarations.
+class DRFSorter;
class Filter;
-// Implements the basic allocator algorithm - first pick
-// a user by some criteria, then pick one of their
-// frameworks to allocate to.
-template <class UserSorter, class FrameworkSorter>
+// We forward declare the hierarchical allocator process so that we
+// can typedef an instantiation of it with DRF sorters.
+template <typename UserSorter, typename FrameworkSorter>
+class HierarchicalAllocatorProcess;
+
+typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
+HierarchicalDRFAllocatorProcess;
+
+
+// Implements the basic allocator algorithm - first pick a user by
+// some criteria, then pick one of their frameworks to allocate to.
+template <typename UserSorter, typename FrameworkSorter>
class HierarchicalAllocatorProcess : public AllocatorProcess
{
public:
- HierarchicalAllocatorProcess() : initialized(false) {}
+ HierarchicalAllocatorProcess();
- virtual ~HierarchicalAllocatorProcess() {}
+ virtual ~HierarchicalAllocatorProcess();
- process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> > self()
- {
- return process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >(this);
- }
+ process::PID<HierarchicalAllocatorProcess> self();
- void initialize(const Flags& flags,
- const process::PID<Master>& _master);
+ void initialize(
+ const Flags& flags,
+ const process::PID<Master>& _master);
- void frameworkAdded(const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo,
- const Resources& used);
+ void frameworkAdded(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used);
- void frameworkRemoved(const FrameworkID& frameworkId);
+ void frameworkRemoved(
+ const FrameworkID& frameworkId);
- void frameworkActivated(const FrameworkID& frameworkId,
- const FrameworkInfo& frameworkInfo);
+ void frameworkActivated(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo);
- void frameworkDeactivated(const FrameworkID& frameworkId);
+ void frameworkDeactivated(
+ const FrameworkID& frameworkId);
- void slaveAdded(const SlaveID& slaveId,
- const SlaveInfo& slaveInfo,
- const hashmap<FrameworkID, Resources>& used);
+ void slaveAdded(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const hashmap<FrameworkID, Resources>& used);
- void slaveRemoved(const SlaveID& slaveId);
+ void slaveRemoved(
+ const SlaveID& slaveId);
void updateWhitelist(
const Option<hashset<std::string> >& whitelist);
@@ -96,9 +107,14 @@ public:
const SlaveID& slaveId,
const Resources& resources);
- void offersRevived(const FrameworkID& frameworkId);
+ void offersRevived(
+ const FrameworkID& frameworkId);
protected:
+ // Useful typedefs for dispatch/delay/defer to self()/this.
+ typedef HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> Self;
+ typedef HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> This;
+
// Callback for doing batch allocations.
void batch();
@@ -119,9 +135,10 @@ protected:
// Returns true if there is a filter for this framework
// on this slave.
- bool isFiltered(const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& resources);
+ bool isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources);
bool initialized;
@@ -157,25 +174,25 @@ class Filter
{
public:
virtual ~Filter() {}
+
virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
};
-class RefusedFilter : public Filter
+class RefusedFilter: public Filter
{
public:
- RefusedFilter(const SlaveID& _slaveId,
- const Resources& _resources,
- const Timeout& _timeout)
- : slaveId(_slaveId),
- resources(_resources),
- timeout(_timeout) {}
+ RefusedFilter(
+ const SlaveID& _slaveId,
+ const Resources& _resources,
+ const Timeout& _timeout)
+ : slaveId(_slaveId), resources(_resources), timeout(_timeout) {}
virtual bool filter(const SlaveID& slaveId, const Resources& resources)
{
return slaveId == this->slaveId &&
- resources <= this->resources && // Refused resources are superset.
- timeout.remaining() > Seconds(0);
+ resources <= this->resources && // Refused resources are superset.
+ timeout.remaining() > Seconds(0);
}
const SlaveID slaveId;
@@ -185,7 +202,27 @@ public:
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::initialize(
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::HierarchicalAllocatorProcess()
+ : initialized(false) {}
+
+
+template <class UserSorter, class FrameworkSorter>
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::~HierarchicalAllocatorProcess()
+{}
+
+
+template <class UserSorter, class FrameworkSorter>
+process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::self()
+{
+ return
+ process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >(this);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::initialize(
const Flags& _flags,
const process::PID<Master>& _master)
{
@@ -194,20 +231,23 @@ void HierarchicalAllocatorProcess<UserSo
initialized = true;
userSorter = new UserSorter();
- delay(flags.allocation_interval, self(),
- &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+ VLOG(1) << "Initializing hierarchical allocator process "
+ << "with master : " << master;
+
+ delay(flags.allocation_interval, self(), &Self::batch);
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const Resources& used)
{
CHECK(initialized);
- std::string user = frameworkInfo.user();
+ std::string user = frameworkInfo.user();
if (!userSorter->contains(user)) {
userSorter->add(user);
sorters[user] = new FrameworkSorter();
@@ -230,7 +270,9 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(const FrameworkID& frameworkId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
+ const FrameworkID& frameworkId)
{
CHECK(initialized);
@@ -270,7 +312,8 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo)
{
@@ -286,7 +329,9 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(const FrameworkID& frameworkId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(
+ const FrameworkID& frameworkId)
{
CHECK(initialized);
@@ -314,7 +359,8 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const hashmap<FrameworkID, Resources>& used)
@@ -329,7 +375,9 @@ void HierarchicalAllocatorProcess<UserSo
Resources unused = slaveInfo.resources();
- foreachpair (const FrameworkID& frameworkId, const Resources& resources, used) {
+ foreachpair (const FrameworkID& frameworkId,
+ const Resources& resources,
+ used) {
if (users.contains(frameworkId)) {
const std::string& user = users[frameworkId];
sorters[user]->add(resources);
@@ -343,15 +391,17 @@ void HierarchicalAllocatorProcess<UserSo
allocatable[slaveId] = unused;
LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
- << ") with " << slaveInfo.resources()
- << " (and " << unused << " available)";
+ << ") with " << slaveInfo.resources() << " (and " << unused
+ << " available)";
allocate(slaveId);
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(const SlaveID& slaveId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(
+ const SlaveID& slaveId)
{
CHECK(initialized);
@@ -373,7 +423,8 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
const Option<hashset<std::string> >& _whitelist)
{
CHECK(initialized);
@@ -390,7 +441,8 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRequested(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRequested(
const FrameworkID& frameworkId,
const std::vector<Request>& requests)
{
@@ -401,7 +453,8 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& resources,
@@ -440,8 +493,8 @@ void HierarchicalAllocatorProcess<UserSo
if (seconds != Seconds(0)) {
LOG(INFO) << "Framework " << frameworkId
- << " filtered slave " << slaveId
- << " for " << seconds;
+ << " filtered slave " << slaveId
+ << " for " << seconds;
// Create a new filter and delay it's expiration.
mesos::internal::master::Filter* filter =
@@ -449,16 +502,14 @@ void HierarchicalAllocatorProcess<UserSo
this->filters.put(frameworkId, filter);
- delay(seconds, self(),
- &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire,
- frameworkId,
- filter);
+ delay(seconds, self(), &Self::expire, frameworkId, filter);
}
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& resources)
@@ -491,13 +542,14 @@ void HierarchicalAllocatorProcess<UserSo
VLOG(1) << "Recovered " << resources.allocatable()
<< " on slave " << slaveId
<< " from framework " << frameworkId;
-
}
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(const FrameworkID& frameworkId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(
+ const FrameworkID& frameworkId)
{
CHECK(initialized);
@@ -521,17 +573,18 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch()
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch()
{
CHECK(initialized);
allocate();
- delay(flags.allocation_interval, self(),
- &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+ delay(flags.allocation_interval, self(), &Self::batch);
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate()
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate()
{
CHECK(initialized);
@@ -540,13 +593,15 @@ void HierarchicalAllocatorProcess<UserSo
allocate(slaves.keys());
- LOG(INFO) << "Performed allocation for " << slaves.size()
- << " slaves in " << stopwatch.elapsed();
+ LOG(INFO) << "Performed allocation for " << slaves.size() << " slaves in "
+ << stopwatch.elapsed();
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const SlaveID& slaveId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
+ const SlaveID& slaveId)
{
CHECK(initialized);
@@ -558,13 +613,15 @@ void HierarchicalAllocatorProcess<UserSo
allocate(slaveIds);
- LOG(INFO) << "Performed allocation for slave " << slaveId
- << " in " << stopwatch.elapsed();
+ LOG(INFO) << "Performed allocation for slave " << slaveId << " in "
+ << stopwatch.elapsed();
}
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const hashset<SlaveID>& slaveIds)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
+ const hashset<SlaveID>& slaveIds)
{
CHECK(initialized);
@@ -597,9 +654,9 @@ void HierarchicalAllocatorProcess<UserSo
Value::Scalar mem = resources.get("mem", none);
if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
- VLOG(1) << "Found available resources: " << resources
- << " on slave " << slaveId;
- available[slaveId] = resources;
+ VLOG(1) << "Found available resources: " << resources
+ << " on slave " << slaveId;
+ available[slaveId] = resources;
}
}
}
@@ -616,32 +673,35 @@ void HierarchicalAllocatorProcess<UserSo
Resources allocatedResources;
hashmap<SlaveID, Resources> offerable;
- foreachpair (const SlaveID& slaveId, const Resources& resources, available) {
- // Check whether or not this framework filters this slave.
- bool filtered = isFiltered(frameworkId, slaveId, resources);
-
- if (!filtered) {
- VLOG(1) << "Offering " << resources
- << " on slave " << slaveId
- << " to framework " << frameworkId;
- offerable[slaveId] = resources;
-
- // Update framework and slave resources.
- allocatable[slaveId] -= resources;
- allocatedResources += resources;
- }
+ foreachpair (const SlaveID& slaveId,
+ const Resources& resources,
+ available) {
+ // Check whether or not this framework filters this slave.
+ bool filtered = isFiltered(frameworkId, slaveId, resources);
+
+ if (!filtered) {
+ VLOG(1)
+ << "Offering " << resources << " on slave " << slaveId
+ << " to framework " << frameworkId;
+
+ offerable[slaveId] = resources;
+
+ // Update framework and slave resources.
+ allocatable[slaveId] -= resources;
+ allocatedResources += resources;
+ }
}
if (offerable.size() > 0) {
- foreachkey (const SlaveID& slaveId, offerable) {
- available.erase(slaveId);
- }
-
- sorters[user]->add(allocatedResources);
- sorters[user]->allocated(frameworkIdValue, allocatedResources);
- userSorter->allocated(user, allocatedResources);
+ foreachkey (const SlaveID& slaveId, offerable) {
+ available.erase(slaveId);
+ }
+
+ sorters[user]->add(allocatedResources);
+ sorters[user]->allocated(frameworkIdValue, allocatedResources);
+ userSorter->allocated(user, allocatedResources);
- dispatch(master, &Master::offer, frameworkId, offerable);
+ dispatch(master, &Master::offer, frameworkId, offerable);
}
}
}
@@ -649,7 +709,8 @@ void HierarchicalAllocatorProcess<UserSo
template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
const FrameworkID& frameworkId,
Filter* filter)
{
@@ -658,17 +719,16 @@ void HierarchicalAllocatorProcess<UserSo
// HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
// keep the address from getting reused possibly causing premature
// expiration).
- if (users.contains(frameworkId) &&
- filters.contains(frameworkId, filter)) {
+ if (users.contains(frameworkId) && filters.contains(frameworkId, filter)) {
filters.remove(frameworkId, filter);
}
-
delete filter;
}
template <class UserSorter, class FrameworkSorter>
-bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isWhitelisted(
+bool
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isWhitelisted(
const SlaveID& slaveId)
{
CHECK(initialized);
@@ -676,12 +736,13 @@ bool HierarchicalAllocatorProcess<UserSo
CHECK(slaves.contains(slaveId));
return whitelist.isNone() ||
- whitelist.get().contains(slaves[slaveId].hostname());
+ whitelist.get().contains(slaves[slaveId].hostname());
}
template <class UserSorter, class FrameworkSorter>
-bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
+bool
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& resources)
@@ -690,8 +751,8 @@ bool HierarchicalAllocatorProcess<UserSo
foreach (Filter* filter, filters.get(frameworkId)) {
if (filter->filter(slaveId, resources)) {
VLOG(1) << "Filtered " << resources
- << " on slave " << slaveId
- << " for framework " << frameworkId;
+ << " on slave " << slaveId
+ << " for framework " << frameworkId;
filtered = true;
break;
}
Modified: incubator/mesos/trunk/src/master/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/main.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/main.cpp (original)
+++ incubator/mesos/trunk/src/master/main.cpp Thu Nov 8 05:51:50 2012
@@ -33,6 +33,8 @@
#include "logging/logging.hpp"
#include "master/allocator.hpp"
+#include "master/drf_sorter.hpp"
+#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
using namespace mesos::internal;
@@ -114,8 +116,9 @@ int main(int argc, char** argv)
LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
LOG(INFO) << "Starting Mesos master";
- AllocatorProcess* allocator = AllocatorProcess::create(flags.user_sorter,
- flags.framework_sorter);
+ AllocatorProcess* allocatorProcess = new HierarchicalDRFAllocatorProcess();
+ Allocator* allocator = new Allocator(allocatorProcess);
+
Files files;
Master* master = new Master(allocator, &files, flags);
process::spawn(master);
@@ -129,6 +132,7 @@ int main(int argc, char** argv)
process::wait(master->self());
delete master;
delete allocator;
+ delete allocatorProcess;
MasterDetector::destroy(detector.get());
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Thu Nov 8 05:51:50 2012
@@ -62,7 +62,7 @@ namespace master {
class WhitelistWatcher : public Process<WhitelistWatcher> {
public:
- WhitelistWatcher(const string& _path, AllocatorProcess* _allocator)
+ WhitelistWatcher(const string& _path, Allocator* _allocator)
: path(_path), allocator(_allocator) {}
protected:
@@ -106,7 +106,7 @@ protected:
// Send the whitelist to allocator, if necessary.
if (whitelist != lastWhitelist) {
- dispatch(allocator, &AllocatorProcess::updateWhitelist, whitelist);
+ allocator->updateWhitelist(whitelist);
}
// Check again.
@@ -116,7 +116,7 @@ protected:
private:
const string path;
- AllocatorProcess* allocator;
+ Allocator* allocator;
Option<hashset<string> > lastWhitelist;
};
@@ -257,14 +257,14 @@ struct SlaveReregistrar
};
-Master::Master(AllocatorProcess* _allocator, Files* _files)
+Master::Master(Allocator* _allocator, Files* _files)
: ProcessBase("master"),
flags(),
allocator(_allocator),
files(_files) {}
-Master::Master(AllocatorProcess* _allocator,
+Master::Master(Allocator* _allocator,
Files* _files,
const flags::Flags<logging::Flags, master::Flags>& _flags)
: ProcessBase("master"),
@@ -322,9 +322,8 @@ void Master::initialize()
slavesManager = new SlavesManager(flags, self());
spawn(slavesManager);
- // Spawn the allocator.
- spawn(allocator);
- dispatch(allocator, &AllocatorProcess::initialize, flags, self());
+ // Initialize the allocator.
+ allocator->initialize(flags, self());
// Parse the white list
whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
@@ -474,9 +473,6 @@ void Master::finalize()
foreachvalue (Slave* slave, slaves) {
send(slave->pid, ShutdownMessage());
}
-
- terminate(allocator);
- wait(allocator);
}
@@ -492,7 +488,7 @@ void Master::exited(const UPID& pid)
framework->active = false;
// Tell the allocator to stop allocating resources to this framework.
- dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
+ allocator->frameworkDeactivated(framework->id);
Seconds failoverTimeout(framework->info.failover_timeout());
@@ -506,10 +502,9 @@ void Master::exited(const UPID& pid)
// Remove the framework's offers.
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- offer->framework_id(),
- offer->slave_id(),
- Resources(offer->resources()));
+ allocator->resourcesRecovered(offer->framework_id(),
+ offer->slave_id(),
+ Resources(offer->resources()));
removeOffer(offer);
}
return;
@@ -667,8 +662,9 @@ void Master::reregisterFramework(const F
// replied to the offers but the driver might have dropped
// those messages since it wasn't connected to the master.
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- offer->framework_id(), offer->slave_id(), offer->resources());
+ allocator->resourcesRecovered(offer->framework_id(),
+ offer->slave_id(),
+ offer->resources());
removeOffer(offer);
}
@@ -763,7 +759,7 @@ void Master::deactivateFramework(const F
void Master::resourceRequest(const FrameworkID& frameworkId,
const vector<Request>& requests)
{
- dispatch(allocator, &AllocatorProcess::resourcesRequested, frameworkId, requests);
+ allocator->resourcesRequested(frameworkId, requests);
}
@@ -813,7 +809,7 @@ void Master::reviveOffers(const Framewor
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
LOG(INFO) << "Reviving offers for framework " << framework->id;
- dispatch(allocator, &AllocatorProcess::offersRevived, framework->id);
+ allocator->offersRevived(framework->id);
}
}
@@ -1128,10 +1124,9 @@ void Master::exitedExecutor(const SlaveI
<< " (" << slave->info.hostname() << ")"
<< " exited with status " << status;
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- frameworkId,
- slaveId,
- Resources(executor.resources()));
+ allocator->resourcesRecovered(frameworkId,
+ slaveId,
+ Resources(executor.resources()));
// Remove executor from slave and framework.
slave->removeExecutor(frameworkId, executorId);
@@ -1210,8 +1205,7 @@ void Master::offer(const FrameworkID& fr
<< " has terminated or is inactive";
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- frameworkId, slaveId, offered);
+ allocator->resourcesRecovered(frameworkId, slaveId, offered);
}
return;
}
@@ -1226,8 +1220,7 @@ void Master::offer(const FrameworkID& fr
<< frameworkId << " because slave " << slaveId
<< " is not valid";
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- frameworkId, slaveId, offered);
+ allocator->resourcesRecovered(frameworkId, slaveId, offered);
continue;
}
@@ -1527,11 +1520,10 @@ void Master::processTasks(Offer* offer,
if (unusedResources.allocatable().size() > 0) {
// Tell the allocator about the unused (e.g., refused) resources.
- dispatch(allocator, &AllocatorProcess::resourcesUnused,
- offer->framework_id(),
- offer->slave_id(),
- unusedResources,
- filters);
+ allocator->resourcesUnused(offer->framework_id(),
+ offer->slave_id(),
+ unusedResources,
+ filters);
}
removeOffer(offer);
@@ -1584,6 +1576,7 @@ Resources Master::launchTask(const TaskI
// Tell the slave to launch the task!
LOG(INFO) << "Launching task " << task.task_id()
+ << " of framework " << framework->id
<< " with resources " << task.resources() << " on slave "
<< slave->id << " (" << slave->info.hostname() << ")";
@@ -1613,8 +1606,9 @@ void Master::addFramework(Framework* fra
message.mutable_master_info()->MergeFrom(info);
send(framework->pid, message);
- dispatch(allocator, &AllocatorProcess::frameworkAdded,
- framework->id, framework->info, framework->resources);
+ allocator->frameworkAdded(framework->id,
+ framework->info,
+ framework->resources);
}
@@ -1638,8 +1632,7 @@ void Master::failoverFramework(Framework
// Make sure we can get offers again.
if (!framework->active) {
framework->active = true;
- dispatch(allocator, &AllocatorProcess::frameworkActivated,
- framework->id, framework->info);
+ allocator->frameworkActivated(framework->id, framework->info);
}
framework->reregisteredTime = Clock::now();
@@ -1657,10 +1650,9 @@ void Master::failoverFramework(Framework
// these resources to this framework if it wants.
// TODO(benh): Consider just reoffering these to
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- offer->framework_id(),
- offer->slave_id(),
- Resources(offer->resources()));
+ allocator->resourcesRecovered(offer->framework_id(),
+ offer->slave_id(),
+ Resources(offer->resources()));
removeOffer(offer);
}
}
@@ -1670,7 +1662,7 @@ void Master::removeFramework(Framework*
{
if (framework->active) {
// Tell the allocator to stop allocating resources to this framework.
- dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
+ allocator->frameworkDeactivated(framework->id);
}
// Tell slaves to shutdown the framework.
@@ -1691,10 +1683,9 @@ void Master::removeFramework(Framework*
// Remove the framework's offers (if they weren't removed before).
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- offer->framework_id(),
- offer->slave_id(),
- Resources(offer->resources()));
+ allocator->resourcesRecovered(offer->framework_id(),
+ offer->slave_id(),
+ Resources(offer->resources()));
removeOffer(offer);
}
@@ -1705,10 +1696,9 @@ void Master::removeFramework(Framework*
foreachpair (const ExecutorID& executorId,
const ExecutorInfo& executorInfo,
framework->executors[slaveId]) {
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- framework->id,
- slave->id,
- executorInfo.resources());
+ allocator->resourcesRecovered(framework->id,
+ slave->id,
+ executorInfo.resources());
slave->removeExecutor(framework->id, executorId);
}
}
@@ -1728,7 +1718,7 @@ void Master::removeFramework(Framework*
// Delete it.
frameworks.erase(framework->id);
- dispatch(allocator, &AllocatorProcess::frameworkRemoved, framework->id);
+ allocator->frameworkRemoved(framework->id);
delete framework;
}
@@ -1767,8 +1757,9 @@ void Master::addSlave(Slave* slave, bool
spawn(slave->observer);
if (!reregister) {
- dispatch(allocator, &AllocatorProcess::slaveAdded,
- slave->id, slave->info, hashmap<FrameworkID, Resources>());
+ allocator->slaveAdded(slave->id,
+ slave->info,
+ hashmap<FrameworkID, Resources>());
}
}
@@ -1837,8 +1828,7 @@ void Master::readdSlave(Slave* slave,
resources[task.framework_id()] += task.resources();
}
- dispatch(allocator, &AllocatorProcess::slaveAdded,
- slave->id, slave->info, resources);
+ allocator->slaveAdded(slave->id, slave->info, resources);
}
@@ -1912,7 +1902,7 @@ void Master::removeSlave(Slave* slave)
// Delete it.
slaves.erase(slave->id);
- dispatch(allocator, &AllocatorProcess::slaveRemoved, slave->id);
+ allocator->slaveRemoved(slave->id);
delete slave;
}
@@ -1931,10 +1921,9 @@ void Master::removeTask(Task* task)
slave->removeTask(task);
// Tell the allocator about the recovered resources.
- dispatch(allocator, &AllocatorProcess::resourcesRecovered,
- task->framework_id(),
- task->slave_id(),
- Resources(task->resources()));
+ allocator->resourcesRecovered(task->framework_id(),
+ task->slave_id(),
+ Resources(task->resources()));
delete task;
}
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Thu Nov 8 05:51:50 2012
@@ -56,20 +56,21 @@ namespace master {
using namespace process; // Included to make code easier to read.
-// Some forward declarations.
-class AllocatorProcess;
+// Forward declarations.
+class Allocator;
class SlavesManager;
-struct Framework;
-struct Slave;
class SlaveObserver;
class WhitelistWatcher;
+struct Framework;
+struct Slave;
+
class Master : public ProtobufProcess<Master>
{
public:
- Master(AllocatorProcess* allocator, Files* files);
- Master(AllocatorProcess* allocator,
+ Master(Allocator* allocator, Files* files);
+ Master(Allocator* allocator,
Files* files,
const flags::Flags<logging::Flags, master::Flags>& flags);
@@ -208,7 +209,7 @@ private:
bool elected;
- AllocatorProcess* allocator;
+ Allocator* allocator;
SlavesManager* slavesManager;
WhitelistWatcher* whitelistWatcher;
Files* files;
Modified: incubator/mesos/trunk/src/master/sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/sorter.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/sorter.hpp (original)
+++ incubator/mesos/trunk/src/master/sorter.hpp Thu Nov 8 05:51:50 2012
@@ -19,7 +19,7 @@
#ifndef __SORTER_HPP__
#define __SORTER_HPP__
-#include "master/master.hpp"
+#include <list>
namespace mesos {
@@ -44,22 +44,18 @@ public:
// Readds a client to the sort after deactivate.
virtual void activate(const std::string& client) = 0;
- // Removes a client from the sort, so it won't get
- // allocated to.
+ // Removes a client from the sort, so it won't get allocated to.
virtual void deactivate(const std::string& client) = 0;
- // Specify that resources have been allocated to the
- // given client.
+ // Specify that resources have been allocated to the given client.
virtual void allocated(const std::string& client,
- const Resources& resources) = 0;
+ const Resources& resources) = 0;
- // Specify that resources have been unallocated from
- // the given client.
+ // Specify that resources have been unallocated from the given client.
virtual void unallocated(const std::string& client,
- const Resources& resources) = 0;
+ const Resources& resources) = 0;
- // Returns the resources that have been allocated to
- // this client.
+ // Returns the resources that have been allocated to this client.
virtual Resources allocation(const std::string& client) = 0;
// Add resources to the total pool of resources this
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Thu Nov 8 05:51:50 2012
@@ -418,7 +418,7 @@ protected:
void stop(bool failover)
{
- VLOG(1) << "Stopping the framework";
+ VLOG(1) << "Stopping framework '" << framework.id() << "'";
// Whether or not we send an unregister message, we want to
// terminate this process.
@@ -442,7 +442,7 @@ protected:
// SchedulerProcess::stop.
void abort()
{
- VLOG(1) << "Aborting the framework";
+ VLOG(1) << "Aborting framework '" << framework.id() << "'";
aborted = true;
Modified: incubator/mesos/trunk/src/tests/allocator_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_tests.cpp Thu Nov 8 05:51:50 2012
@@ -20,11 +20,15 @@
#include <mesos/scheduler.hpp>
+#include <process/clock.hpp>
+#include <process/pid.hpp>
+
#include "configurator/configuration.hpp"
#include "detector/detector.hpp"
#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "slave/process_based_isolation_module.hpp"
@@ -36,12 +40,14 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
-using mesos::internal::master::AllocatorProcess;
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::ProcessBasedIsolationModule;
using mesos::internal::slave::Slave;
+using process::Clock;
using process::PID;
using std::map;
@@ -91,55 +97,56 @@ TEST(AllocatorTest, DRFAllocatorProcess)
frameworkInfo3.set_user("user1");
FrameworkID frameworkId3;
- MockAllocator<TestAllocatorProcess > a;
+ MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
- EXPECT_CALL(a, initialize(_, _));
+ EXPECT_CALL(allocator, initialize(_, _));
- EXPECT_CALL(a, frameworkAdded(_, Eq(frameworkInfo1), _))
- .WillOnce(DoAll(InvokeFrameworkAdded(&a),
- SaveArg<0>(&frameworkId1)));
+ EXPECT_CALL(allocator, frameworkAdded(_, Eq(frameworkInfo1), _))
+ .WillOnce(DoAll(InvokeFrameworkAdded(&allocator),
+ SaveArg<0>(&frameworkId1)));
trigger framework2Added;
- EXPECT_CALL(a, frameworkAdded(_, Eq(frameworkInfo2), _))
- .WillOnce(DoAll(InvokeFrameworkAdded(&a),
- SaveArg<0>(&frameworkId2),
- Trigger(&framework2Added)));
+ EXPECT_CALL(allocator, frameworkAdded(_, Eq(frameworkInfo2), _))
+ .WillOnce(DoAll(InvokeFrameworkAdded(&allocator),
+ SaveArg<0>(&frameworkId2),
+ Trigger(&framework2Added)));
trigger framework3Added;
- EXPECT_CALL(a, frameworkAdded(_, Eq(frameworkInfo3), _))
- .WillOnce(DoAll(InvokeFrameworkAdded(&a),
- SaveArg<0>(&frameworkId3),
- Trigger(&framework3Added)));
+ EXPECT_CALL(allocator, frameworkAdded(_, Eq(frameworkInfo3), _))
+ .WillOnce(DoAll(InvokeFrameworkAdded(&allocator),
+ SaveArg<0>(&frameworkId3),
+ Trigger(&framework3Added)));
- EXPECT_CALL(a, frameworkDeactivated(_))
+ EXPECT_CALL(allocator, frameworkDeactivated(_))
.Times(3);
- EXPECT_CALL(a, frameworkRemoved(Eq(ByRef(frameworkId1))));
+ EXPECT_CALL(allocator, frameworkRemoved(Eq(ByRef(frameworkId1))));
- EXPECT_CALL(a, frameworkRemoved(Eq(ByRef(frameworkId2))));
+ EXPECT_CALL(allocator, frameworkRemoved(Eq(ByRef(frameworkId2))));
trigger lastFrameworkRemoved;
- EXPECT_CALL(a, frameworkRemoved(Eq(ByRef(frameworkId3))))
+ EXPECT_CALL(allocator, frameworkRemoved(Eq(ByRef(frameworkId3))))
.WillOnce(Trigger(&lastFrameworkRemoved));
SlaveID slaveId4;
- EXPECT_CALL(a, slaveAdded(_, _, _))
+ EXPECT_CALL(allocator, slaveAdded(_, _, _))
.WillOnce(DoDefault())
.WillOnce(DoDefault())
.WillOnce(DoDefault())
- .WillOnce(DoAll(InvokeSlaveAdded(&a),
- SaveArg<0>(&slaveId4)));
+ .WillOnce(DoAll(InvokeSlaveAdded(&allocator),
+ SaveArg<0>(&slaveId4)));
- EXPECT_CALL(a, slaveRemoved(_))
+ EXPECT_CALL(allocator, slaveRemoved(_))
.Times(3);
trigger lastSlaveRemoved;
- EXPECT_CALL(a, slaveRemoved(Eq(ByRef(slaveId4))))
+ EXPECT_CALL(allocator, slaveRemoved(Eq(ByRef(slaveId4))))
.WillOnce(Trigger(&lastSlaveRemoved));
- EXPECT_CALL(a, resourcesRecovered(_, _, _))
+ EXPECT_CALL(allocator, resourcesRecovered(_, _, _))
.WillRepeatedly(DoDefault());
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(m);
@@ -160,7 +167,7 @@ TEST(AllocatorTest, DRFAllocatorProcess)
trigger resourceOfferTrigger;
EXPECT_CALL(sched1, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers1),
- Trigger(&resourceOfferTrigger)))
+ Trigger(&resourceOfferTrigger)))
.WillRepeatedly(Return());
driver1.start();
@@ -178,9 +185,9 @@ TEST(AllocatorTest, DRFAllocatorProcess)
trigger resourceOfferTrigger2, resourceOfferTrigger3;
EXPECT_CALL(sched2, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers2),
- Trigger(&resourceOfferTrigger2)))
+ Trigger(&resourceOfferTrigger2)))
.WillOnce(DoAll(SaveArg<1>(&offers3),
- Trigger(&resourceOfferTrigger3)))
+ Trigger(&resourceOfferTrigger3)))
.WillRepeatedly(Return());
driver2.start();
@@ -215,7 +222,7 @@ TEST(AllocatorTest, DRFAllocatorProcess)
trigger resourceOfferTrigger4;
EXPECT_CALL(sched3, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers4),
- Trigger(&resourceOfferTrigger4)));
+ Trigger(&resourceOfferTrigger4)));
driver3.start();
@@ -261,16 +268,16 @@ class AllocatorTest : public ::testing::
protected:
virtual void SetUp()
{
- process::spawn(allocator.real);
+ a = new Allocator(&allocator);
}
virtual void TearDown()
{
- process::terminate(allocator.real);
- process::wait(allocator.real);
+ delete a;
}
- MockAllocator<T> allocator;
+ MockAllocatorProcess<T> allocator;
+ Allocator* a;
};
@@ -301,7 +308,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _));
Files files;
- Master m(&this->allocator, &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(&m);
ProcessBasedIsolationModule isolationModule;
@@ -323,7 +330,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffers)));
+ Trigger(&resourceOffers)));
driver.start();
@@ -369,8 +376,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
trigger resourcesUnusedTrigger;
EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
.WillOnce(DoAll(InvokeResourcesUnused(&this->allocator),
- Trigger(&resourcesUnusedTrigger),
- Return()))
+ Trigger(&resourcesUnusedTrigger)))
.WillRepeatedly(DoDefault());
// Prevent any resources from being recovered by returning instead
@@ -380,7 +386,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
.WillRepeatedly(Return());
Files files;
- Master m(&(this->allocator), &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(m);
ProcessBasedIsolationModule isolationModule;
@@ -414,7 +420,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
trigger offered;
EXPECT_CALL(sched2, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&offered)))
+ Trigger(&offered)))
.WillRepeatedly(Return());
driver2.start();
@@ -457,11 +463,11 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
EXPECT_CALL(this->allocator, frameworkAdded(_, Eq(frameworkInfo1), _))
.WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator),
- SaveArg<0>(&frameworkId1)));
+ SaveArg<0>(&frameworkId1)));
EXPECT_CALL(this->allocator, frameworkAdded(_, Eq(frameworkInfo2), _))
.WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator),
- SaveArg<0>(&frameworkId2)));
+ SaveArg<0>(&frameworkId2)));
EXPECT_CALL(this->allocator, frameworkDeactivated(_))
.Times(2);
@@ -469,8 +475,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
trigger frameworkRemoved, frameworkRemoved2;
EXPECT_CALL(this->allocator, frameworkRemoved(Eq(ByRef(frameworkId1))))
.WillOnce(DoAll(InvokeFrameworkRemoved(&this->allocator),
- Trigger(&frameworkRemoved),
- Return()));
+ Trigger(&frameworkRemoved)));
EXPECT_CALL(this->allocator, frameworkRemoved(Eq(ByRef(frameworkId2))))
.WillOnce(Trigger(&frameworkRemoved2));
@@ -489,12 +494,12 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
// that it doesn't get processed until we redispatch it after
// the frameworkRemoved trigger.
.WillOnce(DoAll(SaveArg<0>(&frameworkId),
- SaveArg<1>(&slaveId),
- SaveArg<2>(&savedResources)))
+ SaveArg<1>(&slaveId),
+ SaveArg<2>(&savedResources)))
.WillRepeatedly(DoDefault());
Files files;
- Master m(&(this->allocator), &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(&m);
ProcessBasedIsolationModule isolationModule;
@@ -516,7 +521,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&offered)));
+ Trigger(&offered)));
driver.start();
@@ -532,8 +537,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
// Re-dispatch the resourcesRecovered call which we "caught"
// earlier now that the framework has been removed, to test
// that recovering resources from a removed framework works.
- dispatch(this->allocator, &AllocatorProcess::resourcesRecovered,
- frameworkId, slaveId, savedResources);
+ this->a->resourcesRecovered(frameworkId, slaveId, savedResources);
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master);
@@ -544,7 +548,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
vector<Offer> offers2;
EXPECT_CALL(sched2, resourceOffers(_, _))
.WillOnce(DoAll(SaveArg<1>(&offers2),
- Trigger(&offered2)));
+ Trigger(&offered2)));
driver2.start();
@@ -585,7 +589,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailo
trigger frameworkDeactivatedTrigger;
EXPECT_CALL(this->allocator, frameworkDeactivated(_))
.WillOnce(DoAll(InvokeFrameworkDeactivated(&this->allocator),
- Trigger(&frameworkDeactivatedTrigger)))
+ Trigger(&frameworkDeactivatedTrigger)))
.WillOnce(DoDefault());
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -604,7 +608,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailo
.WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0));
Files files;
- Master m(&this->allocator, &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(&m);
MockExecutor exec;
@@ -652,7 +656,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailo
trigger resourceOffersTrigger1;
EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
.WillOnce(DoAll(LaunchTasks(1, 1, 256),
- Trigger(&resourceOffersTrigger1)));
+ Trigger(&resourceOffersTrigger1)));
driver1.start();
@@ -730,7 +734,7 @@ TYPED_TEST(AllocatorTest, FrameworkExite
.WillRepeatedly(DoDefault());
Files files;
- Master m(&this->allocator, &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(m);
MockExecutor exec;
@@ -772,7 +776,7 @@ TYPED_TEST(AllocatorTest, FrameworkExite
trigger resourceOffersTrigger1;
EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
.WillOnce(DoAll(LaunchTasks(1, 2, 512),
- Trigger(&resourceOffersTrigger1)));
+ Trigger(&resourceOffersTrigger1)));
driver1.start();
@@ -787,12 +791,11 @@ TYPED_TEST(AllocatorTest, FrameworkExite
.WillRepeatedly(DeclineOffers());
// The first time sched2 gets an offer, framework 1 has a
- // task running with 2 cpus and 512 mem, leaving 1 cpu and
- // 512 mem.
+ // task running with 2 cpus and 512 mem, leaving 1 cpu and 512 mem.
trigger resourceOffersTrigger2;
EXPECT_CALL(sched2, resourceOffers(_, OfferEq(1, 512)))
.WillOnce(DoAll(LaunchTasks(1, 1, 256),
- Trigger(&resourceOffersTrigger2)));
+ Trigger(&resourceOffersTrigger2)));
// After we kill framework 1, all of it's resources should
// have been returned, but framework 2 should still have a
@@ -854,11 +857,11 @@ TYPED_TEST(AllocatorTest, SlaveLost)
trigger slaveRemovedTrigger1, slaveRemovedTrigger2;
EXPECT_CALL(this->allocator, slaveRemoved(_))
.WillOnce(DoAll(InvokeSlaveRemoved(&this->allocator),
- Trigger(&slaveRemovedTrigger1)))
+ Trigger(&slaveRemovedTrigger1)))
.WillOnce(Trigger(&slaveRemovedTrigger2));
Files files;
- Master m(&this->allocator, &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(m);
MockExecutor exec;
@@ -868,7 +871,7 @@ TYPED_TEST(AllocatorTest, SlaveLost)
trigger launchTaskTrigger;
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
- Trigger(&launchTaskTrigger)));
+ Trigger(&launchTaskTrigger)));
EXPECT_CALL(exec, shutdown(_));
@@ -902,7 +905,7 @@ TYPED_TEST(AllocatorTest, SlaveLost)
// Initially, all of slave1's resources are avaliable.
EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 1024)))
.WillOnce(DoAll(LaunchTasks(1, 2, 512),
- Trigger(&resourceOffersTrigger1)));
+ Trigger(&resourceOffersTrigger1)));
// Eventually after slave2 is launched, we should get
// an offer that contains all of slave2's resources
@@ -982,7 +985,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
.WillRepeatedly(DoDefault());
Files files;
- Master m(&this->allocator, &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(m);
MockExecutor exec;
@@ -992,7 +995,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
trigger launchTaskTrigger;
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
- Trigger(&launchTaskTrigger)));
+ Trigger(&launchTaskTrigger)));
trigger shutdownTrigger;
EXPECT_CALL(exec, shutdown(_))
@@ -1029,7 +1032,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
// Initially, all of slave1's resources are avaliable.
EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
.WillOnce(DoAll(LaunchTasks(1, 2, 512),
- Trigger(&resourceOffersTrigger1)));
+ Trigger(&resourceOffersTrigger1)));
// After slave2 launches, all of its resources are
// combined with the resources on slave1 that the
@@ -1109,7 +1112,7 @@ TYPED_TEST(AllocatorTest, TaskFinished)
.WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 1));
Files files;
- Master m(&this->allocator, &files);
+ Master m(this->a, &files);
PID<Master> master = process::spawn(m);
MockExecutor exec;
@@ -1120,9 +1123,9 @@ TYPED_TEST(AllocatorTest, TaskFinished)
trigger launchTaskTrigger;
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(DoAll(SaveArg<0>(&execDriver),
- SaveArg<1>(&taskInfo),
- SendStatusUpdateFromTask(TASK_RUNNING),
- Trigger(&launchTaskTrigger)))
+ SaveArg<1>(&taskInfo),
+ SendStatusUpdateFromTask(TASK_RUNNING),
+ Trigger(&launchTaskTrigger)))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
trigger shutdownTrigger;
@@ -1164,7 +1167,7 @@ TYPED_TEST(AllocatorTest, TaskFinished)
// After the tasks are launched.
EXPECT_CALL(sched1, resourceOffers(_, OfferEq(1, 512)))
.WillOnce(DoAll(DeclineOffers(),
- Trigger(&resourceOffersTrigger1)));
+ Trigger(&resourceOffersTrigger1)));
// After the first task gets killed.
EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 768)))
@@ -1220,9 +1223,9 @@ TYPED_TEST(AllocatorTest, WhitelistSlave
trigger updateWhitelistTrigger1, updateWhitelistTrigger2;
EXPECT_CALL(this->allocator, updateWhitelist(_))
.WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
- Trigger(&updateWhitelistTrigger1)))
+ Trigger(&updateWhitelistTrigger1)))
.WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
- Trigger(&updateWhitelistTrigger2)));
+ Trigger(&updateWhitelistTrigger2)));
EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
.WillRepeatedly(DoDefault());
@@ -1235,7 +1238,7 @@ TYPED_TEST(AllocatorTest, WhitelistSlave
Files files;
flags::Flags<logging::Flags, master::Flags> flags;
flags.whitelist = "file://" + path; // TODO(benh): Put in /tmp.
- Master m(&this->allocator, &files, flags);
+ Master m(this->a, &files, flags);
PID<Master> master = process::spawn(&m);
MockExecutor exec;
Modified: incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp Thu Nov 8 05:51:50 2012
@@ -31,6 +31,7 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
+using mesos::internal::master::Allocator;
using mesos::internal::master::AllocatorProcess;
using mesos::internal::master::Master;
@@ -42,10 +43,10 @@ using std::map;
using std::string;
using std::vector;
+using testing::_;
using testing::DoAll;
using testing::DoDefault;
using testing::Eq;
-using testing::_;
using testing::Return;
using testing::SaveArg;
@@ -53,9 +54,28 @@ using testing::SaveArg;
template <typename T = AllocatorProcess>
class AllocatorZooKeeperTest : public ZooKeeperTest
{
+public:
+ virtual void SetUp()
+ {
+ ZooKeeperTest::SetUp();
+
+ a1 = new Allocator(&allocator1);
+ a2 = new Allocator(&allocator2);
+ }
+
+ virtual void TearDown()
+ {
+ ZooKeeperTest::TearDown();
+
+ delete a1;
+ delete a2;
+ }
+
protected:
T allocator1;
- MockAllocator<T> allocator2;
+ MockAllocatorProcess<T> allocator2;
+ Allocator* a1;
+ Allocator* a2;
};
@@ -70,7 +90,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
trigger frameworkAddedTrigger;
EXPECT_CALL(this->allocator2, frameworkAdded(_, _, _))
.WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator2),
- Trigger(&frameworkAddedTrigger)));
+ Trigger(&frameworkAddedTrigger)));
EXPECT_CALL(this->allocator2, frameworkDeactivated(_));
@@ -88,13 +108,13 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
trigger shutdownMessageTrigger;
EXPECT_MESSAGE(Eq(ShutdownMessage().GetTypeName()), _, _)
.WillRepeatedly(DoAll(Trigger(&shutdownMessageTrigger),
- Return(true)));
+ Return(true)));
EXPECT_MESSAGE(Eq(ReregisterSlaveMessage().GetTypeName()), _, _)
.WillRepeatedly(Return(true));
Files files;
- Master m(&this->allocator1, &files);
+ Master m(this->a1, &files);
PID<Master> master1 = process::spawn(&m);
string zk = "zk://" + this->server->connectString() + "/znode";
@@ -135,16 +155,16 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
trigger resourceOffersTrigger, resourceOffersTrigger2;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
- LaunchTasks(1, 1, 512),
+ LaunchTasks(1, 1, 512),
Trigger(&resourceOffersTrigger)))
.WillRepeatedly(DoAll(SaveArg<1>(&offers2),
- Trigger(&resourceOffersTrigger2)));
+ Trigger(&resourceOffersTrigger2)));
TaskStatus status;
trigger statusUpdateTrigger;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&statusUpdateTrigger)));
+ Trigger(&statusUpdateTrigger)));
EXPECT_CALL(sched, disconnected(_))
.WillRepeatedly(DoDefault());
@@ -174,7 +194,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
WAIT_UNTIL(shutdownMessageTrigger);
Files files2;
- Master m2(&(this->allocator2), &files2);
+ Master m2(this->a2, &files2);
PID<Master> master2 = process::spawn(m2);
Try<MasterDetector*> detector2 =
@@ -224,7 +244,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
trigger slaveAddedTrigger;
EXPECT_CALL(this->allocator2, slaveAdded(_, _, _))
.WillOnce(DoAll(InvokeSlaveAdded(&this->allocator2),
- Trigger(&slaveAddedTrigger)));
+ Trigger(&slaveAddedTrigger)));
trigger slaveRemovedTrigger;
EXPECT_CALL(this->allocator2, slaveRemoved(_))
@@ -236,13 +256,13 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
trigger shutdownMessageTrigger;
EXPECT_MESSAGE(Eq(ShutdownMessage().GetTypeName()), _, _)
.WillRepeatedly(DoAll(Trigger(&shutdownMessageTrigger),
- Return(true)));
+ Return(true)));
EXPECT_MESSAGE(Eq(ReregisterFrameworkMessage().GetTypeName()), _, _)
.WillRepeatedly(Return(true));
Files files;
- Master m(&this->allocator1, &files);
+ Master m(this->a1, &files);
PID<Master> master1 = process::spawn(&m);
string zk = "zk://" + this->server->connectString() + "/znode";
@@ -283,16 +303,16 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
trigger resourceOffersTrigger, resourceOffersTrigger2;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
- LaunchTasks(1, 1, 512),
+ LaunchTasks(1, 1, 512),
Trigger(&resourceOffersTrigger)))
.WillRepeatedly(DoAll(SaveArg<1>(&offers2),
- Trigger(&resourceOffersTrigger2)));
+ Trigger(&resourceOffersTrigger2)));
TaskStatus status;
trigger statusUpdateTrigger;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(DoAll(SaveArg<1>(&status),
- Trigger(&statusUpdateTrigger)));
+ Trigger(&statusUpdateTrigger)));
EXPECT_CALL(sched, disconnected(_))
.WillRepeatedly(DoDefault());
@@ -322,7 +342,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
WAIT_UNTIL(shutdownMessageTrigger);
Files files2;
- Master m2(&(this->allocator2), &files2);
+ Master m2(this->a2, &files2);
PID<Master> master2 = process::spawn(m2);
Try<MasterDetector*> detector2 =
Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Thu Nov 8 05:51:50 2012
@@ -29,6 +29,8 @@
#include "local/local.hpp"
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "slave/process_based_isolation_module.hpp"
@@ -41,6 +43,8 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::ProcessBasedIsolationModule;
@@ -69,7 +73,8 @@ TEST(FaultToleranceTest, SlaveLost)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -208,7 +213,8 @@ TEST(FaultToleranceTest, SchedulerFailov
trigger sched1RegisteredCall;
EXPECT_CALL(sched1, registered(&driver1, _, _))
- .WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&sched1RegisteredCall)));
+ .WillOnce(DoAll(SaveArg<1>(&frameworkId),
+ Trigger(&sched1RegisteredCall)));
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillRepeatedly(Return());
@@ -366,7 +372,8 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -463,7 +470,8 @@ TEST(FaultToleranceTest, SchedulerFailov
Clock::pause();
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -520,7 +528,8 @@ TEST(FaultToleranceTest, SchedulerFailov
EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), _,
Not(AnyOf(Eq(master), Eq(slave))))
- .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
+ .WillOnce(DoAll(Trigger(&statusUpdateMsg),
+ Return(true)))
.RetiresOnSaturation();
driver1.start();
@@ -594,7 +603,8 @@ TEST(FaultToleranceTest, SchedulerFailov
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -637,7 +647,8 @@ TEST(FaultToleranceTest, SchedulerFailov
EXPECT_CALL(sched1, registered(&driver1, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
- .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&sched1StatusUpdateCall)));
+ .WillOnce(DoAll(SaveArg<1>(&status),
+ Trigger(&sched1StatusUpdateCall)));
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
@@ -711,7 +722,8 @@ TEST(FaultToleranceTest, SchedulerExit)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -763,7 +775,8 @@ TEST(FaultToleranceTest, SchedulerExit)
.WillOnce(SaveArg<1>(&frameworkId));
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&schedStatusUpdateCall)));
+ .WillOnce(DoAll(SaveArg<1>(&status),
+ Trigger(&schedStatusUpdateCall)));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(DoAll(SaveArg<1>(&offers),
@@ -834,10 +847,12 @@ TEST(FaultToleranceTest, SlaveReliableRe
// Drop the first slave registered message, allow subsequent messages.
EXPECT_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
- .WillOnce(DoAll(Trigger(&slaveRegisteredMsg), Return(true)))
+ .WillOnce(DoAll(Trigger(&slaveRegisteredMsg),
+ Return(true)))
.WillRepeatedly(Return(false));
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -888,7 +903,8 @@ TEST(FaultToleranceTest, SlaveReregister
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);
@@ -917,7 +933,8 @@ TEST(FaultToleranceTest, SlaveReregister
trigger slaveReRegisterMsg;
EXPECT_MESSAGE(Eq(SlaveReregisteredMessage().GetTypeName()), _, _)
- .WillOnce(DoAll(Trigger(&slaveReRegisterMsg), Return(false)));
+ .WillOnce(DoAll(Trigger(&slaveReRegisterMsg),
+ Return(false)));
driver.start();
Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Thu Nov 8 05:51:50 2012
@@ -35,6 +35,8 @@
#include "local/local.hpp"
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "slave/constants.hpp"
@@ -48,6 +50,8 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
@@ -85,7 +89,7 @@ protected:
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- a = new TestAllocatorProcess();
+ a = new Allocator(&allocator);
files = new Files();
m = new Master(a, files);
master = process::spawn(m);
@@ -133,7 +137,8 @@ protected:
startSlave();
}
- TestAllocatorProcess* a;
+ Allocator* a;
+ HierarchicalDRFAllocatorProcess allocator;
Master* m;
TestingIsolationModule* isolationModule;
Slave* s;
Modified: incubator/mesos/trunk/src/tests/master_detector_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_detector_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_detector_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_detector_tests.cpp Thu Nov 8 05:51:50 2012
@@ -31,6 +31,8 @@
#include "detector/detector.hpp"
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "slave/slave.hpp"
@@ -41,6 +43,8 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
@@ -57,7 +61,8 @@ TEST(MasterDetector, File)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- TestAllocatorProcess a;
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator a(&allocator);
Files files;
Master m(&a, &files);
PID<Master> master = process::spawn(&m);