You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/20 23:41:03 UTC

svn commit: r1375239 [1/2] - in /incubator/mesos/trunk/src: ./ local/ master/ tests/

Author: benh
Date: Mon Aug 20 21:41:02 2012
New Revision: 1375239

URL: http://svn.apache.org/viewvc?rev=1375239&view=rev
Log:
Adding more tests to the allocator and improving the allocator design
to use a hierarchical allocator (two levels, users and frameworks) by
default that uses "sorters" to decide how to rank users and frameworks
(contributed by Thomas Marshall, https://reviews.apache.org/r/6037,
https://reviews.apache.org/r/5599, and
https://reviews.apache.org/r/5913).

Added:
    incubator/mesos/trunk/src/master/allocator.cpp
    incubator/mesos/trunk/src/master/drf_sorter.cpp
    incubator/mesos/trunk/src/master/drf_sorter.hpp
    incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
    incubator/mesos/trunk/src/master/sorter.hpp
    incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
Removed:
    incubator/mesos/trunk/src/master/dominant_share_allocator.cpp
    incubator/mesos/trunk/src/master/dominant_share_allocator.hpp
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/local/local.cpp
    incubator/mesos/trunk/src/local/local.hpp
    incubator/mesos/trunk/src/master/allocator.hpp
    incubator/mesos/trunk/src/master/flags.hpp
    incubator/mesos/trunk/src/master/main.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/tests/allocator_tests.cpp
    incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
    incubator/mesos/trunk/src/tests/gc_tests.cpp
    incubator/mesos/trunk/src/tests/master_detector_tests.cpp
    incubator/mesos/trunk/src/tests/master_tests.cpp
    incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
    incubator/mesos/trunk/src/tests/utils.hpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Mon Aug 20 21:41:02 2012
@@ -151,9 +151,9 @@ noinst_LTLIBRARIES += libmesos_no_third_
 nodist_libmesos_no_third_party_la_SOURCES = $(CXX_PROTOS) $(MESSAGES_PROTOS)
 
 libmesos_no_third_party_la_SOURCES = sched/sched.cpp local/local.cpp	\
-	master/master.cpp master/http.cpp master/slaves_manager.cpp	\
-	master/frameworks_manager.cpp					\
-	master/dominant_share_allocator.cpp slave/gc.cpp		\
+	master/allocator.cpp master/drf_sorter.cpp			\
+	master/frameworks_manager.cpp master/http.cpp master/master.cpp	\
+	master/slaves_manager.cpp slave/gc.cpp				\
 	slave/slave.cpp slave/http.cpp slave/isolation_module.cpp	\
 	slave/process_based_isolation_module.cpp slave/reaper.cpp	\
 	launcher/launcher.cpp exec/exec.cpp common/lock.cpp		\
@@ -197,10 +197,11 @@ libmesos_no_third_party_la_SOURCES += co
 	launcher/launcher.hpp linux/cgroups.hpp linux/fs.hpp		\
 	linux/proc.hpp local/flags.hpp local/local.hpp			\
 	logging/flags.hpp logging/logging.hpp master/allocator.hpp	\
-	master/constants.hpp master/flags.hpp				\
-	master/frameworks_manager.hpp master/http.hpp			\
-	master/master.hpp master/dominant_share_allocator.hpp		\
-	master/slaves_manager.hpp master/webui.hpp			\
+	master/constants.hpp master/drf_sorter.hpp			\
+	master/flags.hpp master/frameworks_manager.hpp			\
+	master/hierarchical_allocator_process.hpp			\
+	master/http.hpp master/master.hpp				\
+	master/slaves_manager.hpp master/sorter.hpp master/webui.hpp	\
 	messages/messages.hpp slave/constants.hpp slave/flags.hpp	\
 	slave/gc.hpp slave/http.hpp slave/isolation_module.hpp		\
 	slave/isolation_module_factory.hpp				\
@@ -781,6 +782,7 @@ mesos_tests_SOURCES = tests/main.cpp tes
 	              tests/exception_tests.cpp				\
 	              tests/attributes_tests.cpp			\
 	              tests/master_detector_tests.cpp			\
+	              tests/sorter_tests.cpp				\
 	              tests/allocator_tests.cpp
 
 mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -804,7 +806,8 @@ if HAS_JAVA
   mesos_tests_SOURCES += tests/zookeeper_server.cpp		\
                          tests/base_zookeeper_test.cpp		\
                          tests/zookeeper_server_tests.cpp	\
-                         tests/zookeeper_tests.cpp
+                         tests/zookeeper_tests.cpp		\
+                         tests/allocator_zookeeper_tests.cpp
   mesos_tests_CPPFLAGS += $(JAVA_CPPFLAGS)
   mesos_tests_CPPFLAGS += -DZOOKEEPER_VERSION=\"$(ZOOKEEPER_VERSION)\"
   mesos_tests_LDFLAGS = $(JAVA_LDFLAGS) $(AM_LDFLAGS)

Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Mon Aug 20 21:41:02 2012
@@ -33,7 +33,6 @@
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
-#include "master/dominant_share_allocator.hpp"
 #include "master/master.hpp"
 
 #include "slave/process_based_isolation_module.hpp"
@@ -41,8 +40,7 @@
 
 using namespace mesos::internal;
 
-using mesos::internal::master::Allocator;
-using mesos::internal::master::DominantShareAllocator;
+using mesos::internal::master::AllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
@@ -62,7 +60,7 @@ namespace mesos {
 namespace internal {
 namespace local {
 
-static Allocator* allocator = NULL;
+static AllocatorProcess* allocator = NULL;
 static Master* master = NULL;
 static map<IsolationModule*, Slave*> slaves;
 static MasterDetector* detector = NULL;
@@ -72,7 +70,7 @@ PID<Master> launch(int numSlaves,
                    int32_t cpus,
                    int64_t mem,
                    bool quiet,
-                   Allocator* _allocator)
+                   AllocatorProcess* _allocator)
 {
   Configuration configuration;
   configuration.set("slaves", "*");
@@ -87,7 +85,7 @@ PID<Master> launch(int numSlaves,
 }
 
 
-PID<Master> launch(const Configuration& configuration, Allocator* _allocator)
+PID<Master> launch(const Configuration& configuration, AllocatorProcess* _allocator)
 {
   int numSlaves = configuration.get<int>("num_slaves", 1);
   bool quiet = configuration.get<bool>("quiet", false);
@@ -98,7 +96,7 @@ PID<Master> launch(const Configuration& 
 
   if (_allocator == NULL) {
     // Create default allocator, save it for deleting later.
-    _allocator = allocator = new DominantShareAllocator();
+    _allocator = allocator = AllocatorProcess::create("drf", "drf");
   } else {
     // TODO(benh): Figure out the behavior of allocator pointer and remove the
     // else block.

Modified: incubator/mesos/trunk/src/local/local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.hpp (original)
+++ incubator/mesos/trunk/src/local/local.hpp Mon Aug 20 21:41:02 2012
@@ -37,12 +37,12 @@ process::PID<master::Master> launch(int 
                                     int32_t cpus,
                                     int64_t mem,
                                     bool quiet,
-                                    master::Allocator* _allocator = NULL);
+                                    master::AllocatorProcess* _allocator = NULL);
 
 
 // Launch a local cluster with a given configuration.
 process::PID<master::Master> launch(const Configuration& configuration,
-                                    master::Allocator* _allocator = NULL);
+                                    master::AllocatorProcess* _allocator = NULL);
 
 
 void shutdown();

Added: incubator/mesos/trunk/src/master/allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/allocator.cpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/allocator.cpp (added)
+++ incubator/mesos/trunk/src/master/allocator.cpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "master/allocator.hpp"
+#include "master/drf_sorter.hpp"
+#include "master/hierarchical_allocator_process.hpp"
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+AllocatorProcess* AllocatorProcess::create(
+    const string& userSorterType,
+    const string& frameworkSorterType)
+{
+  return new HierarchicalAllocatorProcess<DRFSorter, DRFSorter>();
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

Modified: incubator/mesos/trunk/src/master/allocator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/allocator.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/allocator.hpp (original)
+++ incubator/mesos/trunk/src/master/allocator.hpp Mon Aug 20 21:41:02 2012
@@ -20,7 +20,6 @@
 #define __ALLOCATOR_HPP__
 
 #include <stout/hashmap.hpp>
-#include <stout/option.hpp>
 
 #include "common/resources.hpp"
 
@@ -38,9 +37,9 @@ namespace master {
 // framework when tasks finish/fail (or are lost due to a slave
 // failure) or when an offer is rescinded.
 
-class Allocator : public process::Process<Allocator> {
+class AllocatorProcess : public process::Process<AllocatorProcess> {
 public:
-  virtual ~Allocator() {}
+  virtual ~AllocatorProcess() {}
 
   virtual void initialize(const Flags& flags,
                           const process::PID<Master>& master) = 0;
@@ -88,6 +87,9 @@ public:
   // Whenever a framework that has filtered resources wants to revive
   // offers for those resources the master invokes this callback.
   virtual void offersRevived(const FrameworkID& frameworkId) = 0;
+
+  static AllocatorProcess* create(const std::string& userSorterType,
+				  const std::string& frameworkSorterType);
 };
 
 } // namespace master {

Added: incubator/mesos/trunk/src/master/drf_sorter.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.cpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.cpp (added)
+++ incubator/mesos/trunk/src/master/drf_sorter.cpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "master/drf_sorter.hpp"
+
+using std::list;
+using std::set;
+using std::string;
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+bool DRFComparator::operator () (
+    const Client& client1,
+    const Client& client2)
+{
+  if (client1.share == client2.share) {
+    return client1.name < client2.name;
+  }
+  return client1.share < client2.share;
+}
+
+
+void DRFSorter::add(const string& name)
+{
+  Client client;
+  client.name = name;
+  client.share = 0;
+  clients.insert(client);
+
+  allocations[name] = Resources::parse("");
+}
+
+
+void DRFSorter::remove(const string& name)
+{
+  set<Client, DRFComparator>::iterator it = find(name);
+
+  if (it != clients.end()) {
+    clients.erase(it);
+  }
+
+  allocations.erase(name);
+}
+
+
+void DRFSorter::activate(const string& name)
+{
+  CHECK(allocations.contains(name));
+
+  Client client;
+  client.name = name;
+  client.share = calculateShare(name);
+  clients.insert(client);
+}
+
+
+void DRFSorter::deactivate(const string& name)
+{
+  set<Client, DRFComparator>::iterator it = find(name);
+
+  if (it != clients.end()) {
+    clients.erase(it);
+  }
+}
+
+
+void DRFSorter::allocated(
+    const string& name,
+    const Resources& resources)
+{
+  allocations[name] += resources;
+
+  // If the total resources have changed, we're going to
+  // recalculate all the shares, so don't bother just
+  // updating this client.
+  if (!dirty) {
+    update(name);
+  }
+}
+
+
+Resources DRFSorter::allocation(
+    const string& name)
+{
+  return allocations[name];
+}
+
+
+void DRFSorter::unallocated(
+    const string& name,
+    const Resources& resources)
+{
+  allocations[name] -= resources;
+
+  if (!dirty) {
+    update(name);
+  }
+}
+
+
+void DRFSorter::add(const Resources& _resources)
+{
+  resources += _resources;
+
+  // We have to recalculate all shares when the total resources
+  // change, but we put it off until sort is called
+  // so that if something else changes before the next allocation
+  // we don't recalculate everything twice.
+  dirty = true;
+}
+
+
+void DRFSorter::remove(const Resources& _resources)
+{
+  resources -= _resources;
+  dirty = true;
+}
+
+
+list<string> DRFSorter::sort()
+{
+  if (dirty) {
+    set<Client, DRFComparator> temp;
+
+    set<Client, DRFComparator>::iterator it;
+    for (it = clients.begin(); it != clients.end(); it++) {
+      Client client;
+      client.name = (*it).name;
+      client.share = calculateShare((*it).name);
+      temp.insert(client);
+    }
+
+    clients = temp;
+  }
+
+  list<string> ret;
+
+  set<Client, DRFComparator>::iterator it;
+  for (it = clients.begin(); it != clients.end(); it++) {
+    ret.push_back((*it).name);
+  }
+
+  return ret;
+}
+
+
+bool DRFSorter::contains(const string& name)
+{
+  return allocations.contains(name);
+}
+
+
+int DRFSorter::count()
+{
+  return allocations.size();
+}
+
+void DRFSorter::update(const string& name)
+{
+  set<Client, DRFComparator>::iterator it;
+  it = find(name);
+  clients.erase(it);
+
+  Client client;
+  client.name = name;
+  client.share = calculateShare(name);
+  clients.insert(client);
+}
+
+
+double DRFSorter::calculateShare(const string& name)
+{
+  double share = 0;
+
+  // TODO(benh): This implementaion of "dominant resource fairness"
+  // currently does not take into account resources that are not
+  // scalars.
+
+  foreach (const Resource& resource, resources) {
+    if (resource.type() == Value::SCALAR) {
+      double total = resource.scalar().value();
+
+      if (total > 0) {
+	Value::Scalar none;
+	const Value::Scalar& scalar = allocations[name].get(resource.name(), none);
+	share = std::max(share, scalar.value() / total);
+      }
+    }
+  }
+
+  return share;
+}
+
+
+set<Client, DRFComparator>::iterator DRFSorter::find(const string& name)
+{
+  set<Client, DRFComparator>::iterator it;
+  for (it = clients.begin(); it != clients.end(); it++) {
+    if (name == (*it).name) {
+      break;
+    }
+  }
+
+  return it;
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

Added: incubator/mesos/trunk/src/master/drf_sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.hpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.hpp (added)
+++ incubator/mesos/trunk/src/master/drf_sorter.hpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __DRF_SORTER_HPP__
+#define __DRF_SORTER_HPP__
+
+#include "master/sorter.hpp"
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+struct Client
+{
+  std::string name;
+  double share;
+};
+
+
+struct DRFComparator
+{
+  virtual bool operator () (const Client& client1,
+                            const Client& client2);
+};
+
+
+typedef std::set<Client, DRFComparator> drfSet;
+
+
+class DRFSorter : public Sorter
+{
+public:
+  virtual ~DRFSorter() {}
+
+  virtual void add(const std::string& name);
+
+  virtual void remove(const std::string& name);
+
+  virtual void activate(const std::string& name);
+
+  virtual void deactivate(const std::string& name);
+
+  virtual void allocated(const std::string& name,
+			 const Resources& resources);
+
+  virtual void unallocated(const std::string& name,
+			   const Resources& resources);
+
+  virtual Resources allocation(const std::string& name);
+
+  virtual void add(const Resources& resources);
+
+  virtual void remove(const Resources& resources);
+
+  virtual std::list<std::string> sort();
+
+  virtual bool contains(const std::string& name);
+
+  virtual int count();
+
+private:
+  // Recalculates the share for the client and moves
+  // it in 'clients' accordingly.
+  void update(const std::string& name);
+
+  // Returns the dominant resource share for the client.
+  double calculateShare(const std::string& name);
+
+  // Returns an iterator to the specified client, if
+  // it exists in this Sorter.
+  std::set<Client, DRFComparator>::iterator find(const std::string& name);
+
+  // If true, start() will recalculate all shares.
+  bool dirty;
+
+  // A set of Clients (names and shares) sorted by share.
+  std::set<Client, DRFComparator> clients;
+
+  // Maps client names to the resources they have been allocated.
+  hashmap<std::string, Resources> allocations;
+
+  // Total resources.
+  Resources resources;
+};
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __DRF_SORTER_HPP__

Modified: incubator/mesos/trunk/src/master/flags.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/flags.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/flags.hpp (original)
+++ incubator/mesos/trunk/src/master/flags.hpp Mon Aug 20 21:41:02 2012
@@ -61,6 +61,20 @@ public:
         "should be of the form: file://path/to/file",
         "*");
 
+    add(&Flags::user_sorter,
+        "user_sorter",
+        "Policy to use for allocating resources\n"
+        "between users. May be one of:\n"
+        "  dominant_resource_fairness (drf)",
+        "drf");
+ 
+    add(&Flags::framework_sorter,
+        "framework_sorter",
+        "Policy to use for allocating resources\n"
+        "between a given user's frameworks. Options\n"
+        "are the same as for user_allocator",
+        "drf");
+
     add(&Flags::batch_seconds,
         "batch_seconds",
         "Seconds to wait between batch allocations",
@@ -77,6 +91,8 @@ public:
   std::string webui_dir;
   uint16_t webui_port;
   std::string whitelist;
+  std::string user_sorter;
+  std::string framework_sorter;
   double batch_seconds;
   Option<std::string> cluster;
 };

Added: incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp (added)
+++ incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
+#define __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
+
+#include <process/delay.hpp>
+#include <process/timer.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/timer.hpp>
+
+#include "common/resources.hpp"
+
+#include "master/allocator.hpp"
+#include "master/master.hpp"
+#include "master/sorter.hpp"
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+// Forward declarations.
+class Filter;
+
+
+// Implements the basic allocator algorithm - first pick
+// a user by some criteria, then pick one of their
+// frameworks to allocate to.
+template <class UserSorter, class FrameworkSorter>
+class HierarchicalAllocatorProcess : public AllocatorProcess
+{
+public:
+  HierarchicalAllocatorProcess() : initialized(false) {}
+
+  virtual ~HierarchicalAllocatorProcess() {}
+
+  process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> > self()
+  {
+    return process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >(this);
+  }
+
+  void initialize(const Flags& flags,
+		  const process::PID<Master>& _master);
+
+  void frameworkAdded(const FrameworkID& frameworkId,
+			      const FrameworkInfo& frameworkInfo,
+			      const Resources& used);
+
+  void frameworkRemoved(const FrameworkID& frameworkId);
+
+  void frameworkActivated(const FrameworkID& frameworkId,
+                                  const FrameworkInfo& frameworkInfo);
+
+  void frameworkDeactivated(const FrameworkID& frameworkId);
+
+  void slaveAdded(const SlaveID& slaveId,
+		  const SlaveInfo& slaveInfo,
+		  const hashmap<FrameworkID, Resources>& used);
+
+  void slaveRemoved(const SlaveID& slaveId);
+
+  void updateWhitelist(
+      const Option<hashset<std::string> >& whitelist);
+
+  void resourcesRequested(
+      const FrameworkID& frameworkId,
+      const std::vector<Request>& requests);
+
+  void resourcesUnused(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const Resources& resources,
+      const Option<Filters>& filters);
+
+  void resourcesRecovered(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const Resources& resources);
+
+  void offersRevived(const FrameworkID& frameworkId);
+
+protected:
+  // Callback for doing batch allocations.
+  void batch();
+
+  // Allocate any allocatable resources.
+  void allocate();
+
+  // Allocate resources just from the specified slave.
+  void allocate(const SlaveID& slaveId);
+
+  // Allocate resources from the specified slaves.
+  void allocate(const hashset<SlaveID>& slaveIds);
+
+  // Remove a filter for the specified framework.
+  void expire(const FrameworkID& frameworkId, Filter* filter);
+
+  // Checks whether the slave is whitelisted.
+  bool isWhitelisted(const SlaveID& slave);
+
+  // Returns true if there is a filter for this framework
+  // on this slave.
+  bool isFiltered(const FrameworkID& frameworkId,
+		  const SlaveID& slaveId,
+		  const Resources& resources);
+
+  bool initialized;
+
+  Flags flags;
+  PID<Master> master;
+
+  // Maps FrameworkIDs to user names.
+  hashmap<FrameworkID, std::string> users;
+
+  // Maps user names to the Sorter object which contains
+  // all of that user's frameworks.
+  hashmap<std::string, FrameworkSorter*> sorters;
+
+  // Maps slaves to their allocatable resources.
+  hashmap<SlaveID, Resources> allocatable;
+
+  // Contains all active slaves.
+  hashmap<SlaveID, SlaveInfo> slaves;
+
+  // Filters that have been added by frameworks.
+  multihashmap<FrameworkID, Filter*> filters;
+
+  // Slaves to send offers for.
+  Option<hashset<std::string> > whitelist;
+
+  // Sorter containing all active users.
+  UserSorter* userSorter;
+};
+
+
+// Used to represent "filters" for resources unused in offers.
+class Filter
+{
+public:
+  virtual ~Filter() {}
+  virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
+};
+
+
+class RefusedFilter : public Filter
+{
+public:
+  RefusedFilter(const SlaveID& _slaveId,
+                const Resources& _resources,
+                const Timeout& _timeout)
+    : slaveId(_slaveId),
+      resources(_resources),
+      timeout(_timeout) {}
+
+  virtual bool filter(const SlaveID& slaveId, const Resources& resources)
+  {
+    return slaveId == this->slaveId &&
+      resources <= this->resources && // Refused resources are superset.
+      timeout.remaining() > 0.0;
+  }
+
+  const SlaveID slaveId;
+  const Resources resources;
+  const Timeout timeout;
+};
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::initialize(
+    const Flags& _flags,
+    const process::PID<Master>& _master)
+{
+  flags = _flags;
+  master = _master;
+  initialized = true;
+  userSorter = new UserSorter();
+
+  delay(flags.batch_seconds, self(),
+	&HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const Resources& used)
+{
+  CHECK(initialized);
+
+  std::string user  = frameworkInfo.user();
+  if (!userSorter->contains(user)) {
+    userSorter->add(user);
+    sorters[user] = new FrameworkSorter();
+  }
+
+  CHECK(!sorters[user]->contains(frameworkId.value()));
+  sorters[user]->add(frameworkId.value());
+
+  // Update the allocation to this framework.
+  userSorter->allocated(user, used);
+  sorters[user]->add(used);
+  sorters[user]->allocated(frameworkId.value(), used);
+
+  users[frameworkId] = frameworkInfo.user();
+
+  LOG(INFO) << "Added framework " << frameworkId;
+
+  allocate();
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  std::string user = users[frameworkId];
+  // Might not be in 'sorters[user]' because it was previously
+  // deactivated and never re-added.
+  if (sorters[user]->contains(frameworkId.value())) {
+    Resources allocation = sorters[user]->allocation(frameworkId.value());
+    userSorter->unallocated(user, allocation);
+    sorters[user]->remove(allocation);
+    sorters[user]->remove(frameworkId.value());
+  }
+
+  users.erase(frameworkId);
+
+  // If this user doesn't have any more active frameworks, remove it.
+  if (sorters[user]->count() == 0) {
+    Sorter* s = sorters[user];
+    sorters.erase(user);
+    delete s;
+
+    userSorter->remove(user);
+  }
+
+  foreach (Filter* filter, filters.get(frameworkId)) {
+    filters.remove(frameworkId, filter);
+
+    // Do not delete the filter, see comments in
+    // HierarchicalAllocatorProcess::offersRevived and
+    // HierarchicalAllocatorProcess::expire.
+  }
+
+  filters.remove(frameworkId);
+
+  LOG(INFO) << "Removed framework " << frameworkId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo)
+{
+  CHECK(initialized);
+
+  std::string user = frameworkInfo.user();
+  sorters[user]->activate(frameworkId.value());
+
+  LOG(INFO) << "Activated framework " << frameworkId;
+
+  allocate();
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  std::string user = users[frameworkId];
+  sorters[user]->deactivate(frameworkId.value());
+
+  // Note that the Sorter *does not* remove the resources allocated
+  // to this framework. For now, this is important because if the
+  // framework fails over and is activated, we still want a record
+  // of the resources that it is using. We might be able to collapse
+  // the added/removed and activated/deactivated in the future.
+
+  foreach (Filter* filter, filters.get(frameworkId)) {
+    filters.remove(frameworkId, filter);
+
+    // Do not delete the filter, see comments in
+    // HierarchicalAllocatorProcess::offersRevived and
+    // HierarchicalAllocatorProcess::expire.
+  }
+
+  filters.remove(frameworkId);
+
+  LOG(INFO) << "Deactivated framework " << frameworkId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
+    const SlaveID& slaveId,
+    const SlaveInfo& slaveInfo,
+    const hashmap<FrameworkID, Resources>& used)
+{
+  CHECK(initialized);
+
+  CHECK(!slaves.contains(slaveId));
+
+  slaves[slaveId] = slaveInfo;
+
+  userSorter->add(slaveInfo.resources());
+
+  Resources unused = slaveInfo.resources();
+
+  foreachpair (const FrameworkID& frameworkId, const Resources& resources, used) {
+    if (users.contains(frameworkId)) {
+      const std::string& user = users[frameworkId];
+      sorters[user]->add(resources);
+      sorters[user]->allocated(frameworkId.value(), resources);
+      userSorter->allocated(user, resources);
+    }
+
+    unused -= resources; // Only want to allocate resources that are not used!
+  }
+
+  allocatable[slaveId] = unused;
+
+  LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
+            << ") with " << slaveInfo.resources()
+            << " (and " << unused << " available)";
+
+  allocate(slaveId);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(const SlaveID& slaveId)
+{
+  CHECK(initialized);
+
+  CHECK(slaves.contains(slaveId));
+
+  userSorter->remove(slaves[slaveId].resources());
+
+  slaves.erase(slaveId);
+
+  allocatable.erase(slaveId);
+
+  // Note that we DO NOT actually delete any filters associated with
+  // this slave, that will occur when the delayed
+  // HierarchicalAllocatorProcess::expire gets invoked (or the framework
+  // that applied the filters gets removed).
+
+  LOG(INFO) << "Removed slave " << slaveId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
+    const Option<hashset<std::string> >& _whitelist)
+{
+  CHECK(initialized);
+
+  whitelist = _whitelist;
+
+  if (whitelist.isSome()) {
+    LOG(INFO) << "Updated slave white list:";
+    foreach (const std::string& hostname, whitelist.get()) {
+      LOG(INFO) << "\t" << hostname;
+    }
+  }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRequested(
+    const FrameworkID& frameworkId,
+    const std::vector<Request>& requests)
+{
+  CHECK(initialized);
+
+  LOG(INFO) << "Received resource request from framework " << frameworkId;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources,
+    const Option<Filters>& filters)
+{
+  CHECK(initialized);
+
+  if (resources.allocatable().size() == 0) {
+    return;
+  }
+
+  VLOG(1) << "Framework " << frameworkId
+          << " left " << resources.allocatable()
+          << " unused on slave " << slaveId;
+
+  // Update resources allocated to framework. It is
+  // not possible for the user to not be in users
+  // because resourcesUnused is only called as the
+  // result of a valid task launch by an active
+  // framework that doesn't use the entire offer.
+  CHECK(users.contains(frameworkId));
+
+  std::string user = users[frameworkId];
+  sorters[user]->unallocated(frameworkId.value(), resources);
+  sorters[user]->remove(resources);
+  userSorter->unallocated(user, resources);
+
+  // Update resources allocatable on slave.
+  CHECK(allocatable.contains(slaveId));
+  allocatable[slaveId] += resources;
+
+  // Create a refused resources filter.
+  double timeout = filters.isSome()
+    ? filters.get().refuse_seconds()
+    : Filters().refuse_seconds();
+
+  if (timeout != 0.0) {
+    LOG(INFO) << "Framework " << frameworkId
+	      << " filtered slave " << slaveId
+	      << " for " << timeout << " seconds\n";
+
+    // Create a new filter and delay it's expiration.
+    mesos::internal::master::Filter* filter = new RefusedFilter(slaveId, resources, timeout);
+    this->filters.put(frameworkId, filter);
+
+    delay(timeout, self(),
+          &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire, frameworkId, filter);
+  }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources)
+{
+  CHECK(initialized);
+
+  if (resources.allocatable().size() == 0) {
+    return;
+  }
+
+  // Updated resources allocated to framework (if framework still
+  // exists, which it might not in the event that we dispatched
+  // Master::offer before we received AllocatorProcess::frameworkRemoved
+  // or AllocatorProcess::frameworkDeactivated, in which case we will
+  // have already recovered all of its resources).
+  if (users.contains(frameworkId) &&
+      sorters[users[frameworkId]]->contains(frameworkId.value())) {
+    std::string user = users[frameworkId];
+    sorters[user]->unallocated(frameworkId.value(), resources);
+    sorters[user]->remove(resources);
+    userSorter->unallocated(user, resources);
+  }
+
+  // Update resources allocatable on slave (if slave still exists,
+  // which it might not in the event that we dispatched Master::offer
+  // before we received Allocator::slaveRemoved).
+  if (allocatable.contains(slaveId)) {
+    allocatable[slaveId] += resources;
+
+    VLOG(1) << "Recovered " << resources.allocatable()
+            << " on slave " << slaveId
+            << " from framework " << frameworkId;
+
+  }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(const FrameworkID& frameworkId)
+{
+  CHECK(initialized);
+
+  foreach (Filter* filter, filters.get(frameworkId)) {
+    filters.remove(frameworkId, filter);
+
+    // We delete each actual Filter when
+    // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
+    // Filter here it's possible that the same Filter (i.e., same
+    // address) could get reused and HierarchicalAllocatorProcess::expire
+    // would expire that filter too soon. Note that this only works
+    // right now because ALL Filter types "expire".
+  }
+
+  filters.remove(frameworkId);
+
+  LOG(INFO) << "Removed filters for framework " << frameworkId;
+
+  allocate();
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch()
+{
+  CHECK(initialized);
+  allocate();
+  delay(flags.batch_seconds, self(),
+	&HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate()
+{
+  CHECK(initialized);
+
+  ::Timer timer;
+  timer.start();
+
+  allocate(slaves.keys());
+
+  LOG(INFO) << "Performed allocation for "
+            << slaves.size() << " slaves in "
+            << timer.elapsed().millis() << " milliseconds";
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const SlaveID& slaveId)
+{
+  CHECK(initialized);
+
+  hashset<SlaveID> slaveIds;
+  slaveIds.insert(slaveId);
+
+  ::Timer timer;
+  timer.start();
+
+  allocate(slaveIds);
+
+  LOG(INFO) << "Performed allocation for slave "
+            << slaveId << " in "
+            << timer.elapsed().millis() << " milliseconds";
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const hashset<SlaveID>& slaveIds)
+{
+  CHECK(initialized);
+
+  if (userSorter->count() == 0) {
+    VLOG(1) << "No users to allocate resources!";
+    return;
+  }
+
+  // Get out only "available" resources (i.e., resources that are
+  // allocatable and above a certain threshold, see below).
+  hashmap<SlaveID, Resources> available;
+  foreachpair (const SlaveID& slaveId, Resources resources, allocatable) {
+    if (!slaveIds.contains(slaveId)) {
+      continue;
+    }
+
+    if (isWhitelisted(slaveId)) {
+      resources = resources.allocatable(); // Make sure they're allocatable.
+
+      // TODO(benh): For now, only make offers when there is some cpu
+      // and memory left. This is an artifact of the original code that
+      // only offered when there was at least 1 cpu "unit" available,
+      // and without doing this a framework might get offered resources
+      // with only memory available (which it obviously will decline)
+      // and then end up waiting the default Filters::refuse_seconds
+      // (unless the framework set it to something different).
+
+      Value::Scalar none;
+      Value::Scalar cpus = resources.get("cpus", none);
+      Value::Scalar mem = resources.get("mem", none);
+
+      if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
+	VLOG(1) << "Found available resources: " << resources
+		<< " on slave " << slaveId;
+	available[slaveId] = resources;
+      }
+    }
+  }
+
+  if (available.size() == 0) {
+    VLOG(1) << "No resources available to allocate!";
+    return;
+  }
+
+  foreach (const std::string& user, userSorter->sort()) {
+    foreach (const std::string& frameworkIdValue, sorters[user]->sort()) {
+      FrameworkID frameworkId;
+      frameworkId.set_value(frameworkIdValue);
+
+      Resources allocatedResources;
+      hashmap<SlaveID, Resources> offerable;
+      foreachpair (const SlaveID& slaveId, const Resources& resources, available) {
+	// Check whether or not this framework filters this slave.
+	bool filtered = isFiltered(frameworkId, slaveId, resources);
+
+	if (!filtered) {
+	  VLOG(1) << "Offering " << resources
+		  << " on slave " << slaveId
+		  << " to framework " << frameworkId;
+	  offerable[slaveId] = resources;
+
+	  // Update framework and slave resources.
+	  allocatable[slaveId] -= resources;
+	  allocatedResources += resources;
+	}
+      }
+
+      if (offerable.size() > 0) {
+	foreachkey (const SlaveID& slaveId, offerable) {
+	  available.erase(slaveId);
+	}
+
+	sorters[user]->add(allocatedResources);
+	sorters[user]->allocated(frameworkIdValue, allocatedResources);
+	userSorter->allocated(user, allocatedResources);
+
+	dispatch(master, &Master::offer, frameworkId, offerable);
+      }
+    }
+  }
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
+    const FrameworkID& frameworkId,
+    Filter* filter)
+{
+  // The filter might have already been removed (e.g., if the
+  // framework no longer exists or in
+  // HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
+  // keep the address from getting reused possibly causing premature
+  // expiration).
+  if (users.contains(frameworkId) &&
+      filters.contains(frameworkId, filter)) {
+    filters.remove(frameworkId, filter);
+  }
+
+  delete filter;
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isWhitelisted(
+    const SlaveID& slaveId)
+{
+  CHECK(initialized);
+
+  CHECK(slaves.contains(slaveId));
+
+  return whitelist.isNone() ||
+    whitelist.get().contains(slaves[slaveId].hostname());
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources)
+{
+  bool filtered = false;
+  foreach (Filter* filter, filters.get(frameworkId)) {
+    if (filter->filter(slaveId, resources)) {
+      VLOG(1) << "Filtered " << resources
+	      << " on slave " << slaveId
+	      << " for framework " << frameworkId;
+      filtered = true;
+      break;
+    }
+  }
+  return filtered;
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__

Modified: incubator/mesos/trunk/src/master/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/main.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/main.cpp (original)
+++ incubator/mesos/trunk/src/master/main.cpp Mon Aug 20 21:41:02 2012
@@ -33,7 +33,6 @@
 #include "logging/logging.hpp"
 
 #include "master/allocator.hpp"
-#include "master/dominant_share_allocator.hpp"
 #include "master/master.hpp"
 #include "master/webui.hpp"
 
@@ -116,7 +115,8 @@ int main(int argc, char** argv)
   LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
   LOG(INFO) << "Starting Mesos master";
 
-  Allocator* allocator = new DominantShareAllocator();
+  AllocatorProcess* allocator = AllocatorProcess::create(flags.user_sorter,
+							 flags.framework_sorter);
 
   Master* master = new Master(allocator, flags);
   process::spawn(master);

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Mon Aug 20 21:41:02 2012
@@ -60,7 +60,7 @@ namespace master {
 
 class WhitelistWatcher : public Process<WhitelistWatcher> {
 public:
-  WhitelistWatcher(const string& _path, Allocator* _allocator)
+  WhitelistWatcher(const string& _path, AllocatorProcess* _allocator)
   : path(_path), allocator(_allocator) {}
 
 protected:
@@ -104,7 +104,7 @@ protected:
 
     // Send the whitelist to allocator, if necessary.
     if (whitelist != lastWhitelist) {
-      dispatch(allocator, &Allocator::updateWhitelist, whitelist);
+      dispatch(allocator, &AllocatorProcess::updateWhitelist, whitelist);
     }
 
     // Check again.
@@ -113,7 +113,7 @@ protected:
   }
 
 private:
-  Allocator* allocator;
+  AllocatorProcess* allocator;
   const string path;
   Option<hashset<string> > lastWhitelist;
 };
@@ -255,14 +255,14 @@ struct SlaveReregistrar
 };
 
 
-Master::Master(Allocator* _allocator)
+Master::Master(AllocatorProcess* _allocator)
   : ProcessBase("master"),
     allocator(_allocator),
     flags()
 {}
 
 
-Master::Master(Allocator* _allocator,
+Master::Master(AllocatorProcess* _allocator,
                const flags::Flags<logging::Flags, master::Flags>& _flags)
   : ProcessBase("master"),
     allocator(_allocator),
@@ -321,7 +321,7 @@ void Master::initialize()
 
   // Spawn the allocator.
   spawn(allocator);
-  dispatch(allocator, &Allocator::initialize, flags, self());
+  dispatch(allocator, &AllocatorProcess::initialize, flags, self());
 
   // Parse the white list
   whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
@@ -484,7 +484,7 @@ void Master::exited(const UPID& pid)
       framework->active = false;
 
       // Tell the allocator to stop allocating resources to this framework.
-      dispatch(allocator, &Allocator::frameworkDeactivated, framework->id);
+      dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
 
       double failoverTimeout = framework->info.failover_timeout();
 
@@ -498,7 +498,7 @@ void Master::exited(const UPID& pid)
 
       // Remove the framework's offers.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        dispatch(allocator, &Allocator::resourcesRecovered,
+        dispatch(allocator, &AllocatorProcess::resourcesRecovered,
                  offer->framework_id(),
                  offer->slave_id(),
                  Resources(offer->resources()));
@@ -647,7 +647,7 @@ void Master::reregisterFramework(const F
       // replied to the offers but the driver might have dropped
       // those messages since it wasn't connected to the master.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        dispatch(allocator, &Allocator::resourcesRecovered,
+        dispatch(allocator, &AllocatorProcess::resourcesRecovered,
 		 offer->framework_id(), offer->slave_id(), offer->resources());
         removeOffer(offer);
       }
@@ -743,7 +743,7 @@ void Master::deactivateFramework(const F
 void Master::resourceRequest(const FrameworkID& frameworkId,
                              const vector<Request>& requests)
 {
-  dispatch(allocator, &Allocator::resourcesRequested, frameworkId, requests);
+  dispatch(allocator, &AllocatorProcess::resourcesRequested, frameworkId, requests);
 }
 
 
@@ -793,7 +793,7 @@ void Master::reviveOffers(const Framewor
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
     LOG(INFO) << "Reviving offers for framework " << framework->id;
-    dispatch(allocator, &Allocator::offersRevived, framework->id);
+    dispatch(allocator, &AllocatorProcess::offersRevived, framework->id);
   }
 }
 
@@ -1108,7 +1108,7 @@ void Master::exitedExecutor(const SlaveI
                 << " (" << slave->info.hostname() << ")"
                 << " exited with status " << status;
 
-      dispatch(allocator, &Allocator::resourcesRecovered,
+      dispatch(allocator, &AllocatorProcess::resourcesRecovered,
                frameworkId,
                slaveId,
                Resources(executor.resources()));
@@ -1190,7 +1190,7 @@ void Master::offer(const FrameworkID& fr
                  << " has terminated or is inactive";
 
     foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
-      dispatch(allocator, &Allocator::resourcesRecovered,
+      dispatch(allocator, &AllocatorProcess::resourcesRecovered,
                frameworkId, slaveId, offered);
     }
     return;
@@ -1206,7 +1206,7 @@ void Master::offer(const FrameworkID& fr
                    << frameworkId << " because slave " << slaveId
                    << " is not valid";
 
-      dispatch(allocator, &Allocator::resourcesRecovered,
+      dispatch(allocator, &AllocatorProcess::resourcesRecovered,
                frameworkId, slaveId, offered);
       continue;
     }
@@ -1507,7 +1507,7 @@ void Master::processTasks(Offer* offer,
 
   if (unusedResources.allocatable().size() > 0) {
     // Tell the allocator about the unused (e.g., refused) resources.
-    dispatch(allocator, &Allocator::resourcesUnused,
+    dispatch(allocator, &AllocatorProcess::resourcesUnused,
 	     offer->framework_id(),
 	     offer->slave_id(),
 	     unusedResources,
@@ -1593,7 +1593,7 @@ void Master::addFramework(Framework* fra
   message.mutable_master_info()->MergeFrom(info);
   send(framework->pid, message);
 
-  dispatch(allocator, &Allocator::frameworkAdded,
+  dispatch(allocator, &AllocatorProcess::frameworkAdded,
            framework->id, framework->info, framework->resources);
 }
 
@@ -1618,7 +1618,7 @@ void Master::failoverFramework(Framework
   // Make sure we can get offers again.
   if (!framework->active) {
     framework->active = true;
-    dispatch(allocator, &Allocator::frameworkActivated,
+    dispatch(allocator, &AllocatorProcess::frameworkActivated,
              framework->id, framework->info);
   }
 
@@ -1637,7 +1637,7 @@ void Master::failoverFramework(Framework
   // these resources to this framework if it wants.
   // TODO(benh): Consider just reoffering these to
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    dispatch(allocator, &Allocator::resourcesRecovered,
+    dispatch(allocator, &AllocatorProcess::resourcesRecovered,
              offer->framework_id(),
              offer->slave_id(),
              Resources(offer->resources()));
@@ -1650,7 +1650,7 @@ void Master::removeFramework(Framework* 
 {
   if (framework->active) {
     // Tell the allocator to stop allocating resources to this framework.
-    dispatch(allocator, &Allocator::frameworkDeactivated, framework->id);
+    dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
   }
 
   // Tell slaves to shutdown the framework.
@@ -1671,7 +1671,7 @@ void Master::removeFramework(Framework* 
 
   // Remove the framework's offers (if they weren't removed before).
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    dispatch(allocator, &Allocator::resourcesRecovered,
+    dispatch(allocator, &AllocatorProcess::resourcesRecovered,
              offer->framework_id(),
              offer->slave_id(),
              Resources(offer->resources()));
@@ -1685,7 +1685,7 @@ void Master::removeFramework(Framework* 
       foreachpair (const ExecutorID& executorId,
                    const ExecutorInfo& executorInfo,
                    framework->executors[slaveId]) {
-        dispatch(allocator, &Allocator::resourcesRecovered,
+        dispatch(allocator, &AllocatorProcess::resourcesRecovered,
                  framework->id,
                  slave->id,
                  executorInfo.resources());
@@ -1709,7 +1709,8 @@ void Master::removeFramework(Framework* 
 
   // Delete it.
   frameworks.erase(framework->id);
-  dispatch(allocator, &Allocator::frameworkRemoved, framework->id);
+  dispatch(allocator, &AllocatorProcess::frameworkRemoved, framework->id);
+
   delete framework;
 }
 
@@ -1747,7 +1748,7 @@ void Master::addSlave(Slave* slave, bool
   spawn(slave->observer);
 
   if (!reregister) {
-    dispatch(allocator, &Allocator::slaveAdded,
+    dispatch(allocator, &AllocatorProcess::slaveAdded,
              slave->id, slave->info, hashmap<FrameworkID, Resources>());
   }
 }
@@ -1817,7 +1818,7 @@ void Master::readdSlave(Slave* slave,
     resources[task.framework_id()] += task.resources();
   }
 
-  dispatch(allocator, &Allocator::slaveAdded,
+  dispatch(allocator, &AllocatorProcess::slaveAdded,
            slave->id, slave->info, resources);
 }
 
@@ -1892,7 +1893,7 @@ void Master::removeSlave(Slave* slave)
 
   // Delete it.
   slaves.erase(slave->id);
-  dispatch(allocator, &Allocator::slaveRemoved, slave->id);
+  dispatch(allocator, &AllocatorProcess::slaveRemoved, slave->id);
   delete slave;
 }
 
@@ -1911,7 +1912,7 @@ void Master::removeTask(Task* task)
   slave->removeTask(task);
 
   // Tell the allocator about the recovered resources.
-  dispatch(allocator, &Allocator::resourcesRecovered,
+  dispatch(allocator, &AllocatorProcess::resourcesRecovered,
            task->framework_id(),
            task->slave_id(),
            Resources(task->resources()));

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Mon Aug 20 21:41:02 2012
@@ -56,7 +56,7 @@ namespace master {
 using namespace process; // Included to make code easier to read.
 
 // Some forward declarations.
-class Allocator;
+class AllocatorProcess;
 class SlavesManager;
 struct Framework;
 struct Slave;
@@ -67,8 +67,8 @@ class WhitelistWatcher;
 class Master : public ProtobufProcess<Master>
 {
 public:
-  Master(Allocator* allocator);
-  Master(Allocator* allocator,
+  Master(AllocatorProcess* allocator);
+  Master(AllocatorProcess* allocator,
          const flags::Flags<logging::Flags, master::Flags>& flags);
 
   virtual ~Master();
@@ -204,7 +204,7 @@ private:
 
   bool elected;
 
-  Allocator* allocator;
+  AllocatorProcess* allocator;
   SlavesManager* slavesManager;
   WhitelistWatcher* whitelistWatcher;
 

Added: incubator/mesos/trunk/src/master/sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/sorter.hpp?rev=1375239&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master/sorter.hpp (added)
+++ incubator/mesos/trunk/src/master/sorter.hpp Mon Aug 20 21:41:02 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SORTER_HPP__
+#define __SORTER_HPP__
+
+#include "master/master.hpp"
+
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+// Sorters implement the logic for determining the
+// order in which users or frameworks should receive
+// resource allocations.
+class Sorter
+{
+public:
+  virtual ~Sorter() {}
+
+  // Adds a client to allocate resources to. A client
+  // may be a user or a framework.
+  virtual void add(const std::string& client) = 0;
+
+  // Removes a client.
+  virtual void remove(const std::string& client) = 0;
+
+  // Readds a client to the sort after deactivate.
+  virtual void activate(const std::string& client) = 0;
+
+  // Removes a client from the sort, so it won't get
+  // allocated to.
+  virtual void deactivate(const std::string& client) = 0;
+
+  // Specify that resources have been allocated to the
+  // given client.
+  virtual void allocated(const std::string& client,
+			 const Resources& resources) = 0;
+
+  // Specify that resources have been unallocated from
+  // the given client.
+  virtual void unallocated(const std::string& client,
+			   const Resources& resources) = 0;
+
+  // Returns the resources that have been allocated to
+  // this client.
+  virtual Resources allocation(const std::string& client) = 0;
+
+  // Add resources to the total pool of resources this
+  // Sorter should consider.
+  virtual void add(const Resources& resources) = 0;
+
+  // Remove resources from the total pool.
+  virtual void remove(const Resources& resources) = 0;
+
+  // Returns a list of all clients, in the order that they
+  // should be allocated to, according to this Sorter's policy.
+  virtual std::list<std::string> sort() = 0;
+
+  // Returns true if this Sorter contains the specified client,
+  // either active or deactivated.
+  virtual bool contains(const std::string& client) = 0;
+
+  // Returns the number of clients this Sorter contains,
+  // either active or deactivated.
+  virtual int count() = 0;
+};
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SORTER_HPP__

Modified: incubator/mesos/trunk/src/tests/allocator_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_tests.cpp?rev=1375239&r1=1375238&r2=1375239&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_tests.cpp Mon Aug 20 21:41:02 2012
@@ -25,20 +25,17 @@
 #include "detector/detector.hpp"
 
 #include "master/allocator.hpp"
-#include "master/dominant_share_allocator.hpp"
 #include "master/master.hpp"
 
 #include "slave/process_based_isolation_module.hpp"
 
-#include "tests/base_zookeeper_test.hpp"
 #include "tests/utils.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::test;
 
-using mesos::internal::master::Allocator;
-using mesos::internal::master::DominantShareAllocator;
+using mesos::internal::master::AllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::ProcessBasedIsolationModule;
@@ -46,34 +43,21 @@ using mesos::internal::slave::Slave;
 
 using process::PID;
 
+using std::map;
 using std::string;
 using std::vector;
 
-using testing::AnyNumber;
+using testing::_;
 using testing::ByRef;
+using testing::DoAll;
 using testing::DoDefault;
 using testing::Eq;
-using testing::_;
+using testing::InSequence;
 using testing::Return;
 using testing::SaveArg;
 
-void checkResources(vector<Offer> offers, int cpus, int mem)
-{
-  EXPECT_EQ(offers.size(), 1);
-
-  EXPECT_EQ(offers[0].resources().size(), 2);
-
-  foreach (const Resource& resource, offers[0].resources()) {
-    if (resource.name() == "cpus") {
-      EXPECT_EQ(resource.scalar().value(), cpus);
-    } else if (resource.name() == "mem") {
-      EXPECT_EQ(resource.scalar().value(), mem);
-    }
-  }
-}
 
-
-TEST(AllocatorTest, DominantShareAllocator)
+TEST(AllocatorTest, DRFAllocatorProcess)
 {
   FrameworkInfo frameworkInfo1;
   frameworkInfo1.set_name("framework1");
@@ -90,7 +74,7 @@ TEST(AllocatorTest, DominantShareAllocat
   frameworkInfo3.set_user("user1");
   FrameworkID frameworkId3;
 
-  MockAllocator<DominantShareAllocator> a;
+  MockAllocator<TestAllocatorProcess > a;
 
   EXPECT_CALL(a, initialize(_, _));
 
@@ -165,7 +149,7 @@ TEST(AllocatorTest, DominantShareAllocat
 
   WAIT_UNTIL(resourceOfferTrigger);
 
-  checkResources(offers1, 2, 1024);
+  EXPECT_THAT(offers1, OfferEq(2, 1024));
 
   MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master);
@@ -193,7 +177,7 @@ TEST(AllocatorTest, DominantShareAllocat
 
   WAIT_UNTIL(resourceOfferTrigger2);
 
-  checkResources(offers2, 1, 512);
+  EXPECT_THAT(offers2, OfferEq(1, 512));
 
   Resources resources3 = Resources::parse("cpus:3;mem:2048");
   Slave s3(resources3, true, &isolationModule);
@@ -202,7 +186,7 @@ TEST(AllocatorTest, DominantShareAllocat
 
   WAIT_UNTIL(resourceOfferTrigger3);
 
-  checkResources(offers3, 3, 2048);
+  EXPECT_THAT(offers3, OfferEq(3, 2048));
 
   MockScheduler sched3;
   MesosSchedulerDriver driver3(&sched3, frameworkInfo3, master);
@@ -226,7 +210,7 @@ TEST(AllocatorTest, DominantShareAllocat
 
   WAIT_UNTIL(resourceOfferTrigger4);
 
-  checkResources(offers4, 4, 4096);
+  EXPECT_THAT(offers4, OfferEq(4, 4096));
 
   driver3.stop();
   driver2.stop();
@@ -253,7 +237,7 @@ TEST(AllocatorTest, DominantShareAllocat
 }
 
 
-template <typename T = Allocator>
+template <typename T>
 class AllocatorTest : public ::testing::Test
 {
 protected:
@@ -274,7 +258,6 @@ protected:
 
 // Causes all TYPED_TEST(AllocatorTest, ...) to be run for
 // each of the specified Allocator classes.
-typedef ::testing::Types<DominantShareAllocator> AllocatorTypes;
 TYPED_TEST_CASE(AllocatorTest, AllocatorTypes);
 
 
@@ -326,7 +309,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
 
   WAIT_UNTIL(resourceOffers);
 
-  checkResources(offers, 2, 1024);
+  EXPECT_THAT(offers, OfferEq(2, 1024));
 
   driver.stop();
 
@@ -417,7 +400,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
 
   WAIT_UNTIL(offered);
 
-  checkResources(offers, 1, 512);
+  EXPECT_THAT(offers, OfferEq(1, 512));
 
   driver1.stop();
   driver2.stop();
@@ -517,7 +500,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
 
   WAIT_UNTIL(offered);
 
-  checkResources(offers, 2, 1024);
+  EXPECT_THAT(offers, OfferEq(2, 1024));
 
   driver.stop();
   driver.join();
@@ -527,7 +510,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
   // Re-dispatch the resourcesRecovered call which we "caught"
   // earlier now that the framework has been removed, to test
   // that recovering resources from a removed framework works.
-  dispatch(this->allocator, &Allocator::resourcesRecovered,
+  dispatch(this->allocator, &AllocatorProcess::resourcesRecovered,
 	   frameworkId, slaveId, savedResources);
 
   MockScheduler sched2;
@@ -545,7 +528,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
 
   WAIT_UNTIL(offered2);
 
-  checkResources(offers2, 2, 1024);
+  EXPECT_THAT(offers2, OfferEq(2, 1024));
 
   driver2.stop();
   driver2.join();
@@ -562,126 +545,725 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
 }
 
 
-template <typename T = Allocator>
-class MasterFailoverAllocatorTest : public BaseZooKeeperTest
+TYPED_TEST(AllocatorTest, SchedulerFailover)
 {
-public:
-  static void SetUpTestCase() {
-    BaseZooKeeperTest::SetUpTestCase();
-  }
+  MockFilter filter;
+  process::filter(&filter);
 
-protected:
-  T allocator1;
-  MockAllocator<T> allocator2;
-};
+  EXPECT_MESSAGE(filter, Eq(UnregisterFrameworkMessage().GetTypeName()), _, _)
+    .WillRepeatedly(Return(true));
+
+  EXPECT_CALL(this->allocator, initialize(_, _));
+
+  EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+  trigger frameworkRemovedTrigger;
+  EXPECT_CALL(this->allocator, frameworkRemoved(_))
+    .WillOnce(Trigger(&frameworkRemovedTrigger));
+
+  EXPECT_CALL(this->allocator, frameworkActivated(_, _));
+
+  trigger frameworkDeactivatedTrigger;
+  EXPECT_CALL(this->allocator, frameworkDeactivated(_))
+    .WillOnce(DoAll(InvokeFrameworkDeactivated(&this->allocator),
+		    Trigger(&frameworkDeactivatedTrigger)))
+    .WillOnce(DoDefault());
 
+  EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+  trigger slaveRemovedTrigger;
+  EXPECT_CALL(this->allocator, slaveRemoved(_))
+    .WillOnce(Trigger(&slaveRemovedTrigger));
+
+  trigger resourcesRecoveredTrigger;
+  EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+    .WillRepeatedly(DoDefault());
+
+  // We don't filter the unused resources to make sure that
+  // they get offered to the framework as soon as it fails over.
+  EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+    .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0));
+
+  Master m(&this->allocator);
+  PID<Master> master = process::spawn(&m);
 
-// Runs TYPED_TEST(MasterFailoverAllocatorTest, ...) on all AllocatorTypes.
-TYPED_TEST_CASE(MasterFailoverAllocatorTest, AllocatorTypes);
+  MockExecutor exec;
 
+  EXPECT_CALL(exec, registered(_, _, _, _));
 
-TYPED_TEST(MasterFailoverAllocatorTest, MasterFailover)
+  trigger launchTaskTrigger;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(Trigger(&launchTaskTrigger));
+
+  EXPECT_CALL(exec, shutdown(_));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  EXPECT_CALL(isolationModule, resourcesChanged(_, _, _));
+
+  Resources resources = Resources::parse("cpus:3;mem:1024");
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+  BasicMasterDetector detector(master, slave, true);
+
+  FrameworkInfo frameworkInfo1;
+  frameworkInfo1.set_name("framework1");
+  frameworkInfo1.set_user("user1");
+  frameworkInfo1.set_failover_timeout(.5);
+
+  // Launch the first (i.e., failing) scheduler.
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, frameworkInfo1, master);
+
+  FrameworkID frameworkId;
+
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  // We don't know how many times the allocator might try to
+  // allocate to the failing framework before it get killed.
+  EXPECT_CALL(sched1, resourceOffers(_, _))
+    .WillRepeatedly(DeclineOffers());
+
+  // Initially, all cluster resources are avaliable.
+  trigger resourceOffersTrigger1;
+  EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+    .WillOnce(DoAll(LaunchTasks(1, 1, 256),
+		    Trigger(&resourceOffersTrigger1)));
+
+  driver1.start();
+
+  WAIT_UNTIL(resourceOffersTrigger1);
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler.
+  MockScheduler sched2;
+
+  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+  framework2 = DEFAULT_FRAMEWORK_INFO;
+  framework2.mutable_id()->MergeFrom(frameworkId);
+
+  MesosSchedulerDriver driver2(&sched2, framework2, master);
+
+  EXPECT_CALL(sched2, registered(_, frameworkId, _));
+
+  // Even though the scheduler failed over, the 1 cpu, 512 mem
+  // task that it launched earlier should still be running, so
+  // only 2 cpus and 768 mem are available.
+  trigger resourceOffersTrigger2;
+  EXPECT_CALL(sched2, resourceOffers(_, OfferEq(2, 768)))
+    .WillOnce(Trigger(&resourceOffersTrigger2));
+
+  // Ensures that the task has been completely launched
+  // before we have the framework fail over.
+  WAIT_UNTIL(launchTaskTrigger);
+
+  driver1.stop();
+
+  WAIT_UNTIL(frameworkDeactivatedTrigger);
+
+  driver2.start();
+
+  WAIT_UNTIL(resourceOffersTrigger2);
+
+  driver2.stop();
+  driver2.join();
+
+  WAIT_UNTIL(frameworkRemovedTrigger);
+
+  process::terminate(slave);
+  process::wait(slave);
+
+  WAIT_UNTIL(slaveRemovedTrigger);
+
+  process::terminate(master);
+  process::wait(master);
+
+  process::filter(NULL);
+}
+
+
+TYPED_TEST(AllocatorTest, FrameworkExited)
 {
-  trigger slaveAdded;
-  EXPECT_CALL(this->allocator2, initialize(_, _));
+  EXPECT_CALL(this->allocator, initialize(_, _));
+
+  EXPECT_CALL(this->allocator, frameworkAdded(_, _, _))
+    .Times(2);
 
-  trigger frameworkAdded;
-  EXPECT_CALL(this->allocator2, frameworkAdded(_, _, _));
+  EXPECT_CALL(this->allocator, frameworkRemoved(_))
+    .Times(2);
 
-  EXPECT_CALL(this->allocator2, frameworkDeactivated(_));
+  EXPECT_CALL(this->allocator, frameworkDeactivated(_))
+    .Times(2);
+
+  EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+  trigger slaveRemovedTrigger;
+  EXPECT_CALL(this->allocator, slaveRemoved(_))
+    .WillOnce(Trigger(&slaveRemovedTrigger));
+
+  EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+    .WillRepeatedly(DoDefault());
+
+  EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+    .WillRepeatedly(DoDefault());
+
+  Master m(&this->allocator);
+  PID<Master> master = process::spawn(m);
+
+  MockExecutor exec;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(2);
+
+  trigger launchTaskTrigger;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(Trigger(&launchTaskTrigger))
+    .WillOnce(DoDefault());
+
+  trigger shutdownTrigger;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(DoDefault())
+    .WillOnce(Trigger(&shutdownTrigger));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+  EXPECT_CALL(isolationModule, resourcesChanged(_, _, _))
+    .Times(2);
+
+  Resources resources1 = Resources::parse("cpus:3;mem:1024");
+  Slave s1(resources1, true, &isolationModule);
+  PID<Slave> slave1 = process::spawn(s1);
+  BasicMasterDetector detector1(master, slave1, true);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched1, registered(_, _, _));
+
+  EXPECT_CALL(sched1, resourceOffers(_, _))
+    .WillRepeatedly(DeclineOffers());
+
+  // The first time the framework is offered resources,
+  // all of the cluster's resources should be avaliable.
+  trigger resourceOffersTrigger1;
+  EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+    .WillOnce(DoAll(LaunchTasks(1, 2, 512),
+		    Trigger(&resourceOffersTrigger1)));
+
+  driver1.start();
+
+  WAIT_UNTIL(resourceOffersTrigger1);
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched2, registered(_, _, _));
+
+  EXPECT_CALL(sched2, resourceOffers(_, _))
+    .WillRepeatedly(DeclineOffers());
+
+  // The first time sched2 gets an offer, framework 1 has a
+  // task running with 2 cpus and 512 mem, leaving 1 cpu and
+  // 512 mem.
+  trigger resourceOffersTrigger2;
+  EXPECT_CALL(sched2, resourceOffers(_, OfferEq(1, 512)))
+    .WillOnce(DoAll(LaunchTasks(1, 1, 256),
+		    Trigger(&resourceOffersTrigger2)));
+
+  // After we kill framework 1, all of it's resources should
+  // have been returned, but framework 2 should still have a
+  // task with 1 cpu and 256 mem, leaving 2 cpus and 768 mem.
+  trigger resourceOffersTrigger3;
+  EXPECT_CALL(sched2, resourceOffers(_, OfferEq(2, 768)))
+    .WillOnce(Trigger(&resourceOffersTrigger3));
+
+  driver2.start();
+
+  WAIT_UNTIL(resourceOffersTrigger2);
+
+  // Ensures that framework 1's task is completely launched
+  // before we kill the framework to test if its resources
+  // are recovered correctly.
+  WAIT_UNTIL(launchTaskTrigger);
+
+  driver1.stop();
+  driver1.join();
+
+  WAIT_UNTIL(resourceOffersTrigger3);
+
+  driver2.stop();
+  driver2.join();
+
+  WAIT_UNTIL(shutdownTrigger);
+
+  process::terminate(slave1);
+  process::wait(slave1);
+
+  WAIT_UNTIL(slaveRemovedTrigger);
+
+  process::terminate(master);
+  process::wait(master);
+}
+
+
+TYPED_TEST(AllocatorTest, SlaveLost)
+{
+  EXPECT_CALL(this->allocator, initialize(_, _));
+
+  EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
 
   trigger frameworkRemovedTrigger;
-  EXPECT_CALL(this->allocator2, frameworkRemoved(_))
+  EXPECT_CALL(this->allocator, frameworkRemoved(_))
     .WillOnce(Trigger(&frameworkRemovedTrigger));
 
-  EXPECT_CALL(this->allocator2, slaveAdded(_, _, _));
+  EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+  EXPECT_CALL(this->allocator, slaveAdded(_, _, _))
+    .Times(2);
+
+  EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _));
+
+  EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+    .WillRepeatedly(DoDefault());
+
+  trigger slaveRemovedTrigger1, slaveRemovedTrigger2;
+  EXPECT_CALL(this->allocator, slaveRemoved(_))
+    .WillOnce(DoAll(InvokeSlaveRemoved(&this->allocator),
+		    Trigger(&slaveRemovedTrigger1)))
+    .WillOnce(Trigger(&slaveRemovedTrigger2));
+
+  Master m(&this->allocator);
+  PID<Master> master = process::spawn(m);
+
+  MockExecutor exec;
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  trigger launchTaskTrigger;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
+		    Trigger(&launchTaskTrigger)));
+
+  EXPECT_CALL(exec, shutdown(_));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  EXPECT_CALL(isolationModule, resourcesChanged(_, _, _));
+
+  Resources resources1 = Resources::parse("cpus:2;mem:1024");
+  Slave s1(resources1, true, &isolationModule);
+  PID<Slave> slave1 = process::spawn(s1);
+  BasicMasterDetector detector1(master, slave1, true);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched1, registered(_, _, _));
+
+  EXPECT_CALL(sched1, statusUpdate(_, _))
+    .WillRepeatedly(DoDefault());
+
+  EXPECT_CALL(sched1, slaveLost(_, _));
+
+  trigger resourceOffersTrigger1, resourceOffersTrigger2;
+  {
+    // Ensures that the following EXPEC_CALLs happen in order.
+    InSequence dummy;
+
+    // Initially, all of slave1's resources are avaliable.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 1024)))
+      .WillOnce(DoAll(LaunchTasks(1, 2, 512),
+		      Trigger(&resourceOffersTrigger1)));
+
+    // Eventually after slave2 is launched, we should get
+    // an offer that contains all of slave2's resources
+    // and none of slave1's resources.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 256)))
+      .WillOnce(Trigger(&resourceOffersTrigger2));
+  }
+
+  driver1.start();
+
+  WAIT_UNTIL(resourceOffersTrigger1);
+
+  // Ensures the task is completely launched before we
+  // kill the slave, to test that the task's resources
+  // are recovered correctly (i.e. never reallocated
+  // since the slave is killed)
+  WAIT_UNTIL(launchTaskTrigger);
+
+  process::terminate(slave1);
+  process::wait(slave1);
+
+  WAIT_UNTIL(slaveRemovedTrigger1);
+
+  ProcessBasedIsolationModule isolationModule2;
+  Resources resources2 = Resources::parse("cpus:3;mem:256");
+  Slave s2(resources2, true, &isolationModule2);
+  PID<Slave> slave2 = process::spawn(s2);
+  BasicMasterDetector detector2(master, slave2, true);
+
+  WAIT_UNTIL(resourceOffersTrigger2);
+
+  driver1.stop();
+  driver1.join();
+
+  WAIT_UNTIL(frameworkRemovedTrigger);
+
+  process::terminate(slave2);
+  process::wait(slave2);
+
+  WAIT_UNTIL(slaveRemovedTrigger2);
+
+  process::terminate(master);
+  process::wait(master);
+}
+
+
+TYPED_TEST(AllocatorTest, SlaveAdded)
+{
+  EXPECT_CALL(this->allocator, initialize(_, _));
+
+  EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+  trigger frameworkRemovedTrigger;
+  EXPECT_CALL(this->allocator, frameworkRemoved(_))
+    .WillOnce(Trigger(&frameworkRemovedTrigger));
+
+  EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+  EXPECT_CALL(this->allocator, slaveAdded(_, _, _))
+    .Times(2);
 
   trigger slaveRemovedTrigger;
-  EXPECT_CALL(this->allocator2, slaveRemoved(_))
+  EXPECT_CALL(this->allocator, slaveRemoved(_))
+    .WillOnce(DoDefault())
     .WillOnce(Trigger(&slaveRemovedTrigger));
 
-  EXPECT_CALL(this->allocator2, resourcesRecovered(_, _, _))
+  // We filter the first time so that the unused resources
+  // on slave1 from the task launch won't get reoffered
+  // immediately and will get combined with slave2's
+  // resources for a single offer.
+  EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+    .WillOnce(InvokeUnusedWithFilters(&this->allocator, .1))
+    .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0));
+
+  EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
     .WillRepeatedly(DoDefault());
 
-  MockFilter filter;
-  process::filter(&filter);
+  Master m(&this->allocator);
+  PID<Master> master = process::spawn(m);
 
-  EXPECT_MESSAGE(filter, Eq(ShutdownMessage().GetTypeName()), _, _)
-    .WillRepeatedly(Return(true));
+  MockExecutor exec;
 
-  Master m1(&(this->allocator1));
-  PID<Master> master1 = process::spawn(m1);
+  EXPECT_CALL(exec, registered(_, _, _, _));
 
-  string zk = "zk://" + this->zks->connectString() + "/znode";
-  Try<MasterDetector*> detector =
-    MasterDetector::create(zk, master1, true, true);
-  CHECK(!detector.isError())
-    << "Failed to create a master detector: " << detector.error();
+  trigger launchTaskTrigger;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
+		    Trigger(&launchTaskTrigger)));
 
-  ProcessBasedIsolationModule isolationModule;
+  trigger shutdownTrigger;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(Trigger(&shutdownTrigger));
 
-  Resources resources = Resources::parse("cpus:2;mem:1024");
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
 
-  Slave s(resources, true, &isolationModule);
-  PID<Slave> slave = process::spawn(s);
+  TestingIsolationModule isolationModule(execs);
 
-  Try<MasterDetector*> slave_detector =
-    MasterDetector::create(zk, slave, false, true);
-  CHECK(!slave_detector.isError())
-    << "Failed to create a master detector: " << slave_detector.error();
+  EXPECT_CALL(isolationModule, resourcesChanged(_, _, _));
 
-  MockScheduler sched;
-  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, zk);
+  Resources resources1 = Resources::parse("cpus:3;mem:1024");
+  Slave s1(resources1, true, &isolationModule);
+  PID<Slave> slave1 = process::spawn(s1);
+  BasicMasterDetector detector1(master, slave1, true);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched1, registered(_, _, _));
+
+  EXPECT_CALL(sched1, statusUpdate(_, _))
+    .WillRepeatedly(DoDefault());
+
+  EXPECT_CALL(sched1, resourceOffers(_, _))
+    .WillRepeatedly(DeclineOffers());
+
+  trigger resourceOffersTrigger1, resourceOffersTrigger2;
+  {
+    // Ensures that the following EXPEC_CALLs happen in order.
+    InSequence dummy;
+
+    // Initially, all of slave1's resources are avaliable.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+      .WillOnce(DoAll(LaunchTasks(1, 2, 512),
+		      Trigger(&resourceOffersTrigger1)));
+
+    // After slave2 launches, all of its resources are
+    // combined with the resources on slave1 that the
+    // task isn't using.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(5, 2560)))
+      .WillOnce(Trigger(&resourceOffersTrigger2));
+  }
+
+  driver1.start();
+
+  WAIT_UNTIL(resourceOffersTrigger1);
+
+  // Wait until the filter from resourcesUnused above times
+  // out so that all resources not used by the launched task
+  // will get offered together. TODO(tmarshall): replace this
+  // with a Clock::advance().
+  sleep(.1);
+
+  WAIT_UNTIL(launchTaskTrigger);
+
+  Resources resources2 = Resources::parse("cpus:4;mem:2048");
+  Slave s2(resources2, true, &isolationModule);
+  PID<Slave> slave2 = process::spawn(s2);
+  BasicMasterDetector detector2(master, slave2, true);
+
+  WAIT_UNTIL(resourceOffersTrigger2);
+
+  driver1.stop();
+  driver1.join();
+
+  WAIT_UNTIL(frameworkRemovedTrigger);
+
+  WAIT_UNTIL(shutdownTrigger);
+
+  process::terminate(slave1);
+  process::wait(slave1);
+
+  process::terminate(slave2);
+  process::wait(slave2);
+
+  WAIT_UNTIL(slaveRemovedTrigger);
+
+  process::terminate(master);
+  process::wait(master);
+}
 
-  trigger resourceOffers, resourceOffers2;
 
-  vector<Offer> offers, offers2;
+TYPED_TEST(AllocatorTest, TaskFinished)
+{
+  EXPECT_CALL(this->allocator, initialize(_, _));
+
+  EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+  EXPECT_CALL(this->allocator, frameworkRemoved(_));
+
+  EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+  EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+  trigger slaveRemovedTrigger;
+  EXPECT_CALL(this->allocator, slaveRemoved(_))
+    .WillOnce(Trigger(&slaveRemovedTrigger));
 
-  EXPECT_CALL(sched, registered(_, _, _))
+  EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+    .WillRepeatedly(DoDefault());
+
+  // The first time we don't filter because we want to see
+  // the unused resources from the task launch get reoffered
+  // to us, but when that offer is returned unused we do
+  // filter so that they won't get reoffered again and will
+  // instead get combined with the recovered resources from
+  // the task finishing for one offer.
+  EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
+    .WillOnce(InvokeUnusedWithFilters(&this->allocator, 0))
+    .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 1));
+
+  Master m(&this->allocator);
+  PID<Master> master = process::spawn(m);
+
+  MockExecutor exec;
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  ExecutorDriver* execDriver;
+  TaskInfo taskInfo;
+  trigger launchTaskTrigger;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(DoAll(SaveArg<0>(&execDriver),
+		    SaveArg<1>(&taskInfo),
+		    SendStatusUpdateFromTask(TASK_RUNNING),
+		    Trigger(&launchTaskTrigger)))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  trigger shutdownTrigger;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(Trigger(&shutdownTrigger));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+  EXPECT_CALL(isolationModule, resourcesChanged(_, _, _))
     .Times(2);
 
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers),
-		    Trigger(&resourceOffers)))
-    .WillOnce(DoAll(SaveArg<1>(&offers2),
-		    Trigger(&resourceOffers2)));
+  Resources resources1 = Resources::parse("cpus:3;mem:1024");
+  Slave s1(resources1, true, &isolationModule);
+  PID<Slave> slave1 = process::spawn(s1);
+  BasicMasterDetector detector1(master, slave1, true);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched1, registered(_, _, _));
 
-  EXPECT_CALL(sched, disconnected(_))
+  EXPECT_CALL(sched1, statusUpdate(_, _))
     .WillRepeatedly(DoDefault());
 
-  driver.start();
+  EXPECT_CALL(sched1, resourceOffers(_, _))
+    .WillRepeatedly(DeclineOffers());
 
-  WAIT_UNTIL(resourceOffers);
+  trigger resourceOffersTrigger1, resourceOffersTrigger2;
+  {
+    // Ensures that the following EXPEC_CALLs happen in order.
+    InSequence dummy;
 
-  checkResources(offers, 2, 1024);
+    // Initially, all of the slave's resources.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
+      .WillOnce(LaunchTasks(2, 1, 256));
+
+    // After the tasks are launched.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(1, 512)))
+      .WillOnce(DoAll(DeclineOffers(),
+		      Trigger(&resourceOffersTrigger1)));
+
+    // After the first task gets killed.
+    EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 768)))
+      .WillOnce(Trigger(&resourceOffersTrigger2));
+  }
 
-  process::terminate(master1);
-  process::wait(master1);
-  MasterDetector::destroy(detector.get());
+  driver1.start();
 
-  Master m2(&(this->allocator2));
-  PID<Master> master2 = process::spawn(m2);
+  WAIT_UNTIL(resourceOffersTrigger1);
 
-  Try<MasterDetector*> detector2 =
-    MasterDetector::create(zk, master2, true, true);
-  CHECK(!detector2.isError())
-    << "Failed to create a master detector: " << detector2.error();
+  WAIT_UNTIL(launchTaskTrigger);
 
-  WAIT_UNTIL(resourceOffers2);
+  TaskStatus status;
+  status.mutable_task_id()->MergeFrom(taskInfo.task_id());
+  status.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(status);
 
-  checkResources(offers2, 2, 1024);
+  WAIT_UNTIL(resourceOffersTrigger2);
 
-  driver.stop();
+  driver1.stop();
+  driver1.join();
 
-  WAIT_UNTIL(frameworkRemovedTrigger);
+  WAIT_UNTIL(shutdownTrigger);
+
+  process::terminate(slave1);
+  process::wait(slave1);
+
+  WAIT_UNTIL(slaveRemovedTrigger);
+
+  process::terminate(master);
+  process::wait(master);
+}
+
+
+TYPED_TEST(AllocatorTest, WhitelistSlave)
+{
+  EXPECT_CALL(this->allocator, initialize(_, _));
+
+  EXPECT_CALL(this->allocator, frameworkAdded(_, _, _));
+
+  EXPECT_CALL(this->allocator, frameworkRemoved(_));
+
+  EXPECT_CALL(this->allocator, frameworkDeactivated(_));
+
+  EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
+
+  trigger slaveRemovedTrigger;
+  EXPECT_CALL(this->allocator, slaveRemoved(_))
+    .WillOnce(Trigger(&slaveRemovedTrigger));
+
+  trigger updateWhitelistTrigger1, updateWhitelistTrigger2;
+  EXPECT_CALL(this->allocator, updateWhitelist(_))
+    .WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
+		    Trigger(&updateWhitelistTrigger1)))
+    .WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
+		    Trigger(&updateWhitelistTrigger2)));
+
+  EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
+    .WillRepeatedly(DoDefault());
+
+  // Create a dummy whitelist, so that no resources will get allocated.
+  string hosts = "dummy-slave";
+  string path = "whitelist.txt";
+  CHECK (os::write(path, hosts).isSome()) << "Error writing whitelist";
+
+  flags::Flags<logging::Flags, master::Flags> flags;
+  flags.whitelist = "file://" + path; // TODO(benh): Put in /tmp.
+  Master m(&this->allocator, flags);
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  trigger resourceOffersTrigger;
+  EXPECT_CALL(sched, resourceOffers(_, OfferEq(2, 1024)))
+    .WillOnce(Trigger(&resourceOffersTrigger));
+
+  WAIT_UNTIL(updateWhitelistTrigger1);
+
+  driver.start();
+
+  // Give the allocator some time to confirm that it doesn't
+  // make an allocation.
+  sleep(1);
+  EXPECT_FALSE(resourceOffersTrigger.value);
+
+  // Update the whitelist to include the slave, so that
+  // the allocator will start making allocations.
+  Try<string> hostname = os::hostname();
+  ASSERT_TRUE(hostname.isSome());
+  hosts = hostname.get() + "\n" + "dummy-slave";
+  CHECK (os::write(path, hosts).isSome()) << "Error writing whitelist";
+
+  // Give the WhitelistWatcher some time to notice that
+  // the whitelist has changed.
+  sleep(4);
+
+  WAIT_UNTIL(updateWhitelistTrigger2);
+
+  WAIT_UNTIL(resourceOffersTrigger);
+
+  driver.stop();
+  driver.join();
 
   process::terminate(slave);
   process::wait(slave);
 
   WAIT_UNTIL(slaveRemovedTrigger);
 
-  process::terminate(master2);
-  process::wait(master2);
+  process::terminate(master);
+  process::wait(master);
 
-  process::filter(NULL);
+  os::rm(path);
 }