You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/30 23:53:04 UTC
git commit: Add Slave and Framework struct to
HierarchicalAllocatorProcess. Cleans up the proliferation of hashmaps in
HierarchicalAllocatorProcess.
Updated Branches:
refs/heads/master d9dd5c97a -> 11e8a1dba
Add Slave and Framework struct to HierarchicalAllocatorProcess. Cleans
up the proliferation of hashmaps in HierarchicalAllocatorProcess.
From: Thomas Marshall <tw...@gmail.com>
Review: https://reviews.apache.org/r/11541
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/11e8a1db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/11e8a1db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/11e8a1db
Branch: refs/heads/master
Commit: 11e8a1dba355148dbbbe17f7229cca120534b7b4
Parents: d9dd5c9
Author: Benjamin Hindman <be...@twitter.com>
Authored: Thu May 30 14:51:24 2013 -0700
Committer: Benjamin Hindman <be...@twitter.com>
Committed: Thu May 30 14:51:24 2013 -0700
----------------------------------------------------------------------
.../3rdparty/stout/include/stout/stringify.hpp | 17 +
src/local/local.cpp | 9 +-
src/local/local.hpp | 10 +-
src/master/allocator.hpp | 2 +
src/master/drf_sorter.cpp | 2 +
src/master/drf_sorter.hpp | 2 +
src/master/hierarchical_allocator_process.hpp | 248 ++++++++-------
src/master/main.cpp | 6 +-
src/master/master.cpp | 2 +
src/master/master.hpp | 13 +-
src/master/sorter.hpp | 2 +
src/tests/allocator_tests.cpp | 6 +-
src/tests/allocator_zookeeper_tests.cpp | 5 +-
src/tests/cluster.hpp | 14 +-
src/tests/mesos.cpp | 2 +-
src/tests/mesos.hpp | 34 +-
src/tests/resource_offers_tests.cpp | 3 +-
src/tests/sorter_tests.cpp | 4 +-
18 files changed, 224 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
index 136316d..2bb7290 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/stringify.hpp
@@ -103,6 +103,23 @@ std::string stringify(const std::map<K, V>& map)
}
+template <typename T>
+std::string stringify(const hashset<T>& set)
+{
+ std::ostringstream out;
+ out << "{ ";
+ typename hashset<T>::const_iterator iterator = set.begin();
+ while (iterator != set.end()) {
+ out << stringify(*iterator);
+ if (++iterator != set.end()) {
+ out << ", ";
+ }
+ }
+ out << " }";
+ return out.str();
+}
+
+
template <typename K, typename V>
std::string stringify(const hashmap<K, V>& map)
{
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index c35c19a..3364c15 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -40,10 +40,11 @@
using namespace mesos::internal;
-using mesos::internal::master::Allocator;
-using mesos::internal::master::AllocatorProcess;
-using mesos::internal::master::DRFSorter;
-using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::allocator::Allocator;
+using mesos::internal::master::allocator::AllocatorProcess;
+using mesos::internal::master::allocator::DRFSorter;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
+
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/local/local.hpp
----------------------------------------------------------------------
diff --git a/src/local/local.hpp b/src/local/local.hpp
index a165265..0aa50ef 100644
--- a/src/local/local.hpp
+++ b/src/local/local.hpp
@@ -29,9 +29,14 @@ namespace internal {
// Forward declarations.
namespace master {
-class Allocator;
class Master;
+namespace allocator {
+
+class Allocator;
+
+} // namespace allocator {
+
} // namespace master {
class Configuration;
@@ -41,8 +46,7 @@ namespace local {
// Launch a local cluster with the given flags.
process::PID<master::Master> launch(
const Flags& flags,
- master::Allocator* _allocator = NULL);
-
+ master::allocator::Allocator* _allocator = NULL);
void shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index b68b67d..78c75bb 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -42,6 +42,7 @@ namespace master {
class Master; // Forward declaration.
+namespace allocator {
// Basic model of an allocator: resources are allocated to a framework
// in the form of offers. A framework can refuse some resources in
@@ -338,6 +339,7 @@ inline void Allocator::offersRevived(
frameworkId);
}
+} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/drf_sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.cpp b/src/master/drf_sorter.cpp
index fe31a00..7fcd0ca 100644
--- a/src/master/drf_sorter.cpp
+++ b/src/master/drf_sorter.cpp
@@ -28,6 +28,7 @@ using std::string;
namespace mesos {
namespace internal {
namespace master {
+namespace allocator {
bool DRFComparator::operator () (
const Client& client1,
@@ -226,6 +227,7 @@ set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
return it;
}
+} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/drf_sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.hpp b/src/master/drf_sorter.hpp
index 79566cc..866515e 100644
--- a/src/master/drf_sorter.hpp
+++ b/src/master/drf_sorter.hpp
@@ -32,6 +32,7 @@
namespace mesos {
namespace internal {
namespace master {
+namespace allocator {
struct Client
{
@@ -107,6 +108,7 @@ private:
Resources resources;
};
+} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 28a7879..1048a28 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -27,6 +27,7 @@
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/stopwatch.hpp>
+#include <stout/stringify.hpp>
#include "common/resources.hpp"
@@ -38,6 +39,7 @@
namespace mesos {
namespace internal {
namespace master {
+namespace allocator {
// Forward declarations.
class Filter;
@@ -52,6 +54,68 @@ typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
HierarchicalDRFAllocatorProcess;
+struct Slave
+{
+ Slave() {}
+
+ Slave(const SlaveInfo& _info)
+ : available(_info.resources()),
+ whitelisted(false),
+ info(_info) {}
+
+ Resources resources() const { return info.resources(); }
+
+ std::string hostname() const { return info.hostname(); }
+
+ // Returns true iff this slave is whitelisted and has sufficient
+ // free resources to allocate.
+ bool allocatable() const
+ {
+ // TODO(benh): For now, only make offers when there is some cpu
+ // and memory left. This is an artifact of the original code that
+ // only offered when there was at least 1 cpu "unit" available,
+ // and without doing this a framework might get offered resources
+ // with only memory available (which it obviously will decline)
+ // and then end up waiting the default Filters::refuse_seconds
+ // (unless the framework set it to something different).
+
+ Value::Scalar none;
+ Value::Scalar cpus = available.get("cpus", none);
+ Value::Scalar mem = available.get("mem", none);
+
+ return (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) &&
+ whitelisted;
+ }
+
+ // Contains all of the resources currently free on this slave.
+ Resources available;
+
+ // Indicates if the resources on this slave should be offered to
+ // frameworks.
+ bool whitelisted;
+
+private:
+ SlaveInfo info;
+};
+
+
+struct Framework
+{
+ Framework() {}
+
+ Framework(const FrameworkInfo& _info)
+ : info(_info) {}
+
+ std::string user() const { return info.user(); }
+
+ // Filters that have been added by this framework.
+ hashset<Filter*> filters;
+
+private:
+ FrameworkInfo info;
+};
+
+
// Implements the basic allocator algorithm - first pick a user by
// some criteria, then pick one of their frameworks to allocate to.
template <typename UserSorter, typename FrameworkSorter>
@@ -147,21 +211,15 @@ protected:
Flags flags;
PID<Master> master;
- // Maps FrameworkIDs to user names.
- hashmap<FrameworkID, std::string> users;
+ // Contains all frameworks.
+ hashmap<FrameworkID, Framework> frameworks;
// Maps user names to the Sorter object which contains
// all of that user's frameworks.
hashmap<std::string, FrameworkSorter*> sorters;
- // Maps slaves to their allocatable resources.
- hashmap<SlaveID, Resources> allocatable;
-
// Contains all active slaves.
- hashmap<SlaveID, SlaveInfo> slaves;
-
- // Filters that have been added by frameworks.
- multihashmap<FrameworkID, Filter*> filters;
+ hashmap<SlaveID, Slave> slaves;
// Slaves to send offers for.
Option<hashset<std::string> > whitelist;
@@ -250,7 +308,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
{
CHECK(initialized);
- std::string user = frameworkInfo.user();
+ const std::string& user = frameworkInfo.user();
if (!userSorter->contains(user)) {
userSorter->add(user);
sorters[user] = new FrameworkSorter();
@@ -264,7 +322,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
sorters[user]->add(used);
sorters[user]->allocated(frameworkId.value(), used);
- users[frameworkId] = frameworkInfo.user();
+ frameworks[frameworkId] = Framework(frameworkInfo);
LOG(INFO) << "Added framework " << frameworkId;
@@ -279,7 +337,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
{
CHECK(initialized);
- std::string user = users[frameworkId];
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& user = frameworks[frameworkId].user();
+
// Might not be in 'sorters[user]' because it was previously
// deactivated and never re-added.
if (sorters[user]->contains(frameworkId.value())) {
@@ -289,7 +349,11 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
sorters[user]->remove(frameworkId.value());
}
- users.erase(frameworkId);
+ // Do not delete the filters contained in this
+ // framework's 'filters' hashset yet, see comments in
+ // HierarchicalAllocatorProcess::offersRevived and
+ // HierarchicalAllocatorProcess::expire.
+ frameworks.erase(frameworkId);
// If this user doesn't have any more active frameworks, remove it.
if (sorters[user]->count() == 0) {
@@ -300,16 +364,6 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
userSorter->remove(user);
}
- foreach (Filter* filter, filters.get(frameworkId)) {
- filters.remove(frameworkId, filter);
-
- // Do not delete the filter, see comments in
- // HierarchicalAllocatorProcess::offersRevived and
- // HierarchicalAllocatorProcess::expire.
- }
-
- filters.remove(frameworkId);
-
LOG(INFO) << "Removed framework " << frameworkId;
}
@@ -322,7 +376,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
{
CHECK(initialized);
- std::string user = frameworkInfo.user();
+ const std::string& user = frameworkInfo.user();
sorters[user]->activate(frameworkId.value());
LOG(INFO) << "Activated framework " << frameworkId;
@@ -338,7 +392,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(
{
CHECK(initialized);
- std::string user = users[frameworkId];
+ CHECK(frameworks.contains(frameworkId));
+ const std::string& user = frameworks[frameworkId].user();
+
sorters[user]->deactivate(frameworkId.value());
// Note that the Sorter *does not* remove the resources allocated
@@ -347,15 +403,11 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(
// of the resources that it is using. We might be able to collapse
// the added/removed and activated/deactivated in the future.
- foreach (Filter* filter, filters.get(frameworkId)) {
- filters.remove(frameworkId, filter);
-
- // Do not delete the filter, see comments in
- // HierarchicalAllocatorProcess::offersRevived and
- // HierarchicalAllocatorProcess::expire.
- }
-
- filters.remove(frameworkId);
+ // Do not delete the filters contained in this
+ // framework's 'filters' hashset yet, see comments in
+ // HierarchicalAllocatorProcess::offersRevived and
+ // HierarchicalAllocatorProcess::expire.
+ frameworks[frameworkId].filters.clear();
LOG(INFO) << "Deactivated framework " << frameworkId;
}
@@ -372,7 +424,8 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
CHECK(!slaves.contains(slaveId));
- slaves[slaveId] = slaveInfo;
+ slaves[slaveId] = Slave(slaveInfo);
+ slaves[slaveId].whitelisted = isWhitelisted(slaveId);
userSorter->add(slaveInfo.resources());
@@ -381,8 +434,8 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
foreachpair (const FrameworkID& frameworkId,
const Resources& resources,
used) {
- if (users.contains(frameworkId)) {
- const std::string& user = users[frameworkId];
+ if (frameworks.contains(frameworkId)) {
+ const std::string& user = frameworks[frameworkId].user();
sorters[user]->add(resources);
sorters[user]->allocated(frameworkId.value(), resources);
userSorter->allocated(user, resources);
@@ -391,13 +444,15 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
unused -= resources; // Only want to allocate resources that are not used!
}
- allocatable[slaveId] = unused;
+ slaves[slaveId].available = unused;
LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
<< ") with " << slaveInfo.resources() << " (and " << unused
<< " available)";
- allocate(slaveId);
+ if (slaves[slaveId].allocatable()) {
+ allocate(slaveId);
+ }
}
@@ -414,8 +469,6 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(
slaves.erase(slaveId);
- allocatable.erase(slaveId);
-
// Note that we DO NOT actually delete any filters associated with
// this slave, that will occur when the delayed
// HierarchicalAllocatorProcess::expire gets invoked (or the framework
@@ -435,9 +488,10 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
whitelist = _whitelist;
if (whitelist.isSome()) {
- LOG(INFO) << "Updated slave white list:";
- foreach (const std::string& hostname, whitelist.get()) {
- LOG(INFO) << "\t" << hostname;
+ LOG(INFO) << "Updated slave white list: " << stringify(whitelist.get());
+
+ foreachkey (const SlaveID& slaveId, slaves) {
+ slaves[slaveId].whitelisted = isWhitelisted(slaveId);
}
}
}
@@ -478,16 +532,16 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
// because resourcesUnused is only called as the
// result of a valid task launch by an active
// framework that doesn't use the entire offer.
- CHECK(users.contains(frameworkId));
+ CHECK(frameworks.contains(frameworkId));
- std::string user = users[frameworkId];
+ const std::string& user = frameworks[frameworkId].user();
sorters[user]->unallocated(frameworkId.value(), resources);
sorters[user]->remove(resources);
userSorter->unallocated(user, resources);
// Update resources allocatable on slave.
- CHECK(allocatable.contains(slaveId));
- allocatable[slaveId] += resources;
+ CHECK(slaves.contains(slaveId));
+ slaves[slaveId].available += resources;
// Create a refused resources filter.
Try<Duration> seconds_ = Duration::create(Filters().refuse_seconds());
@@ -517,10 +571,10 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
<< " for " << seconds;
// Create a new filter and delay it's expiration.
- mesos::internal::master::Filter* filter =
+ Filter* filter =
new RefusedFilter(slaveId, resources, Timeout::in(seconds));
- this->filters.put(frameworkId, filter);
+ frameworks[frameworkId].filters.insert(filter);
delay(seconds, self(), &Self::expire, frameworkId, filter);
}
@@ -545,9 +599,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
// Master::offer before we received AllocatorProcess::frameworkRemoved
// or AllocatorProcess::frameworkDeactivated, in which case we will
// have already recovered all of its resources).
- if (users.contains(frameworkId) &&
- sorters[users[frameworkId]]->contains(frameworkId.value())) {
- std::string user = users[frameworkId];
+ if (frameworks.contains(frameworkId) &&
+ sorters[frameworks[frameworkId].user()]->contains(frameworkId.value())) {
+ const std::string& user = frameworks[frameworkId].user();
sorters[user]->unallocated(frameworkId.value(), resources);
sorters[user]->remove(resources);
userSorter->unallocated(user, resources);
@@ -556,12 +610,12 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
// Update resources allocatable on slave (if slave still exists,
// which it might not in the event that we dispatched Master::offer
// before we received Allocator::slaveRemoved).
- if (allocatable.contains(slaveId)) {
- allocatable[slaveId] += resources;
+ if (slaves.contains(slaveId)) {
+ slaves[slaveId].available += resources;
LOG(INFO) << "Recovered " << resources.allocatable()
- << " (total allocatable: " << allocatable[slaveId] << ")"
- << " on slave " << slaveId
+ << " (total allocatable: " << slaves[slaveId].available
+ << ") on slave " << slaveId
<< " from framework " << frameworkId;
}
}
@@ -574,18 +628,14 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(
{
CHECK(initialized);
- foreach (Filter* filter, filters.get(frameworkId)) {
- filters.remove(frameworkId, filter);
-
- // We delete each actual Filter when
- // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
- // Filter here it's possible that the same Filter (i.e., same
- // address) could get reused and HierarchicalAllocatorProcess::expire
- // would expire that filter too soon. Note that this only works
- // right now because ALL Filter types "expire".
- }
+ frameworks[frameworkId].filters.clear();
- filters.remove(frameworkId);
+ // We delete each actual Filter when
+ // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
+ // Filter here it's possible that the same Filter (i.e., same
+ // address) could get reused and HierarchicalAllocatorProcess::expire
+ // would expire that filter too soon. Note that this only works
+ // right now because ALL Filter types "expire".
LOG(INFO) << "Removed filters for framework " << frameworkId;
@@ -651,38 +701,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
return;
}
- // Get out only "available" resources (i.e., resources that are
- // allocatable and above a certain threshold, see below).
- hashmap<SlaveID, Resources> available;
- foreachpair (const SlaveID& slaveId, Resources resources, allocatable) {
- if (!slaveIds.contains(slaveId)) {
- continue;
- }
-
- if (isWhitelisted(slaveId)) {
- resources = resources.allocatable(); // Make sure they're allocatable.
-
- // TODO(benh): For now, only make offers when there is some cpu
- // and memory left. This is an artifact of the original code that
- // only offered when there was at least 1 cpu "unit" available,
- // and without doing this a framework might get offered resources
- // with only memory available (which it obviously will decline)
- // and then end up waiting the default Filters::refuse_seconds
- // (unless the framework set it to something different).
-
- Value::Scalar none;
- Value::Scalar cpus = resources.get("cpus", none);
- Value::Scalar mem = resources.get("mem", none);
-
- if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
- VLOG(1) << "Found available resources: " << resources
- << " on slave " << slaveId;
- available[slaveId] = resources;
- }
- }
- }
-
- if (available.size() == 0) {
+ if (slaveIds.empty()) {
VLOG(1) << "No resources available to allocate!";
return;
}
@@ -694,9 +713,13 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
Resources allocatedResources;
hashmap<SlaveID, Resources> offerable;
- foreachpair (const SlaveID& slaveId,
- const Resources& resources,
- available) {
+ foreach (const SlaveID& slaveId, slaveIds) {
+ if (!slaves[slaveId].allocatable()) {
+ continue;
+ }
+
+ Resources resources = slaves[slaveId].available;
+
// Check whether or not this framework filters this slave.
bool filtered = isFiltered(frameworkId, slaveId, resources);
@@ -708,16 +731,12 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
offerable[slaveId] = resources;
// Update framework and slave resources.
- allocatable[slaveId] -= resources;
+ slaves[slaveId].available -= resources;
allocatedResources += resources;
}
}
- if (offerable.size() > 0) {
- foreachkey (const SlaveID& slaveId, offerable) {
- available.erase(slaveId);
- }
-
+ if (!offerable.empty()) {
sorters[user]->add(allocatedResources);
sorters[user]->allocated(frameworkIdValue, allocatedResources);
userSorter->allocated(user, allocatedResources);
@@ -740,9 +759,11 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
// HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
// keep the address from getting reused possibly causing premature
// expiration).
- if (users.contains(frameworkId) && filters.contains(frameworkId, filter)) {
- filters.remove(frameworkId, filter);
+ if (frameworks.contains(frameworkId) &&
+ frameworks[frameworkId].filters.contains(filter)) {
+ frameworks[frameworkId].filters.erase(filter);
}
+
delete filter;
}
@@ -769,7 +790,9 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
const Resources& resources)
{
bool filtered = false;
- foreach (Filter* filter, filters.get(frameworkId)) {
+
+ CHECK(frameworks.contains(frameworkId));
+ foreach (Filter* filter, frameworks[frameworkId].filters) {
if (filter->filter(slaveId, resources)) {
VLOG(1) << "Filtered " << resources
<< " on slave " << slaveId
@@ -781,6 +804,7 @@ HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
return filtered;
}
+} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 3c1cf4c..19fcb9f 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -114,8 +114,10 @@ int main(int argc, char** argv)
LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
LOG(INFO) << "Starting Mesos master";
- AllocatorProcess* allocatorProcess = new HierarchicalDRFAllocatorProcess();
- Allocator* allocator = new Allocator(allocatorProcess);
+ allocator::AllocatorProcess* allocatorProcess =
+ new allocator::HierarchicalDRFAllocatorProcess();
+ allocator::Allocator* allocator =
+ new allocator::Allocator(allocatorProcess);
Files files;
Master* master = new Master(allocator, &files, flags);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d031b95..a2e4b90 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -60,6 +60,8 @@ namespace mesos {
namespace internal {
namespace master {
+using allocator::Allocator;
+
class WhitelistWatcher : public Process<WhitelistWatcher> {
public:
WhitelistWatcher(const string& _path, Allocator* _allocator)
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 82e3596..8e7b74c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -57,7 +57,12 @@ namespace master {
using namespace process; // Included to make code easier to read.
// Forward declarations.
-class Allocator;
+namespace allocator {
+
+ class Allocator;
+
+}
+
class SlaveObserver;
class WhitelistWatcher;
@@ -68,8 +73,8 @@ struct Slave;
class Master : public ProtobufProcess<Master>
{
public:
- Master(Allocator* allocator, Files* files);
- Master(Allocator* allocator,
+ Master(allocator::Allocator* allocator, Files* files);
+ Master(allocator::Allocator* allocator,
Files* files,
const Flags& flags);
@@ -210,7 +215,7 @@ private:
bool elected;
- Allocator* allocator;
+ allocator::Allocator* allocator;
WhitelistWatcher* whitelistWatcher;
Files* files;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/master/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/sorter.hpp b/src/master/sorter.hpp
index 73db6b1..e54be3b 100644
--- a/src/master/sorter.hpp
+++ b/src/master/sorter.hpp
@@ -25,6 +25,7 @@
namespace mesos {
namespace internal {
namespace master {
+namespace allocator {
// Sorters implement the logic for determining the
// order in which users or frameworks should receive
@@ -78,6 +79,7 @@ public:
virtual int count() = 0;
};
+} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp
index b153dee..32f0a90 100644
--- a/src/tests/allocator_tests.cpp
+++ b/src/tests/allocator_tests.cpp
@@ -38,8 +38,9 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
-using mesos::internal::master::Allocator;
-using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::allocator::Allocator;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
+
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
@@ -693,6 +694,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
slave::Flags flags = this->CreateSlaveFlags();
+
flags.resources = Option<string>("cpus:3;mem:1024");
EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/allocator_zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_zookeeper_tests.cpp b/src/tests/allocator_zookeeper_tests.cpp
index 1034d72..1daaecd 100644
--- a/src/tests/allocator_zookeeper_tests.cpp
+++ b/src/tests/allocator_zookeeper_tests.cpp
@@ -35,8 +35,9 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
-using mesos::internal::master::Allocator;
-using mesos::internal::master::AllocatorProcess;
+using mesos::internal::master::allocator::Allocator;
+using mesos::internal::master::allocator::AllocatorProcess;
+
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 25cd554..f743bb3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -77,7 +77,7 @@ public:
// expected to outlive the launched master (i.e., until it is
// stopped via Masters::stop).
Try<process::PID<master::Master> > start(
- master::AllocatorProcess* allocatorProcess,
+ master::allocator::AllocatorProcess* allocatorProcess,
const master::Flags& flags = master::Flags());
// Stops and cleans up a master at the specified PID.
@@ -106,8 +106,8 @@ public:
detector(NULL) {}
master::Master* master;
- master::Allocator* allocator;
- master::AllocatorProcess* allocatorProcess;
+ master::allocator::Allocator* allocator;
+ master::allocator::AllocatorProcess* allocatorProcess;
MasterDetector* detector;
};
@@ -221,8 +221,8 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
Master master;
- master.allocatorProcess = new master::HierarchicalDRFAllocatorProcess();
- master.allocator = new master::Allocator(master.allocatorProcess);
+ master.allocatorProcess = new master::allocator::HierarchicalDRFAllocatorProcess();
+ master.allocator = new master::allocator::Allocator(master.allocatorProcess);
master.master = new master::Master(master.allocator, &cluster->files, flags);
process::PID<master::Master> pid = process::spawn(master.master);
@@ -240,7 +240,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
inline Try<process::PID<master::Master> > Cluster::Masters::start(
- master::AllocatorProcess* allocatorProcess,
+ master::allocator::AllocatorProcess* allocatorProcess,
const master::Flags& flags)
{
// Disallow multiple masters when not using ZooKeeper.
@@ -250,7 +250,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
Master master;
master.allocatorProcess = NULL;
- master.allocator = new master::Allocator(allocatorProcess);
+ master.allocator = new master::allocator::Allocator(allocatorProcess);
master.master = new master::Master(master.allocator, &cluster->files, flags);
process::PID<master::Master> pid = process::spawn(master.master);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index d811267..6dbf7f3 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -54,7 +54,7 @@ Try<process::PID<master::Master> > MesosTest::StartMaster(
Try<process::PID<master::Master> > MesosTest::StartMaster(
- master::AllocatorProcess* allocator,
+ master::allocator::AllocatorProcess* allocator,
const Option<master::Flags>& flags)
{
return cluster.masters.start(
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 12298ae..fca41aa 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -82,7 +82,7 @@ protected:
// Starts a master with the specified allocator process and flags.
virtual Try<process::PID<master::Master> > StartMaster(
- master::AllocatorProcess* allocator,
+ master::allocator::AllocatorProcess* allocator,
const Option<master::Flags>& flags = None());
// Starts a slave with the specified flags.
@@ -321,8 +321,8 @@ public:
};
-template <typename T = master::AllocatorProcess>
-class MockAllocatorProcess : public master::AllocatorProcess
+template <typename T = master::allocator::AllocatorProcess>
+class MockAllocatorProcess : public master::allocator::AllocatorProcess
{
public:
MockAllocatorProcess()
@@ -404,7 +404,7 @@ public:
};
-typedef ::testing::Types<master::HierarchicalDRFAllocatorProcess>
+typedef ::testing::Types<master::allocator::HierarchicalDRFAllocatorProcess>
AllocatorTypes;
@@ -418,7 +418,7 @@ ACTION_P(InvokeInitialize, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::initialize,
+ &master::allocator::AllocatorProcess::initialize,
arg0,
arg1);
}
@@ -428,7 +428,7 @@ ACTION_P(InvokeFrameworkAdded, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::frameworkAdded,
+ &master::allocator::AllocatorProcess::frameworkAdded,
arg0,
arg1,
arg2);
@@ -439,7 +439,7 @@ ACTION_P(InvokeFrameworkRemoved, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::frameworkRemoved, arg0);
+ &master::allocator::AllocatorProcess::frameworkRemoved, arg0);
}
@@ -447,7 +447,7 @@ ACTION_P(InvokeFrameworkActivated, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::frameworkActivated,
+ &master::allocator::AllocatorProcess::frameworkActivated,
arg0,
arg1);
}
@@ -457,7 +457,7 @@ ACTION_P(InvokeFrameworkDeactivated, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::frameworkDeactivated,
+ &master::allocator::AllocatorProcess::frameworkDeactivated,
arg0);
}
@@ -466,7 +466,7 @@ ACTION_P(InvokeSlaveAdded, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::slaveAdded,
+ &master::allocator::AllocatorProcess::slaveAdded,
arg0,
arg1,
arg2);
@@ -477,7 +477,7 @@ ACTION_P(InvokeSlaveRemoved, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::slaveRemoved,
+ &master::allocator::AllocatorProcess::slaveRemoved,
arg0);
}
@@ -486,7 +486,7 @@ ACTION_P(InvokeUpdateWhitelist, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::updateWhitelist,
+ &master::allocator::AllocatorProcess::updateWhitelist,
arg0);
}
@@ -495,7 +495,7 @@ ACTION_P(InvokeResourcesRequested, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::resourcesRequested,
+ &master::allocator::AllocatorProcess::resourcesRequested,
arg0,
arg1);
}
@@ -506,7 +506,7 @@ ACTION_P(InvokeResourcesUnused, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::resourcesUnused,
+ &master::allocator::AllocatorProcess::resourcesUnused,
arg0,
arg1,
arg2,
@@ -521,7 +521,7 @@ ACTION_P2(InvokeUnusedWithFilters, allocator, timeout)
process::dispatch(
allocator->real,
- &master::AllocatorProcess::resourcesUnused,
+ &master::allocator::AllocatorProcess::resourcesUnused,
arg0,
arg1,
arg2,
@@ -533,7 +533,7 @@ ACTION_P(InvokeResourcesRecovered, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::resourcesRecovered,
+ &master::allocator::AllocatorProcess::resourcesRecovered,
arg0,
arg1,
arg2);
@@ -544,7 +544,7 @@ ACTION_P(InvokeOffersRevived, allocator)
{
process::dispatch(
allocator->real,
- &master::AllocatorProcess::offersRevived,
+ &master::allocator::AllocatorProcess::offersRevived,
arg0);
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index b066403..3d5f02d 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -34,7 +34,8 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
-using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
+
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/11e8a1db/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index 95e8d28..0a50b43 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -25,8 +25,8 @@
using namespace mesos::internal;
-using mesos::internal::master::Sorter;
-using mesos::internal::master::DRFSorter;
+using mesos::internal::master::allocator::Sorter;
+using mesos::internal::master::allocator::DRFSorter;
using std::list;
using std::string;