You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/20 23:41:03 UTC
svn commit: r1375239 [1/2] - in /incubator/mesos/trunk/src: ./ local/
master/ tests/
Author: benh
Date: Mon Aug 20 21:41:02 2012
New Revision: 1375239
URL: http://svn.apache.org/viewvc?rev=1375239&view=rev
Log:
Adding more tests to the allocator and improving the allocator design
to use a hierarchical allocator (two levels, users and frameworks) by
default that uses "sorters" to decide how to rank users and frameworks
(contributed by Thomas Marshall, https://reviews.apache.org/r/6037,
https://reviews.apache.org/r/5599, and
https://reviews.apache.org/r/5913).
Added:
incubator/mesos/trunk/src/master/allocator.cpp
incubator/mesos/trunk/src/master/drf_sorter.cpp
incubator/mesos/trunk/src/master/drf_sorter.hpp
incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
incubator/mesos/trunk/src/master/sorter.hpp
incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
Removed:
incubator/mesos/trunk/src/master/dominant_share_allocator.cpp
incubator/mesos/trunk/src/master/dominant_share_allocator.hpp
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/local/local.cpp
incubator/mesos/trunk/src/local/local.hpp
incubator/mesos/trunk/src/master/allocator.hpp
incubator/mesos/trunk/src/master/flags.hpp
incubator/mesos/trunk/src/master/main.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/tests/allocator_tests.cpp
incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
incubator/mesos/trunk/src/tests/gc_tests.cpp
incubator/mesos/trunk/src/tests/master_detector_tests.cpp
incubator/mesos/trunk/src/tests/master_tests.cpp
incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
incubator/mesos/trunk/src/tests/utils.hpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Mon Aug 20 21:41:02 2012
@@ -151,9 +151,9 @@ noinst_LTLIBRARIES += libmesos_no_third_
nodist_libmesos_no_third_party_la_SOURCES = $(CXX_PROTOS) $(MESSAGES_PROTOS)
libmesos_no_third_party_la_SOURCES = sched/sched.cpp local/local.cpp \
- master/master.cpp master/http.cpp master/slaves_manager.cpp \
- master/frameworks_manager.cpp \
- master/dominant_share_allocator.cpp slave/gc.cpp \
+ master/allocator.cpp master/drf_sorter.cpp \
+ master/frameworks_manager.cpp master/http.cpp master/master.cpp \
+ master/slaves_manager.cpp slave/gc.cpp \
slave/slave.cpp slave/http.cpp slave/isolation_module.cpp \
slave/process_based_isolation_module.cpp slave/reaper.cpp \
launcher/launcher.cpp exec/exec.cpp common/lock.cpp \
@@ -197,10 +197,11 @@ libmesos_no_third_party_la_SOURCES += co
launcher/launcher.hpp linux/cgroups.hpp linux/fs.hpp \
linux/proc.hpp local/flags.hpp local/local.hpp \
logging/flags.hpp logging/logging.hpp master/allocator.hpp \
- master/constants.hpp master/flags.hpp \
- master/frameworks_manager.hpp master/http.hpp \
- master/master.hpp master/dominant_share_allocator.hpp \
- master/slaves_manager.hpp master/webui.hpp \
+ master/constants.hpp master/drf_sorter.hpp \
+ master/flags.hpp master/frameworks_manager.hpp \
+ master/hierarchical_allocator_process.hpp \
+ master/http.hpp master/master.hpp \
+ master/slaves_manager.hpp master/sorter.hpp master/webui.hpp \
messages/messages.hpp slave/constants.hpp slave/flags.hpp \
slave/gc.hpp slave/http.hpp slave/isolation_module.hpp \
slave/isolation_module_factory.hpp \
@@ -781,6 +782,7 @@ mesos_tests_SOURCES = tests/main.cpp tes
tests/exception_tests.cpp \
tests/attributes_tests.cpp \
tests/master_detector_tests.cpp \
+ tests/sorter_tests.cpp \
tests/allocator_tests.cpp
mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -804,7 +806,8 @@ if HAS_JAVA
mesos_tests_SOURCES += tests/zookeeper_server.cpp \
tests/base_zookeeper_test.cpp \
tests/zookeeper_server_tests.cpp \
- tests/zookeeper_tests.cpp
+ tests/zookeeper_tests.cpp \
+ tests/allocator_zookeeper_tests.cpp
mesos_tests_CPPFLAGS += $(JAVA_CPPFLAGS)
mesos_tests_CPPFLAGS += -DZOOKEEPER_VERSION=\"$(ZOOKEEPER_VERSION)\"
mesos_tests_LDFLAGS = $(JAVA_LDFLAGS) $(AM_LDFLAGS)
Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Mon Aug 20 21:41:02 2012
@@ -33,7 +33,6 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"
-#include "master/dominant_share_allocator.hpp"
#include "master/master.hpp"
#include "slave/process_based_isolation_module.hpp"
@@ -41,8 +40,7 @@
using namespace mesos::internal;
-using mesos::internal::master::Allocator;
-using mesos::internal::master::DominantShareAllocator;
+using mesos::internal::master::AllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
@@ -62,7 +60,7 @@ namespace mesos {
namespace internal {
namespace local {
-static Allocator* allocator = NULL;
+static AllocatorProcess* allocator = NULL;
static Master* master = NULL;
static map<IsolationModule*, Slave*> slaves;
static MasterDetector* detector = NULL;
@@ -72,7 +70,7 @@ PID<Master> launch(int numSlaves,
int32_t cpus,
int64_t mem,
bool quiet,
- Allocator* _allocator)
+ AllocatorProcess* _allocator)
{
Configuration configuration;
configuration.set("slaves", "*");
@@ -87,7 +85,7 @@ PID<Master> launch(int numSlaves,
}
-PID<Master> launch(const Configuration& configuration, Allocator* _allocator)
+PID<Master> launch(const Configuration& configuration, AllocatorProcess* _allocator)
{
int numSlaves = configuration.get<int>("num_slaves", 1);
bool quiet = configuration.get<bool>("quiet", false);
@@ -98,7 +96,7 @@ PID<Master> launch(const Configuration&
if (_allocator == NULL) {
// Create default allocator, save it for deleting later.
- _allocator = allocator = new DominantShareAllocator();
+ _allocator = allocator = AllocatorProcess::create("drf", "drf");
} else {
// TODO(benh): Figure out the behavior of allocator pointer and remove the
// else block.
Modified: incubator/mesos/trunk/src/local/local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.hpp (original)
+++ incubator/mesos/trunk/src/local/local.hpp Mon Aug 20 21:41:02 2012
@@ -37,12 +37,12 @@ process::PID<master::Master> launch(int
int32_t cpus,
int64_t mem,
bool quiet,
- master::Allocator* _allocator = NULL);
+ master::AllocatorProcess* _allocator = NULL);
// Launch a local cluster with a given configuration.
process::PID<master::Master> launch(const Configuration& configuration,
- master::Allocator* _allocator = NULL);
+ master::AllocatorProcess* _allocator = NULL);
void shutdown();
Added: incubator/mesos/trunk/src/master/allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/allocator.cpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/allocator.cpp (added)
+++ incubator/mesos/trunk/src/master/allocator.cpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "master/allocator.hpp"
+#include "master/drf_sorter.hpp"
+#include "master/hierarchical_allocator_process.hpp"
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+AllocatorProcess* AllocatorProcess::create(
+ const string& userSorterType,
+ const string& frameworkSorterType)
+{
+ return new HierarchicalAllocatorProcess<DRFSorter, DRFSorter>();
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/trunk/src/master/allocator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/allocator.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/allocator.hpp (original)
+++ incubator/mesos/trunk/src/master/allocator.hpp Mon Aug 20 21:41:02 2012
@@ -20,7 +20,6 @@
#define __ALLOCATOR_HPP__
#include <stout/hashmap.hpp>
-#include <stout/option.hpp>
#include "common/resources.hpp"
@@ -38,9 +37,9 @@ namespace master {
// framework when tasks finish/fail (or are lost due to a slave
// failure) or when an offer is rescinded.
-class Allocator : public process::Process<Allocator> {
+class AllocatorProcess : public process::Process<AllocatorProcess> {
public:
- virtual ~Allocator() {}
+ virtual ~AllocatorProcess() {}
virtual void initialize(const Flags& flags,
const process::PID<Master>& master) = 0;
@@ -88,6 +87,9 @@ public:
// Whenever a framework that has filtered resources wants to revive
// offers for those resources the master invokes this callback.
virtual void offersRevived(const FrameworkID& frameworkId) = 0;
+
+ static AllocatorProcess* create(const std::string& userSorterType,
+ const std::string& frameworkSorterType);
};
} // namespace master {
Added: incubator/mesos/trunk/src/master/drf_sorter.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.cpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.cpp (added)
+++ incubator/mesos/trunk/src/master/drf_sorter.cpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "master/drf_sorter.hpp"
+
+using std::list;
+using std::set;
+using std::string;
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+bool DRFComparator::operator () (
+ const Client& client1,
+ const Client& client2)
+{
+ if (client1.share == client2.share) {
+ return client1.name < client2.name;
+ }
+ return client1.share < client2.share;
+}
+
+
+void DRFSorter::add(const string& name)
+{
+ Client client;
+ client.name = name;
+ client.share = 0;
+ clients.insert(client);
+
+ allocations[name] = Resources::parse("");
+}
+
+
+void DRFSorter::remove(const string& name)
+{
+ set<Client, DRFComparator>::iterator it = find(name);
+
+ if (it != clients.end()) {
+ clients.erase(it);
+ }
+
+ allocations.erase(name);
+}
+
+
+void DRFSorter::activate(const string& name)
+{
+ CHECK(allocations.contains(name));
+
+ Client client;
+ client.name = name;
+ client.share = calculateShare(name);
+ clients.insert(client);
+}
+
+
+void DRFSorter::deactivate(const string& name)
+{
+ set<Client, DRFComparator>::iterator it = find(name);
+
+ if (it != clients.end()) {
+ clients.erase(it);
+ }
+}
+
+
+void DRFSorter::allocated(
+ const string& name,
+ const Resources& resources)
+{
+ allocations[name] += resources;
+
+ // If the total resources have changed, we're going to
+ // recalculate all the shares, so don't bother just
+ // updating this client.
+ if (!dirty) {
+ update(name);
+ }
+}
+
+
+Resources DRFSorter::allocation(
+ const string& name)
+{
+ return allocations[name];
+}
+
+
+void DRFSorter::unallocated(
+ const string& name,
+ const Resources& resources)
+{
+ allocations[name] -= resources;
+
+ if (!dirty) {
+ update(name);
+ }
+}
+
+
+void DRFSorter::add(const Resources& _resources)
+{
+ resources += _resources;
+
+ // We have to recalculate all shares when the total resources
+ // change, but we put it off until sort is called
+ // so that if something else changes before the next allocation
+ // we don't recalculate everything twice.
+ dirty = true;
+}
+
+
+void DRFSorter::remove(const Resources& _resources)
+{
+ resources -= _resources;
+ dirty = true;
+}
+
+
+list<string> DRFSorter::sort()
+{
+ if (dirty) {
+ set<Client, DRFComparator> temp;
+
+ set<Client, DRFComparator>::iterator it;
+ for (it = clients.begin(); it != clients.end(); it++) {
+ Client client;
+ client.name = (*it).name;
+ client.share = calculateShare((*it).name);
+ temp.insert(client);
+ }
+
+ clients = temp;
+ }
+
+ list<string> ret;
+
+ set<Client, DRFComparator>::iterator it;
+ for (it = clients.begin(); it != clients.end(); it++) {
+ ret.push_back((*it).name);
+ }
+
+ return ret;
+}
+
+
+bool DRFSorter::contains(const string& name)
+{
+ return allocations.contains(name);
+}
+
+
+int DRFSorter::count()
+{
+ return allocations.size();
+}
+
+void DRFSorter::update(const string& name)
+{
+ set<Client, DRFComparator>::iterator it;
+ it = find(name);
+ clients.erase(it);
+
+ Client client;
+ client.name = name;
+ client.share = calculateShare(name);
+ clients.insert(client);
+}
+
+
+double DRFSorter::calculateShare(const string& name)
+{
+ double share = 0;
+
+ // TODO(benh): This implementaion of "dominant resource fairness"
+ // currently does not take into account resources that are not
+ // scalars.
+
+ foreach (const Resource& resource, resources) {
+ if (resource.type() == Value::SCALAR) {
+ double total = resource.scalar().value();
+
+ if (total > 0) {
+ Value::Scalar none;
+ const Value::Scalar& scalar = allocations[name].get(resource.name(), none);
+ share = std::max(share, scalar.value() / total);
+ }
+ }
+ }
+
+ return share;
+}
+
+
+set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
+{
+ set<Client, DRFComparator>::iterator it;
+ for (it = clients.begin(); it != clients.end(); it++) {
+ if (name == (*it).name) {
+ break;
+ }
+ }
+
+ return it;
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
Added: incubator/mesos/trunk/src/master/drf_sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.hpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.hpp (added)
+++ incubator/mesos/trunk/src/master/drf_sorter.hpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __DRF_SORTER_HPP__
+#define __DRF_SORTER_HPP__
+
+#include "master/sorter.hpp"
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+struct Client
+{
+ std::string name;
+ double share;
+};
+
+
+struct DRFComparator
+{
+ virtual bool operator () (const Client& client1,
+ const Client& client2);
+};
+
+
+typedef std::set<Client, DRFComparator> drfSet;
+
+
+class DRFSorter : public Sorter
+{
+public:
+ virtual ~DRFSorter() {}
+
+ virtual void add(const std::string& name);
+
+ virtual void remove(const std::string& name);
+
+ virtual void activate(const std::string& name);
+
+ virtual void deactivate(const std::string& name);
+
+ virtual void allocated(const std::string& name,
+ const Resources& resources);
+
+ virtual void unallocated(const std::string& name,
+ const Resources& resources);
+
+ virtual Resources allocation(const std::string& name);
+
+ virtual void add(const Resources& resources);
+
+ virtual void remove(const Resources& resources);
+
+ virtual std::list<std::string> sort();
+
+ virtual bool contains(const std::string& name);
+
+ virtual int count();
+
+private:
+ // Recalculates the share for the client and moves
+ // it in 'clients' accordingly.
+ void update(const std::string& name);
+
+ // Returns the dominant resource share for the client.
+ double calculateShare(const std::string& name);
+
+ // Returns an iterator to the specified client, if
+ // it exists in this Sorter.
+ std::set<Client, DRFComparator>::iterator find(const std::string& name);
+
+ // If true, start() will recalculate all shares.
+ bool dirty;
+
+ // A set of Clients (names and shares) sorted by share.
+ std::set<Client, DRFComparator> clients;
+
+ // Maps client names to the resources they have been allocated.
+ hashmap<std::string, Resources> allocations;
+
+ // Total resources.
+ Resources resources;
+};
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __DRF_SORTER_HPP__
Modified: incubator/mesos/trunk/src/master/flags.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/flags.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/flags.hpp (original)
+++ incubator/mesos/trunk/src/master/flags.hpp Mon Aug 20 21:41:02 2012
@@ -61,6 +61,20 @@ public:
"should be of the form: file://path/to/file",
"*");
+ add(&Flags::user_sorter,
+ "user_sorter",
+ "Policy to use for allocating resources\n"
+ "between users. May be one of:\n"
+ " dominant_resource_fairness (drf)",
+ "drf");
+
+ add(&Flags::framework_sorter,
+ "framework_sorter",
+ "Policy to use for allocating resources\n"
+ "between a given user's frameworks. Options\n"
+ "are the same as for user_allocator",
+ "drf");
+
add(&Flags::batch_seconds,
"batch_seconds",
"Seconds to wait between batch allocations",
@@ -77,6 +91,8 @@ public:
std::string webui_dir;
uint16_t webui_port;
std::string whitelist;
+ std::string user_sorter;
+ std::string framework_sorter;
double batch_seconds;
Option<std::string> cluster;
};
Added: incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp (added)
+++ incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
+#define __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
+
+#include <process/delay.hpp>
+#include <process/timer.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/timer.hpp>
+
+#include "common/resources.hpp"
+
+#include "master/allocator.hpp"
+#include "master/master.hpp"
+#include "master/sorter.hpp"
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+// Forward declarations.
+class Filter;
+
+
+// Implements the basic allocator algorithm - first pick
+// a user by some criteria, then pick one of their
+// frameworks to allocate to.
+template <class UserSorter, class FrameworkSorter>
+class HierarchicalAllocatorProcess : public AllocatorProcess
+{
+public:
+ HierarchicalAllocatorProcess() : initialized(false) {}
+
+ virtual ~HierarchicalAllocatorProcess() {}
+
+ process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> > self()
+ {
+ return process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >(this);
+ }
+
+ void initialize(const Flags& flags,
+ const process::PID<Master>& _master);
+
+ void frameworkAdded(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used);
+
+ void frameworkRemoved(const FrameworkID& frameworkId);
+
+ void frameworkActivated(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo);
+
+ void frameworkDeactivated(const FrameworkID& frameworkId);
+
+ void slaveAdded(const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const hashmap<FrameworkID, Resources>& used);
+
+ void slaveRemoved(const SlaveID& slaveId);
+
+ void updateWhitelist(
+ const Option<hashset<std::string> >& whitelist);
+
+ void resourcesRequested(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests);
+
+ void resourcesUnused(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters);
+
+ void resourcesRecovered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources);
+
+ void offersRevived(const FrameworkID& frameworkId);
+
+protected:
+ // Callback for doing batch allocations.
+ void batch();
+
+ // Allocate any allocatable resources.
+ void allocate();
+
+ // Allocate resources just from the specified slave.
+ void allocate(const SlaveID& slaveId);
+
+ // Allocate resources from the specified slaves.
+ void allocate(const hashset<SlaveID>& slaveIds);
+
+ // Remove a filter for the specified framework.
+ void expire(const FrameworkID& frameworkId, Filter* filter);
+
+ // Checks whether the slave is whitelisted.
+ bool isWhitelisted(const SlaveID& slave);
+
+ // Returns true if there is a filter for this framework
+ // on this slave.
+ bool isFiltered(const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources);
+
+ bool initialized;
+
+ Flags flags;
+ PID<Master> master;
+
+ // Maps FrameworkIDs to user names.
+ hashmap<FrameworkID, std::string> users;
+
+ // Maps user names to the Sorter object which contains
+ // all of that user's frameworks.
+ hashmap<std::string, FrameworkSorter*> sorters;
+
+ // Maps slaves to their allocatable resources.
+ hashmap<SlaveID, Resources> allocatable;
+
+ // Contains all active slaves.
+ hashmap<SlaveID, SlaveInfo> slaves;
+
+ // Filters that have been added by frameworks.
+ multihashmap<FrameworkID, Filter*> filters;
+
+ // Slaves to send offers for.
+ Option<hashset<std::string> > whitelist;
+
+ // Sorter containing all active users.
+ UserSorter* userSorter;
+};
+
+
+// Used to represent "filters" for resources unused in offers.
+class Filter
+{
+public:
+ virtual ~Filter() {}
+ virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
+};
+
+
+class RefusedFilter : public Filter
+{
+public:
+ RefusedFilter(const SlaveID& _slaveId,
+ const Resources& _resources,
+ const Timeout& _timeout)
+ : slaveId(_slaveId),
+ resources(_resources),
+ timeout(_timeout) {}
+
+ virtual bool filter(const SlaveID& slaveId, const Resources& resources)
+ {
+ return slaveId == this->slaveId &&
+ resources <= this->resources && // Refused resources are superset.
+ timeout.remaining() > 0.0;
+ }
+
+ const SlaveID slaveId;
+ const Resources resources;
+ const Timeout timeout;
+};
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::initialize(
+ const Flags& _flags,
+ const process::PID<Master>& _master)
+{
+ flags = _flags;
+ master = _master;
+ initialized = true;
+ userSorter = new UserSorter();
+
+ delay(flags.batch_seconds, self(),
+ &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const Resources& used)
+{
+ CHECK(initialized);
+
+ std::string user = frameworkInfo.user();
+ if (!userSorter->contains(user)) {
+ userSorter->add(user);
+ sorters[user] = new FrameworkSorter();
+ }
+
+ CHECK(!sorters[user]->contains(frameworkId.value()));
+ sorters[user]->add(frameworkId.value());
+
+ // Update the allocation to this framework.
+ userSorter->allocated(user, used);
+ sorters[user]->add(used);
+ sorters[user]->allocated(frameworkId.value(), used);
+
+ users[frameworkId] = frameworkInfo.user();
+
+ LOG(INFO) << "Added framework " << frameworkId;
+
+ allocate();
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ std::string user = users[frameworkId];
+ // Might not be in 'sorters[user]' because it was previously
+ // deactivated and never re-added.
+ if (sorters[user]->contains(frameworkId.value())) {
+ Resources allocation = sorters[user]->allocation(frameworkId.value());
+ userSorter->unallocated(user, allocation);
+ sorters[user]->remove(allocation);
+ sorters[user]->remove(frameworkId.value());
+ }
+
+ users.erase(frameworkId);
+
+ // If this user doesn't have any more active frameworks, remove it.
+ if (sorters[user]->count() == 0) {
+ Sorter* s = sorters[user];
+ sorters.erase(user);
+ delete s;
+
+ userSorter->remove(user);
+ }
+
+ foreach (Filter* filter, filters.get(frameworkId)) {
+ filters.remove(frameworkId, filter);
+
+ // Do not delete the filter, see comments in
+ // HierarchicalAllocatorProcess::offersRevived and
+ // HierarchicalAllocatorProcess::expire.
+ }
+
+ filters.remove(frameworkId);
+
+ LOG(INFO) << "Removed framework " << frameworkId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo)
+{
+ CHECK(initialized);
+
+ std::string user = frameworkInfo.user();
+ sorters[user]->activate(frameworkId.value());
+
+ LOG(INFO) << "Activated framework " << frameworkId;
+
+ allocate();
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ std::string user = users[frameworkId];
+ sorters[user]->deactivate(frameworkId.value());
+
+ // Note that the Sorter *does not* remove the resources allocated
+ // to this framework. For now, this is important because if the
+ // framework fails over and is activated, we still want a record
+ // of the resources that it is using. We might be able to collapse
+ // the added/removed and activated/deactivated in the future.
+
+ foreach (Filter* filter, filters.get(frameworkId)) {
+ filters.remove(frameworkId, filter);
+
+ // Do not delete the filter, see comments in
+ // HierarchicalAllocatorProcess::offersRevived and
+ // HierarchicalAllocatorProcess::expire.
+ }
+
+ filters.remove(frameworkId);
+
+ LOG(INFO) << "Deactivated framework " << frameworkId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
+ const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const hashmap<FrameworkID, Resources>& used)
+{
+ CHECK(initialized);
+
+ CHECK(!slaves.contains(slaveId));
+
+ slaves[slaveId] = slaveInfo;
+
+ userSorter->add(slaveInfo.resources());
+
+ Resources unused = slaveInfo.resources();
+
+ foreachpair (const FrameworkID& frameworkId, const Resources& resources, used) {
+ if (users.contains(frameworkId)) {
+ const std::string& user = users[frameworkId];
+ sorters[user]->add(resources);
+ sorters[user]->allocated(frameworkId.value(), resources);
+ userSorter->allocated(user, resources);
+ }
+
+ unused -= resources; // Only want to allocate resources that are not used!
+ }
+
+ allocatable[slaveId] = unused;
+
+ LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
+ << ") with " << slaveInfo.resources()
+ << " (and " << unused << " available)";
+
+ allocate(slaveId);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(const SlaveID& slaveId)
+{
+ CHECK(initialized);
+
+ CHECK(slaves.contains(slaveId));
+
+ userSorter->remove(slaves[slaveId].resources());
+
+ slaves.erase(slaveId);
+
+ allocatable.erase(slaveId);
+
+ // Note that we DO NOT actually delete any filters associated with
+ // this slave, that will occur when the delayed
+ // HierarchicalAllocatorProcess::expire gets invoked (or the framework
+ // that applied the filters gets removed).
+
+ LOG(INFO) << "Removed slave " << slaveId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
+ const Option<hashset<std::string> >& _whitelist)
+{
+ CHECK(initialized);
+
+ whitelist = _whitelist;
+
+ if (whitelist.isSome()) {
+ LOG(INFO) << "Updated slave white list:";
+ foreach (const std::string& hostname, whitelist.get()) {
+ LOG(INFO) << "\t" << hostname;
+ }
+ }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRequested(
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests)
+{
+ CHECK(initialized);
+
+ LOG(INFO) << "Received resource request from framework " << frameworkId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources,
+ const Option<Filters>& filters)
+{
+ CHECK(initialized);
+
+ if (resources.allocatable().size() == 0) {
+ return;
+ }
+
+ VLOG(1) << "Framework " << frameworkId
+ << " left " << resources.allocatable()
+ << " unused on slave " << slaveId;
+
+ // Update resources allocated to framework. It is
+ // not possible for the user to not be in users
+ // because resourcesUnused is only called as the
+ // result of a valid task launch by an active
+ // framework that doesn't use the entire offer.
+ CHECK(users.contains(frameworkId));
+
+ std::string user = users[frameworkId];
+ sorters[user]->unallocated(frameworkId.value(), resources);
+ sorters[user]->remove(resources);
+ userSorter->unallocated(user, resources);
+
+ // Update resources allocatable on slave.
+ CHECK(allocatable.contains(slaveId));
+ allocatable[slaveId] += resources;
+
+ // Create a refused resources filter.
+ double timeout = filters.isSome()
+ ? filters.get().refuse_seconds()
+ : Filters().refuse_seconds();
+
+ if (timeout != 0.0) {
+ LOG(INFO) << "Framework " << frameworkId
+ << " filtered slave " << slaveId
+ << " for " << timeout << " seconds\n";
+
+ // Create a new filter and delay it's expiration.
+ mesos::internal::master::Filter* filter = new RefusedFilter(slaveId, resources, timeout);
+ this->filters.put(frameworkId, filter);
+
+ delay(timeout, self(),
+ &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire, frameworkId, filter);
+ }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources)
+{
+ CHECK(initialized);
+
+ if (resources.allocatable().size() == 0) {
+ return;
+ }
+
+ // Updated resources allocated to framework (if framework still
+ // exists, which it might not in the event that we dispatched
+ // Master::offer before we received AllocatorProcess::frameworkRemoved
+ // or AllocatorProcess::frameworkDeactivated, in which case we will
+ // have already recovered all of its resources).
+ if (users.contains(frameworkId) &&
+ sorters[users[frameworkId]]->contains(frameworkId.value())) {
+ std::string user = users[frameworkId];
+ sorters[user]->unallocated(frameworkId.value(), resources);
+ sorters[user]->remove(resources);
+ userSorter->unallocated(user, resources);
+ }
+
+ // Update resources allocatable on slave (if slave still exists,
+ // which it might not in the event that we dispatched Master::offer
+ // before we received Allocator::slaveRemoved).
+ if (allocatable.contains(slaveId)) {
+ allocatable[slaveId] += resources;
+
+ VLOG(1) << "Recovered " << resources.allocatable()
+ << " on slave " << slaveId
+ << " from framework " << frameworkId;
+
+ }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(const FrameworkID& frameworkId)
+{
+ CHECK(initialized);
+
+ foreach (Filter* filter, filters.get(frameworkId)) {
+ filters.remove(frameworkId, filter);
+
+ // We delete each actual Filter when
+ // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
+ // Filter here it's possible that the same Filter (i.e., same
+ // address) could get reused and HierarchicalAllocatorProcess::expire
+ // would expire that filter too soon. Note that this only works
+ // right now because ALL Filter types "expire".
+ }
+
+ filters.remove(frameworkId);
+
+ LOG(INFO) << "Removed filters for framework " << frameworkId;
+
+ allocate();
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch()
+{
+ CHECK(initialized);
+ allocate();
+ delay(flags.batch_seconds, self(),
+ &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate()
+{
+ CHECK(initialized);
+
+ ::Timer timer;
+ timer.start();
+
+ allocate(slaves.keys());
+
+ LOG(INFO) << "Performed allocation for "
+ << slaves.size() << " slaves in "
+ << timer.elapsed().millis() << " milliseconds";
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const SlaveID& slaveId)
+{
+ CHECK(initialized);
+
+ hashset<SlaveID> slaveIds;
+ slaveIds.insert(slaveId);
+
+ ::Timer timer;
+ timer.start();
+
+ allocate(slaveIds);
+
+ LOG(INFO) << "Performed allocation for slave "
+ << slaveId << " in "
+ << timer.elapsed().millis() << " milliseconds";
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const hashset<SlaveID>& slaveIds)
+{
+ CHECK(initialized);
+
+ if (userSorter->count() == 0) {
+ VLOG(1) << "No users to allocate resources!";
+ return;
+ }
+
+ // Get out only "available" resources (i.e., resources that are
+ // allocatable and above a certain threshold, see below).
+ hashmap<SlaveID, Resources> available;
+ foreachpair (const SlaveID& slaveId, Resources resources, allocatable) {
+ if (!slaveIds.contains(slaveId)) {
+ continue;
+ }
+
+ if (isWhitelisted(slaveId)) {
+ resources = resources.allocatable(); // Make sure they're allocatable.
+
+ // TODO(benh): For now, only make offers when there is some cpu
+ // and memory left. This is an artifact of the original code that
+ // only offered when there was at least 1 cpu "unit" available,
+ // and without doing this a framework might get offered resources
+ // with only memory available (which it obviously will decline)
+ // and then end up waiting the default Filters::refuse_seconds
+ // (unless the framework set it to something different).
+
+ Value::Scalar none;
+ Value::Scalar cpus = resources.get("cpus", none);
+ Value::Scalar mem = resources.get("mem", none);
+
+ if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
+ VLOG(1) << "Found available resources: " << resources
+ << " on slave " << slaveId;
+ available[slaveId] = resources;
+ }
+ }
+ }
+
+ if (available.size() == 0) {
+ VLOG(1) << "No resources available to allocate!";
+ return;
+ }
+
+ foreach (const std::string& user, userSorter->sort()) {
+ foreach (const std::string& frameworkIdValue, sorters[user]->sort()) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(frameworkIdValue);
+
+ Resources allocatedResources;
+ hashmap<SlaveID, Resources> offerable;
+ foreachpair (const SlaveID& slaveId, const Resources& resources, available) {
+ // Check whether or not this framework filters this slave.
+ bool filtered = isFiltered(frameworkId, slaveId, resources);
+
+ if (!filtered) {
+ VLOG(1) << "Offering " << resources
+ << " on slave " << slaveId
+ << " to framework " << frameworkId;
+ offerable[slaveId] = resources;
+
+ // Update framework and slave resources.
+ allocatable[slaveId] -= resources;
+ allocatedResources += resources;
+ }
+ }
+
+ if (offerable.size() > 0) {
+ foreachkey (const SlaveID& slaveId, offerable) {
+ available.erase(slaveId);
+ }
+
+ sorters[user]->add(allocatedResources);
+ sorters[user]->allocated(frameworkIdValue, allocatedResources);
+ userSorter->allocated(user, allocatedResources);
+
+ dispatch(master, &Master::offer, frameworkId, offerable);
+ }
+ }
+ }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
+ const FrameworkID& frameworkId,
+ Filter* filter)
+{
+ // The filter might have already been removed (e.g., if the
+ // framework no longer exists or in
+ // HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
+ // keep the address from getting reused possibly causing premature
+ // expiration).
+ if (users.contains(frameworkId) &&
+ filters.contains(frameworkId, filter)) {
+ filters.remove(frameworkId, filter);
+ }
+
+ delete filter;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isWhitelisted(
+ const SlaveID& slaveId)
+{
+ CHECK(initialized);
+
+ CHECK(slaves.contains(slaveId));
+
+ return whitelist.isNone() ||
+ whitelist.get().contains(slaves[slaveId].hostname());
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& resources)
+{
+ bool filtered = false;
+ foreach (Filter* filter, filters.get(frameworkId)) {
+ if (filter->filter(slaveId, resources)) {
+ VLOG(1) << "Filtered " << resources
+ << " on slave " << slaveId
+ << " for framework " << frameworkId;
+ filtered = true;
+ break;
+ }
+ }
+ return filtered;
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
Modified: incubator/mesos/trunk/src/master/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/main.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/main.cpp (original)
+++ incubator/mesos/trunk/src/master/main.cpp Mon Aug 20 21:41:02 2012
@@ -33,7 +33,6 @@
#include "logging/logging.hpp"
#include "master/allocator.hpp"
-#include "master/dominant_share_allocator.hpp"
#include "master/master.hpp"
#include "master/webui.hpp"
@@ -116,7 +115,8 @@ int main(int argc, char** argv)
LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
LOG(INFO) << "Starting Mesos master";
- Allocator* allocator = new DominantShareAllocator();
+ AllocatorProcess* allocator = AllocatorProcess::create(flags.user_sorter,
+ flags.framework_sorter);
Master* master = new Master(allocator, flags);
process::spawn(master);
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Mon Aug 20 21:41:02 2012
@@ -60,7 +60,7 @@ namespace master {
class WhitelistWatcher : public Process<WhitelistWatcher> {
public:
- WhitelistWatcher(const string& _path, Allocator* _allocator)
+ WhitelistWatcher(const string& _path, AllocatorProcess* _allocator)
: path(_path), allocator(_allocator) {}
protected:
@@ -104,7 +104,7 @@ protected:
// Send the whitelist to allocator, if necessary.
if (whitelist != lastWhitelist) {
- dispatch(allocator, &Allocator::updateWhitelist, whitelist);
+ dispatch(allocator, &AllocatorProcess::updateWhitelist, whitelist);
}
// Check again.
@@ -113,7 +113,7 @@ protected:
}
private:
- Allocator* allocator;
+ AllocatorProcess* allocator;
const string path;
Option<hashset<string> > lastWhitelist;
};
@@ -255,14 +255,14 @@ struct SlaveReregistrar
};
-Master::Master(Allocator* _allocator)
+Master::Master(AllocatorProcess* _allocator)
: ProcessBase("master"),
allocator(_allocator),
flags()
{}
-Master::Master(Allocator* _allocator,
+Master::Master(AllocatorProcess* _allocator,
const flags::Flags<logging::Flags, master::Flags>& _flags)
: ProcessBase("master"),
allocator(_allocator),
@@ -321,7 +321,7 @@ void Master::initialize()
// Spawn the allocator.
spawn(allocator);
- dispatch(allocator, &Allocator::initialize, flags, self());
+ dispatch(allocator, &AllocatorProcess::initialize, flags, self());
// Parse the white list
whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
@@ -484,7 +484,7 @@ void Master::exited(const UPID& pid)
framework->active = false;
// Tell the allocator to stop allocating resources to this framework.
- dispatch(allocator, &Allocator::frameworkDeactivated, framework->id);
+ dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
double failoverTimeout = framework->info.failover_timeout();
@@ -498,7 +498,7 @@ void Master::exited(const UPID& pid)
// Remove the framework's offers.
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
offer->framework_id(),
offer->slave_id(),
Resources(offer->resources()));
@@ -647,7 +647,7 @@ void Master::reregisterFramework(const F
// replied to the offers but the driver might have dropped
// those messages since it wasn't connected to the master.
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
offer->framework_id(), offer->slave_id(), offer->resources());
removeOffer(offer);
}
@@ -743,7 +743,7 @@ void Master::deactivateFramework(const F
void Master::resourceRequest(const FrameworkID& frameworkId,
const vector<Request>& requests)
{
- dispatch(allocator, &Allocator::resourcesRequested, frameworkId, requests);
+ dispatch(allocator, &AllocatorProcess::resourcesRequested, frameworkId, requests);
}
@@ -793,7 +793,7 @@ void Master::reviveOffers(const Framewor
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
LOG(INFO) << "Reviving offers for framework " << framework->id;
- dispatch(allocator, &Allocator::offersRevived, framework->id);
+ dispatch(allocator, &AllocatorProcess::offersRevived, framework->id);
}
}
@@ -1108,7 +1108,7 @@ void Master::exitedExecutor(const SlaveI
<< " (" << slave->info.hostname() << ")"
<< " exited with status " << status;
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
frameworkId,
slaveId,
Resources(executor.resources()));
@@ -1190,7 +1190,7 @@ void Master::offer(const FrameworkID& fr
<< " has terminated or is inactive";
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
frameworkId, slaveId, offered);
}
return;
@@ -1206,7 +1206,7 @@ void Master::offer(const FrameworkID& fr
<< frameworkId << " because slave " << slaveId
<< " is not valid";
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
frameworkId, slaveId, offered);
continue;
}
@@ -1507,7 +1507,7 @@ void Master::processTasks(Offer* offer,
if (unusedResources.allocatable().size() > 0) {
// Tell the allocator about the unused (e.g., refused) resources.
- dispatch(allocator, &Allocator::resourcesUnused,
+ dispatch(allocator, &AllocatorProcess::resourcesUnused,
offer->framework_id(),
offer->slave_id(),
unusedResources,
@@ -1593,7 +1593,7 @@ void Master::addFramework(Framework* fra
message.mutable_master_info()->MergeFrom(info);
send(framework->pid, message);
- dispatch(allocator, &Allocator::frameworkAdded,
+ dispatch(allocator, &AllocatorProcess::frameworkAdded,
framework->id, framework->info, framework->resources);
}
@@ -1618,7 +1618,7 @@ void Master::failoverFramework(Framework
// Make sure we can get offers again.
if (!framework->active) {
framework->active = true;
- dispatch(allocator, &Allocator::frameworkActivated,
+ dispatch(allocator, &AllocatorProcess::frameworkActivated,
framework->id, framework->info);
}
@@ -1637,7 +1637,7 @@ void Master::failoverFramework(Framework
// these resources to this framework if it wants.
// TODO(benh): Consider just reoffering these to
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
offer->framework_id(),
offer->slave_id(),
Resources(offer->resources()));
@@ -1650,7 +1650,7 @@ void Master::removeFramework(Framework*
{
if (framework->active) {
// Tell the allocator to stop allocating resources to this framework.
- dispatch(allocator, &Allocator::frameworkDeactivated, framework->id);
+ dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
}
// Tell slaves to shutdown the framework.
@@ -1671,7 +1671,7 @@ void Master::removeFramework(Framework*
// Remove the framework's offers (if they weren't removed before).
foreach (Offer* offer, utils::copy(framework->offers)) {
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
offer->framework_id(),
offer->slave_id(),
Resources(offer->resources()));
@@ -1685,7 +1685,7 @@ void Master::removeFramework(Framework*
foreachpair (const ExecutorID& executorId,
const ExecutorInfo& executorInfo,
framework->executors[slaveId]) {
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
framework->id,
slave->id,
executorInfo.resources());
@@ -1709,7 +1709,8 @@ void Master::removeFramework(Framework*
// Delete it.
frameworks.erase(framework->id);
- dispatch(allocator, &Allocator::frameworkRemoved, framework->id);
+ dispatch(allocator, &AllocatorProcess::frameworkRemoved, framework->id);
+
delete framework;
}
@@ -1747,7 +1748,7 @@ void Master::addSlave(Slave* slave, bool
spawn(slave->observer);
if (!reregister) {
- dispatch(allocator, &Allocator::slaveAdded,
+ dispatch(allocator, &AllocatorProcess::slaveAdded,
slave->id, slave->info, hashmap<FrameworkID, Resources>());
}
}
@@ -1817,7 +1818,7 @@ void Master::readdSlave(Slave* slave,
resources[task.framework_id()] += task.resources();
}
- dispatch(allocator, &Allocator::slaveAdded,
+ dispatch(allocator, &AllocatorProcess::slaveAdded,
slave->id, slave->info, resources);
}
@@ -1892,7 +1893,7 @@ void Master::removeSlave(Slave* slave)
// Delete it.
slaves.erase(slave->id);
- dispatch(allocator, &Allocator::slaveRemoved, slave->id);
+ dispatch(allocator, &AllocatorProcess::slaveRemoved, slave->id);
delete slave;
}
@@ -1911,7 +1912,7 @@ void Master::removeTask(Task* task)
slave->removeTask(task);
// Tell the allocator about the recovered resources.
- dispatch(allocator, &Allocator::resourcesRecovered,
+ dispatch(allocator, &AllocatorProcess::resourcesRecovered,
task->framework_id(),
task->slave_id(),
Resources(task->resources()));
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Mon Aug 20 21:41:02 2012
@@ -56,7 +56,7 @@ namespace master {
using namespace process; // Included to make code easier to read.
// Some forward declarations.
-class Allocator;
+class AllocatorProcess;
class SlavesManager;
struct Framework;
struct Slave;
@@ -67,8 +67,8 @@ class WhitelistWatcher;
class Master : public ProtobufProcess<Master>
{
public:
- Master(Allocator* allocator);
- Master(Allocator* allocator,
+ Master(AllocatorProcess* allocator);
+ Master(AllocatorProcess* allocator,
const flags::Flags<logging::Flags, master::Flags>& flags);
virtual ~Master();
@@ -204,7 +204,7 @@ private:
bool elected;
- Allocator* allocator;
+ AllocatorProcess* allocator;
SlavesManager* slavesManager;
WhitelistWatcher* whitelistWatcher;
Added: incubator/mesos/trunk/src/master/sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/sorter.hpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/sorter.hpp (added)
+++ incubator/mesos/trunk/src/master/sorter.hpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SORTER_HPP__
+#define __SORTER_HPP__
+
+#include "master/master.hpp"
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+// Sorters implement the logic for determining the
+// order in which users or frameworks should receive
+// resource allocations.
+class Sorter
+{
+public:
+ virtual ~Sorter() {}
+
+ // Adds a client to allocate resources to. A client
+ // may be a user or a framework.
+ virtual void add(const std::string& client) = 0;
+
+ // Removes a client.
+ virtual void remove(const std::string& client) = 0;
+
+ // Readds a client to the sort after deactivate.
+ virtual void activate(const std::string& client) = 0;
+
+ // Removes a client from the sort, so it won't get
+ // allocated to.
+ virtual void deactivate(const std::string& client) = 0;
+
+ // Specify that resources have been allocated to the
+ // given client.
+ virtual void allocated(const std::string& client,
+ const Resources& resources) = 0;
+
+ // Specify that resources have been unallocated from
+ // the given client.
+ virtual void unallocated(const std::string& client,
+ const Resources& resources) = 0;
+
+ // Returns the resources that have been allocated to
+ // this client.
+ virtual Resources allocation(const std::string& client) = 0;
+
+ // Add resources to the total pool of resources this
+ // Sorter should consider.
+ virtual void add(const Resources& resources) = 0;
+
+ // Remove resources from the total pool.
+ virtual void remove(const Resources& resources) = 0;
+
+ // Returns a list of all clients, in the order that they
+ // should be allocated to, according to this Sorter's policy.
+ virtual std::list<std::string> sort() = 0;
+
+ // Returns true if this Sorter contains the specified client,
+ // either active or deactivated.
+ virtual bool contains(const std::string& client) = 0;
+
+ // Returns the number of clients this Sorter contains,
+ // either active or deactivated.
+ virtual int count() = 0;
+};
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SORTER_HPP__
Modified: incubator/mesos/trunk/src/tests/allocator_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_tests.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_tests.cpp Mon Aug 20 21:41:02 2012
@@ -25,20 +25,17 @@
#include "detector/detector.hpp"
#include "master/allocator.hpp"
-#include "master/dominant_share_allocator.hpp"
#include "master/master.hpp"
#include "slave/process_based_isolation_module.hpp"
-#include "tests/base_zookeeper_test.hpp"
#include "tests/utils.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::test;
-using mesos::internal::master::Allocator;
-using mesos::internal::master::DominantShareAllocator;
+using mesos::internal::master::AllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::ProcessBasedIsolationModule;
@@ -46,34 +43,21 @@ using mesos::internal::slave::Slave;
using process::PID;
+using std::map;
using std::string;
using std::vector;
-using testing::AnyNumber;
+using testing::_;
using testing::ByRef;
+using testing::DoAll;
using testing::DoDefault;
using testing::Eq;
-using testing::_;
+using testing::InSequence;
using testing::Return;
using testing::SaveArg;
-void checkResources(vector<Offer> offers, int cpus, int mem)
-{
- EXPECT_EQ(offers.size(), 1);
-
- EXPECT_EQ(offers[0].resources().size(), 2);
-
- foreach (const Resource& resource, offers[0].resources()) {
- if (resource.name() == "cpus") {
- EXPECT_EQ(resource.scalar().value(), cpus);
- } else if (resource.name() == "mem") {
- EXPECT_EQ(resource.scalar().value(), mem);
- }
- }
-}
-
-TEST(AllocatorTest, DominantShareAllocator)
+TEST(AllocatorTest, DRFAllocatorProcess)
{
FrameworkInfo frameworkInfo1;
frameworkInfo1.set_name("framework1");
@@ -90,7 +74,7 @@ TEST(AllocatorTest, DominantShareAllocat
frameworkInfo3.set_user("user1");
FrameworkID frameworkId3;
- MockAllocator<DominantShareAllocator> a;
+ MockAllocator<TestAllocatorProcess > a;
EXPECT_CALL(a, initialize(_, _));
@@ -165,7 +149,7 @@ TEST(AllocatorTest, DominantShareAllocat
WAIT_UNTIL(resourceOfferTrigger);
- checkResources(offers1, 2, 1024);
+ EXPECT_THAT(offers1, OfferEq(2, 1024));
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master);
@@ -193,7 +177,7 @@ TEST(AllocatorTest, DominantShareAllocat
WAIT_UNTIL(resourceOfferTrigger2);
- checkResources(offers2, 1, 512);
+ EXPECT_THAT(offers2, OfferEq(1, 512));
Resources resources3 = Resources::parse("cpus:3;mem:2048");
Slave s3(resources3, true, &isolationModule);
@@ -202,7 +186,7 @@ TEST(AllocatorTest, DominantShareAllocat
WAIT_UNTIL(resourceOfferTrigger3);
- checkResources(offers3, 3, 2048);
+ EXPECT_THAT(offers3, OfferEq(3, 2048));
MockScheduler sched3;
MesosSchedulerDriver driver3(&sched3, frameworkInfo3, master);
@@ -226,7 +210,7 @@ TEST(AllocatorTest, DominantShareAllocat
WAIT_UNTIL(resourceOfferTrigger4);
- checkResources(offers4, 4, 4096);
+ EXPECT_THAT(offers4, OfferEq(4, 4096));
driver3.stop();
driver2.stop();
@@ -253,7 +237,7 @@ TEST(AllocatorTest, DominantShareAllocat
}
-template <typename T = Allocator>
+template <typename T>
class AllocatorTest : public ::testing::Test
{
protected:
@@ -274,7 +258,6 @@ protected:
// Causes all TYPED_TEST(AllocatorTest, ...) to be run for
// each of the specified Allocator classes.
-typedef ::testing::Types<DominantShareAllocator> AllocatorTypes;
TYPED_TEST_CASE(AllocatorTest, AllocatorTypes);
@@ -326,7 +309,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
WAIT_UNTIL(resourceOffers);
- checkResources(offers, 2, 1024);
+ EXPECT_THAT(offers, OfferEq(2, 1024));
driver.stop();
@@ -417,7 +400,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
WAIT_UNTIL(offered);
- checkResources(offers, 1, 512);
+ EXPECT_THAT(offers, OfferEq(1, 512));
driver1.stop();
driver2.stop();
@@ -517,7 +500,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
WAIT_UNTIL(offered);
- checkResources(offers, 2, 1024);
+ EXPECT_THAT(offers, OfferEq(2, 1024));
driver.stop();
driver.join();
@@ -527,7 +510,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
// Re-dispatch the resourcesRecovered call which we "caught"
// earlier now that the framework has been removed, to test
// that recovering resources from a removed framework works.
- dispatch(this->allocator, &Allocator::resourcesRecovered,
+ dispatch(this->allocator, &AllocatorProcess::resourcesRecovered,
frameworkId, slaveId, savedResources);
MockScheduler sched2;
@@ -545,7 +528,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
WAIT_UNTIL(offered2);
- checkResources(offers2, 2, 1024);
+ EXPECT_THAT(offers2, OfferEq(2, 1024));
driver2.stop();
driver2.join();
@@ -562,126 +545,725 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
}
-template <typename T = Allocator>
-class MasterFailoverAllocatorTest : public BaseZooKeeperTest
+TYPED_TEST(AllocatorTest, SchedulerFailover)
{
-public:
- static void SetUpTestCase() {
- BaseZooKeeperTest::SetUpTestCase();
- }
+ MockFilter filter;
+ process::filter(&filter);
-protected:
- T allocator1;
- MockAllocator<T> allocator2;
-};
+ EXPECT_MESSAGE(filter, Eq(UnregisterFrameworkMessage().GetTypeName()), _, _)
+ .WillRepeatedly(Return(true));
+
+ EXPECT_CALL(this->allocator, initialize(_, _));
+
+ EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+ trigger frameworkRemovedTrigger;
+ EXPECT_CALL(this->allocator, frameworkRemoved(_))
+ .WillOnce(Trigger(&frameworkRemovedTrigger));
+
+ EXPECT_CALL(this->allocator, frameworkActivated(_, _));
+
+ trigger frameworkDeactivatedTrigger;
+ EXPECT_CALL(this->allocator, frameworkDeactivated(_))
+ .WillOnce(DoAll(InvokeFrameworkDeactivated(&this->allocator),
+ Trigger(&frameworkDeactivatedTrigger)))
+ .WillOnce(DoDefault());
+ EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+ trigger slaveRemovedTrigger;
+ EXPECT_CALL(this->allocator, slaveRemoved(_))
+ .WillOnce(Trigger(&slaveRemovedTrigger));
+
+ trigger resourcesRecoveredTrigger;
+ EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+ .WillRepeatedly(DoDefault());
+
+ // We don't filter the unused resources to make sure that
+ // they get offered to the framework as soon as it fails over.
+ EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+ .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0));
+
+ Master m(&this->allocator);
+ PID<Master> master = process::spawn(&m);
-// Runs TYPED_TEST(MasterFailoverAllocatorTest, ...) on all AllocatorTypes.
-TYPED_TEST_CASE(MasterFailoverAllocatorTest, AllocatorTypes);
+ MockExecutor exec;
+ EXPECT_CALL(exec, registered(_, _, _, _));
-TYPED_TEST(MasterFailoverAllocatorTest, MasterFailover)
+ trigger launchTaskTrigger;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(Trigger(&launchTaskTrigger));
+
+ EXPECT_CALL(exec, shutdown(_));
+
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ TestingIsolationModule isolationModule(execs);
+
+ EXPECT_CALL(isolationModule, resourcesChanged(_, _, _));
+
+ Resources resources = Resources::parse("cpus:3;mem:1024");
+ Slave s(resources, true, &isolationModule);
+ PID<Slave> slave = process::spawn(&s);
+ BasicMasterDetector detector(master, slave, true);
+
+ FrameworkInfo frameworkInfo1;
+ frameworkInfo1.set_name("framework1");
+ frameworkInfo1.set_user("user1");
+ frameworkInfo1.set_failover_timeout(.5);
+
+ // Launch the first (i.e., failing) scheduler.
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, frameworkInfo1, master);
+
+ FrameworkID frameworkId;
+
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ // We don't know how many times the allocator might try to
+ // allocate to the failing framework before it get killed.
+ EXPECT_CALL(sched1, resourceOffers(_, _))
+ .WillRepeatedly(DeclineOffers());
+
+ // Initially, all cluster resources are avaliable.
+ trigger resourceOffersTrigger1;
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+ .WillOnce(DoAll(LaunchTasks(1, 1, 256),
+ Trigger(&resourceOffersTrigger1)));
+
+ driver1.start();
+
+ WAIT_UNTIL(resourceOffersTrigger1);
+
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler.
+ MockScheduler sched2;
+
+ FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+ framework2 = DEFAULT_FRAMEWORK_INFO;
+ framework2.mutable_id()->MergeFrom(frameworkId);
+
+ MesosSchedulerDriver driver2(&sched2, framework2, master);
+
+ EXPECT_CALL(sched2, registered(_, frameworkId, _));
+
+ // Even though the scheduler failed over, the 1 cpu, 512 mem
+ // task that it launched earlier should still be running, so
+ // only 2 cpus and 768 mem are available.
+ trigger resourceOffersTrigger2;
+ EXPECT_CALL(sched2, resourceOffers(_, OfferEq(2, 768)))
+ .WillOnce(Trigger(&resourceOffersTrigger2));
+
+ // Ensures that the task has been completely launched
+ // before we have the framework fail over.
+ WAIT_UNTIL(launchTaskTrigger);
+
+ driver1.stop();
+
+ WAIT_UNTIL(frameworkDeactivatedTrigger);
+
+ driver2.start();
+
+ WAIT_UNTIL(resourceOffersTrigger2);
+
+ driver2.stop();
+ driver2.join();
+
+ WAIT_UNTIL(frameworkRemovedTrigger);
+
+ process::terminate(slave);
+ process::wait(slave);
+
+ WAIT_UNTIL(slaveRemovedTrigger);
+
+ process::terminate(master);
+ process::wait(master);
+
+ process::filter(NULL);
+}
+
+
+TYPED_TEST(AllocatorTest, FrameworkExited)
{
- trigger slaveAdded;
- EXPECT_CALL(this->allocator2, initialize(_, _));
+ EXPECT_CALL(this->allocator, initialize(_, _));
+
+ EXPECT_CALL(this->allocator, frameworkAdded(_, _, _))
+ .Times(2);
- trigger frameworkAdded;
- EXPECT_CALL(this->allocator2, frameworkAdded(_, _, _));
+ EXPECT_CALL(this->allocator, frameworkRemoved(_))
+ .Times(2);
- EXPECT_CALL(this->allocator2, frameworkDeactivated(_));
+ EXPECT_CALL(this->allocator, frameworkDeactivated(_))
+ .Times(2);
+
+ EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+ trigger slaveRemovedTrigger;
+ EXPECT_CALL(this->allocator, slaveRemoved(_))
+ .WillOnce(Trigger(&slaveRemovedTrigger));
+
+ EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+ .WillRepeatedly(DoDefault());
+
+ EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+ .WillRepeatedly(DoDefault());
+
+ Master m(&this->allocator);
+ PID<Master> master = process::spawn(m);
+
+ MockExecutor exec;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(2);
+
+ trigger launchTaskTrigger;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(Trigger(&launchTaskTrigger))
+ .WillOnce(DoDefault());
+
+ trigger shutdownTrigger;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(DoDefault())
+ .WillOnce(Trigger(&shutdownTrigger));
+
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ TestingIsolationModule isolationModule(execs);
+ EXPECT_CALL(isolationModule, resourcesChanged(_, _, _))
+ .Times(2);
+
+ Resources resources1 = Resources::parse("cpus:3;mem:1024");
+ Slave s1(resources1, true, &isolationModule);
+ PID<Slave> slave1 = process::spawn(s1);
+ BasicMasterDetector detector1(master, slave1, true);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched1, registered(_, _, _));
+
+ EXPECT_CALL(sched1, resourceOffers(_, _))
+ .WillRepeatedly(DeclineOffers());
+
+ // The first time the framework is offered resources,
+ // all of the cluster's resources should be avaliable.
+ trigger resourceOffersTrigger1;
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+ .WillOnce(DoAll(LaunchTasks(1, 2, 512),
+ Trigger(&resourceOffersTrigger1)));
+
+ driver1.start();
+
+ WAIT_UNTIL(resourceOffersTrigger1);
+
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched2, registered(_, _, _));
+
+ EXPECT_CALL(sched2, resourceOffers(_, _))
+ .WillRepeatedly(DeclineOffers());
+
+ // The first time sched2 gets an offer, framework 1 has a
+ // task running with 2 cpus and 512 mem, leaving 1 cpu and
+ // 512 mem.
+ trigger resourceOffersTrigger2;
+ EXPECT_CALL(sched2, resourceOffers(_, OfferEq(1, 512)))
+ .WillOnce(DoAll(LaunchTasks(1, 1, 256),
+ Trigger(&resourceOffersTrigger2)));
+
+ // After we kill framework 1, all of it's resources should
+ // have been returned, but framework 2 should still have a
+ // task with 1 cpu and 256 mem, leaving 2 cpus and 768 mem.
+ trigger resourceOffersTrigger3;
+ EXPECT_CALL(sched2, resourceOffers(_, OfferEq(2, 768)))
+ .WillOnce(Trigger(&resourceOffersTrigger3));
+
+ driver2.start();
+
+ WAIT_UNTIL(resourceOffersTrigger2);
+
+ // Ensures that framework 1's task is completely launched
+ // before we kill the framework to test if its resources
+ // are recovered correctly.
+ WAIT_UNTIL(launchTaskTrigger);
+
+ driver1.stop();
+ driver1.join();
+
+ WAIT_UNTIL(resourceOffersTrigger3);
+
+ driver2.stop();
+ driver2.join();
+
+ WAIT_UNTIL(shutdownTrigger);
+
+ process::terminate(slave1);
+ process::wait(slave1);
+
+ WAIT_UNTIL(slaveRemovedTrigger);
+
+ process::terminate(master);
+ process::wait(master);
+}
+
+
+TYPED_TEST(AllocatorTest, SlaveLost)
+{
+ EXPECT_CALL(this->allocator, initialize(_, _));
+
+ EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
trigger frameworkRemovedTrigger;
- EXPECT_CALL(this->allocator2, frameworkRemoved(_))
+ EXPECT_CALL(this->allocator, frameworkRemoved(_))
.WillOnce(Trigger(&frameworkRemovedTrigger));
- EXPECT_CALL(this->allocator2, slaveAdded(_, _, _));
+ EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+ EXPECT_CALL(this->allocator, slaveAdded(_, _, _))
+ .Times(2);
+
+ EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _));
+
+ EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+ .WillRepeatedly(DoDefault());
+
+ trigger slaveRemovedTrigger1, slaveRemovedTrigger2;
+ EXPECT_CALL(this->allocator, slaveRemoved(_))
+ .WillOnce(DoAll(InvokeSlaveRemoved(&this->allocator),
+ Trigger(&slaveRemovedTrigger1)))
+ .WillOnce(Trigger(&slaveRemovedTrigger2));
+
+ Master m(&this->allocator);
+ PID<Master> master = process::spawn(m);
+
+ MockExecutor exec;
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ trigger launchTaskTrigger;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
+ Trigger(&launchTaskTrigger)));
+
+ EXPECT_CALL(exec, shutdown(_));
+
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ TestingIsolationModule isolationModule(execs);
+
+ EXPECT_CALL(isolationModule, resourcesChanged(_, _, _));
+
+ Resources resources1 = Resources::parse("cpus:2;mem:1024");
+ Slave s1(resources1, true, &isolationModule);
+ PID<Slave> slave1 = process::spawn(s1);
+ BasicMasterDetector detector1(master, slave1, true);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched1, registered(_, _, _));
+
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .WillRepeatedly(DoDefault());
+
+ EXPECT_CALL(sched1, slaveLost(_, _));
+
+ trigger resourceOffersTrigger1, resourceOffersTrigger2;
+ {
+ // Ensures that the following EXPEC_CALLs happen in order.
+ InSequence dummy;
+
+ // Initially, all of slave1's resources are avaliable.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 1024)))
+ .WillOnce(DoAll(LaunchTasks(1, 2, 512),
+ Trigger(&resourceOffersTrigger1)));
+
+ // Eventually after slave2 is launched, we should get
+ // an offer that contains all of slave2's resources
+ // and none of slave1's resources.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 256)))
+ .WillOnce(Trigger(&resourceOffersTrigger2));
+ }
+
+ driver1.start();
+
+ WAIT_UNTIL(resourceOffersTrigger1);
+
+ // Ensures the task is completely launched before we
+ // kill the slave, to test that the task's resources
+ // are recovered correctly (i.e. never reallocated
+ // since the slave is killed)
+ WAIT_UNTIL(launchTaskTrigger);
+
+ process::terminate(slave1);
+ process::wait(slave1);
+
+ WAIT_UNTIL(slaveRemovedTrigger1);
+
+ ProcessBasedIsolationModule isolationModule2;
+ Resources resources2 = Resources::parse("cpus:3;mem:256");
+ Slave s2(resources2, true, &isolationModule2);
+ PID<Slave> slave2 = process::spawn(s2);
+ BasicMasterDetector detector2(master, slave2, true);
+
+ WAIT_UNTIL(resourceOffersTrigger2);
+
+ driver1.stop();
+ driver1.join();
+
+ WAIT_UNTIL(frameworkRemovedTrigger);
+
+ process::terminate(slave2);
+ process::wait(slave2);
+
+ WAIT_UNTIL(slaveRemovedTrigger2);
+
+ process::terminate(master);
+ process::wait(master);
+}
+
+
+TYPED_TEST(AllocatorTest, SlaveAdded)
+{
+ EXPECT_CALL(this->allocator, initialize(_, _));
+
+ EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+ trigger frameworkRemovedTrigger;
+ EXPECT_CALL(this->allocator, frameworkRemoved(_))
+ .WillOnce(Trigger(&frameworkRemovedTrigger));
+
+ EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+ EXPECT_CALL(this->allocator, slaveAdded(_, _, _))
+ .Times(2);
trigger slaveRemovedTrigger;
- EXPECT_CALL(this->allocator2, slaveRemoved(_))
+ EXPECT_CALL(this->allocator, slaveRemoved(_))
+ .WillOnce(DoDefault())
.WillOnce(Trigger(&slaveRemovedTrigger));
- EXPECT_CALL(this->allocator2, resourcesRecovered(_, _, _))
+ // We filter the first time so that the unused resources
+ // on slave1 from the task launch won't get reoffered
+ // immediately and will get combined with slave2's
+ // resources for a single offer.
+ EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+ .WillOnce(InvokeUnusedWithFilters(&this->allocator, .1))
+ .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0));
+
+ EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
.WillRepeatedly(DoDefault());
- MockFilter filter;
- process::filter(&filter);
+ Master m(&this->allocator);
+ PID<Master> master = process::spawn(m);
- EXPECT_MESSAGE(filter, Eq(ShutdownMessage().GetTypeName()), _, _)
- .WillRepeatedly(Return(true));
+ MockExecutor exec;
- Master m1(&(this->allocator1));
- PID<Master> master1 = process::spawn(m1);
+ EXPECT_CALL(exec, registered(_, _, _, _));
- string zk = "zk://" + this->zks->connectString() + "/znode";
- Try<MasterDetector*> detector =
- MasterDetector::create(zk, master1, true, true);
- CHECK(!detector.isError())
- << "Failed to create a master detector: " << detector.error();
+ trigger launchTaskTrigger;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
+ Trigger(&launchTaskTrigger)));
- ProcessBasedIsolationModule isolationModule;
+ trigger shutdownTrigger;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(Trigger(&shutdownTrigger));
- Resources resources = Resources::parse("cpus:2;mem:1024");
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(s);
+ TestingIsolationModule isolationModule(execs);
- Try<MasterDetector*> slave_detector =
- MasterDetector::create(zk, slave, false, true);
- CHECK(!slave_detector.isError())
- << "Failed to create a master detector: " << slave_detector.error();
+ EXPECT_CALL(isolationModule, resourcesChanged(_, _, _));
- MockScheduler sched;
- MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, zk);
+ Resources resources1 = Resources::parse("cpus:3;mem:1024");
+ Slave s1(resources1, true, &isolationModule);
+ PID<Slave> slave1 = process::spawn(s1);
+ BasicMasterDetector detector1(master, slave1, true);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched1, registered(_, _, _));
+
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .WillRepeatedly(DoDefault());
+
+ EXPECT_CALL(sched1, resourceOffers(_, _))
+ .WillRepeatedly(DeclineOffers());
+
+ trigger resourceOffersTrigger1, resourceOffersTrigger2;
+ {
+ // Ensures that the following EXPEC_CALLs happen in order.
+ InSequence dummy;
+
+ // Initially, all of slave1's resources are avaliable.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+ .WillOnce(DoAll(LaunchTasks(1, 2, 512),
+ Trigger(&resourceOffersTrigger1)));
+
+ // After slave2 launches, all of its resources are
+ // combined with the resources on slave1 that the
+ // task isn't using.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(5, 2560)))
+ .WillOnce(Trigger(&resourceOffersTrigger2));
+ }
+
+ driver1.start();
+
+ WAIT_UNTIL(resourceOffersTrigger1);
+
+ // Wait until the filter from resourcesUnused above times
+ // out so that all resources not used by the launched task
+ // will get offered together. TODO(tmarshall): replace this
+ // with a Clock::advance().
+ sleep(.1);
+
+ WAIT_UNTIL(launchTaskTrigger);
+
+ Resources resources2 = Resources::parse("cpus:4;mem:2048");
+ Slave s2(resources2, true, &isolationModule);
+ PID<Slave> slave2 = process::spawn(s2);
+ BasicMasterDetector detector2(master, slave2, true);
+
+ WAIT_UNTIL(resourceOffersTrigger2);
+
+ driver1.stop();
+ driver1.join();
+
+ WAIT_UNTIL(frameworkRemovedTrigger);
+
+ WAIT_UNTIL(shutdownTrigger);
+
+ process::terminate(slave1);
+ process::wait(slave1);
+
+ process::terminate(slave2);
+ process::wait(slave2);
+
+ WAIT_UNTIL(slaveRemovedTrigger);
+
+ process::terminate(master);
+ process::wait(master);
+}
- trigger resourceOffers, resourceOffers2;
- vector<Offer> offers, offers2;
+TYPED_TEST(AllocatorTest, TaskFinished)
+{
+ EXPECT_CALL(this->allocator, initialize(_, _));
+
+ EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+ EXPECT_CALL(this->allocator, frameworkRemoved(_));
+
+ EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+ EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+ trigger slaveRemovedTrigger;
+ EXPECT_CALL(this->allocator, slaveRemoved(_))
+ .WillOnce(Trigger(&slaveRemovedTrigger));
- EXPECT_CALL(sched, registered(_, _, _))
+ EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+ .WillRepeatedly(DoDefault());
+
+ // The first time we don't filter because we want to see
+ // the unused resources from the task launch get reoffered
+ // to us, but when that offer is returned unused we do
+ // filter so that they won't get reoffered again and will
+ // instead get combined with the recovered resources from
+ // the task finishing for one offer.
+ EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+ .WillOnce(InvokeUnusedWithFilters(&this->allocator, 0))
+ .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 1));
+
+ Master m(&this->allocator);
+ PID<Master> master = process::spawn(m);
+
+ MockExecutor exec;
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ ExecutorDriver* execDriver;
+ TaskInfo taskInfo;
+ trigger launchTaskTrigger;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(DoAll(SaveArg<0>(&execDriver),
+ SaveArg<1>(&taskInfo),
+ SendStatusUpdateFromTask(TASK_RUNNING),
+ Trigger(&launchTaskTrigger)))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ trigger shutdownTrigger;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(Trigger(&shutdownTrigger));
+
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ TestingIsolationModule isolationModule(execs);
+ EXPECT_CALL(isolationModule, resourcesChanged(_, _, _))
.Times(2);
- EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(DoAll(SaveArg<1>(&offers),
- Trigger(&resourceOffers)))
- .WillOnce(DoAll(SaveArg<1>(&offers2),
- Trigger(&resourceOffers2)));
+ Resources resources1 = Resources::parse("cpus:3;mem:1024");
+ Slave s1(resources1, true, &isolationModule);
+ PID<Slave> slave1 = process::spawn(s1);
+ BasicMasterDetector detector1(master, slave1, true);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched1, registered(_, _, _));
- EXPECT_CALL(sched, disconnected(_))
+ EXPECT_CALL(sched1, statusUpdate(_, _))
.WillRepeatedly(DoDefault());
- driver.start();
+ EXPECT_CALL(sched1, resourceOffers(_, _))
+ .WillRepeatedly(DeclineOffers());
- WAIT_UNTIL(resourceOffers);
+ trigger resourceOffersTrigger1, resourceOffersTrigger2;
+ {
+ // Ensures that the following EXPEC_CALLs happen in order.
+ InSequence dummy;
- checkResources(offers, 2, 1024);
+ // Initially, all of the slave's resources.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+ .WillOnce(LaunchTasks(2, 1, 256));
+
+ // After the tasks are launched.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(1, 512)))
+ .WillOnce(DoAll(DeclineOffers(),
+ Trigger(&resourceOffersTrigger1)));
+
+ // After the first task gets killed.
+ EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 768)))
+ .WillOnce(Trigger(&resourceOffersTrigger2));
+ }
- process::terminate(master1);
- process::wait(master1);
- MasterDetector::destroy(detector.get());
+ driver1.start();
- Master m2(&(this->allocator2));
- PID<Master> master2 = process::spawn(m2);
+ WAIT_UNTIL(resourceOffersTrigger1);
- Try<MasterDetector*> detector2 =
- MasterDetector::create(zk, master2, true, true);
- CHECK(!detector2.isError())
- << "Failed to create a master detector: " << detector2.error();
+ WAIT_UNTIL(launchTaskTrigger);
- WAIT_UNTIL(resourceOffers2);
+ TaskStatus status;
+ status.mutable_task_id()->MergeFrom(taskInfo.task_id());
+ status.set_state(TASK_FINISHED);
+ execDriver->sendStatusUpdate(status);
- checkResources(offers2, 2, 1024);
+ WAIT_UNTIL(resourceOffersTrigger2);
- driver.stop();
+ driver1.stop();
+ driver1.join();
- WAIT_UNTIL(frameworkRemovedTrigger);
+ WAIT_UNTIL(shutdownTrigger);
+
+ process::terminate(slave1);
+ process::wait(slave1);
+
+ WAIT_UNTIL(slaveRemovedTrigger);
+
+ process::terminate(master);
+ process::wait(master);
+}
+
+
+TYPED_TEST(AllocatorTest, WhitelistSlave)
+{
+ EXPECT_CALL(this->allocator, initialize(_, _));
+
+ EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+ EXPECT_CALL(this->allocator, frameworkRemoved(_));
+
+ EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+ EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+ trigger slaveRemovedTrigger;
+ EXPECT_CALL(this->allocator, slaveRemoved(_))
+ .WillOnce(Trigger(&slaveRemovedTrigger));
+
+ trigger updateWhitelistTrigger1, updateWhitelistTrigger2;
+ EXPECT_CALL(this->allocator, updateWhitelist(_))
+ .WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
+ Trigger(&updateWhitelistTrigger1)))
+ .WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
+ Trigger(&updateWhitelistTrigger2)));
+
+ EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+ .WillRepeatedly(DoDefault());
+
+ // Create a dummy whitelist, so that no resources will get allocated.
+ string hosts = "dummy-slave";
+ string path = "whitelist.txt";
+ CHECK (os::write(path, hosts).isSome()) << "Error writing whitelist";
+
+ flags::Flags<logging::Flags, master::Flags> flags;
+ flags.whitelist = "file://" + path; // TODO(benh): Put in /tmp.
+ Master m(&this->allocator, flags);
+ PID<Master> master = process::spawn(&m);
+
+ MockExecutor exec;
+
+ map<ExecutorID, Executor*> execs;
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ TestingIsolationModule isolationModule(execs);
+ Resources resources = Resources::parse("cpus:2;mem:1024");
+ Slave s(resources, true, &isolationModule);
+ PID<Slave> slave = process::spawn(&s);
+
+ BasicMasterDetector detector(master, slave, true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ trigger resourceOffersTrigger;
+ EXPECT_CALL(sched, resourceOffers(_, OfferEq(2, 1024)))
+ .WillOnce(Trigger(&resourceOffersTrigger));
+
+ WAIT_UNTIL(updateWhitelistTrigger1);
+
+ driver.start();
+
+ // Give the allocator some time to confirm that it doesn't
+ // make an allocation.
+ sleep(1);
+ EXPECT_FALSE(resourceOffersTrigger.value);
+
+ // Update the whitelist to include the slave, so that
+ // the allocator will start making allocations.
+ Try<string> hostname = os::hostname();
+ ASSERT_TRUE(hostname.isSome());
+ hosts = hostname.get() + "\n" + "dummy-slave";
+ CHECK (os::write(path, hosts).isSome()) << "Error writing whitelist";
+
+ // Give the WhitelistWatcher some time to notice that
+ // the whitelist has changed.
+ sleep(4);
+
+ WAIT_UNTIL(updateWhitelistTrigger2);
+
+ WAIT_UNTIL(resourceOffersTrigger);
+
+ driver.stop();
+ driver.join();
process::terminate(slave);
process::wait(slave);
WAIT_UNTIL(slaveRemovedTrigger);
- process::terminate(master2);
- process::wait(master2);
+ process::terminate(master);
+ process::wait(master);
- process::filter(NULL);
+ os::rm(path);
}