You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC

svn commit: r1140024 [4/15] - in /incubator/mesos/trunk: ./ ec2/ ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/ include/mesos/ src/ src/common/ src/configurator/ src/detector/ src/examples/ src/examples/java/ src/examples/python/...

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Mon Jun 27 06:08:33 2011
@@ -4,29 +4,40 @@
 #include <string>
 #include <vector>
 
-#include <glog/logging.h>
-
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-
 #include <process/process.hpp>
+#include <process/protobuf.hpp>
 
 #include "state.hpp"
 
 #include "common/foreach.hpp"
+#include "common/hashmap.hpp"
+#include "common/hashset.hpp"
 #include "common/multimap.hpp"
 #include "common/resources.hpp"
 #include "common/type_utils.hpp"
+#include "common/units.hpp"
+#include "common/utils.hpp"
 
 #include "configurator/configurator.hpp"
 
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
 
 
 namespace mesos { namespace internal { namespace master {
 
-using foreach::_;
+using namespace process;
 
+// Some forward declarations.
+class Allocator;
+class SlavesManager;
+struct Framework;
+struct Slave;
+struct SlaveResources;
+class SlaveObserver;
+struct Offer;
+
+// TODO(benh): Add units after constants.
+// TODO(benh): Also make configuration options be constants.
 
 // Maximum number of slot offers to have outstanding for each framework.
 const int MAX_OFFERS_PER_FRAMEWORK = 50;
@@ -53,7 +64,7 @@ const double SLAVE_PONG_TIMEOUT = 15.0;
 const int MAX_SLAVE_TIMEOUTS = 5;
 
 // Time to wait for a framework to failover (TODO(benh): Make configurable)).
-const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;
+const double FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;
 
 
 // Reasons why offers might be returned to the Allocator.
@@ -77,17 +88,7 @@ enum TaskRemovalReason
 };
 
 
-// Some forward declarations.
-class Allocator;
-class SlavesManager;
-struct Framework;
-struct Slave;
-struct SlaveResources;
-class SlaveObserver;
-struct SlotOffer;
-
-
-class Master : public MesosProcess<Master>
+class Master : public ProtobufProcess<Master>
 {
 public:
   Master();
@@ -97,18 +98,9 @@ public:
 
   static void registerOptions(Configurator* configurator);
 
-  process::Promise<state::MasterState*> getState();
-  
-  OfferID makeOffer(Framework* framework,
-		    const std::vector<SlaveResources>& resources);
-
-  // Return connected frameworks that are not in the process of being removed
-  std::vector<Framework*> getActiveFrameworks();
-  
-  // Return connected slaves that are not in the process of being removed
-  std::vector<Slave*> getActiveSlaves();
+  Promise<state::MasterState*> getState();
 
-  void newMasterDetected(const std::string& pid);
+  void newMasterDetected(const UPID& pid);
   void noMasterDetected();
   void masterDetectionFailure();
   void registerFramework(const FrameworkInfo& frameworkInfo);
@@ -121,22 +113,17 @@ public:
                           const std::vector<TaskDescription>& tasks,
                           const Params& params);
   void reviveOffers(const FrameworkID& frameworkId);
-  void killTask(const FrameworkID& frameworkId,
-                const TaskID& taskId);
+  void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
   void schedulerMessage(const SlaveID& slaveId,
 			const FrameworkID& frameworkId,
 			const ExecutorID& executorId,
 			const std::string& data);
-  void statusUpdateAck(const FrameworkID& frameworkId,
-                       const TaskID& taskId,
-                       const SlaveID& slaveId);
   void registerSlave(const SlaveInfo& slaveInfo);
   void reregisterSlave(const SlaveID& slaveId,
                        const SlaveInfo& slaveInfo,
                        const std::vector<Task>& tasks);
   void unregisterSlave(const SlaveID& slaveId);
-  void statusUpdate(const FrameworkID& frameworkId,
-                    const TaskStatus& status);
+  void statusUpdate(const StatusUpdate& update, const UPID& pid);
   void executorMessage(const SlaveID& slaveId,
 		       const FrameworkID& frameworkId,
 		       const ExecutorID& executorId,
@@ -144,102 +131,101 @@ public:
   void exitedExecutor(const SlaveID& slaveId,
                       const FrameworkID& frameworkId,
                       const ExecutorID& executorId,
-                      int32_t result);
+                      int32_t status);
   void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
   void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
   void timerTick();
-  void frameworkExpired(const FrameworkID& frameworkId);
+  void frameworkFailoverTimeout(const FrameworkID& frameworkId,
+                                double reregisteredTime);
   void exited();
 
-  Framework* lookupFramework(const FrameworkID& frameworkId);
-  Slave* lookupSlave(const SlaveID& slaveId);
-  SlotOffer* lookupSlotOffer(const OfferID& offerId);
+  // Return connected frameworks that are not in the process of being removed
+  std::vector<Framework*> getActiveFrameworks();
+  
+  // Return connected slaves that are not in the process of being removed
+  std::vector<Slave*> getActiveSlaves();
+
+  OfferID makeOffer(Framework* framework,
+		    const std::vector<SlaveResources>& resources);
   
 protected:
   virtual void operator () ();
   
   void initialize();
 
-  // Process a resource offer reply (for a non-cancelled offer) by launching
-  // the desired tasks (if the offer contains a valid set of tasks) and
-  // reporting any unused resources to the allocator
-  void processOfferReply(SlotOffer* offer,
+  // Process a resource offer reply (for a non-cancelled offer) by
+  // launching the desired tasks (if the offer contains a valid set of
+  // tasks) and reporting any unused resources to the allocator
+  void processOfferReply(Offer* offer,
                          const std::vector<TaskDescription>& tasks,
                          const Params& params);
 
-  // Launch a task described in a slot offer response
+  // Launch a task described in an offer response.
   void launchTask(Framework* framework, const TaskDescription& task);
   
+  void addFramework(Framework* framework);
+
+  // Replace the scheduler for a framework with a new process ID, in
+  // the event of a scheduler failover.
+  void failoverFramework(Framework* framework, const UPID& newPid);
+
   // Terminate a framework, sending it a particular error message
   // TODO: Make the error codes and messages programmer-friendly
   void terminateFramework(Framework* framework,
                           int32_t code,
-                          const std::string& message);
-  
-  // Remove a slot offer (because it was replied to, or we want to rescind it,
-  // or we lost a framework or a slave)
-  void removeSlotOffer(SlotOffer* offer,
-                       OfferReturnReason reason,
-                       const std::vector<SlaveResources>& resourcesLeft);
-
-  void removeTask(Task* task, TaskRemovalReason reason);
-
-  void addFramework(Framework* framework);
-
-  // Replace the scheduler for a framework with a new process ID, in the
-  // event of a scheduler failover.
-  void failoverFramework(Framework* framework, const process::UPID& newPid);
+                          const std::string& error);
 
   // Kill all of a framework's tasks, delete the framework object, and
   // reschedule slot offers for slots that were assigned to this framework
   void removeFramework(Framework* framework);
 
   // Add a slave.
-  void addSlave(Slave* slave);
+  void addSlave(Slave* slave, bool reregister = false);
 
   void readdSlave(Slave* slave, const std::vector<Task>& tasks);
 
   // Lose all of a slave's tasks and delete the slave object
   void removeSlave(Slave* slave);
 
-  virtual Allocator* createAllocator();
+  void removeTask(Framework* framework,
+                  Slave* slave,
+                  Task* task,
+                  TaskRemovalReason reason);
+
+  // Remove a slot offer (because it was replied to, or we want to rescind it,
+  // or we lost a framework or a slave)
+  void removeOffer(Offer* offer,
+                   OfferReturnReason reason,
+                   const std::vector<SlaveResources>& resourcesLeft);
+
+  Framework* getFramework(const FrameworkID& frameworkId);
+  Slave* getSlave(const SlaveID& slaveId);
+  Offer* getOffer(const OfferID& offerId);
 
   FrameworkID newFrameworkId();
   OfferID newOfferId();
   SlaveID newSlaveId();
 
-  const Configuration& getConfiguration();
-
 private:
+  friend struct SlaveRegistrar;
+  friend struct SlaveReregistrar;
+  // TODO(benh): Remove once SimpleAllocator doesn't use Master::get*.
+  friend class SimpleAllocator; 
+
   // TODO(benh): Better naming and name scope for these http handlers.
-  process::Promise<process::HttpResponse> http_info_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_frameworks_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_slaves_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_tasks_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_stats_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_vars(const process::HttpRequest& request);
+  Promise<HttpResponse> http_info_json(const HttpRequest& request);
+  Promise<HttpResponse> http_frameworks_json(const HttpRequest& request);
+  Promise<HttpResponse> http_slaves_json(const HttpRequest& request);
+  Promise<HttpResponse> http_tasks_json(const HttpRequest& request);
+  Promise<HttpResponse> http_stats_json(const HttpRequest& request);
+  Promise<HttpResponse> http_vars(const HttpRequest& request);
 
   const Configuration conf;
 
-  SlavesManager* slavesManager;
-
-  multimap<std::string, uint16_t> slaveHostnamePorts;
-
-  boost::unordered_map<FrameworkID, Framework*> frameworks;
-  boost::unordered_map<SlaveID, Slave*> slaves;
-  boost::unordered_map<OfferID, SlotOffer*> slotOffers;
-
-  boost::unordered_map<process::UPID, FrameworkID> pidToFrameworkId;
-  boost::unordered_map<process::UPID, SlaveID> pidToSlaveId;
-
-  int64_t nextFrameworkId; // Used to give each framework a unique ID.
-  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
-  int64_t nextSlaveId;     // Used to give each slave a unique ID.
+  bool active;
 
-  std::string allocatorType;
   Allocator* allocator;
-
-  bool active;
+  SlavesManager* slavesManager;
 
   // Contains the date the master was launched and
   // some ephemeral token (e.g. returned from
@@ -247,18 +233,24 @@ private:
   // created by this master.
   std::string masterId;
 
+  multimap<std::string, uint16_t> slaveHostnamePorts;
+
+  hashmap<FrameworkID, Framework*> frameworks;
+  hashmap<SlaveID, Slave*> slaves;
+  hashmap<OfferID, Offer*> offers;
+
+  int64_t nextFrameworkId; // Used to give each framework a unique ID.
+  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
+  int64_t nextSlaveId;     // Used to give each slave a unique ID.
+
   // Statistics (initialized in Master::initialize).
   struct {
-    uint64_t launched_tasks;
-    uint64_t finished_tasks;
-    uint64_t killed_tasks;
-    uint64_t failed_tasks;
-    uint64_t lost_tasks;
-    uint64_t valid_status_updates;
-    uint64_t invalid_status_updates;
-    uint64_t valid_framework_messages;
-    uint64_t invalid_framework_messages;
-  } statistics;
+    uint64_t tasks[TaskState_ARRAYSIZE];
+    uint64_t validStatusUpdates;
+    uint64_t invalidStatusUpdates;
+    uint64_t validFrameworkMessages;
+    uint64_t invalidFrameworkMessages;
+  } stats;
 
   // Start time used to calculate uptime.
   double startTime;
@@ -266,49 +258,40 @@ private:
 
 
 // A resource offer.
-struct SlotOffer
+struct Offer
 {
-  OfferID offerId;
-  FrameworkID frameworkId;
-  std::vector<SlaveResources> resources;
+  Offer(const OfferID& _id,
+        const FrameworkID& _frameworkId,
+        const std::vector<SlaveResources>& _resources)
+    : id(_id), frameworkId(_frameworkId), resources(_resources) {}
 
-  SlotOffer(const OfferID& _offerId,
-            const FrameworkID& _frameworkId,
-            const std::vector<SlaveResources>& _resources)
-    : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
+  const OfferID id;
+  const FrameworkID frameworkId;
+  std::vector<SlaveResources> resources;
 };
 
 
 // A connected slave.
 struct Slave
 {
-  SlaveInfo info;
-  SlaveID slaveId;
-  process::UPID pid;
-
-  bool active; // Turns false when slave is being removed
-  double connectTime;
-  double lastHeartbeat;
-  
-  Resources resourcesOffered; // Resources currently in offers
-  Resources resourcesInUse;   // Resources currently used by tasks
-
-  boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
-  boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
-
-  SlaveObserver* observer;
-  
-  Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
-        const process::UPID& _pid, double time)
-    : info(_info), slaveId(_slaveId), pid(_pid), active(true),
-      connectTime(time), lastHeartbeat(time) {}
+  Slave(const SlaveInfo& _info,
+        const SlaveID& _id,
+        const UPID& _pid,
+        double time)
+    : info(_info),
+      id(_id),
+      pid(_pid),
+      active(true),
+      registeredTime(time),
+      lastHeartbeat(time) {}
 
   ~Slave() {}
 
-  Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
+  Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
   {
-    foreachpair (_, Task* task, tasks) {
-      if (task->framework_id() == frameworkId && task->task_id() == taskId) {
+    foreachvalue (Task* task, tasks) {
+      if (task->framework_id() == frameworkId &&
+          task->task_id() == taskId) {
         return task;
       }
     }
@@ -346,6 +329,23 @@ struct Slave
     }
     return resources - (resourcesOffered + resourcesInUse);
   }
+
+  const SlaveID id;
+  const SlaveInfo info;
+
+  UPID pid;
+
+  bool active; // Turns false when slave is being removed
+  double registeredTime;
+  double lastHeartbeat;
+
+  Resources resourcesOffered; // Resources currently in offers
+  Resources resourcesInUse;   // Resources currently used by tasks
+
+  hashmap<std::pair<FrameworkID, TaskID>, Task*> tasks;
+  hashset<Offer*> offers; // Active offers on this slave.
+
+  SlaveObserver* observer;
 };
 
 
@@ -360,71 +360,17 @@ struct SlaveResources
 };
 
 
-class FrameworkFailoverTimer : public process::Process<FrameworkFailoverTimer>
-{
-public:
-  FrameworkFailoverTimer(const process::PID<Master>& _master,
-                         const FrameworkID& _frameworkId)
-    : master(_master), frameworkId(_frameworkId) {}
-
-protected:
-  virtual void operator () ()
-  {
-    link(master);
-    while (true) {
-      receive(FRAMEWORK_FAILOVER_TIMEOUT);
-      if (name() == process::TIMEOUT) {
-        process::dispatch(master, &Master::frameworkExpired, frameworkId);
-        return;
-      } else if (name() == process::EXITED || name() == process::TERMINATE) {
-        return;
-      }
-    }
-  }
-
-private:
-  const process::PID<Master> master;
-  const FrameworkID frameworkId;
-};
-
-
 // An connected framework.
 struct Framework
 {
-  FrameworkInfo info;
-  FrameworkID frameworkId;
-  process::UPID pid;
-
-  bool active; // Turns false when framework is being removed
-  double connectTime;
-
-  boost::unordered_map<TaskID, Task*> tasks;
-  boost::unordered_set<SlotOffer*> slotOffers; // Active offers for framework.
-
-  Resources resources; // Total resources owned by framework (tasks + offers)
-  
-  // Contains a time of unfiltering for each slave we've filtered,
-  // or 0 for slaves that we want to keep filtered forever
-  boost::unordered_map<Slave*, double> slaveFilter;
-
-  // A failover timer if the connection to this framework is lost.
-  FrameworkFailoverTimer* failoverTimer;
+  Framework(const FrameworkInfo& _info, const FrameworkID& _id,
+            const UPID& _pid, double time)
+    : info(_info), id(_id), pid(_pid), active(true),
+      registeredTime(time), reregisteredTime(time) {}
 
-  Framework(const FrameworkInfo& _info, const FrameworkID& _frameworkId,
-            const process::UPID& _pid, double time)
-    : info(_info), frameworkId(_frameworkId), pid(_pid), active(true),
-      connectTime(time), failoverTimer(NULL) {}
-
-  ~Framework()
-  {
-    if (failoverTimer != NULL) {
-      process::post(failoverTimer->self(), process::TERMINATE);
-      process::wait(failoverTimer->self());
-      delete failoverTimer;
-    }
-  }
+  ~Framework() {}
   
-  Task* lookupTask(const TaskID& taskId)
+  Task* getTask(const TaskID& taskId)
   {
     if (tasks.count(taskId) > 0) {
       return tasks[taskId];
@@ -452,19 +398,19 @@ struct Framework
     tasks.erase(taskId);
   }
   
-  void addOffer(SlotOffer* offer)
+  void addOffer(Offer* offer)
   {
-    CHECK(slotOffers.count(offer) == 0);
-    slotOffers.insert(offer);
+    CHECK(offers.count(offer) == 0);
+    offers.insert(offer);
     foreach (const SlaveResources& sr, offer->resources) {
       resources += sr.resources;
     }
   }
 
-  void removeOffer(SlotOffer* offer)
+  void removeOffer(Offer* offer)
   {
-    CHECK(slotOffers.find(offer) != slotOffers.end());
-    slotOffers.erase(offer);
+    CHECK(offers.find(offer) != offers.end());
+    offers.erase(offer);
     foreach (const SlaveResources& sr, offer->resources) {
       resources -= sr.resources;
     }
@@ -478,34 +424,52 @@ struct Framework
   
   void removeExpiredFilters(double now)
   {
-    foreachpaircopy (Slave* slave, double removalTime, slaveFilter) {
+    foreachpair (Slave* slave, double removalTime, utils::copy(slaveFilter)) {
       if (removalTime != 0 && removalTime <= now) {
         slaveFilter.erase(slave);
       }
     }
   }
+
+  const FrameworkID id;
+  const FrameworkInfo info;
+
+  UPID pid;
+
+  bool active; // Turns false when framework is being removed
+  double registeredTime;
+  double reregisteredTime;
+
+  hashmap<TaskID, Task*> tasks;
+  hashset<Offer*> offers; // Active offers for framework.
+
+  Resources resources; // Total resources owned by framework (tasks + offers)
+
+  // Contains a time of unfiltering for each slave we've filtered,
+  // or 0 for slaves that we want to keep filtered forever
+  hashmap<Slave*, double> slaveFilter;
 };
 
 
-// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc.
+// Pretty-printing of Offers, Tasks, Frameworks, Slaves, etc.
 
-inline std::ostream& operator << (std::ostream& stream, const SlotOffer *o)
+inline std::ostream& operator << (std::ostream& stream, const Offer *o)
 {
-  stream << "offer " << o->offerId;
+  stream << "offer " << o->id;
   return stream;
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const Slave *s)
 {
-  stream << "slave " << s->slaveId;
+  stream << "slave " << s->id;
   return stream;
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const Framework *f)
 {
-  stream << "framework " << f->frameworkId;
+  stream << "framework " << f->id;
   return stream;
 }
 

Modified: incubator/mesos/trunk/src/master/simple_allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.cpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.cpp Mon Jun 27 06:08:33 2011
@@ -56,7 +56,7 @@ void SimpleAllocator::taskRemoved(Task* 
 {
   LOG(INFO) << "Removed " << task;
   // Remove all refusers from this slave since it has more resources free
-  Slave* slave = master->lookupSlave(task->slave_id());
+  Slave* slave = master->getSlave(task->slave_id());
   CHECK(slave != 0);
   refusers[slave].clear();
   // Re-offer the resources, unless this task was removed due to a lost
@@ -66,7 +66,7 @@ void SimpleAllocator::taskRemoved(Task* 
 }
 
 
-void SimpleAllocator::offerReturned(SlotOffer* offer,
+void SimpleAllocator::offerReturned(Offer* offer,
                                     OfferReturnReason reason,
                                     const vector<SlaveResources>& resLeft)
 {
@@ -74,7 +74,7 @@ void SimpleAllocator::offerReturned(Slot
 
   // If this offer returned due to the framework replying, add it to refusers.
   if (reason == ORR_FRAMEWORK_REPLIED) {
-    Framework* framework = master->lookupFramework(offer->frameworkId);
+    Framework* framework = master->getFramework(offer->frameworkId);
     CHECK(framework != 0);
     foreach (const SlaveResources& r, resLeft) {
       VLOG(1) << "Framework reply leaves " << r.resources.allocatable()
@@ -148,7 +148,7 @@ struct DominantShareComparator
 
     if (share1 == share2)
       // Make the sort deterministic for unit testing.
-      return f1->frameworkId.value() < f2->frameworkId.value();
+      return f1->id.value() < f2->id.value();
     else
       return share1 < share2;
   }
@@ -235,7 +235,7 @@ void SimpleAllocator::makeNewOffers(cons
   }
   
   // Clear refusers on any slave that has been refused by everyone
-  foreachpair (Slave* slave, _, freeResources) {
+  foreachkey (Slave* slave, freeResources) {
     unordered_set<Framework*>& refs = refusers[slave];
     if (refs.size() == ordering.size()) {
       VLOG(1) << "Clearing refusers for " << slave
@@ -251,7 +251,7 @@ void SimpleAllocator::makeNewOffers(cons
       if (refusers[slave].find(framework) == refusers[slave].end() &&
           !framework->filters(slave, resources)) {
         VLOG(1) << "Offering " << resources << " on " << slave
-                << " to framework " << framework->frameworkId;
+                << " to framework " << framework->id;
         offerable.push_back(SlaveResources(slave, resources));
       }
     }

Modified: incubator/mesos/trunk/src/master/simple_allocator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.hpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.hpp Mon Jun 27 06:08:33 2011
@@ -6,7 +6,7 @@
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 
-#include "messaging/messages.pb.h"
+#include "messages/messages.pb.h"
 
 #include "allocator.hpp"
 
@@ -38,7 +38,7 @@ public:
   
   virtual void taskRemoved(Task* task, TaskRemovalReason reason);
 
-  virtual void offerReturned(SlotOffer* offer,
+  virtual void offerReturned(Offer* offer,
                              OfferReturnReason reason,
                              const std::vector<SlaveResources>& resourcesLeft);
 

Modified: incubator/mesos/trunk/src/master/slaves_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.cpp Mon Jun 27 06:08:33 2011
@@ -1,9 +1,11 @@
+#include <glog/logging.h>
+
 #include <map>
 #include <sstream>
 
 #include <boost/lexical_cast.hpp>
 
-#include <glog/logging.h>
+#include <process/dispatch.hpp>
 
 #include "config/config.hpp"
 
@@ -568,7 +570,7 @@ bool ZooKeeperSlavesManagerStorage::pars
 
 SlavesManager::SlavesManager(const Configuration& conf,
                              const PID<Master>& _master)
-  : process::Process<SlavesManager>("slaves"),
+  : process::ProcessBase("slaves"),
     master(_master)
 {
   // Create the slave manager storage based on configuration.
@@ -754,7 +756,7 @@ void SlavesManager::updateActive(const m
 {
   // Loop through the current active slave hostname:port pairs and
   // remove all that are not found in updated.
-  foreachpaircopy (const string& hostname, uint16_t port, active) {
+  foreachpair (const string& hostname, uint16_t port, utils::copy(active)) {
     if (updated.count(hostname, port) == 0) {
       process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
                         hostname, port);

Modified: incubator/mesos/trunk/src/master/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/state.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/state.hpp (original)
+++ incubator/mesos/trunk/src/master/state.hpp Mon Jun 27 06:08:33 2011
@@ -19,24 +19,24 @@ namespace mesos { namespace internal { n
 struct SlaveResources
 {  
   std::string slave_id;
-  int32_t cpus;
-  int64_t mem;
+  double cpus;
+  double mem;
   
-  SlaveResources(std::string _slaveId, int32_t _cpus, int64_t _mem)
+  SlaveResources(std::string _slaveId, double _cpus, double _mem)
     : slave_id(_slaveId), cpus(_cpus), mem(_mem) {}
 };
 
 
-struct SlotOffer
+struct Offer
 {  
   std::string id;
   std::string framework_id;
   std::vector<SlaveResources *> resources;
   
-  SlotOffer(std::string _id, std::string _frameworkId)
+  Offer(std::string _id, std::string _frameworkId)
     : id(_id), framework_id(_frameworkId) {}
     
-  ~SlotOffer()
+  ~Offer()
   {
     foreach (SlaveResources *sr, resources)
       delete sr;
@@ -48,7 +48,7 @@ struct Slave
 {
   Slave(std::string id_, const std::string& host_,
         const std::string& web_ui_url_,
-	int32_t cpus_, int64_t mem_, time_t connect_)
+	double cpus_, double mem_, double connect_)
     : id(id_), host(host_), web_ui_url(web_ui_url_),
       cpus(cpus_), mem(mem_), connect_time(connect_) {}
 
@@ -57,16 +57,16 @@ struct Slave
   std::string id;
   std::string host;
   std::string web_ui_url;
-  int32_t cpus;
-  int64_t mem;
-  int64_t connect_time;
+  double cpus;
+  double mem;
+  double connect_time;
 };
 
 
 struct Task
 {
   Task(std::string id_, const std::string& name_, std::string framework_id_,
-       std::string slaveId_, std::string state_, int32_t _cpus, int64_t _mem)
+       std::string slaveId_, std::string state_, double _cpus, double _mem)
     : id(id_), name(name_), framework_id(framework_id_), slave_id(slaveId_),
       state(state_), cpus(_cpus), mem(_mem) {}
 
@@ -77,8 +77,8 @@ struct Task
   std::string framework_id;
   std::string slave_id;
   std::string state;
-  int32_t cpus;
-  int64_t mem;
+  double cpus;
+  double mem;
 };
 
 
@@ -86,7 +86,7 @@ struct Framework
 {
   Framework(std::string id_, const std::string& user_,
             const std::string& name_, const std::string& executor_,
-            int32_t cpus_, int64_t mem_, time_t connect_)
+            double cpus_, double mem_, double connect_)
     : id(id_), user(user_), name(name_), executor(executor_),
       cpus(cpus_), mem(mem_), connect_time(connect_) {}
 
@@ -96,7 +96,7 @@ struct Framework
   {
     foreach (Task *task, tasks)
       delete task;
-    foreach (SlotOffer *offer, offers)
+    foreach (Offer *offer, offers)
       delete offer;
   }
 
@@ -104,12 +104,12 @@ struct Framework
   std::string user;
   std::string name;
   std::string executor;
-  int32_t cpus;
-  int64_t mem;
-  int64_t connect_time;
+  double cpus;
+  double mem;
+  double connect_time;
 
   std::vector<Task *> tasks;
-  std::vector<SlotOffer *> offers;
+  std::vector<Offer *> offers;
 };
 
 

Modified: incubator/mesos/trunk/src/master/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/webui.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/webui.cpp (original)
+++ incubator/mesos/trunk/src/master/webui.cpp Mon Jun 27 06:08:33 2011
@@ -3,6 +3,8 @@
 #include <sstream>
 #include <string>
 
+#include <process/dispatch.hpp>
+
 #include "state.hpp"
 #include "webui.hpp"
 

Added: incubator/mesos/trunk/src/messages/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.hpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.hpp (added)
+++ incubator/mesos/trunk/src/messages/messages.hpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,6 @@
+#ifndef __MESSAGES_HPP__
+#define __MESSAGES_HPP__
+
+#include "messages/messages.pb.h"
+
+#endif // __MESSAGES_HPP__

Added: incubator/mesos/trunk/src/messages/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.proto?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.proto (added)
+++ incubator/mesos/trunk/src/messages/messages.proto Mon Jun 27 06:08:33 2011
@@ -0,0 +1,245 @@
+import "mesos.proto";
+
+package mesos.internal;
+
+// TODO(benh): Provide comments for each of these messages. Also,
+// consider splitting these messages into different "packages" which
+// represent which messages get handled by which components (e.g., the
+// "mesos.internal.executor" package includes messages that the
+// executor handles).
+
+
+// TODO(benh): It would be great if this could just be a
+// TaskDescription wherever it gets used! However, doing so would
+// require adding the framework_id field, the executor_id field, and
+// the state field into TaskDescription though (or send them another
+// way). Also, one performance reason why we don't do that now is
+// because storing whatever data is coupled with a TaskDescription
+// could be large and unnecessary.
+message Task {
+  required string name = 1;
+  required TaskID task_id = 2;
+  required FrameworkID framework_id = 3;
+  required ExecutorID executor_id = 4;
+  required SlaveID slave_id = 5;
+  required TaskState state = 6;
+  repeated Resource resources = 7;
+}
+
+
+message StatusUpdate {
+  required FrameworkID framework_id = 1;
+  optional ExecutorID executor_id = 2;
+  optional SlaveID slave_id = 3;
+  required TaskStatus status = 4;
+  required double timestamp = 5;
+  required bytes uuid = 6;
+}
+
+
+message ResourceOffer {
+  required SlaveInfo slave = 1;
+  repeated Resource resources = 2;
+}
+
+
+message ExecutorToFrameworkMessage {
+  required SlaveID slave_id = 1;
+  required FrameworkID framework_id = 2;
+  required ExecutorID executor_id = 3;
+  required bytes data = 4;
+}
+
+
+message FrameworkToExecutorMessage {
+  required SlaveID slave_id = 1;
+  required FrameworkID framework_id = 2;
+  required ExecutorID executor_id = 3;
+  required bytes data = 4;
+}
+
+
+message RegisterFrameworkMessage {
+  required FrameworkInfo framework = 1;
+}
+
+
+message ReregisterFrameworkMessage {
+  required FrameworkID framework_id = 1;
+  required FrameworkInfo framework = 2;
+  required int32 generation = 3;
+}
+
+
+message FrameworkRegisteredMessage {
+  required FrameworkID framework_id = 1;
+}
+
+
+message UnregisterFrameworkMessage {
+  required FrameworkID framework_id = 1;
+}
+
+
+message ResourceOfferMessage {
+  required OfferID offer_id = 1;
+  repeated SlaveOffer offers = 2;
+  repeated string pids = 3;
+}
+
+
+message ResourceOfferReplyMessage {
+  required FrameworkID framework_id = 1;
+  required OfferID offer_id = 2;
+  repeated TaskDescription tasks = 3;
+  optional Params params = 4;
+}
+
+
+message RescindResourceOfferMessage {
+  required OfferID offer_id = 1;
+}
+
+
+message ReviveOffersMessage {
+  required FrameworkID framework_id = 1;
+}
+
+
+message RunTaskMessage {
+  required FrameworkID framework_id = 1;
+  required FrameworkInfo framework = 2;
+  required string pid = 3;
+  required TaskDescription task = 4;
+}
+
+
+message KillTaskMessage {
+  required FrameworkID framework_id = 1;
+  required TaskID task_id = 2;
+}
+
+
+message StatusUpdateMessage {
+  required StatusUpdate update = 1;
+  optional string pid = 2;
+}
+
+
+message StatusUpdateAcknowledgementMessage {
+  required SlaveID slave_id = 1;
+  required FrameworkID framework_id = 2;
+  required TaskID task_id = 3;
+  required bytes uuid = 4;
+}
+
+
+message LostSlaveMessage {
+  required SlaveID slave_id = 1;
+}
+
+
+message FrameworkErrorMessage {
+  required int32 code = 1;
+  required string message = 2;
+}
+
+
+message RegisterSlaveMessage {
+  required SlaveInfo slave = 1;
+}
+
+
+message ReregisterSlaveMessage {
+  required SlaveID slave_id = 1;
+  required SlaveInfo slave = 2;
+  repeated Task tasks = 3;
+}
+
+
+message SlaveRegisteredMessage {
+  required SlaveID slave_id = 1;
+}
+
+
+message SlaveReregisteredMessage {
+  required SlaveID slave_id = 1;
+}
+
+
+message UnregisterSlaveMessage {
+  required SlaveID slave_id = 1;
+}
+
+
+message HeartbeatMessage {
+  required SlaveID slave_id = 1;
+}
+
+
+// Tells a slave to shut down all executors of the given framework.
+message ShutdownFrameworkMessage {
+  required FrameworkID framework_id = 1;
+}
+
+
+// Tells the executor to initiate a shut down by invoking
+// Executor::shutdown.
+message ShutdownExecutorMessage {}
+
+
+message UpdateFrameworkMessage {
+  required FrameworkID framework_id = 1;
+  required string pid = 2;
+}
+
+
+message RegisterExecutorMessage {
+  required FrameworkID framework_id = 1;
+  required ExecutorID executor_id = 2;
+}
+
+
+message ExecutorRegisteredMessage {
+  required ExecutorArgs args = 1;
+}
+
+
+message ExitedExecutorMessage {
+  required SlaveID slave_id = 1;
+  required FrameworkID framework_id = 2;
+  required ExecutorID executor_id = 3;
+  required int32 status = 4;
+}
+
+
+message RegisterProjdMessage {
+  required string project = 1;
+}
+
+
+message ProjdReadyMessage {
+  required string project = 1;
+}
+
+
+message ProjdUpdateResourcesMessage {
+  optional Params params = 1;
+}
+
+
+message FrameworkExpiredMessage {
+  required FrameworkID framework_id = 1;
+}
+
+
+message NoMasterDetectedMessage {}
+
+message NewMasterDetectedMessage {
+  required string pid = 2;
+}
+
+
+message GotMasterTokenMessage {
+  required string token = 1;
+}

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Mon Jun 27 06:08:33 2011
@@ -8,8 +8,6 @@
 
 #include <arpa/inet.h>
 
-#include <google/protobuf/descriptor.h>
-
 #include <iostream>
 #include <map>
 #include <string>
@@ -19,14 +17,14 @@
 
 #include <mesos/scheduler.hpp>
 
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-
+#include <process/dispatch.hpp>
 #include <process/process.hpp>
+#include <process/protobuf.hpp>
 
 #include "configurator/configuration.hpp"
 
 #include "common/fatal.hpp"
+#include "common/hashmap.hpp"
 #include "common/lock.hpp"
 #include "common/logging.hpp"
 #include "common/type_utils.hpp"
@@ -35,24 +33,21 @@
 
 #include "local/local.hpp"
 
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
 
-using boost::cref;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using google::protobuf::RepeatedPtrField;
+using namespace process;
 
-using process::PID;
-using process::UPID;
+using boost::cref;
 
 using std::map;
 using std::string;
 using std::vector;
 
+using process::wait; // Necessary on some OS's to disambiguate.
+
 using std::tr1::bind;
 
 
@@ -64,81 +59,69 @@ namespace mesos { namespace internal {
 // we allow friend functions to invoke 'send', 'post', etc. Therefore,
 // we must make sure that any necessary synchronization is performed.
 
-class SchedulerProcess : public MesosProcess<SchedulerProcess>
+class SchedulerProcess : public ProtobufProcess<SchedulerProcess>
 {
 public:
-  SchedulerProcess(MesosSchedulerDriver* _driver, Scheduler* _sched,
+  SchedulerProcess(MesosSchedulerDriver* _driver,
+                   Scheduler* _sched,
 		   const FrameworkID& _frameworkId,
                    const FrameworkInfo& _framework)
-    : driver(_driver), sched(_sched), frameworkId(_frameworkId),
-      framework(_framework), generation(0), master(UPID()), terminate(false)
-  {
-    install(NEW_MASTER_DETECTED, &SchedulerProcess::newMasterDetected,
-            &NewMasterDetectedMessage::pid);
-
-    install(NO_MASTER_DETECTED, &SchedulerProcess::noMasterDetected);
-
-    install(M2F_REGISTER_REPLY, &SchedulerProcess::registerReply,
-            &FrameworkRegisteredMessage::framework_id);
-
-    install(M2F_RESOURCE_OFFER, &SchedulerProcess::resourceOffer,
-            &ResourceOfferMessage::offer_id,
-            &ResourceOfferMessage::offers,
-            &ResourceOfferMessage::pids);
-
-    install(M2F_RESCIND_OFFER, &SchedulerProcess::rescindOffer,
-            &RescindResourceOfferMessage::offer_id);
-
-    install(M2F_STATUS_UPDATE, &SchedulerProcess::statusUpdate,
-            &StatusUpdateMessage::framework_id,
-            &StatusUpdateMessage::status);
+    : driver(_driver),
+      sched(_sched),
+      frameworkId(_frameworkId),
+      framework(_framework),
+      generation(0),
+      master(UPID())
+  {
+    installProtobufHandler<NewMasterDetectedMessage>(
+        &SchedulerProcess::newMasterDetected,
+        &NewMasterDetectedMessage::pid);
+
+    installProtobufHandler<NoMasterDetectedMessage>(
+        &SchedulerProcess::noMasterDetected);
+
+    installProtobufHandler<FrameworkRegisteredMessage>(
+        &SchedulerProcess::registered,
+        &FrameworkRegisteredMessage::framework_id);
+
+    installProtobufHandler<ResourceOfferMessage>(
+        &SchedulerProcess::resourceOffer,
+        &ResourceOfferMessage::offer_id,
+        &ResourceOfferMessage::offers,
+        &ResourceOfferMessage::pids);
+
+    installProtobufHandler<RescindResourceOfferMessage>(
+        &SchedulerProcess::rescindOffer,
+        &RescindResourceOfferMessage::offer_id);
+
+    installProtobufHandler<StatusUpdateMessage>(
+        &SchedulerProcess::statusUpdate,
+        &StatusUpdateMessage::update,
+        &StatusUpdateMessage::pid);
+
+    installProtobufHandler<LostSlaveMessage>(
+        &SchedulerProcess::lostSlave,
+        &LostSlaveMessage::slave_id);
+
+    installProtobufHandler<ExecutorToFrameworkMessage>(
+        &SchedulerProcess::frameworkMessage,
+        &ExecutorToFrameworkMessage::slave_id,
+        &ExecutorToFrameworkMessage::framework_id,
+        &ExecutorToFrameworkMessage::executor_id,
+        &ExecutorToFrameworkMessage::data);
+
+    installProtobufHandler<FrameworkErrorMessage>(
+        &SchedulerProcess::error,
+        &FrameworkErrorMessage::code,
+        &FrameworkErrorMessage::message);
 
-    install(M2F_LOST_SLAVE, &SchedulerProcess::lostSlave,
-            &LostSlaveMessage::slave_id);
-
-    install(M2F_FRAMEWORK_MESSAGE, &SchedulerProcess::frameworkMessage,
-	    &FrameworkMessageMessage::slave_id,
-	    &FrameworkMessageMessage::framework_id,
-	    &FrameworkMessageMessage::executor_id,
-	    &FrameworkMessageMessage::data);
-
-    install(M2F_ERROR, &SchedulerProcess::error,
-            &FrameworkErrorMessage::code,
-            &FrameworkErrorMessage::message);
-
-    install(process::EXITED, &SchedulerProcess::exited);
+    installMessageHandler(process::EXITED, &SchedulerProcess::exited);
   }
 
   virtual ~SchedulerProcess() {}
 
 protected:
-  virtual void operator () ()
-  {
-    while (true) {
-      // Sending a message to terminate this process is insufficient
-      // because that message might get queued behind a bunch of other
-      // message. So, when it is time to terminate, we set a flag that
-      // gets re-read by this process after every message. In order to
-      // get this correct we must return from each invocation of
-      // 'serve', to check and see if terminate has been set. In
-      // addition, we need to send a dummy message right after we set
-      // terminate just in case there aren't any messages in the
-      // queue. Note that the terminate field is only read by this
-      // process, so we don't need to protect it in any way. In fact,
-      // using a lock to protect it (or for providing atomicity for
-      // cleanup, for example), might lead to deadlock with the client
-      // code because we already use a lock in SchedulerDriver. That
-      // being said, for now we make terminate 'volatile' to guarantee
-      // that each read is getting a fresh copy.
-      // TODO(benh): Do a coherent read so as to avoid using
-      // 'volatile'.
-      if (terminate) return;
-
-      serve(0, true);
-    }
-  }
-
-  void newMasterDetected(const string& pid)
+  void newMasterDetected(const UPID& pid)
   {
     VLOG(1) << "New master at " << pid;
 
@@ -147,16 +130,16 @@ protected:
 
     if (frameworkId == "") {
       // Touched for the very first time.
-      MSG<F2M_REGISTER_FRAMEWORK> out;
-      out.mutable_framework()->MergeFrom(framework);
-      send(master, out);
+      RegisterFrameworkMessage message;
+      message.mutable_framework()->MergeFrom(framework);
+      send(master, message);
     } else {
       // Not the first time, or failing over.
-      MSG<F2M_REREGISTER_FRAMEWORK> out;
-      out.mutable_framework()->MergeFrom(framework);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.set_generation(generation++);
-      send(master, out);
+      ReregisterFrameworkMessage message;
+      message.mutable_framework()->MergeFrom(framework);
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.set_generation(generation++);
+      send(master, message);
     }
 
     active = true;
@@ -170,12 +153,11 @@ protected:
     active = false;
   }
 
-  void registerReply(const FrameworkID& frameworkId)
+  void registered(const FrameworkID& frameworkId)
   {
     VLOG(1) << "Framework registered with " << frameworkId;
     this->frameworkId = frameworkId;
-    process::invoke(bind(&Scheduler::registered, sched, driver,
-                         cref(frameworkId)));
+    invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
   }
 
   void resourceOffer(const OfferID& offerId,
@@ -199,26 +181,26 @@ protected:
       }
     }
 
-    process::invoke(bind(&Scheduler::resourceOffer, sched, driver,
-                         cref(offerId), cref(offers)));
+    invoke(bind(&Scheduler::resourceOffer, sched, driver, cref(offerId),
+                cref(offers)));
   }
 
   void rescindOffer(const OfferID& offerId)
   {
     VLOG(1) << "Rescinded offer " << offerId;
     savedOffers.erase(offerId);
-    process::invoke(bind(&Scheduler::offerRescinded, sched, driver, 
-                         cref(offerId)));
+    invoke(bind(&Scheduler::offerRescinded, sched, driver, cref(offerId)));
   }
 
-  void statusUpdate(const FrameworkID& frameworkId, const TaskStatus& status)
+  void statusUpdate(const StatusUpdate& update, const UPID& pid)
   {
+    const TaskStatus& status = update.status();
+
     VLOG(1) << "Status update: task " << status.task_id()
-            << " of framework " << frameworkId
-            << " is now in state "
-            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+            << " of framework " << update.framework_id()
+            << " is now in state " << status.state();
 
-    CHECK(this->frameworkId == frameworkId);
+    CHECK(frameworkId == update.framework_id());
 
     // TODO(benh): Note that this maybe a duplicate status update!
     // Once we get support to try and have a more consistent view
@@ -229,25 +211,27 @@ protected:
     // multiple times (of course, if a scheduler re-uses a TaskID,
     // that could be bad.
 
-    process::invoke(bind(&Scheduler::statusUpdate, sched, driver,
-                         cref(status)));
+    invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
 
-    // Acknowledge the message (we do this last, after we process::invoked
-    // the scheduler, if we did at all, in case it causes a crash,
-    // since this way the message might get resent/routed after
-    // the scheduler comes back online).
-    MSG<F2M_STATUS_UPDATE_ACK> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    out.mutable_slave_id()->MergeFrom(status.slave_id());
-    out.mutable_task_id()->MergeFrom(status.task_id());
-    send(master, out);
+    if (pid) {
+      // Acknowledge the message (we do this last, after we invoked
+      // the scheduler, if we did at all, in case it causes a crash,
+      // since this way the message might get resent/routed after the
+      // scheduler comes back online).
+      StatusUpdateAcknowledgementMessage message;
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_slave_id()->MergeFrom(update.slave_id());
+      message.mutable_task_id()->MergeFrom(status.task_id());
+      message.set_uuid(update.uuid());
+      send(pid, message);
+    }
   }
 
   void lostSlave(const SlaveID& slaveId)
   {
     VLOG(1) << "Lost slave " << slaveId;
     savedSlavePids.erase(slaveId);
-    process::invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
+    invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
   }
 
   void frameworkMessage(const SlaveID& slaveId,
@@ -256,15 +240,14 @@ protected:
 			const string& data)
   {
     VLOG(1) << "Received framework message";
-    process::invoke(bind(&Scheduler::frameworkMessage, sched, driver,
-                         cref(slaveId), cref(executorId), cref(data)));
+    invoke(bind(&Scheduler::frameworkMessage, sched, driver, cref(slaveId),
+                cref(executorId), cref(data)));
   }
 
   void error(int32_t code, const string& message)
   {
     VLOG(1) << "Got error '" << message << "' (code: " << code << ")";
-    process::invoke(bind(&Scheduler::error, sched, driver, code,
-                         cref(message)));
+    invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
   }
 
   void exited()
@@ -277,12 +260,16 @@ protected:
 
   void stop()
   {
+    // Whether or not we send an unregister message, we want to
+    // terminate this process ...
+    terminate(self());
+
     if (!active)
       return;
 
-    MSG<F2M_UNREGISTER_FRAMEWORK> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    send(master, out);
+    UnregisterFrameworkMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    send(master, message);
   }
 
   void killTask(const TaskID& taskId)
@@ -290,10 +277,10 @@ protected:
     if (!active)
       return;
 
-    MSG<F2M_KILL_TASK> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    out.mutable_task_id()->MergeFrom(taskId);
-    send(master, out);
+    KillTaskMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_task_id()->MergeFrom(taskId);
+    send(master, message);
   }
 
   void replyToOffer(const OfferID& offerId,
@@ -303,12 +290,12 @@ protected:
     if (!active)
       return;
 
-    MSG<F2M_RESOURCE_OFFER_REPLY> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    out.mutable_offer_id()->MergeFrom(offerId);
+    ResourceOfferReplyMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_offer_id()->MergeFrom(offerId);
 
     foreachpair (const string& key, const string& value, params) {
-      Param* param = out.mutable_params()->add_param();
+      Param* param = message.mutable_params()->add_param();
       param->set_key(key);
       param->set_value(value);
     }
@@ -318,13 +305,13 @@ protected:
       // framework messages directly.
       savedSlavePids[task.slave_id()] = savedOffers[offerId][task.slave_id()];
 
-      out.add_tasks()->MergeFrom(task);
+      message.add_tasks()->MergeFrom(task);
     }
 
     // Remove the offer since we saved all the PIDs we might use.
     savedOffers.erase(offerId);
 
-    send(master, out);
+    send(master, message);
   }
 
   void reviveOffers()
@@ -332,9 +319,9 @@ protected:
     if (!active)
       return;
 
-    MSG<F2M_REVIVE_OFFERS> out;
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    send(master, out);
+    ReviveOffersMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkId);
+    send(master, message);
   }
 
   void sendFrameworkMessage(const SlaveID& slaveId,
@@ -357,23 +344,22 @@ protected:
       UPID slave = savedSlavePids[slaveId];
       CHECK(slave != UPID());
 
-      // TODO(benh): This is kind of wierd, M2S?
-      MSG<M2S_FRAMEWORK_MESSAGE> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_data(data);
-      send(slave, out);
+      FrameworkToExecutorMessage message;
+      message.mutable_slave_id()->MergeFrom(slaveId);
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_executor_id()->MergeFrom(executorId);
+      message.set_data(data);
+      send(slave, message);
     } else {
       VLOG(1) << "Cannot send directly to slave " << slaveId
 	      << "; sending through master";
 
-      MSG<F2M_FRAMEWORK_MESSAGE> out;
-      out.mutable_slave_id()->MergeFrom(slaveId);
-      out.mutable_framework_id()->MergeFrom(frameworkId);
-      out.mutable_executor_id()->MergeFrom(executorId);
-      out.set_data(data);
-      send(master, out);
+      FrameworkToExecutorMessage message;
+      message.mutable_slave_id()->MergeFrom(slaveId);
+      message.mutable_framework_id()->MergeFrom(frameworkId);
+      message.mutable_executor_id()->MergeFrom(executorId);
+      message.set_data(data);
+      send(master, message);
     }
   }
 
@@ -388,10 +374,9 @@ private:
   UPID master;
 
   volatile bool active;
-  volatile bool terminate;
 
-  unordered_map<OfferID, unordered_map<SlaveID, UPID> > savedOffers;
-  unordered_map<SlaveID, UPID> savedSlavePids;
+  hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
+  hashmap<SlaveID, UPID> savedSlavePids;
 };
 
 }} // namespace mesos { namespace internal {
@@ -526,7 +511,7 @@ MesosSchedulerDriver::~MesosSchedulerDri
   // to the user somehow. Note that we will also wait forever if
   // MesosSchedulerDriver::stop was never called.
   if (process != NULL) {
-    process::wait(process->self());
+    wait(process);
     delete process;
   }
 
@@ -584,7 +569,7 @@ int MesosSchedulerDriver::start()
 
   process = new SchedulerProcess(this, sched, frameworkId, framework);
 
-  UPID pid = process::spawn(process);
+  UPID pid = spawn(process);
 
   // Check and see if we need to launch a local cluster.
   if (url == "local") {
@@ -616,9 +601,7 @@ int MesosSchedulerDriver::stop()
   // getExecutorInfo which threw exceptions, or explicitely called
   // stop. See above in start).
   if (process != NULL) {
-    process::dispatch(process->self(), &SchedulerProcess::stop);
-    process->terminate = true;
-    process::post(process->self(), process::TERMINATE);
+    dispatch(process, &SchedulerProcess::stop);
   }
 
   running = false;
@@ -661,8 +644,7 @@ int MesosSchedulerDriver::killTask(const
     return -1;
   }
 
-  process::dispatch(process->self(), &SchedulerProcess::killTask,
-                    taskId);
+  dispatch(process, &SchedulerProcess::killTask, taskId);
 
   return 0;
 }
@@ -678,8 +660,7 @@ int MesosSchedulerDriver::replyToOffer(c
     return -1;
   }
 
-  process::dispatch(process->self(), &SchedulerProcess::replyToOffer,
-                    offerId, tasks, params);
+  dispatch(process, &SchedulerProcess::replyToOffer, offerId, tasks, params);
 
   return 0;
 }
@@ -693,7 +674,7 @@ int MesosSchedulerDriver::reviveOffers()
     return -1;
   }
 
-  process::dispatch(process->self(), &SchedulerProcess::reviveOffers);
+  dispatch(process, &SchedulerProcess::reviveOffers);
 
   return 0;
 }
@@ -709,8 +690,8 @@ int MesosSchedulerDriver::sendFrameworkM
     return -1;
   }
 
-  process::dispatch(process->self(), &SchedulerProcess::sendFrameworkMessage,
-                    slaveId, executorId, data);
+  dispatch(process, &SchedulerProcess::sendFrameworkMessage,
+           slaveId, executorId, data);
 
   return 0;
 }

Modified: incubator/mesos/trunk/src/slave/isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.cpp Mon Jun 27 06:08:33 2011
@@ -6,12 +6,10 @@
 #include "lxc_isolation_module.hpp"
 #endif
 
-using std::string;
 
-using namespace mesos::internal::slave;
+namespace mesos { namespace internal { namespace slave {
 
-
-IsolationModule * IsolationModule::create(const string &type)
+IsolationModule* IsolationModule::create(const std::string &type)
 {
   if (type == "process")
     return new ProcessBasedIsolationModule();
@@ -27,8 +25,11 @@ IsolationModule * IsolationModule::creat
 }
 
 
-void IsolationModule::destroy(IsolationModule *module)
+void IsolationModule::destroy(IsolationModule* module)
 {
-  if (module != NULL)
+  if (module != NULL) {
     delete module;
+  }
 }
+
+}}} // namespace mesos { namespace internal { namespace slave {

Modified: incubator/mesos/trunk/src/slave/isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -3,36 +3,51 @@
 
 #include <string>
 
+#include <mesos/mesos.hpp>
+
+#include <process/process.hpp>
+
+#include "configurator/configuration.hpp"
+
+#include "common/resources.hpp"
+
 
 namespace mesos { namespace internal { namespace slave {
 
 class Slave;
-class Framework;
-class Executor;
 
 
-class IsolationModule {
+class IsolationModule : public process::Process<IsolationModule>
+{
 public:
-  static IsolationModule * create(const std::string &type);
-  static void destroy(IsolationModule *module);
+  static IsolationModule* create(const std::string& type);
+  static void destroy(IsolationModule* module);
 
   virtual ~IsolationModule() {}
 
   // Called during slave initialization.
-  virtual void initialize(Slave *slave) {}
+  virtual void initialize(const Configuration& conf,
+                          bool local,
+                          const process::PID<Slave>& slave) = 0;
 
   // Called by the slave to launch an executor for a given framework.
-  virtual void launchExecutor(Framework* framework, Executor* executor) = 0;
+  virtual void launchExecutor(const FrameworkID& frameworkId,
+                              const FrameworkInfo& frameworkInfo,
+                              const ExecutorInfo& executorInfo,
+                              const std::string& directory) = 0;
 
   // Terminate a framework's executor, if it is still running.
   // The executor is expected to be gone after this method exits.
-  virtual void killExecutor(Framework* framework, Executor* executor) = 0;
+  virtual void killExecutor(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId) = 0;
 
   // Update the resource limits for a given framework. This method will
   // be called only after an executor for the framework is started.
-  virtual void resourcesChanged(Framework *framework, Executor* executor) {}
+  virtual void resourcesChanged(const FrameworkID& frameworkId,
+                                const ExecutorID& executorId,
+                                const Resources& resources) = 0;
 };
 
-}}}
+}}} // namespace mesos { namespace internal { namespace slave {
 
-#endif /* __ISOLATION_MODULE_HPP__ */
+#endif // __ISOLATION_MODULE_HPP__

Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp Mon Jun 27 06:08:33 2011
@@ -1,277 +1,342 @@
-#include <stdlib.h>
-#include <unistd.h>
-
 #include <algorithm>
+#include <sstream>
+#include <map>
+
+#include <process/dispatch.hpp>
 
 #include "lxc_isolation_module.hpp"
 
 #include "common/foreach.hpp"
+#include "common/type_utils.hpp"
+#include "common/units.hpp"
+#include "common/utils.hpp"
 
 #include "launcher/launcher.hpp"
 
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::list;
-using std::make_pair;
-using std::max;
-using std::ostringstream;
-using std::pair;
-using std::queue;
-using std::string;
-using std::vector;
-
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
 using namespace mesos;
 using namespace mesos::internal;
-using namespace mesos::internal::launcher;
 using namespace mesos::internal::slave;
 
+using namespace process;
+
+using launcher::ExecutorLauncher;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::map;
+using std::max;
+using std::string;
+
+
 namespace {
 
 const int32_t CPU_SHARES_PER_CPU = 1024;
 const int32_t MIN_CPU_SHARES = 10;
-const int64_t MIN_RSS = 128 * Megabyte;
+const int64_t MIN_RSS_MB = 128 * Megabyte;
 
+
+// TODO(benh): Factor this out into common/utils or possibly into
+// libprocess so that it can handle blocking.
+// Run a shell command formatted with varargs and return its exit code.
+int shell(const char* format, ...)
+{
+  char* cmd;
+  FILE* f;
+  int ret;
+  va_list args;
+  va_start(args, format);
+  if (vasprintf(&cmd, format, args) == -1)
+    return -1;
+  if ((f = popen(cmd, "w")) == NULL)
+    return -1;
+  ret = pclose(f);
+  if (ret == -1)
+    LOG(INFO) << "pclose error: " << strerror(errno);
+  free(cmd);
+  va_end(args);
+  return ret;
 }
 
 
+// Attempt to set a resource limit of a container for a given cgroup
+// property (e.g. cpu.shares). Returns true on success.
+bool setResourceLimit(const string& container,
+		      const string& property,
+		      int64_t value)
+{
+  LOG(INFO) << "Setting " << property
+            << " for container " << container
+            << " to " << value;
+
+  int ret = shell("lxc-cgroup -n %s %s %lld",
+                  container.c_str(),
+                  property.c_str(),
+                  value);
+  if (ret != 0) {
+    LOG(ERROR) << "Failed to set " << property
+               << " for container " << container
+               << ": lxc-cgroup returned " << ret;
+    return false;
+  }
+
+  return true;
+}
+
+} // namespace {
+
+
 LxcIsolationModule::LxcIsolationModule()
-  : initialized(false) {}
+  : initialized(false)
+{
+  // Spawn the reaper, note that it might send us a message before we
+  // actually get spawned ourselves, but that's okay, the message will
+  // just get dropped.
+  reaper = new Reaper();
+  spawn(reaper);
+  dispatch(reaper, &Reaper::addProcessExitedListener, this);
+}
 
 
 LxcIsolationModule::~LxcIsolationModule()
 {
-  // We want to wait until the reaper has completed because it
-  // accesses 'this' in order to make callbacks ... deleting 'this'
-  // could thus lead to a seg fault!
-  if (initialized) {
-    CHECK(reaper != NULL);
-    process::post(reaper->self(), process::TERMINATE);
-    process::wait(reaper->self());
-    delete reaper;
-  }
+  CHECK(reaper != NULL);
+  terminate(reaper);
+  wait(reaper);
+  delete reaper;
 }
 
 
-void LxcIsolationModule::initialize(Slave* slave)
+void LxcIsolationModule::initialize(
+    const Configuration& _conf,
+    bool _local,
+    const PID<Slave>& _slave)
 {
-  this->slave = slave;
+  conf = _conf;
+  local = _local;
+  slave = _slave;
   
-  // Run a basic check to see whether Linux Container tools are available
+  // Check if Linux Container tools are available.
   if (system("lxc-version > /dev/null") != 0) {
     LOG(FATAL) << "Could not run lxc-version; make sure Linux Container "
                 << "tools are installed";
   }
 
   // Check that we are root (it might also be possible to create Linux
-  // containers without being root, but we can support that later)
+  // containers without being root, but we can support that later).
   if (getuid() != 0) {
     LOG(FATAL) << "LXC isolation module requires slave to run as root";
   }
 
-  reaper = new Reaper(this);
-  process::spawn(reaper);
   initialized = true;
 }
 
 
-void LxcIsolationModule::launchExecutor(Framework* framework, Executor* executor)
+void LxcIsolationModule::launchExecutor(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory)
 {
   if (!initialized) {
     LOG(FATAL) << "Cannot launch executors before initialization!";
   }
 
-  infos[framework->frameworkId][executor->info.executor_id()] = new FrameworkInfo();
+  const ExecutorID& executorId = executorInfo.executor_id();
+
+  LOG(INFO) << "Launching '" << executorInfo.uri()
+            << "' for executor '" << executorId
+            << "' of framework " << frameworkId;
 
-  LOG(INFO) << "Starting executor for framework " << framework->frameworkId << ": "
-            << executor->info.uri();
+  // Create a name for the container.
+  std::ostringstream out;
+  out << "mesos.executor-" << executorId
+      << ".framework-" << frameworkId;
 
-  // Get location of Mesos install in order to find mesos-launcher.
-  string mesosHome = slave->getConfiguration().get("home", ".");
-  string mesosLauncher = mesosHome + "/mesos-launcher";
-
-  // Create a name for the container
-  ostringstream oss;
-  oss << "mesos.slave-" << slave->slaveId
-      << ".framework-" << framework->frameworkId;
-  string containerName = oss.str();
+  const string& container = out.str();
 
-  infos[framework->frameworkId][executor->info.executor_id()]->container = containerName;
-  executor->executorStatus = "Container: " + containerName;
+  ContainerInfo* info = new ContainerInfo();
+
+  info->frameworkId = frameworkId;
+  info->executorId = executorId;
+  info->container = container;
+  info->pid = -1;
+
+  infos[frameworkId][executorId] = info;
 
   // Run lxc-execute mesos-launcher using a fork-exec (since lxc-execute
   // does not return until the container is finished). Note that lxc-execute
   // automatically creates the container and will delete it when finished.
   pid_t pid;
-  if ((pid = fork()) == -1)
+  if ((pid = fork()) == -1) {
     PLOG(FATAL) << "Failed to fork to launch lxc-execute";
+  }
 
   if (pid) {
-    // In parent process
-    infos[framework->frameworkId][executor->info.executor_id()]->lxcExecutePid = pid;
-    LOG(INFO) << "Started child for lxc-execute, pid = " << pid;
-    int status;
-  } else {
-    // Create an ExecutorLauncher to set up the environment for executing
-    // an extrernal launcher_main.cpp process (inside of lxc-execute).
+    // In parent process.
+    info->pid = pid;
 
-    const Configuration& conf = slave->getConfiguration();
+    // Tell the slave this executor has started.
+    dispatch(slave, &Slave::executorStarted,
+             frameworkId, executorId, pid);
+  } else {
+    // Close unnecessary file descriptors. Note that we are assuming
+    // stdin, stdout, and stderr can ONLY be found at the POSIX
+    // specified file numbers (0, 1, 2).
+    foreach (const string& entry, utils::os::listdir("/proc/self/fd")) {
+      if (entry != "." && entry != "..") {
+	try {
+	  int fd = boost::lexical_cast<int>(entry);
+	  if (fd != STDIN_FILENO &&
+	      fd != STDOUT_FILENO &&
+	      fd != STDERR_FILENO) {
+	    close(fd);
+	  }
+	} catch (boost::bad_lexical_cast&) {
+	  LOG(FATAL) << "Failed to close file descriptors";
+	}
+      }
+    }
 
+    // Create an ExecutorLauncher to set up the environment for executing
+    // an external launcher_main.cpp process (inside of lxc-execute).
     map<string, string> params;
 
-    for (int i = 0; i < framework->info.executor().params().param_size(); i++) {
-      params[framework->info.executor().params().param(i).key()] = 
-	framework->info.executor().params().param(i).value();
+    for (int i = 0; i < executorInfo.params().param_size(); i++) {
+      params[executorInfo.params().param(i).key()] =
+        executorInfo.params().param(i).value();
     }
 
-    ExecutorLauncher* launcher;
-    launcher =
-      new ExecutorLauncher(framework->frameworkId,
-			   executor->info.executor_id(),
-			   executor->info.uri(),
-			   framework->info.user(),
-			   slave->getUniqueWorkDirectory(framework->frameworkId,
-							 executor->info.executor_id()),
-			   slave->self(),
+    ExecutorLauncher* launcher =
+      new ExecutorLauncher(frameworkId,
+			   executorId,
+			   executorInfo.uri(),
+			   frameworkInfo.user(),
+                           directory,
+			   slave,
 			   conf.get("frameworks_home", ""),
 			   conf.get("home", ""),
 			   conf.get("hadoop_home", ""),
-			   !slave->local,
+			   !local,
 			   conf.get("switch_user", true),
+			   container,
 			   params);
+
     launcher->setupEnvironmentForLauncherMain();
+
+    // Get location of Mesos install in order to find mesos-launcher.
+    string mesosLauncher = conf.get("home", ".") + "/bin/mesos-launcher";
     
     // Run lxc-execute.
-    execlp("lxc-execute", "lxc-execute", "-n", containerName.c_str(),
+    execlp("lxc-execute", "lxc-execute", "-n", container.c_str(),
            mesosLauncher.c_str(), (char *) NULL);
-    // If we get here, the execl call failed.
-    fatalerror("Could not exec lxc-execute");
-    // TODO: Exit the slave if this happens
+
+    // If we get here, the execlp call failed.
+    LOG(FATAL) << "Could not exec lxc-execute";
   }
 }
 
 
-void LxcIsolationModule::killExecutor(Framework* framework, Executor* executor)
+void LxcIsolationModule::killExecutor(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
 {
-  string container = infos[framework->frameworkId][executor->info.executor_id()]->container;
-  if (container != "") {
-    LOG(INFO) << "Stopping container " << container;
-    int ret = shell("lxc-stop -n %s", container.c_str());
-    if (ret != 0)
-      LOG(ERROR) << "lxc-stop returned " << ret;
-    infos[framework->frameworkId][executor->info.executor_id()]->container = "";
-    executor->executorStatus = "No executor running";
-    delete infos[framework->frameworkId][executor->info.executor_id()];
-    infos[framework->frameworkId].erase(executor->info.executor_id());
+  if (!infos.contains(frameworkId) ||
+      !infos[frameworkId].contains(executorId)) {
+    LOG(ERROR) << "ERROR! Asked to kill an unknown executor!";
+    return;
   }
-}
-
 
-void LxcIsolationModule::resourcesChanged(Framework* framework, Executor* executor)
-{
-  if (infos[framework->frameworkId][executor->info.executor_id()]->container != "") {
-    // For now, just try setting the CPUs and memory right away, and kill the
-    // framework if this fails.
-    // A smarter thing to do might be to only update them periodically in a
-    // separate thread, and to give frameworks some time to scale down their
-    // memory usage.
-
-    double cpu = executor->resources.getScalar("cpu", Resource::Scalar()).value();
-    int32_t cpuShares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
-    if (!setResourceLimit(framework, executor, "cpu.shares", cpuShares)) {
-      // Tell slave to kill framework, which will invoke killExecutor.
-      slave->killFramework(framework);
-      return;
-    }
+  ContainerInfo* info = infos[frameworkId][executorId];
 
-    double mem = executor->resources.getScalar("mem", Resource::Scalar()).value();
-    int64_t rssLimit = max((int64_t) mem, MIN_RSS) * 1024LL * 1024LL;
-    if (!setResourceLimit(framework, executor, "memory.limit_in_bytes", rssLimit)) {
-      // Tell slave to kill framework, which will invoke killExecutor.
-      slave->killFramework(framework);
-      return;
-    }
-  }
-}
+  CHECK(info->container != "");
 
+  LOG(INFO) << "Stopping container " << info->container;
 
-bool LxcIsolationModule::setResourceLimit(Framework* framework,
-					  Executor* executor,
-                                          const string& property,
-                                          int64_t value)
-{
-  LOG(INFO) << "Setting " << property << " for framework " << framework->frameworkId
-            << " to " << value;
-  int ret = shell("lxc-cgroup -n %s %s %lld",
-                  infos[framework->frameworkId][executor->info.executor_id()]->container.c_str(),
-                  property.c_str(),
-                  value);
+  int ret = shell("lxc-stop -n %s", info->container.c_str());
   if (ret != 0) {
-    LOG(ERROR) << "Failed to set " << property << " for framework " << framework->frameworkId
-               << ": lxc-cgroup returned " << ret;
-    return false;
+    LOG(ERROR) << "lxc-stop returned " << ret;
+  }
+
+  if (infos[frameworkId].size() == 1) {
+    infos.erase(frameworkId);
   } else {
-    return true;
+    infos[frameworkId].erase(executorId);
   }
+
+  delete info;
+
+  // NOTE: Both frameworkId and executorId are no longer valid because
+  // they have just been deleted above!
 }
 
 
-int LxcIsolationModule::shell(const char* fmt, ...)
+void LxcIsolationModule::resourcesChanged(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const Resources& resources)
 {
-  char *cmd;
-  FILE *f;
-  int ret;
-  va_list args;
-  va_start(args, fmt);
-  if (vasprintf(&cmd, fmt, args) == -1)
-    return -1;
-  if ((f = popen(cmd, "w")) == NULL)
-    return -1;
-  ret = pclose(f);
-  if (ret == -1)
-    LOG(INFO) << "pclose error: " << strerror(errno);
-  free(cmd);
-  va_end(args);
-  return ret;
-}
+  if (!infos.contains(frameworkId) ||
+      !infos[frameworkId].contains(executorId)) {
+    LOG(ERROR) << "ERROR! Asked to update resources for an unknown executor!";
+    return;
+  }
 
+  ContainerInfo* info = infos[frameworkId][executorId];
 
-LxcIsolationModule::Reaper::Reaper(LxcIsolationModule* m)
-  : module(m)
-{}
+  CHECK(info->container != "");
 
-  
-void LxcIsolationModule::Reaper::operator () ()
+  const string& container = info->container;
+
+  // For now, just try setting the CPUs and memory right away, and kill the
+  // framework if this fails (needs to be fixed).
+  // A smarter thing to do might be to only update them periodically in a
+  // separate thread, and to give frameworks some time to scale down their
+  // memory usage.
+  string property;
+  uint64_t value;
+
+  double cpu = resources.getScalar("cpu", Resource::Scalar()).value();
+  int32_t cpuShares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
+
+  property = "cpu.shares";
+  value = cpuShares;
+
+  if (!setResourceLimit(container, property, value)) {
+    // TODO(benh): Kill the executor, but do it in such a way that
+    // the slave finds out about it exiting.
+    return;
+  }
+
+  double mem = resources.getScalar("mem", Resource::Scalar()).value();
+  int64_t rssLimit = max((int64_t) mem, MIN_RSS_MB) * 1024LL * 1024LL;
+
+  property = "memory.limit_in_bytes";
+  value = rssLimit;
+
+  if (!setResourceLimit(container, property, value)) {
+    // TODO(benh): Kill the executor, but do it in such a way that
+    // the slave finds out about it exiting.
+    return;
+  }
+}
+
+
+void LxcIsolationModule::processExited(pid_t pid, int status)
 {
-  link(module->slave->self());
-  while (true) {
-    receive(1);
-    if (name() == process::TIMEOUT) {
-      // Check whether any child process has exited
-      pid_t pid;
-      int status;
-      if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
-        foreachpair (const FrameworkID& frameworkId, _, module->infos) {
-          foreachpair (const ExecutorID& executorId, FrameworkInfo* info, module->infos[frameworkId]) {
-	    if (info->lxcExecutePid == pid) {
-	      info->lxcExecutePid = -1;
-	      info->container = "";
-	      LOG(INFO) << "Telling slave of lost framework " << frameworkId;
-	      // TODO(benh): This is broken if/when libprocess is parallel!
-	      module->slave->executorExited(frameworkId, executorId, status);
-	      delete module->infos[frameworkId][executorId];
-	      module->infos[frameworkId].erase(executorId);
-	      break;
-	    }
-	  }
-	}
+  foreachkey (const FrameworkID& frameworkId, infos) {
+    foreachvalue (ContainerInfo* info, infos[frameworkId]) {
+      if (info->pid == pid) {
+        LOG(INFO) << "Telling slave of lost executor "
+		  << info->executorId
+                  << " of framework " << info->frameworkId;
+
+        dispatch(slave, &Slave::executorExited,
+                 info->frameworkId, info->executorId, status);
+
+        // Try and cleanup after the executor.
+        killExecutor(info->frameworkId, info->executorId);
+        return;
       }
-    } else if (name() == process::TERMINATE || name() == process::EXITED) {
-      return;
     }
   }
 }

Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -3,66 +3,64 @@
 
 #include <string>
 
-#include <boost/unordered_map.hpp>
-
 #include "isolation_module.hpp"
+#include "reaper.hpp"
 #include "slave.hpp"
 
-#include "messaging/messages.hpp"
+#include "common/hashmap.hpp"
 
 
 namespace mesos { namespace internal { namespace slave {
 
-using std::string;
-using boost::unordered_map;
-
-class LxcIsolationModule : public IsolationModule {
+class LxcIsolationModule
+  : public IsolationModule, public ProcessExitedListener
+{
 public:
   LxcIsolationModule();
 
   virtual ~LxcIsolationModule();
 
-  virtual void initialize(Slave* slave);
-
-  virtual void launchExecutor(Framework* framework, Executor* executor);
-
-  virtual void killExecutor(Framework* framework, Executor* executor);
-
-  virtual void resourcesChanged(Framework* framework, Executor* executor);
+  virtual void initialize(const Configuration& conf,
+                          bool local,
+                          const process::PID<Slave>& slave);
+
+  virtual void launchExecutor(const FrameworkID& frameworkId,
+                              const FrameworkInfo& frameworkInfo,
+                              const ExecutorInfo& executorInfo,
+                              const std::string& directory);
+
+  virtual void killExecutor(const FrameworkID& frameworkId,
+                            const ExecutorID& executorId);
+
+  virtual void resourcesChanged(const FrameworkID& frameworkId,
+                                const ExecutorID& executorId,
+                                const Resources& resources);
 
-protected:
-  // Run a shell command formatted with varargs and return its exit code.
-  int shell(const char* format, ...);
-
-  // Attempt to set a resource limit of a framework's container for a given
-  // cgroup property (e.g. cpu.shares). Returns true on success.
-  bool setResourceLimit(Framework* framework, Executor* executor,
-			const string& property, int64_t value);
+  virtual void processExited(pid_t pid, int status);
 
 private:
-  // Reaps framework containers and tells the slave if they exit
-  class Reaper : public process::Process<Reaper> {
-    LxcIsolationModule* module;
-
-  protected:
-    virtual void operator () ();
-
-  public:
-    Reaper(LxcIsolationModule* module);
-  };
-
-  // Per-framework information object maintained in info hashmap
-  struct FrameworkInfo {
-    string container;    // Name of Linux container used for this framework
-    pid_t lxcExecutePid; // PID of lxc-execute command running the executor
+  // No copying, no assigning.
+  LxcIsolationModule(const LxcIsolationModule&);
+  LxcIsolationModule& operator = (const LxcIsolationModule&);
+
+  // Per-framework information object maintained in info hashmap.
+  struct ContainerInfo
+  {
+    FrameworkID frameworkId;
+    ExecutorID executorId;
+    std::string container; // Name of Linux container used for this framework.
+    pid_t pid; // PID of lxc-execute command running the executor.
   };
 
+  // TODO(benh): Make variables const by passing them via constructor.
+  Configuration conf;
+  bool local;
+  process::PID<Slave> slave;
   bool initialized;
-  Slave* slave;
-  boost::unordered_map<FrameworkID, boost::unordered_map<ExecutorID, FrameworkInfo*> > infos;
   Reaper* reaper;
+  hashmap<FrameworkID, hashmap<ExecutorID, ContainerInfo*> > infos;
 };
 
-}}}
+}}} // namespace mesos { namespace internal { namespace slave {
 
-#endif /* __LXC_ISOLATION_MODULE_HPP__ */
+#endif // __LXC_ISOLATION_MODULE_HPP__

Modified: incubator/mesos/trunk/src/slave/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/main.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/main.cpp (original)
+++ incubator/mesos/trunk/src/slave/main.cpp Mon Jun 27 06:08:33 2011
@@ -1,3 +1,5 @@
+#include <libgen.h>
+
 #include "common/build.hpp"
 #include "common/logging.hpp"
 
@@ -12,9 +14,6 @@
 using namespace mesos::internal;
 using namespace mesos::internal::slave;
 
-using boost::bad_lexical_cast;
-using boost::lexical_cast;
-
 using std::cerr;
 using std::endl;
 using std::string;

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Mon Jun 27 06:08:33 2011
@@ -1,179 +1,197 @@
+#include <signal.h>
+
 #include <map>
-#include <vector>
+
+#include <process/dispatch.hpp>
 
 #include "process_based_isolation_module.hpp"
 
 #include "common/foreach.hpp"
+#include "common/type_utils.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::slave;
 
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
+using namespace process;
 
 using launcher::ExecutorLauncher;
 
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::list;
-using std::make_pair;
 using std::map;
-using std::ostringstream;
-using std::pair;
-using std::queue;
 using std::string;
-using std::vector;
+
+using process::wait; // Necessary on some OS's to disambiguate.
 
 
 ProcessBasedIsolationModule::ProcessBasedIsolationModule()
-  : initialized(false) {}
+  : initialized(false)
+{
+  // Spawn the reaper, note that it might send us a message before we
+  // actually get spawned ourselves, but that's okay, the message will
+  // just get dropped.
+  reaper = new Reaper();
+  spawn(reaper);
+  dispatch(reaper, &Reaper::addProcessExitedListener, this);
+}
 
 
 ProcessBasedIsolationModule::~ProcessBasedIsolationModule()
 {
-  // We need to wait until the reaper has completed because it
-  // accesses 'this' in order to make callbacks ... deleting 'this'
-  // could thus lead to a seg fault!
-  if (initialized) {
-    CHECK(reaper != NULL);
-    process::post(reaper->self(), process::TERMINATE);
-    process::wait(reaper->self());
-    delete reaper;
-  }
+  CHECK(reaper != NULL);
+  terminate(reaper);
+  wait(reaper);
+  delete reaper;
 }
 
 
-void ProcessBasedIsolationModule::initialize(Slave *slave)
+void ProcessBasedIsolationModule::initialize(
+    const Configuration& _conf,
+    bool _local,
+    const PID<Slave>& _slave)
 {
-  this->slave = slave;
-  reaper = new Reaper(this);
-  process::spawn(reaper);
+  conf = _conf;
+  local = _local;
+  slave = _slave;
+
   initialized = true;
 }
 
 
-void ProcessBasedIsolationModule::launchExecutor(Framework* framework, Executor* executor)
+void ProcessBasedIsolationModule::launchExecutor(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory)
 {
-  if (!initialized)
+  if (!initialized) {
     LOG(FATAL) << "Cannot launch executors before initialization!";
+  }
 
-  LOG(INFO) << "Starting executor for framework " << framework->frameworkId
-            << ": " << executor->info.uri();
+  LOG(INFO) << "Launching '" << executorInfo.uri()
+            << "' for executor '" << executorInfo.executor_id()
+            << "' of framework " << frameworkId;
 
   pid_t pid;
-  if ((pid = fork()) == -1)
+  if ((pid = fork()) == -1) {
     PLOG(FATAL) << "Failed to fork to launch new executor";
+  }
 
   if (pid) {
     // In parent process, record the pgid for killpg later.
     LOG(INFO) << "Started executor, OS pid = " << pid;
-    pgids[framework->frameworkId][executor->info.executor_id()] = pid;
-    executor->executorStatus = "PID: " + lexical_cast<string>(pid);
+    pgids[frameworkId][executorInfo.executor_id()] = pid;
+
+    // Tell the slave this executor has started.
+    dispatch(slave, &Slave::executorStarted,
+             frameworkId, executorInfo.executor_id(), pid);
   } else {
     // In child process, make cleanup easier.
-//     if (setpgid(0, 0) < 0)
-//       PLOG(FATAL) << "Failed to put executor in own process group";
-    if ((pid = setsid()) == -1)
+    if ((pid = setsid()) == -1) {
       PLOG(FATAL) << "Failed to put executor in own session";
-    
-    createExecutorLauncher(framework, executor)->run();
+    }
+
+    ExecutorLauncher* launcher = 
+      createExecutorLauncher(frameworkId, frameworkInfo,
+                             executorInfo, directory);
+
+    launcher->run();
   }
 }
 
 
-void ProcessBasedIsolationModule::killExecutor(Framework* framework, Executor* executor)
-{
-  if (pgids[framework->frameworkId][executor->info.executor_id()] != -1) {
+void ProcessBasedIsolationModule::killExecutor(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  if (!pgids.contains(frameworkId) ||
+      !pgids[frameworkId].contains(executorId)) {
+    LOG(ERROR) << "ERROR! Asked to kill an unknown executor!";
+    return;
+  }
+
+  if (pgids[frameworkId][executorId] != -1) {
     // TODO(benh): Consider sending a SIGTERM, then after so much time
     // if it still hasn't exited do a SIGKILL (can use a libprocess
-    // process for this).
-    LOG(INFO) << "Sending SIGKILL to gpid "
-              << pgids[framework->frameworkId][executor->info.executor_id()];
-    killpg(pgids[framework->frameworkId][executor->info.executor_id()], SIGKILL);
-    pgids[framework->frameworkId][executor->info.executor_id()] = -1;
-    executor->executorStatus = "No executor running";
+    // process for this). This might not be necessary because we have
+    // higher-level semantics via the first shut down phase that gets
+    // initiated by the slave.
+    LOG(INFO) << "Sending SIGKILL to process group "
+              << pgids[frameworkId][executorId];
+
+    killpg(pgids[frameworkId][executorId], SIGKILL);
+
+    if (pgids[frameworkId].size() == 1) {
+      pgids.erase(frameworkId);
+    } else {
+      pgids[frameworkId].erase(executorId);
+    }
+
+    // NOTE: Both frameworkId and executorId are no longer valid
+    // because they have just been deleted above!
 
     // TODO(benh): Kill all of the process's descendants? Perhaps
     // create a new libprocess process that continually tries to kill
     // all the processes that are a descendant of the executor, trying
     // to kill the executor last ... maybe this is just too much of a
     // burden?
-
-    pgids[framework->frameworkId].erase(executor->info.executor_id());
   }
 }
 
 
-void ProcessBasedIsolationModule::resourcesChanged(Framework* framework, Executor* executor)
+void ProcessBasedIsolationModule::resourcesChanged(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const Resources& resources)
 {
   // Do nothing; subclasses may override this.
 }
 
 
-ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(Framework* framework, Executor* executor)
+ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(
+    const FrameworkID& frameworkId,
+    const FrameworkInfo& frameworkInfo,
+    const ExecutorInfo& executorInfo,
+    const string& directory)
 {
   // Create a map of parameters for the executor launcher.
   map<string, string> params;
 
-  for (int i = 0; i < executor->info.params().param_size(); i++) {
-    params[executor->info.params().param(i).key()] = 
-      executor->info.params().param(i).value();
-  }
-
-  return 
-    new ExecutorLauncher(framework->frameworkId,
-                         executor->info.executor_id(),
-                         executor->info.uri(),
-                         framework->info.user(),
-                         slave->getUniqueWorkDirectory(framework->frameworkId,
-                                                       executor->info.executor_id()),
-                         slave->self(),
-                         slave->getConfiguration().get("frameworks_home", ""),
-                         slave->getConfiguration().get("home", ""),
-                         slave->getConfiguration().get("hadoop_home", ""),
-                         !slave->local,
-                         slave->getConfiguration().get("switch_user", true),
-                         params);
+  for (int i = 0; i < executorInfo.params().param_size(); i++) {
+    params[executorInfo.params().param(i).key()] = 
+      executorInfo.params().param(i).value();
+  }
+
+  return new ExecutorLauncher(frameworkId,
+                              executorInfo.executor_id(),
+                              executorInfo.uri(),
+                              frameworkInfo.user(),
+                              directory,
+                              slave,
+                              conf.get("frameworks_home", ""),
+                              conf.get("home", ""),
+                              conf.get("hadoop_home", ""),
+                              !local,
+                              conf.get("switch_user", true),
+			      "",
+                              params);
 }
 
 
-ProcessBasedIsolationModule::Reaper::Reaper(ProcessBasedIsolationModule* m)
-  : module(m)
-{}
-
-
-void ProcessBasedIsolationModule::Reaper::operator () ()
-{
-  link(module->slave->self());
-  while (true) {
-    receive(1);
-    if (name() == process::TIMEOUT) {
-      // Check whether any child process has exited.
-      pid_t pid;
-      int status;
-      if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
-        foreachpair (const FrameworkID& frameworkId, _, module->pgids) {
-          foreachpair (const ExecutorID& executorId, pid_t pgid, module->pgids[frameworkId]) {
-            if (pgid == pid) {
-              // Kill the process group to clean up the tasks.
-              LOG(INFO) << "Sending SIGKILL to gpid " << pgid;
-              killpg(pgid, SIGKILL);
-              module->pgids[frameworkId][executorId] = -1;
-              LOG(INFO) << "Telling slave of lost executor " << executorId
-                        << " of framework " << frameworkId;
-              // TODO(benh): This is broken if/when libprocess is parallel!
-              module->slave->executorExited(frameworkId, executorId, status);
-              module->pgids[frameworkId].erase(executorId);
-              break;
-            }
-          }
-        }
+void ProcessBasedIsolationModule::processExited(pid_t pid, int status)
+{
+  foreachkey (const FrameworkID& frameworkId, pgids) {
+    foreachpair (const ExecutorID& executorId, pid_t pgid, pgids[frameworkId]) {
+      if (pgid == pid) {
+        LOG(INFO) << "Telling slave of lost executor " << executorId
+                  << " of framework " << frameworkId;
+
+        dispatch(slave, &Slave::executorExited,
+                 frameworkId, executorId, status);
+
+        // Try and cleanup after the executor.
+        killExecutor(frameworkId, executorId);
+	return;
       }
-    } else if (name() == process::TERMINATE || name() == process::EXITED) {
-      return;
     }
   }
 }