You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/11/08 06:51:51 UTC

svn commit: r1406932 [1/2] - in /incubator/mesos/trunk/src: ./ local/ master/ sched/ tests/

Author: benh
Date: Thu Nov  8 05:51:50 2012
New Revision: 1406932

URL: http://svn.apache.org/viewvc?rev=1406932&view=rev
Log:
Created an Allocator facade that wraps AllocatorProcess. Also improved
MockAllocatorProcess.

From: Vinod Kone <vi...@gmail.com>
Review: https://reviews.apache.org/r/7720

Removed:
    incubator/mesos/trunk/src/master/allocator.cpp
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/local/local.cpp
    incubator/mesos/trunk/src/local/local.hpp
    incubator/mesos/trunk/src/master/allocator.hpp
    incubator/mesos/trunk/src/master/drf_sorter.cpp
    incubator/mesos/trunk/src/master/drf_sorter.hpp
    incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
    incubator/mesos/trunk/src/master/main.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/master/sorter.hpp
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/tests/allocator_tests.cpp
    incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
    incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
    incubator/mesos/trunk/src/tests/gc_tests.cpp
    incubator/mesos/trunk/src/tests/master_detector_tests.cpp
    incubator/mesos/trunk/src/tests/master_tests.cpp
    incubator/mesos/trunk/src/tests/resource_offers_tests.cpp
    incubator/mesos/trunk/src/tests/utils.hpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Thu Nov  8 05:51:50 2012
@@ -152,7 +152,7 @@ noinst_LTLIBRARIES += libmesos_no_third_
 nodist_libmesos_no_third_party_la_SOURCES = $(CXX_PROTOS) $(MESSAGES_PROTOS)
 
 libmesos_no_third_party_la_SOURCES = sched/sched.cpp local/local.cpp	\
-	master/allocator.cpp master/drf_sorter.cpp			\
+	master/drf_sorter.cpp						\
 	master/frameworks_manager.cpp master/http.cpp master/master.cpp	\
 	master/slaves_manager.cpp slave/gc.cpp slave/state.cpp		\
 	slave/slave.cpp slave/http.cpp slave/isolation_module.cpp	\

Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Thu Nov  8 05:51:50 2012
@@ -33,6 +33,9 @@
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
+#include "master/allocator.hpp"
+#include "master/drf_sorter.hpp"
+#include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
 #include "slave/process_based_isolation_module.hpp"
@@ -40,7 +43,10 @@
 
 using namespace mesos::internal;
 
+using mesos::internal::master::Allocator;
 using mesos::internal::master::AllocatorProcess;
+using mesos::internal::master::DRFSorter;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
@@ -60,19 +66,20 @@ namespace mesos {
 namespace internal {
 namespace local {
 
-static AllocatorProcess* allocator = NULL;
+static Allocator* allocator = NULL;
+static AllocatorProcess* allocatorProcess = NULL;
 static Master* master = NULL;
 static map<IsolationModule*, Slave*> slaves;
 static MasterDetector* detector = NULL;
 static Files* files = NULL;
 
-
-PID<Master> launch(int numSlaves,
-                   double cpus,
-                   uint64_t mem,
-                   uint64_t disk,
-                   bool quiet,
-                   AllocatorProcess* _allocator)
+PID<Master> launch(
+    int numSlaves,
+    double cpus,
+    uint64_t mem,
+    uint64_t disk,
+    bool quiet,
+    Allocator* _allocator)
 {
   Configuration configuration;
   configuration.set("slaves", "*");
@@ -87,8 +94,7 @@ PID<Master> launch(int numSlaves,
 }
 
 
-PID<Master> launch(const Configuration& configuration,
-                   AllocatorProcess* _allocator)
+PID<Master> launch(const Configuration& configuration, Allocator* _allocator)
 {
   int numSlaves = configuration.get<int>("num_slaves", 1);
 
@@ -98,11 +104,13 @@ PID<Master> launch(const Configuration& 
 
   if (_allocator == NULL) {
     // Create default allocator, save it for deleting later.
-    _allocator = allocator = AllocatorProcess::create("drf", "drf");
+    allocatorProcess = new HierarchicalDRFAllocatorProcess();
+    _allocator = allocator = new Allocator(allocatorProcess);
   } else {
     // TODO(benh): Figure out the behavior of allocator pointer and remove the
     // else block.
     allocator = NULL;
+    allocatorProcess = NULL;
   }
 
   files = new Files();
@@ -142,6 +150,7 @@ void shutdown()
     process::wait(master->self());
     delete master;
     delete allocator;
+    delete allocatorProcess;
     master = NULL;
 
     // TODO(benh): Ugh! Because the isolation module calls back into the

Modified: incubator/mesos/trunk/src/local/local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.hpp (original)
+++ incubator/mesos/trunk/src/local/local.hpp Thu Nov  8 05:51:50 2012
@@ -21,15 +21,22 @@
 
 #include <process/process.hpp>
 
-#include "configurator/configuration.hpp"
-
-#include "master/allocator.hpp"
-#include "master/master.hpp"
-
 namespace mesos {
 namespace internal {
+
+// Forward declarations.
+namespace master {
+
+class Allocator;
+class Master;
+
+} // namespace master {
+
+class Configuration;
+
 namespace local {
 
+
 // Launch a local cluster with a given number of slaves and given numbers
 // of CPUs and memory per slave. Additionally one can also toggle whether
 // to initialize Google Logging and whether to log quietly.
@@ -38,12 +45,12 @@ process::PID<master::Master> launch(int 
                                     uint64_t mem,
                                     uint64_t disk,
                                     bool quiet,
-                                    master::AllocatorProcess* _allocator = NULL);
+                                    master::Allocator* _allocator = NULL);
 
 
 // Launch a local cluster with a given configuration.
 process::PID<master::Master> launch(const Configuration& configuration,
-                                    master::AllocatorProcess* _allocator = NULL);
+                                    master::Allocator* _allocator = NULL);
 
 
 void shutdown();

Modified: incubator/mesos/trunk/src/master/allocator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/allocator.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/allocator.hpp (original)
+++ incubator/mesos/trunk/src/master/allocator.hpp Thu Nov  8 05:51:50 2012
@@ -19,47 +19,69 @@
 #ifndef __ALLOCATOR_HPP__
 #define __ALLOCATOR_HPP__
 
+#include <string>
+
+#include <process/future.hpp>
+#include <process/dispatch.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
 #include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/option.hpp>
 
 #include "common/resources.hpp"
 
 #include "master/flags.hpp"
-#include "master/master.hpp"
 
+#include "messages/messages.hpp"
 
 namespace mesos {
 namespace internal {
 namespace master {
 
+class Master; // Forward declaration.
+
+
 // Basic model of an allocator: resources are allocated to a framework
 // in the form of offers. A framework can refuse some resources in
 // offers and run tasks in others. Resources can be recovered from a
 // framework when tasks finish/fail (or are lost due to a slave
 // failure) or when an offer is rescinded.
-
-class AllocatorProcess : public process::Process<AllocatorProcess> {
+// NOTE: New Allocators should implement this interface.
+class AllocatorProcess : public process::Process<AllocatorProcess>
+{
 public:
+  AllocatorProcess() {}
+
   virtual ~AllocatorProcess() {}
 
-  virtual void initialize(const Flags& flags,
-                          const process::PID<Master>& master) = 0;
+  virtual void initialize(
+      const Flags& flags,
+      const process::PID<Master>& master) = 0;
 
-  virtual void frameworkAdded(const FrameworkID& frameworkId,
-                              const FrameworkInfo& frameworkInfo,
-                              const Resources& used) = 0;
+  virtual void frameworkAdded(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo,
+      const Resources& used) = 0;
 
-  virtual void frameworkRemoved(const FrameworkID& frameworkId) = 0;
+  virtual void frameworkRemoved(
+      const FrameworkID& frameworkId) = 0;
 
-  virtual void frameworkActivated(const FrameworkID& frameworkId,
-                                  const FrameworkInfo& frameworkInfo) = 0;
+  virtual void frameworkActivated(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo) = 0;
 
-  virtual void frameworkDeactivated(const FrameworkID& frameworkId) = 0;
+  virtual void frameworkDeactivated(
+      const FrameworkID& frameworkId) = 0;
 
-  virtual void slaveAdded(const SlaveID& slaveId,
-                          const SlaveInfo& slaveInfo,
-                          const hashmap<FrameworkID, Resources>& used) = 0;
+  virtual void slaveAdded(
+      const SlaveID& slaveId,
+      const SlaveInfo& slaveInfo,
+      const hashmap<FrameworkID, Resources>& used) = 0;
 
-  virtual void slaveRemoved(const SlaveID& slaveId) = 0;
+  virtual void slaveRemoved(
+      const SlaveID& slaveId) = 0;
 
   virtual void updateWhitelist(
       const Option<hashset<std::string> >& whitelist) = 0;
@@ -86,12 +108,236 @@ public:
 
   // Whenever a framework that has filtered resources wants to revive
   // offers for those resources the master invokes this callback.
-  virtual void offersRevived(const FrameworkID& frameworkId) = 0;
+  virtual void offersRevived(
+      const FrameworkID& frameworkId) = 0;
+};
+
+
+// This is a wrapper around the AllocatorProcess interface.
+// NOTE: DO NOT subclass this class when implementing a new allocator.
+// Implement AllocatorProcess (above) instead!
+class Allocator
+{
+public:
+  // The AllocatorProcess object passed to the constructor is
+  // spawned and terminated by the allocator. But it is the responsibility
+  // of the caller to de-allocate the object, if necessary.
+  Allocator(AllocatorProcess* _process);
+
+  virtual ~Allocator();
+
+  void initialize(
+      const Flags& flags,
+      const process::PID<Master>& master);
+
+  void frameworkAdded(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo,
+      const Resources& used);
+
+  void frameworkRemoved(
+      const FrameworkID& frameworkId);
+
+  void frameworkActivated(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo);
 
-  static AllocatorProcess* create(const std::string& userSorterType,
-				  const std::string& frameworkSorterType);
+  void frameworkDeactivated(
+      const FrameworkID& frameworkId);
+
+  void slaveAdded(
+      const SlaveID& slaveId,
+      const SlaveInfo& slaveInfo,
+      const hashmap<FrameworkID, Resources>& used);
+
+  void slaveRemoved(
+      const SlaveID& slaveId);
+
+  void updateWhitelist(
+      const Option<hashset<std::string> >& whitelist);
+
+  void resourcesRequested(
+      const FrameworkID& frameworkId,
+      const std::vector<Request>& requests);
+
+  void resourcesUnused(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const Resources& resources,
+      const Option<Filters>& filters);
+
+  void resourcesRecovered(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const Resources& resources);
+
+  void offersRevived(
+      const FrameworkID& frameworkId);
+
+private:
+  Allocator(const Allocator&); // Not copyable.
+  Allocator& operator=(const Allocator&); // Not assignable.
+
+  AllocatorProcess* process;
 };
 
+
+inline Allocator::Allocator(AllocatorProcess* _process)
+  : process(_process)
+{
+  process::spawn(process);
+}
+
+
+inline Allocator::~Allocator()
+{
+  process::terminate(process);
+  process::wait(process);
+}
+
+
+inline void Allocator::initialize(
+    const Flags& flags,
+    const process::PID<Master>& master)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::initialize,
+      flags,
+      master);
+}
+
+
+inline void Allocator::frameworkAdded(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const Resources& used)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::frameworkAdded,
+      frameworkId,
+      frameworkInfo,
+      used);
+}
+
+
+inline void Allocator::frameworkRemoved(
+    const FrameworkID& frameworkId)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::frameworkRemoved,
+      frameworkId);
+}
+
+
+inline void Allocator::frameworkActivated(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::frameworkActivated,
+      frameworkId,
+      frameworkInfo);
+}
+
+
+inline void Allocator::frameworkDeactivated(
+    const FrameworkID& frameworkId)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::frameworkDeactivated,
+      frameworkId);
+}
+
+
+inline void Allocator::slaveAdded(
+    const SlaveID& slaveId,
+    const SlaveInfo& slaveInfo,
+    const hashmap<FrameworkID, Resources>& used)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::slaveAdded,
+      slaveId,
+      slaveInfo,
+      used);
+}
+
+
+inline void Allocator::slaveRemoved(const SlaveID& slaveId)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::slaveRemoved,
+      slaveId);
+}
+
+
+inline void Allocator::updateWhitelist(
+    const Option<hashset<std::string> >& whitelist)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::updateWhitelist,
+      whitelist);
+}
+
+
+inline void Allocator::resourcesRequested(
+    const FrameworkID& frameworkId,
+    const std::vector<Request>& requests)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::resourcesRequested,
+      frameworkId,
+      requests);
+}
+
+
+inline void Allocator::resourcesUnused(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources,
+    const Option<Filters>& filters)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::resourcesUnused,
+      frameworkId,
+      slaveId,
+      resources,
+      filters);
+}
+
+
+inline void Allocator::resourcesRecovered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& resources)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::resourcesRecovered,
+      frameworkId,
+      slaveId,
+      resources);
+}
+
+
+inline void Allocator::offersRevived(
+    const FrameworkID& frameworkId)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::offersRevived,
+      frameworkId);
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

Modified: incubator/mesos/trunk/src/master/drf_sorter.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.cpp (original)
+++ incubator/mesos/trunk/src/master/drf_sorter.cpp Thu Nov  8 05:51:50 2012
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include "logging/logging.hpp"
+
 #include "master/drf_sorter.hpp"
 
 using std::list;

Modified: incubator/mesos/trunk/src/master/drf_sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/drf_sorter.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/drf_sorter.hpp (original)
+++ incubator/mesos/trunk/src/master/drf_sorter.hpp Thu Nov  8 05:51:50 2012
@@ -19,6 +19,13 @@
 #ifndef __DRF_SORTER_HPP__
 #define __DRF_SORTER_HPP__
 
+#include <set>
+#include <string>
+
+#include <stout/hashmap.hpp>
+
+#include "common/resources.hpp"
+
 #include "master/sorter.hpp"
 
 
@@ -58,10 +65,10 @@ public:
   virtual void deactivate(const std::string& name);
 
   virtual void allocated(const std::string& name,
-			 const Resources& resources);
+                         const Resources& resources);
 
   virtual void unallocated(const std::string& name,
-			   const Resources& resources);
+                           const Resources& resources);
 
   virtual Resources allocation(const std::string& name);
 

Modified: incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp (original)
+++ incubator/mesos/trunk/src/master/hierarchical_allocator_process.hpp Thu Nov  8 05:51:50 2012
@@ -21,7 +21,6 @@
 
 #include <process/delay.hpp>
 #include <process/timeout.hpp>
-#include <process/timer.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
@@ -33,50 +32,62 @@
 #include "master/master.hpp"
 #include "master/sorter.hpp"
 
-
 namespace mesos {
 namespace internal {
 namespace master {
 
 // Forward declarations.
+class DRFSorter;
 class Filter;
 
 
-// Implements the basic allocator algorithm - first pick
-// a user by some criteria, then pick one of their
-// frameworks to allocate to.
-template <class UserSorter, class FrameworkSorter>
+// We forward declare the hierarchical allocator process so that we
+// can typedef an instantiation of it with DRF sorters.
+template <typename UserSorter, typename FrameworkSorter>
+class HierarchicalAllocatorProcess;
+
+typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
+HierarchicalDRFAllocatorProcess;
+
+
+// Implements the basic allocator algorithm - first pick a user by
+// some criteria, then pick one of their frameworks to allocate to.
+template <typename UserSorter, typename FrameworkSorter>
 class HierarchicalAllocatorProcess : public AllocatorProcess
 {
 public:
-  HierarchicalAllocatorProcess() : initialized(false) {}
+  HierarchicalAllocatorProcess();
 
-  virtual ~HierarchicalAllocatorProcess() {}
+  virtual ~HierarchicalAllocatorProcess();
 
-  process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> > self()
-  {
-    return process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >(this);
-  }
+  process::PID<HierarchicalAllocatorProcess> self();
 
-  void initialize(const Flags& flags,
-		  const process::PID<Master>& _master);
+  void initialize(
+      const Flags& flags,
+      const process::PID<Master>& _master);
 
-  void frameworkAdded(const FrameworkID& frameworkId,
-			      const FrameworkInfo& frameworkInfo,
-			      const Resources& used);
+  void frameworkAdded(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo,
+      const Resources& used);
 
-  void frameworkRemoved(const FrameworkID& frameworkId);
+  void frameworkRemoved(
+      const FrameworkID& frameworkId);
 
-  void frameworkActivated(const FrameworkID& frameworkId,
-                                  const FrameworkInfo& frameworkInfo);
+  void frameworkActivated(
+      const FrameworkID& frameworkId,
+      const FrameworkInfo& frameworkInfo);
 
-  void frameworkDeactivated(const FrameworkID& frameworkId);
+  void frameworkDeactivated(
+      const FrameworkID& frameworkId);
 
-  void slaveAdded(const SlaveID& slaveId,
-		  const SlaveInfo& slaveInfo,
-		  const hashmap<FrameworkID, Resources>& used);
+  void slaveAdded(
+      const SlaveID& slaveId,
+      const SlaveInfo& slaveInfo,
+      const hashmap<FrameworkID, Resources>& used);
 
-  void slaveRemoved(const SlaveID& slaveId);
+  void slaveRemoved(
+      const SlaveID& slaveId);
 
   void updateWhitelist(
       const Option<hashset<std::string> >& whitelist);
@@ -96,9 +107,14 @@ public:
       const SlaveID& slaveId,
       const Resources& resources);
 
-  void offersRevived(const FrameworkID& frameworkId);
+  void offersRevived(
+      const FrameworkID& frameworkId);
 
 protected:
+  // Useful typedefs for dispatch/delay/defer to self()/this.
+  typedef HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> Self;
+  typedef HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> This;
+
   // Callback for doing batch allocations.
   void batch();
 
@@ -119,9 +135,10 @@ protected:
 
   // Returns true if there is a filter for this framework
   // on this slave.
-  bool isFiltered(const FrameworkID& frameworkId,
-		  const SlaveID& slaveId,
-		  const Resources& resources);
+  bool isFiltered(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const Resources& resources);
 
   bool initialized;
 
@@ -157,25 +174,25 @@ class Filter
 {
 public:
   virtual ~Filter() {}
+
   virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
 };
 
 
-class RefusedFilter : public Filter
+class RefusedFilter: public Filter
 {
 public:
-  RefusedFilter(const SlaveID& _slaveId,
-                const Resources& _resources,
-                const Timeout& _timeout)
-    : slaveId(_slaveId),
-      resources(_resources),
-      timeout(_timeout) {}
+  RefusedFilter(
+      const SlaveID& _slaveId,
+      const Resources& _resources,
+      const Timeout& _timeout)
+    : slaveId(_slaveId), resources(_resources), timeout(_timeout) {}
 
   virtual bool filter(const SlaveID& slaveId, const Resources& resources)
   {
     return slaveId == this->slaveId &&
-      resources <= this->resources && // Refused resources are superset.
-      timeout.remaining() > Seconds(0);
+           resources <= this->resources && // Refused resources are superset.
+           timeout.remaining() > Seconds(0);
   }
 
   const SlaveID slaveId;
@@ -185,7 +202,27 @@ public:
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::initialize(
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::HierarchicalAllocatorProcess()
+  : initialized(false) {}
+
+
+template <class UserSorter, class FrameworkSorter>
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::~HierarchicalAllocatorProcess()
+{}
+
+
+template <class UserSorter, class FrameworkSorter>
+process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::self()
+{
+  return
+    process::PID<HierarchicalAllocatorProcess<UserSorter, FrameworkSorter> >(this);
+}
+
+
+template <class UserSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::initialize(
     const Flags& _flags,
     const process::PID<Master>& _master)
 {
@@ -194,20 +231,23 @@ void HierarchicalAllocatorProcess<UserSo
   initialized = true;
   userSorter = new UserSorter();
 
-  delay(flags.allocation_interval, self(),
-	&HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+  VLOG(1) << "Initializing hierarchical allocator process "
+          << "with master : " << master;
+
+  delay(flags.allocation_interval, self(), &Self::batch);
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkAdded(
     const FrameworkID& frameworkId,
     const FrameworkInfo& frameworkInfo,
     const Resources& used)
 {
   CHECK(initialized);
 
-  std::string user  = frameworkInfo.user();
+  std::string user = frameworkInfo.user();
   if (!userSorter->contains(user)) {
     userSorter->add(user);
     sorters[user] = new FrameworkSorter();
@@ -230,7 +270,9 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(const FrameworkID& frameworkId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkRemoved(
+    const FrameworkID& frameworkId)
 {
   CHECK(initialized);
 
@@ -270,7 +312,8 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkActivated(
     const FrameworkID& frameworkId,
     const FrameworkInfo& frameworkInfo)
 {
@@ -286,7 +329,9 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(const FrameworkID& frameworkId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::frameworkDeactivated(
+    const FrameworkID& frameworkId)
 {
   CHECK(initialized);
 
@@ -314,7 +359,8 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveAdded(
     const SlaveID& slaveId,
     const SlaveInfo& slaveInfo,
     const hashmap<FrameworkID, Resources>& used)
@@ -329,7 +375,9 @@ void HierarchicalAllocatorProcess<UserSo
 
   Resources unused = slaveInfo.resources();
 
-  foreachpair (const FrameworkID& frameworkId, const Resources& resources, used) {
+  foreachpair (const FrameworkID& frameworkId,
+               const Resources& resources,
+               used) {
     if (users.contains(frameworkId)) {
       const std::string& user = users[frameworkId];
       sorters[user]->add(resources);
@@ -343,15 +391,17 @@ void HierarchicalAllocatorProcess<UserSo
   allocatable[slaveId] = unused;
 
   LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
-            << ") with " << slaveInfo.resources()
-            << " (and " << unused << " available)";
+            << ") with " << slaveInfo.resources() << " (and " << unused
+            << " available)";
 
   allocate(slaveId);
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(const SlaveID& slaveId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::slaveRemoved(
+    const SlaveID& slaveId)
 {
   CHECK(initialized);
 
@@ -373,7 +423,8 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::updateWhitelist(
     const Option<hashset<std::string> >& _whitelist)
 {
   CHECK(initialized);
@@ -390,7 +441,8 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRequested(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRequested(
     const FrameworkID& frameworkId,
     const std::vector<Request>& requests)
 {
@@ -401,7 +453,8 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesUnused(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     const Resources& resources,
@@ -440,8 +493,8 @@ void HierarchicalAllocatorProcess<UserSo
 
   if (seconds != Seconds(0)) {
     LOG(INFO) << "Framework " << frameworkId
-	      << " filtered slave " << slaveId
-	      << " for " << seconds;
+              << " filtered slave " << slaveId
+              << " for " << seconds;
 
     // Create a new filter and delay it's expiration.
     mesos::internal::master::Filter* filter =
@@ -449,16 +502,14 @@ void HierarchicalAllocatorProcess<UserSo
 
     this->filters.put(frameworkId, filter);
 
-    delay(seconds, self(),
-          &HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire,
-          frameworkId,
-          filter);
+    delay(seconds, self(), &Self::expire, frameworkId, filter);
   }
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::resourcesRecovered(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     const Resources& resources)
@@ -491,13 +542,14 @@ void HierarchicalAllocatorProcess<UserSo
     VLOG(1) << "Recovered " << resources.allocatable()
             << " on slave " << slaveId
             << " from framework " << frameworkId;
-
   }
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(const FrameworkID& frameworkId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::offersRevived(
+    const FrameworkID& frameworkId)
 {
   CHECK(initialized);
 
@@ -521,17 +573,18 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch()
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch()
 {
   CHECK(initialized);
   allocate();
-  delay(flags.allocation_interval, self(),
-	&HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::batch);
+  delay(flags.allocation_interval, self(), &Self::batch);
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate()
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate()
 {
   CHECK(initialized);
 
@@ -540,13 +593,15 @@ void HierarchicalAllocatorProcess<UserSo
 
   allocate(slaves.keys());
 
-  LOG(INFO) << "Performed allocation for " << slaves.size()
-            << " slaves in " << stopwatch.elapsed();
+  LOG(INFO) << "Performed allocation for " << slaves.size() << " slaves in "
+            << stopwatch.elapsed();
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const SlaveID& slaveId)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
+    const SlaveID& slaveId)
 {
   CHECK(initialized);
 
@@ -558,13 +613,15 @@ void HierarchicalAllocatorProcess<UserSo
 
   allocate(slaveIds);
 
-  LOG(INFO) << "Performed allocation for slave " << slaveId
-            << " in " << stopwatch.elapsed();
+  LOG(INFO) << "Performed allocation for slave " << slaveId << " in "
+            << stopwatch.elapsed();
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(const hashset<SlaveID>& slaveIds)
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::allocate(
+    const hashset<SlaveID>& slaveIds)
 {
   CHECK(initialized);
 
@@ -597,9 +654,9 @@ void HierarchicalAllocatorProcess<UserSo
       Value::Scalar mem = resources.get("mem", none);
 
       if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
-	VLOG(1) << "Found available resources: " << resources
-		<< " on slave " << slaveId;
-	available[slaveId] = resources;
+        VLOG(1) << "Found available resources: " << resources
+                << " on slave " << slaveId;
+        available[slaveId] = resources;
       }
     }
   }
@@ -616,32 +673,35 @@ void HierarchicalAllocatorProcess<UserSo
 
       Resources allocatedResources;
       hashmap<SlaveID, Resources> offerable;
-      foreachpair (const SlaveID& slaveId, const Resources& resources, available) {
-	// Check whether or not this framework filters this slave.
-	bool filtered = isFiltered(frameworkId, slaveId, resources);
-
-	if (!filtered) {
-	  VLOG(1) << "Offering " << resources
-		  << " on slave " << slaveId
-		  << " to framework " << frameworkId;
-	  offerable[slaveId] = resources;
-
-	  // Update framework and slave resources.
-	  allocatable[slaveId] -= resources;
-	  allocatedResources += resources;
-	}
+      foreachpair (const SlaveID& slaveId,
+                   const Resources& resources,
+                   available) {
+        // Check whether or not this framework filters this slave.
+        bool filtered = isFiltered(frameworkId, slaveId, resources);
+
+        if (!filtered) {
+          VLOG(1)
+            << "Offering " << resources << " on slave " << slaveId
+            << " to framework " << frameworkId;
+
+          offerable[slaveId] = resources;
+
+          // Update framework and slave resources.
+          allocatable[slaveId] -= resources;
+          allocatedResources += resources;
+        }
       }
 
       if (offerable.size() > 0) {
-	foreachkey (const SlaveID& slaveId, offerable) {
-	  available.erase(slaveId);
-	}
-
-	sorters[user]->add(allocatedResources);
-	sorters[user]->allocated(frameworkIdValue, allocatedResources);
-	userSorter->allocated(user, allocatedResources);
+        foreachkey (const SlaveID& slaveId, offerable) {
+          available.erase(slaveId);
+        }
+
+        sorters[user]->add(allocatedResources);
+        sorters[user]->allocated(frameworkIdValue, allocatedResources);
+        userSorter->allocated(user, allocatedResources);
 
-	dispatch(master, &Master::offer, frameworkId, offerable);
+        dispatch(master, &Master::offer, frameworkId, offerable);
       }
     }
   }
@@ -649,7 +709,8 @@ void HierarchicalAllocatorProcess<UserSo
 
 
 template <class UserSorter, class FrameworkSorter>
-void HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
+void
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::expire(
     const FrameworkID& frameworkId,
     Filter* filter)
 {
@@ -658,17 +719,16 @@ void HierarchicalAllocatorProcess<UserSo
   // HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
   // keep the address from getting reused possibly causing premature
   // expiration).
-  if (users.contains(frameworkId) &&
-      filters.contains(frameworkId, filter)) {
+  if (users.contains(frameworkId) && filters.contains(frameworkId, filter)) {
     filters.remove(frameworkId, filter);
   }
-
   delete filter;
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isWhitelisted(
+bool
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isWhitelisted(
     const SlaveID& slaveId)
 {
   CHECK(initialized);
@@ -676,12 +736,13 @@ bool HierarchicalAllocatorProcess<UserSo
   CHECK(slaves.contains(slaveId));
 
   return whitelist.isNone() ||
-    whitelist.get().contains(slaves[slaveId].hostname());
+         whitelist.get().contains(slaves[slaveId].hostname());
 }
 
 
 template <class UserSorter, class FrameworkSorter>
-bool HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
+bool
+HierarchicalAllocatorProcess<UserSorter, FrameworkSorter>::isFiltered(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     const Resources& resources)
@@ -690,8 +751,8 @@ bool HierarchicalAllocatorProcess<UserSo
   foreach (Filter* filter, filters.get(frameworkId)) {
     if (filter->filter(slaveId, resources)) {
       VLOG(1) << "Filtered " << resources
-	      << " on slave " << slaveId
-	      << " for framework " << frameworkId;
+              << " on slave " << slaveId
+              << " for framework " << frameworkId;
       filtered = true;
       break;
     }

Modified: incubator/mesos/trunk/src/master/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/main.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/main.cpp (original)
+++ incubator/mesos/trunk/src/master/main.cpp Thu Nov  8 05:51:50 2012
@@ -33,6 +33,8 @@
 #include "logging/logging.hpp"
 
 #include "master/allocator.hpp"
+#include "master/drf_sorter.hpp"
+#include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
 using namespace mesos::internal;
@@ -114,8 +116,9 @@ int main(int argc, char** argv)
   LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
   LOG(INFO) << "Starting Mesos master";
 
-  AllocatorProcess* allocator = AllocatorProcess::create(flags.user_sorter,
-							 flags.framework_sorter);
+  AllocatorProcess* allocatorProcess = new HierarchicalDRFAllocatorProcess();
+  Allocator* allocator = new Allocator(allocatorProcess);
+
   Files files;
   Master* master = new Master(allocator, &files, flags);
   process::spawn(master);
@@ -129,6 +132,7 @@ int main(int argc, char** argv)
   process::wait(master->self());
   delete master;
   delete allocator;
+  delete allocatorProcess;
 
   MasterDetector::destroy(detector.get());
 

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Thu Nov  8 05:51:50 2012
@@ -62,7 +62,7 @@ namespace master {
 
 class WhitelistWatcher : public Process<WhitelistWatcher> {
 public:
-  WhitelistWatcher(const string& _path, AllocatorProcess* _allocator)
+  WhitelistWatcher(const string& _path, Allocator* _allocator)
   : path(_path), allocator(_allocator) {}
 
 protected:
@@ -106,7 +106,7 @@ protected:
 
     // Send the whitelist to allocator, if necessary.
     if (whitelist != lastWhitelist) {
-      dispatch(allocator, &AllocatorProcess::updateWhitelist, whitelist);
+      allocator->updateWhitelist(whitelist);
     }
 
     // Check again.
@@ -116,7 +116,7 @@ protected:
 
 private:
   const string path;
-  AllocatorProcess* allocator;
+  Allocator* allocator;
   Option<hashset<string> > lastWhitelist;
 };
 
@@ -257,14 +257,14 @@ struct SlaveReregistrar
 };
 
 
-Master::Master(AllocatorProcess* _allocator, Files* _files)
+Master::Master(Allocator* _allocator, Files* _files)
   : ProcessBase("master"),
     flags(),
     allocator(_allocator),
     files(_files) {}
 
 
-Master::Master(AllocatorProcess* _allocator,
+Master::Master(Allocator* _allocator,
                Files* _files,
                const flags::Flags<logging::Flags, master::Flags>& _flags)
   : ProcessBase("master"),
@@ -322,9 +322,8 @@ void Master::initialize()
   slavesManager = new SlavesManager(flags, self());
   spawn(slavesManager);
 
-  // Spawn the allocator.
-  spawn(allocator);
-  dispatch(allocator, &AllocatorProcess::initialize, flags, self());
+  // Initialize the allocator.
+  allocator->initialize(flags, self());
 
   // Parse the white list
   whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
@@ -474,9 +473,6 @@ void Master::finalize()
   foreachvalue (Slave* slave, slaves) {
     send(slave->pid, ShutdownMessage());
   }
-
-  terminate(allocator);
-  wait(allocator);
 }
 
 
@@ -492,7 +488,7 @@ void Master::exited(const UPID& pid)
       framework->active = false;
 
       // Tell the allocator to stop allocating resources to this framework.
-      dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
+      allocator->frameworkDeactivated(framework->id);
 
       Seconds failoverTimeout(framework->info.failover_timeout());
 
@@ -506,10 +502,9 @@ void Master::exited(const UPID& pid)
 
       // Remove the framework's offers.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-                 offer->framework_id(),
-                 offer->slave_id(),
-                 Resources(offer->resources()));
+        allocator->resourcesRecovered(offer->framework_id(),
+                                      offer->slave_id(),
+                                      Resources(offer->resources()));
         removeOffer(offer);
       }
       return;
@@ -667,8 +662,9 @@ void Master::reregisterFramework(const F
       // replied to the offers but the driver might have dropped
       // those messages since it wasn't connected to the master.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-		 offer->framework_id(), offer->slave_id(), offer->resources());
+        allocator->resourcesRecovered(offer->framework_id(),
+                                      offer->slave_id(),
+                                      offer->resources());
         removeOffer(offer);
       }
 
@@ -763,7 +759,7 @@ void Master::deactivateFramework(const F
 void Master::resourceRequest(const FrameworkID& frameworkId,
                              const vector<Request>& requests)
 {
-  dispatch(allocator, &AllocatorProcess::resourcesRequested, frameworkId, requests);
+  allocator->resourcesRequested(frameworkId, requests);
 }
 
 
@@ -813,7 +809,7 @@ void Master::reviveOffers(const Framewor
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
     LOG(INFO) << "Reviving offers for framework " << framework->id;
-    dispatch(allocator, &AllocatorProcess::offersRevived, framework->id);
+    allocator->offersRevived(framework->id);
   }
 }
 
@@ -1128,10 +1124,9 @@ void Master::exitedExecutor(const SlaveI
                 << " (" << slave->info.hostname() << ")"
                 << " exited with status " << status;
 
-      dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-               frameworkId,
-               slaveId,
-               Resources(executor.resources()));
+      allocator->resourcesRecovered(frameworkId,
+                                    slaveId,
+                                    Resources(executor.resources()));
 
       // Remove executor from slave and framework.
       slave->removeExecutor(frameworkId, executorId);
@@ -1210,8 +1205,7 @@ void Master::offer(const FrameworkID& fr
                  << " has terminated or is inactive";
 
     foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
-      dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-               frameworkId, slaveId, offered);
+      allocator->resourcesRecovered(frameworkId, slaveId, offered);
     }
     return;
   }
@@ -1226,8 +1220,7 @@ void Master::offer(const FrameworkID& fr
                    << frameworkId << " because slave " << slaveId
                    << " is not valid";
 
-      dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-               frameworkId, slaveId, offered);
+      allocator->resourcesRecovered(frameworkId, slaveId, offered);
       continue;
     }
 
@@ -1527,11 +1520,10 @@ void Master::processTasks(Offer* offer,
 
   if (unusedResources.allocatable().size() > 0) {
     // Tell the allocator about the unused (e.g., refused) resources.
-    dispatch(allocator, &AllocatorProcess::resourcesUnused,
-	     offer->framework_id(),
-	     offer->slave_id(),
-	     unusedResources,
-	     filters);
+    allocator->resourcesUnused(offer->framework_id(),
+                               offer->slave_id(),
+                               unusedResources,
+                               filters);
   }
 
   removeOffer(offer);
@@ -1584,6 +1576,7 @@ Resources Master::launchTask(const TaskI
 
   // Tell the slave to launch the task!
   LOG(INFO) << "Launching task " << task.task_id()
+            << " of framework " << framework->id
             << " with resources " << task.resources() << " on slave "
             << slave->id << " (" << slave->info.hostname() << ")";
 
@@ -1613,8 +1606,9 @@ void Master::addFramework(Framework* fra
   message.mutable_master_info()->MergeFrom(info);
   send(framework->pid, message);
 
-  dispatch(allocator, &AllocatorProcess::frameworkAdded,
-           framework->id, framework->info, framework->resources);
+  allocator->frameworkAdded(framework->id,
+                            framework->info,
+                            framework->resources);
 }
 
 
@@ -1638,8 +1632,7 @@ void Master::failoverFramework(Framework
   // Make sure we can get offers again.
   if (!framework->active) {
     framework->active = true;
-    dispatch(allocator, &AllocatorProcess::frameworkActivated,
-             framework->id, framework->info);
+    allocator->frameworkActivated(framework->id, framework->info);
   }
 
   framework->reregisteredTime = Clock::now();
@@ -1657,10 +1650,9 @@ void Master::failoverFramework(Framework
   // these resources to this framework if it wants.
   // TODO(benh): Consider just reoffering these to
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-             offer->framework_id(),
-             offer->slave_id(),
-             Resources(offer->resources()));
+    allocator->resourcesRecovered(offer->framework_id(),
+                                  offer->slave_id(),
+                                  Resources(offer->resources()));
     removeOffer(offer);
   }
 }
@@ -1670,7 +1662,7 @@ void Master::removeFramework(Framework* 
 {
   if (framework->active) {
     // Tell the allocator to stop allocating resources to this framework.
-    dispatch(allocator, &AllocatorProcess::frameworkDeactivated, framework->id);
+    allocator->frameworkDeactivated(framework->id);
   }
 
   // Tell slaves to shutdown the framework.
@@ -1691,10 +1683,9 @@ void Master::removeFramework(Framework* 
 
   // Remove the framework's offers (if they weren't removed before).
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-             offer->framework_id(),
-             offer->slave_id(),
-             Resources(offer->resources()));
+    allocator->resourcesRecovered(offer->framework_id(),
+                                  offer->slave_id(),
+                                  Resources(offer->resources()));
     removeOffer(offer);
   }
 
@@ -1705,10 +1696,9 @@ void Master::removeFramework(Framework* 
       foreachpair (const ExecutorID& executorId,
                    const ExecutorInfo& executorInfo,
                    framework->executors[slaveId]) {
-        dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-                 framework->id,
-                 slave->id,
-                 executorInfo.resources());
+        allocator->resourcesRecovered(framework->id,
+                                      slave->id,
+                                      executorInfo.resources());
         slave->removeExecutor(framework->id, executorId);
       }
     }
@@ -1728,7 +1718,7 @@ void Master::removeFramework(Framework* 
 
   // Delete it.
   frameworks.erase(framework->id);
-  dispatch(allocator, &AllocatorProcess::frameworkRemoved, framework->id);
+  allocator->frameworkRemoved(framework->id);
 
   delete framework;
 }
@@ -1767,8 +1757,9 @@ void Master::addSlave(Slave* slave, bool
   spawn(slave->observer);
 
   if (!reregister) {
-    dispatch(allocator, &AllocatorProcess::slaveAdded,
-             slave->id, slave->info, hashmap<FrameworkID, Resources>());
+    allocator->slaveAdded(slave->id,
+                          slave->info,
+                          hashmap<FrameworkID, Resources>());
   }
 }
 
@@ -1837,8 +1828,7 @@ void Master::readdSlave(Slave* slave,
     resources[task.framework_id()] += task.resources();
   }
 
-  dispatch(allocator, &AllocatorProcess::slaveAdded,
-           slave->id, slave->info, resources);
+  allocator->slaveAdded(slave->id, slave->info, resources);
 }
 
 
@@ -1912,7 +1902,7 @@ void Master::removeSlave(Slave* slave)
 
   // Delete it.
   slaves.erase(slave->id);
-  dispatch(allocator, &AllocatorProcess::slaveRemoved, slave->id);
+  allocator->slaveRemoved(slave->id);
   delete slave;
 }
 
@@ -1931,10 +1921,9 @@ void Master::removeTask(Task* task)
   slave->removeTask(task);
 
   // Tell the allocator about the recovered resources.
-  dispatch(allocator, &AllocatorProcess::resourcesRecovered,
-           task->framework_id(),
-           task->slave_id(),
-           Resources(task->resources()));
+  allocator->resourcesRecovered(task->framework_id(),
+                                task->slave_id(),
+                                Resources(task->resources()));
 
   delete task;
 }

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Thu Nov  8 05:51:50 2012
@@ -56,20 +56,21 @@ namespace master {
 
 using namespace process; // Included to make code easier to read.
 
-// Some forward declarations.
-class AllocatorProcess;
+// Forward declarations.
+class Allocator;
 class SlavesManager;
-struct Framework;
-struct Slave;
 class SlaveObserver;
 class WhitelistWatcher;
 
+struct Framework;
+struct Slave;
+
 
 class Master : public ProtobufProcess<Master>
 {
 public:
-  Master(AllocatorProcess* allocator, Files* files);
-  Master(AllocatorProcess* allocator,
+  Master(Allocator* allocator, Files* files);
+  Master(Allocator* allocator,
          Files* files,
          const flags::Flags<logging::Flags, master::Flags>& flags);
 
@@ -208,7 +209,7 @@ private:
 
   bool elected;
 
-  AllocatorProcess* allocator;
+  Allocator* allocator;
   SlavesManager* slavesManager;
   WhitelistWatcher* whitelistWatcher;
   Files* files;

Modified: incubator/mesos/trunk/src/master/sorter.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/sorter.hpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/sorter.hpp (original)
+++ incubator/mesos/trunk/src/master/sorter.hpp Thu Nov  8 05:51:50 2012
@@ -19,7 +19,7 @@
 #ifndef __SORTER_HPP__
 #define __SORTER_HPP__
 
-#include "master/master.hpp"
+#include <list>
 
 
 namespace mesos {
@@ -44,22 +44,18 @@ public:
   // Readds a client to the sort after deactivate.
   virtual void activate(const std::string& client) = 0;
 
-  // Removes a client from the sort, so it won't get
-  // allocated to.
+  // Removes a client from the sort, so it won't get allocated to.
   virtual void deactivate(const std::string& client) = 0;
 
-  // Specify that resources have been allocated to the
-  // given client.
+  // Specify that resources have been allocated to the given client.
   virtual void allocated(const std::string& client,
-			 const Resources& resources) = 0;
+                         const Resources& resources) = 0;
 
-  // Specify that resources have been unallocated from
-  // the given client.
+  // Specify that resources have been unallocated from the given client.
   virtual void unallocated(const std::string& client,
-			   const Resources& resources) = 0;
+                           const Resources& resources) = 0;
 
-  // Returns the resources that have been allocated to
-  // this client.
+  // Returns the resources that have been allocated to this client.
   virtual Resources allocation(const std::string& client) = 0;
 
   // Add resources to the total pool of resources this

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Thu Nov  8 05:51:50 2012
@@ -418,7 +418,7 @@ protected:
 
   void stop(bool failover)
   {
-    VLOG(1) << "Stopping the framework";
+    VLOG(1) << "Stopping framework '" << framework.id() << "'";
 
     // Whether or not we send an unregister message, we want to
     // terminate this process.
@@ -442,7 +442,7 @@ protected:
   // SchedulerProcess::stop.
   void abort()
   {
-    VLOG(1) << "Aborting the framework";
+    VLOG(1) << "Aborting framework '" << framework.id() << "'";
 
     aborted = true;
 

Modified: incubator/mesos/trunk/src/tests/allocator_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_tests.cpp Thu Nov  8 05:51:50 2012
@@ -20,11 +20,15 @@
 
 #include <mesos/scheduler.hpp>
 
+#include <process/clock.hpp>
+#include <process/pid.hpp>
+
 #include "configurator/configuration.hpp"
 
 #include "detector/detector.hpp"
 
 #include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
 #include "slave/process_based_isolation_module.hpp"
@@ -36,12 +40,14 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
-using mesos::internal::master::AllocatorProcess;
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::ProcessBasedIsolationModule;
 using mesos::internal::slave::Slave;
 
+using process::Clock;
 using process::PID;
 
 using std::map;
@@ -91,55 +97,56 @@ TEST(AllocatorTest, DRFAllocatorProcess)
   frameworkInfo3.set_user("user1");
   FrameworkID frameworkId3;
 
-  MockAllocator<TestAllocatorProcess > a;
+  MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
 
-  EXPECT_CALL(a, initialize(_, _));
+  EXPECT_CALL(allocator, initialize(_, _));
 
-  EXPECT_CALL(a, frameworkAdded(_, Eq(frameworkInfo1), _))
-    .WillOnce(DoAll(InvokeFrameworkAdded(&a),
-		    SaveArg<0>(&frameworkId1)));
+  EXPECT_CALL(allocator, frameworkAdded(_, Eq(frameworkInfo1), _))
+    .WillOnce(DoAll(InvokeFrameworkAdded(&allocator),
+                    SaveArg<0>(&frameworkId1)));
 
   trigger framework2Added;
-  EXPECT_CALL(a, frameworkAdded(_, Eq(frameworkInfo2), _))
-    .WillOnce(DoAll(InvokeFrameworkAdded(&a),
-		    SaveArg<0>(&frameworkId2),
-		    Trigger(&framework2Added)));
+  EXPECT_CALL(allocator, frameworkAdded(_, Eq(frameworkInfo2), _))
+    .WillOnce(DoAll(InvokeFrameworkAdded(&allocator),
+                    SaveArg<0>(&frameworkId2),
+                    Trigger(&framework2Added)));
 
   trigger framework3Added;
-  EXPECT_CALL(a, frameworkAdded(_, Eq(frameworkInfo3), _))
-    .WillOnce(DoAll(InvokeFrameworkAdded(&a),
-		    SaveArg<0>(&frameworkId3),
-		    Trigger(&framework3Added)));
+  EXPECT_CALL(allocator, frameworkAdded(_, Eq(frameworkInfo3), _))
+    .WillOnce(DoAll(InvokeFrameworkAdded(&allocator),
+                    SaveArg<0>(&frameworkId3),
+                    Trigger(&framework3Added)));
 
-  EXPECT_CALL(a, frameworkDeactivated(_))
+  EXPECT_CALL(allocator, frameworkDeactivated(_))
     .Times(3);
 
-  EXPECT_CALL(a, frameworkRemoved(Eq(ByRef(frameworkId1))));
+  EXPECT_CALL(allocator, frameworkRemoved(Eq(ByRef(frameworkId1))));
 
-  EXPECT_CALL(a, frameworkRemoved(Eq(ByRef(frameworkId2))));
+  EXPECT_CALL(allocator, frameworkRemoved(Eq(ByRef(frameworkId2))));
 
   trigger lastFrameworkRemoved;
-  EXPECT_CALL(a, frameworkRemoved(Eq(ByRef(frameworkId3))))
+  EXPECT_CALL(allocator, frameworkRemoved(Eq(ByRef(frameworkId3))))
     .WillOnce(Trigger(&lastFrameworkRemoved));
 
   SlaveID slaveId4;
-  EXPECT_CALL(a, slaveAdded(_, _, _))
+  EXPECT_CALL(allocator, slaveAdded(_, _, _))
     .WillOnce(DoDefault())
     .WillOnce(DoDefault())
     .WillOnce(DoDefault())
-    .WillOnce(DoAll(InvokeSlaveAdded(&a),
-		    SaveArg<0>(&slaveId4)));
+    .WillOnce(DoAll(InvokeSlaveAdded(&allocator),
+                    SaveArg<0>(&slaveId4)));
 
-  EXPECT_CALL(a, slaveRemoved(_))
+  EXPECT_CALL(allocator, slaveRemoved(_))
     .Times(3);
 
   trigger lastSlaveRemoved;
-  EXPECT_CALL(a, slaveRemoved(Eq(ByRef(slaveId4))))
+  EXPECT_CALL(allocator, slaveRemoved(Eq(ByRef(slaveId4))))
     .WillOnce(Trigger(&lastSlaveRemoved));
 
-  EXPECT_CALL(a, resourcesRecovered(_, _, _))
+  EXPECT_CALL(allocator, resourcesRecovered(_, _, _))
     .WillRepeatedly(DoDefault());
 
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(m);
@@ -160,7 +167,7 @@ TEST(AllocatorTest, DRFAllocatorProcess)
   trigger resourceOfferTrigger;
   EXPECT_CALL(sched1, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers1),
-		    Trigger(&resourceOfferTrigger)))
+                    Trigger(&resourceOfferTrigger)))
     .WillRepeatedly(Return());
 
   driver1.start();
@@ -178,9 +185,9 @@ TEST(AllocatorTest, DRFAllocatorProcess)
   trigger resourceOfferTrigger2, resourceOfferTrigger3;
   EXPECT_CALL(sched2, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers2),
-		    Trigger(&resourceOfferTrigger2)))
+                    Trigger(&resourceOfferTrigger2)))
     .WillOnce(DoAll(SaveArg<1>(&offers3),
-		    Trigger(&resourceOfferTrigger3)))
+                    Trigger(&resourceOfferTrigger3)))
     .WillRepeatedly(Return());
 
   driver2.start();
@@ -215,7 +222,7 @@ TEST(AllocatorTest, DRFAllocatorProcess)
   trigger resourceOfferTrigger4;
   EXPECT_CALL(sched3, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers4),
-		    Trigger(&resourceOfferTrigger4)));
+                    Trigger(&resourceOfferTrigger4)));
 
   driver3.start();
 
@@ -261,16 +268,16 @@ class AllocatorTest : public ::testing::
 protected:
   virtual void SetUp()
   {
-    process::spawn(allocator.real);
+    a = new Allocator(&allocator);
   }
 
   virtual void TearDown()
   {
-    process::terminate(allocator.real);
-    process::wait(allocator.real);
+    delete a;
   }
 
-  MockAllocator<T> allocator;
+  MockAllocatorProcess<T> allocator;
+  Allocator* a;
 };
 
 
@@ -301,7 +308,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
   EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _));
 
   Files files;
-  Master m(&this->allocator, &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(&m);
 
   ProcessBasedIsolationModule isolationModule;
@@ -323,7 +330,7 @@ TYPED_TEST(AllocatorTest, MockAllocator)
 
   EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
-		    Trigger(&resourceOffers)));
+                    Trigger(&resourceOffers)));
 
   driver.start();
 
@@ -369,8 +376,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
   trigger resourcesUnusedTrigger;
   EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _))
     .WillOnce(DoAll(InvokeResourcesUnused(&this->allocator),
-		    Trigger(&resourcesUnusedTrigger),
-		    Return()))
+                    Trigger(&resourcesUnusedTrigger)))
     .WillRepeatedly(DoDefault());
 
   // Prevent any resources from being recovered by returning instead
@@ -380,7 +386,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
     .WillRepeatedly(Return());
 
   Files files;
-  Master m(&(this->allocator), &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(m);
 
   ProcessBasedIsolationModule isolationModule;
@@ -414,7 +420,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnuse
   trigger offered;
   EXPECT_CALL(sched2, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
-		    Trigger(&offered)))
+                    Trigger(&offered)))
     .WillRepeatedly(Return());
 
   driver2.start();
@@ -457,11 +463,11 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
 
   EXPECT_CALL(this->allocator, frameworkAdded(_, Eq(frameworkInfo1), _))
     .WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator),
-		    SaveArg<0>(&frameworkId1)));
+                    SaveArg<0>(&frameworkId1)));
 
   EXPECT_CALL(this->allocator, frameworkAdded(_, Eq(frameworkInfo2), _))
     .WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator),
-		    SaveArg<0>(&frameworkId2)));
+                    SaveArg<0>(&frameworkId2)));
 
   EXPECT_CALL(this->allocator, frameworkDeactivated(_))
     .Times(2);
@@ -469,8 +475,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
   trigger frameworkRemoved, frameworkRemoved2;
   EXPECT_CALL(this->allocator, frameworkRemoved(Eq(ByRef(frameworkId1))))
     .WillOnce(DoAll(InvokeFrameworkRemoved(&this->allocator),
-		    Trigger(&frameworkRemoved),
-		    Return()));
+                    Trigger(&frameworkRemoved)));
 
   EXPECT_CALL(this->allocator, frameworkRemoved(Eq(ByRef(frameworkId2))))
     .WillOnce(Trigger(&frameworkRemoved2));
@@ -489,12 +494,12 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
     // that it doesn't get processed until we redispatch it after
     // the frameworkRemoved trigger.
     .WillOnce(DoAll(SaveArg<0>(&frameworkId),
-		    SaveArg<1>(&slaveId),
-		    SaveArg<2>(&savedResources)))
+                    SaveArg<1>(&slaveId),
+                    SaveArg<2>(&savedResources)))
     .WillRepeatedly(DoDefault());
 
   Files files;
-  Master m(&(this->allocator), &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(&m);
 
   ProcessBasedIsolationModule isolationModule;
@@ -516,7 +521,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
 
   EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
-		    Trigger(&offered)));
+                    Trigger(&offered)));
 
   driver.start();
 
@@ -532,8 +537,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
   // Re-dispatch the resourcesRecovered call which we "caught"
   // earlier now that the framework has been removed, to test
   // that recovering resources from a removed framework works.
-  dispatch(this->allocator, &AllocatorProcess::resourcesRecovered,
-	   frameworkId, slaveId, savedResources);
+  this->a->resourcesRecovered(frameworkId, slaveId, savedResources);
 
   MockScheduler sched2;
   MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master);
@@ -544,7 +548,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDisp
   vector<Offer> offers2;
   EXPECT_CALL(sched2, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers2),
-		    Trigger(&offered2)));
+                    Trigger(&offered2)));
 
   driver2.start();
 
@@ -585,7 +589,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailo
   trigger frameworkDeactivatedTrigger;
   EXPECT_CALL(this->allocator, frameworkDeactivated(_))
     .WillOnce(DoAll(InvokeFrameworkDeactivated(&this->allocator),
-		    Trigger(&frameworkDeactivatedTrigger)))
+                    Trigger(&frameworkDeactivatedTrigger)))
     .WillOnce(DoDefault());
 
   EXPECT_CALL(this->allocator, slaveAdded(_, _, _));
@@ -604,7 +608,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailo
     .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0));
 
   Files files;
-  Master m(&this->allocator, &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(&m);
 
   MockExecutor exec;
@@ -652,7 +656,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailo
   trigger resourceOffersTrigger1;
   EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
     .WillOnce(DoAll(LaunchTasks(1, 1, 256),
-		    Trigger(&resourceOffersTrigger1)));
+                    Trigger(&resourceOffersTrigger1)));
 
   driver1.start();
 
@@ -730,7 +734,7 @@ TYPED_TEST(AllocatorTest, FrameworkExite
     .WillRepeatedly(DoDefault());
 
   Files files;
-  Master m(&this->allocator, &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(m);
 
   MockExecutor exec;
@@ -772,7 +776,7 @@ TYPED_TEST(AllocatorTest, FrameworkExite
   trigger resourceOffersTrigger1;
   EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
     .WillOnce(DoAll(LaunchTasks(1, 2, 512),
-		    Trigger(&resourceOffersTrigger1)));
+                    Trigger(&resourceOffersTrigger1)));
 
   driver1.start();
 
@@ -787,12 +791,11 @@ TYPED_TEST(AllocatorTest, FrameworkExite
     .WillRepeatedly(DeclineOffers());
 
   // The first time sched2 gets an offer, framework 1 has a
-  // task running with 2 cpus and 512 mem, leaving 1 cpu and
-  // 512 mem.
+  // task running with 2 cpus and 512 mem, leaving 1 cpu and 512 mem.
   trigger resourceOffersTrigger2;
   EXPECT_CALL(sched2, resourceOffers(_, OfferEq(1, 512)))
     .WillOnce(DoAll(LaunchTasks(1, 1, 256),
-		    Trigger(&resourceOffersTrigger2)));
+                    Trigger(&resourceOffersTrigger2)));
 
   // After we kill framework 1, all of it's resources should
   // have been returned, but framework 2 should still have a
@@ -854,11 +857,11 @@ TYPED_TEST(AllocatorTest, SlaveLost)
   trigger slaveRemovedTrigger1, slaveRemovedTrigger2;
   EXPECT_CALL(this->allocator, slaveRemoved(_))
     .WillOnce(DoAll(InvokeSlaveRemoved(&this->allocator),
-		    Trigger(&slaveRemovedTrigger1)))
+                    Trigger(&slaveRemovedTrigger1)))
     .WillOnce(Trigger(&slaveRemovedTrigger2));
 
   Files files;
-  Master m(&this->allocator, &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(m);
 
   MockExecutor exec;
@@ -868,7 +871,7 @@ TYPED_TEST(AllocatorTest, SlaveLost)
   trigger launchTaskTrigger;
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
-		    Trigger(&launchTaskTrigger)));
+                    Trigger(&launchTaskTrigger)));
 
   EXPECT_CALL(exec, shutdown(_));
 
@@ -902,7 +905,7 @@ TYPED_TEST(AllocatorTest, SlaveLost)
     // Initially, all of slave1's resources are avaliable.
     EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 1024)))
       .WillOnce(DoAll(LaunchTasks(1, 2, 512),
-		      Trigger(&resourceOffersTrigger1)));
+                      Trigger(&resourceOffersTrigger1)));
 
     // Eventually after slave2 is launched, we should get
     // an offer that contains all of slave2's resources
@@ -982,7 +985,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
     .WillRepeatedly(DoDefault());
 
   Files files;
-  Master m(&this->allocator, &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(m);
 
   MockExecutor exec;
@@ -992,7 +995,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
   trigger launchTaskTrigger;
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
-		    Trigger(&launchTaskTrigger)));
+                    Trigger(&launchTaskTrigger)));
 
   trigger shutdownTrigger;
   EXPECT_CALL(exec, shutdown(_))
@@ -1029,7 +1032,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded)
     // Initially, all of slave1's resources are avaliable.
     EXPECT_CALL(sched1, resourceOffers(_, OfferEq(3, 1024)))
       .WillOnce(DoAll(LaunchTasks(1, 2, 512),
-		      Trigger(&resourceOffersTrigger1)));
+                      Trigger(&resourceOffersTrigger1)));
 
     // After slave2 launches, all of its resources are
     // combined with the resources on slave1 that the
@@ -1109,7 +1112,7 @@ TYPED_TEST(AllocatorTest, TaskFinished)
     .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 1));
 
   Files files;
-  Master m(&this->allocator, &files);
+  Master m(this->a, &files);
   PID<Master> master = process::spawn(m);
 
   MockExecutor exec;
@@ -1120,9 +1123,9 @@ TYPED_TEST(AllocatorTest, TaskFinished)
   trigger launchTaskTrigger;
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(DoAll(SaveArg<0>(&execDriver),
-		    SaveArg<1>(&taskInfo),
-		    SendStatusUpdateFromTask(TASK_RUNNING),
-		    Trigger(&launchTaskTrigger)))
+                    SaveArg<1>(&taskInfo),
+                    SendStatusUpdateFromTask(TASK_RUNNING),
+                    Trigger(&launchTaskTrigger)))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
   trigger shutdownTrigger;
@@ -1164,7 +1167,7 @@ TYPED_TEST(AllocatorTest, TaskFinished)
     // After the tasks are launched.
     EXPECT_CALL(sched1, resourceOffers(_, OfferEq(1, 512)))
       .WillOnce(DoAll(DeclineOffers(),
-		      Trigger(&resourceOffersTrigger1)));
+                      Trigger(&resourceOffersTrigger1)));
 
     // After the first task gets killed.
     EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 768)))
@@ -1220,9 +1223,9 @@ TYPED_TEST(AllocatorTest, WhitelistSlave
   trigger updateWhitelistTrigger1, updateWhitelistTrigger2;
   EXPECT_CALL(this->allocator, updateWhitelist(_))
     .WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
-		    Trigger(&updateWhitelistTrigger1)))
+                    Trigger(&updateWhitelistTrigger1)))
     .WillOnce(DoAll(InvokeUpdateWhitelist(&this->allocator),
-		    Trigger(&updateWhitelistTrigger2)));
+                    Trigger(&updateWhitelistTrigger2)));
 
   EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _))
     .WillRepeatedly(DoDefault());
@@ -1235,7 +1238,7 @@ TYPED_TEST(AllocatorTest, WhitelistSlave
   Files files;
   flags::Flags<logging::Flags, master::Flags> flags;
   flags.whitelist = "file://" + path; // TODO(benh): Put in /tmp.
-  Master m(&this->allocator, &files, flags);
+  Master m(this->a, &files, flags);
   PID<Master> master = process::spawn(&m);
 
   MockExecutor exec;

Modified: incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp Thu Nov  8 05:51:50 2012
@@ -31,6 +31,7 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using mesos::internal::master::Allocator;
 using mesos::internal::master::AllocatorProcess;
 using mesos::internal::master::Master;
 
@@ -42,10 +43,10 @@ using std::map;
 using std::string;
 using std::vector;
 
+using testing::_;
 using testing::DoAll;
 using testing::DoDefault;
 using testing::Eq;
-using testing::_;
 using testing::Return;
 using testing::SaveArg;
 
@@ -53,9 +54,28 @@ using testing::SaveArg;
 template <typename T = AllocatorProcess>
 class AllocatorZooKeeperTest : public ZooKeeperTest
 {
+public:
+  virtual void SetUp()
+  {
+    ZooKeeperTest::SetUp();
+
+    a1 = new Allocator(&allocator1);
+    a2 = new Allocator(&allocator2);
+  }
+
+  virtual void TearDown()
+  {
+    ZooKeeperTest::TearDown();
+
+    delete a1;
+    delete a2;
+  }
+
 protected:
   T allocator1;
-  MockAllocator<T> allocator2;
+  MockAllocatorProcess<T> allocator2;
+  Allocator* a1;
+  Allocator* a2;
 };
 
 
@@ -70,7 +90,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
   trigger frameworkAddedTrigger;
   EXPECT_CALL(this->allocator2, frameworkAdded(_, _, _))
     .WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator2),
-		    Trigger(&frameworkAddedTrigger)));
+                    Trigger(&frameworkAddedTrigger)));
 
   EXPECT_CALL(this->allocator2, frameworkDeactivated(_));
 
@@ -88,13 +108,13 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
   trigger shutdownMessageTrigger;
   EXPECT_MESSAGE(Eq(ShutdownMessage().GetTypeName()), _, _)
     .WillRepeatedly(DoAll(Trigger(&shutdownMessageTrigger),
-			  Return(true)));
+			                    Return(true)));
 
   EXPECT_MESSAGE(Eq(ReregisterSlaveMessage().GetTypeName()), _, _)
     .WillRepeatedly(Return(true));
 
   Files files;
-  Master m(&this->allocator1, &files);
+  Master m(this->a1, &files);
   PID<Master> master1 = process::spawn(&m);
 
   string zk = "zk://" + this->server->connectString() + "/znode";
@@ -135,16 +155,16 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
   trigger resourceOffersTrigger, resourceOffersTrigger2;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
-		    LaunchTasks(1, 1, 512),
+		                LaunchTasks(1, 1, 512),
                     Trigger(&resourceOffersTrigger)))
     .WillRepeatedly(DoAll(SaveArg<1>(&offers2),
-			  Trigger(&resourceOffersTrigger2)));
+			                    Trigger(&resourceOffersTrigger2)));
 
   TaskStatus status;
   trigger statusUpdateTrigger;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(DoAll(SaveArg<1>(&status),
-		    Trigger(&statusUpdateTrigger)));
+		                Trigger(&statusUpdateTrigger)));
 
   EXPECT_CALL(sched, disconnected(_))
     .WillRepeatedly(DoDefault());
@@ -174,7 +194,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
   WAIT_UNTIL(shutdownMessageTrigger);
 
   Files files2;
-  Master m2(&(this->allocator2), &files2);
+  Master m2(this->a2, &files2);
   PID<Master> master2 = process::spawn(m2);
 
   Try<MasterDetector*> detector2 =
@@ -224,7 +244,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
   trigger slaveAddedTrigger;
   EXPECT_CALL(this->allocator2, slaveAdded(_, _, _))
     .WillOnce(DoAll(InvokeSlaveAdded(&this->allocator2),
-		    Trigger(&slaveAddedTrigger)));
+                    Trigger(&slaveAddedTrigger)));
 
   trigger slaveRemovedTrigger;
   EXPECT_CALL(this->allocator2, slaveRemoved(_))
@@ -236,13 +256,13 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
   trigger shutdownMessageTrigger;
   EXPECT_MESSAGE(Eq(ShutdownMessage().GetTypeName()), _, _)
     .WillRepeatedly(DoAll(Trigger(&shutdownMessageTrigger),
-			  Return(true)));
+                          Return(true)));
 
   EXPECT_MESSAGE(Eq(ReregisterFrameworkMessage().GetTypeName()), _, _)
     .WillRepeatedly(Return(true));
 
   Files files;
-  Master m(&this->allocator1, &files);
+  Master m(this->a1, &files);
   PID<Master> master1 = process::spawn(&m);
 
   string zk = "zk://" + this->server->connectString() + "/znode";
@@ -283,16 +303,16 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
   trigger resourceOffersTrigger, resourceOffersTrigger2;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
-		    LaunchTasks(1, 1, 512),
+                    LaunchTasks(1, 1, 512),
                     Trigger(&resourceOffersTrigger)))
     .WillRepeatedly(DoAll(SaveArg<1>(&offers2),
-			  Trigger(&resourceOffersTrigger2)));
+                          Trigger(&resourceOffersTrigger2)));
 
   TaskStatus status;
   trigger statusUpdateTrigger;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(DoAll(SaveArg<1>(&status),
-		    Trigger(&statusUpdateTrigger)));
+		                Trigger(&statusUpdateTrigger)));
 
   EXPECT_CALL(sched, disconnected(_))
     .WillRepeatedly(DoDefault());
@@ -322,7 +342,7 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
   WAIT_UNTIL(shutdownMessageTrigger);
 
   Files files2;
-  Master m2(&(this->allocator2), &files2);
+  Master m2(this->a2, &files2);
   PID<Master> master2 = process::spawn(m2);
 
   Try<MasterDetector*> detector2 =

Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Thu Nov  8 05:51:50 2012
@@ -29,6 +29,8 @@
 
 #include "local/local.hpp"
 
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
 #include "slave/process_based_isolation_module.hpp"
@@ -41,6 +43,8 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::ProcessBasedIsolationModule;
@@ -69,7 +73,8 @@ TEST(FaultToleranceTest, SlaveLost)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -208,7 +213,8 @@ TEST(FaultToleranceTest, SchedulerFailov
   trigger sched1RegisteredCall;
 
   EXPECT_CALL(sched1, registered(&driver1, _, _))
-    .WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&sched1RegisteredCall)));
+    .WillOnce(DoAll(SaveArg<1>(&frameworkId),
+                    Trigger(&sched1RegisteredCall)));
 
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
     .WillRepeatedly(Return());
@@ -366,7 +372,8 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -463,7 +470,8 @@ TEST(FaultToleranceTest, SchedulerFailov
 
   Clock::pause();
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -520,7 +528,8 @@ TEST(FaultToleranceTest, SchedulerFailov
 
   EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), _,
              Not(AnyOf(Eq(master), Eq(slave))))
-    .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
+    .WillOnce(DoAll(Trigger(&statusUpdateMsg),
+                    Return(true)))
     .RetiresOnSaturation();
 
   driver1.start();
@@ -594,7 +603,8 @@ TEST(FaultToleranceTest, SchedulerFailov
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -637,7 +647,8 @@ TEST(FaultToleranceTest, SchedulerFailov
   EXPECT_CALL(sched1, registered(&driver1, _, _))
     .WillOnce(SaveArg<1>(&frameworkId));
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
-    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&sched1StatusUpdateCall)));
+    .WillOnce(DoAll(SaveArg<1>(&status),
+                    Trigger(&sched1StatusUpdateCall)));
 
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
@@ -711,7 +722,8 @@ TEST(FaultToleranceTest, SchedulerExit)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -763,7 +775,8 @@ TEST(FaultToleranceTest, SchedulerExit)
     .WillOnce(SaveArg<1>(&frameworkId));
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&schedStatusUpdateCall)));
+    .WillOnce(DoAll(SaveArg<1>(&status),
+                    Trigger(&schedStatusUpdateCall)));
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
@@ -834,10 +847,12 @@ TEST(FaultToleranceTest, SlaveReliableRe
 
   // Drop the first slave registered message, allow subsequent messages.
   EXPECT_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(Trigger(&slaveRegisteredMsg), Return(true)))
+    .WillOnce(DoAll(Trigger(&slaveRegisteredMsg),
+                    Return(true)))
     .WillRepeatedly(Return(false));
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -888,7 +903,8 @@ TEST(FaultToleranceTest, SlaveReregister
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);
@@ -917,7 +933,8 @@ TEST(FaultToleranceTest, SlaveReregister
   trigger slaveReRegisterMsg;
 
   EXPECT_MESSAGE(Eq(SlaveReregisteredMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(Trigger(&slaveReRegisterMsg), Return(false)));
+    .WillOnce(DoAll(Trigger(&slaveReRegisterMsg),
+                    Return(false)));
 
   driver.start();
 

Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Thu Nov  8 05:51:50 2012
@@ -35,6 +35,8 @@
 
 #include "local/local.hpp"
 
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
 #include "slave/constants.hpp"
@@ -48,6 +50,8 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
@@ -85,7 +89,7 @@ protected:
   {
     ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-    a = new TestAllocatorProcess();
+    a = new Allocator(&allocator);
     files = new Files();
     m = new Master(a, files);
     master = process::spawn(m);
@@ -133,7 +137,8 @@ protected:
     startSlave();
   }
 
-  TestAllocatorProcess* a;
+  Allocator* a;
+  HierarchicalDRFAllocatorProcess allocator;
   Master* m;
   TestingIsolationModule* isolationModule;
   Slave* s;

Modified: incubator/mesos/trunk/src/tests/master_detector_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_detector_tests.cpp?rev=1406932&r1=1406931&r2=1406932&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_detector_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_detector_tests.cpp Thu Nov  8 05:51:50 2012
@@ -31,6 +31,8 @@
 
 #include "detector/detector.hpp"
 
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
 #include "slave/slave.hpp"
@@ -41,6 +43,8 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
@@ -57,7 +61,8 @@ TEST(MasterDetector, File)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  TestAllocatorProcess a;
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator a(&allocator);
   Files files;
   Master m(&a, &files);
   PID<Master> master = process::spawn(&m);