You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:06:19 UTC

svn commit: r1132242 - in /incubator/mesos/trunk: include/ src/common/ src/examples/ src/examples/java/ src/exec/ src/launcher/ src/master/ src/messaging/ src/sched/ src/slave/ src/tests/

Author: benh
Date: Sun Jun  5 09:06:18 2011
New Revision: 1132242

URL: http://svn.apache.org/viewvc?rev=1132242&view=rev
Log:
Initial support for multiple executors per framework per slave.

Modified:
    incubator/mesos/trunk/include/mesos.proto
    incubator/mesos/trunk/src/common/foreach.hpp
    incubator/mesos/trunk/src/common/type_utils.hpp
    incubator/mesos/trunk/src/examples/cpp_test_framework.cpp
    incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java
    incubator/mesos/trunk/src/examples/java/TestFramework.java
    incubator/mesos/trunk/src/examples/memhog.cpp
    incubator/mesos/trunk/src/examples/scheduled_memhog.cpp
    incubator/mesos/trunk/src/exec/exec.cpp
    incubator/mesos/trunk/src/launcher/launcher.cpp
    incubator/mesos/trunk/src/launcher/launcher.hpp
    incubator/mesos/trunk/src/launcher/main.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/messaging/messages.proto
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/isolation_module.hpp
    incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
    incubator/mesos/trunk/src/slave/main.cpp
    incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
    incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/master_test.cpp
    incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp
    incubator/mesos/trunk/src/tests/utils.hpp

Modified: incubator/mesos/trunk/include/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos.proto?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos.proto Sun Jun  5 09:06:18 2011
@@ -23,6 +23,11 @@ message TaskID {
 }
 
 
+message ExecutorID {
+  required string value = 1;
+}
+
+
 message Param {
   required string key = 1;
   required string value = 2;
@@ -99,6 +104,7 @@ message TaskDescription {
   required SlaveID slave_id = 3;
   repeated Resource resources = 4;
   optional bytes data = 5;
+  optional ExecutorInfo executor = 6;
 }
 
 
@@ -121,9 +127,10 @@ message TaskStatus {
 
 
 message ExecutorInfo {
-  required string uri = 1;
-  optional Params params = 2;
-  optional bytes data = 3;
+  required ExecutorID executor_id = 1;
+  required string uri = 2;
+  optional Params params = 3;
+  optional bytes data = 4;
 }
 
 
@@ -144,7 +151,8 @@ message SlaveInfo {
 
 message FrameworkMessage {
   required SlaveID slave_id = 1;
-  optional bytes data = 2;
+  required ExecutorID executor_id = 2;
+  optional bytes data = 3;
 }
 
 

Modified: incubator/mesos/trunk/src/common/foreach.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/foreach.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/foreach.hpp (original)
+++ incubator/mesos/trunk/src/common/foreach.hpp Sun Jun  5 09:06:18 2011
@@ -29,6 +29,17 @@ namespace foreach {
 
 const boost::tuples::detail::swallow_assign _ = boost::tuples::ignore;
 
+template <typename T> T copy(const T& t) { return t; }
+
 }
 
+
+#define foreachcopy(VAR, COL)                   \
+  foreach (VAR, foreach::copy(COL))
+
+#define foreachpaircopy(VARFIRST, VARSECOND, COL)       \
+  foreachpair (VARFIRST, VARSECOND, foreach::copy(COL))
+
+
+
 #endif /* FOREACH_HPP */

Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Sun Jun  5 09:06:18 2011
@@ -47,6 +47,13 @@ inline std::ostream& operator << (std::o
 }
 
 
+inline std::ostream& operator << (std::ostream& stream,
+                                  const ExecutorID& executorId)
+{
+  stream << executorId.value();
+  return stream;
+}
+
 inline bool operator == (const FrameworkID& left, const FrameworkID& right)
 {
   return left.value() == right.value();
@@ -71,6 +78,12 @@ inline bool operator == (const TaskID& l
 }
 
 
+inline bool operator == (const ExecutorID& left, const ExecutorID& right)
+{
+  return left.value() == right.value();
+}
+
+
 inline bool operator == (const FrameworkID& left, const std::string& right)
 {
   return left.value() == right;
@@ -95,6 +108,12 @@ inline bool operator == (const TaskID& l
 }
 
 
+inline bool operator == (const ExecutorID& left, const std::string& right)
+{
+  return left.value() == right;
+}
+
+
 inline size_t hash_value(const FrameworkID& frameworkId)
 {
   size_t seed = 0;
@@ -127,6 +146,14 @@ inline size_t hash_value(const TaskID& t
 }
 
 
+inline size_t hash_value(const ExecutorID& executorId)
+{
+  size_t seed = 0;
+  boost::hash_combine(seed, executorId.value());
+  return seed;
+}
+
+
 namespace internal {
 
 inline std::ostream& operator << (std::ostream& stream,

Modified: incubator/mesos/trunk/src/examples/cpp_test_framework.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/cpp_test_framework.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/cpp_test_framework.cpp (original)
+++ incubator/mesos/trunk/src/examples/cpp_test_framework.cpp Sun Jun  5 09:06:18 2011
@@ -36,6 +36,7 @@ public:
   virtual ExecutorInfo getExecutorInfo(SchedulerDriver*)
   {
     ExecutorInfo executor;
+    executor.mutable_executor_id()->set_value("default");
     executor.set_uri(uri);
     return executor;
   }

Modified: incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java (original)
+++ incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java Sun Jun  5 09:06:18 2011
@@ -18,7 +18,9 @@ public class TestExceptionFramework {
       try {
         File file = new File("./test_executor");
         return ExecutorInfo.newBuilder()
-          .setUri(file.getCanonicalPath()).build();
+          .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+          .setUri(file.getCanonicalPath())
+          .build();
       } catch (Throwable t) {
         throw new RuntimeException(t);
       }

Modified: incubator/mesos/trunk/src/examples/java/TestFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/java/TestFramework.java?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/java/TestFramework.java (original)
+++ incubator/mesos/trunk/src/examples/java/TestFramework.java Sun Jun  5 09:06:18 2011
@@ -32,7 +32,9 @@ public class TestFramework {
       try {
         File file = new File("./test_executor");
         return ExecutorInfo.newBuilder()
-          .setUri(file.getCanonicalPath()).build();
+          .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+          .setUri(file.getCanonicalPath())
+          .build();
       } catch (Throwable t) {
         throw new RuntimeException(t);
       }

Modified: incubator/mesos/trunk/src/examples/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/memhog.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/memhog.cpp (original)
+++ incubator/mesos/trunk/src/examples/memhog.cpp Sun Jun  5 09:06:18 2011
@@ -36,6 +36,7 @@ public:
   virtual ExecutorInfo getExecutorInfo(SchedulerDriver*)
   {
     ExecutorInfo executor;
+    executor.mutable_executor_id()->set_value("default");
     executor.set_uri(uri);
     return executor;
   }

Modified: incubator/mesos/trunk/src/examples/scheduled_memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/scheduled_memhog.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/scheduled_memhog.cpp (original)
+++ incubator/mesos/trunk/src/examples/scheduled_memhog.cpp Sun Jun  5 09:06:18 2011
@@ -82,6 +82,7 @@ public:
   virtual ExecutorInfo getExecutorInfo(SchedulerDriver*)
   {
     ExecutorInfo executor;
+    executor.mutable_executor_id()->set_value("default");
     executor.set_uri(uri);
     return executor;
   }

Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun  5 09:06:18 2011
@@ -18,16 +18,16 @@
 
 #include "messaging/messages.hpp"
 
-using std::cerr;
-using std::endl;
-using std::string;
+using namespace mesos;
+using namespace mesos::internal;
 
 using boost::bind;
 using boost::cref;
 using boost::unordered_map;
 
-using namespace mesos;
-using namespace mesos::internal;
+using std::cerr;
+using std::endl;
+using std::string;
 
 
 namespace mesos { namespace internal {
@@ -37,9 +37,10 @@ class ExecutorProcess : public MesosProc
 public:
   ExecutorProcess(const PID& _slave, MesosExecutorDriver* _driver,
                   Executor* _executor, const FrameworkID& _frameworkId,
-                  bool _local)
+                  const ExecutorID& _executorId, bool _local)
     : slave(_slave), driver(_driver), executor(_executor),
-      frameworkId(_frameworkId), local(_local), terminate(false) {}
+      frameworkId(_frameworkId), executorId(_executorId),
+      local(_local), terminate(false) {}
 
   ~ExecutorProcess() {}
 
@@ -51,6 +52,7 @@ protected:
     // Register with slave.
     Message<E2S_REGISTER_EXECUTOR> out;
     out.mutable_framework_id()->MergeFrom(frameworkId);
+    out.mutable_executor_id()->MergeFrom(executorId);
     send(slave, out);
 
     while(true) {
@@ -149,6 +151,7 @@ private:
   MesosExecutorDriver* driver;
   Executor* executor;
   FrameworkID frameworkId;
+  ExecutorID executorId;
   SlaveID slaveId;
   bool local;
 
@@ -203,6 +206,7 @@ int MesosExecutorDriver::start()
 
   PID slave;
   FrameworkID frameworkId;
+  ExecutorID executorId;
 
   char* value;
   std::istringstream iss;
@@ -234,7 +238,16 @@ int MesosExecutorDriver::start()
 
   frameworkId.set_value(value);
 
-  process = new ExecutorProcess(slave, this, executor, frameworkId, local);
+  /* Get executor ID from environment. */
+  value = getenv("MESOS_EXECUTOR_ID");
+
+  if (value == NULL)
+    fatal("expecting MESOS_EXECUTOR_ID in environment");
+
+  executorId.set_value(value);
+
+  process =
+    new ExecutorProcess(slave, this, executor, frameworkId, executorId, local);
 
   Process::spawn(process);
 
@@ -312,11 +325,15 @@ int MesosExecutorDriver::sendFrameworkMe
     return -1;
   }
 
-  // Validate that they set the correct slave ID.
+  // Validate that they set the correct slave ID and executor ID.
   if (!(process->slaveId == message.slave_id())) {
     return -1;
   }
 
+  if (!(process->executorId == message.executor_id())) {
+    return -1;
+  }
+
   // TODO(benh): Do a dispatch to Executor first?
   Message<E2S_FRAMEWORK_MESSAGE> out;
   out.mutable_framework_id()->MergeFrom(process->frameworkId);

Modified: incubator/mesos/trunk/src/launcher/launcher.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/launcher/launcher.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/launcher/launcher.cpp (original)
+++ incubator/mesos/trunk/src/launcher/launcher.cpp Sun Jun  5 09:06:18 2011
@@ -31,7 +31,8 @@ using std::string;
 using std::vector;
 
 
-ExecutorLauncher::ExecutorLauncher(FrameworkID _frameworkId,
+ExecutorLauncher::ExecutorLauncher(const FrameworkID& _frameworkId,
+                                   const ExecutorID& _executorId,
                                    const string& _executorUri,
                                    const string& _user,
                                    const string& _workDirectory,
@@ -42,7 +43,8 @@ ExecutorLauncher::ExecutorLauncher(Frame
                                    bool _redirectIO,
                                    bool _shouldSwitchUser,
                                    const map<string, string>& _params)
-  : frameworkId(_frameworkId), executorUri(_executorUri), user(_user),
+  : frameworkId(_frameworkId), executorId(_executorId),
+    executorUri(_executorUri), user(_user),
     workDirectory(_workDirectory), slavePid(_slavePid),
     frameworksHome(_frameworksHome), mesosHome(_mesosHome),
     hadoopHome(_hadoopHome), redirectIO(_redirectIO), 
@@ -228,6 +230,7 @@ void ExecutorLauncher::setupEnvironment(
   // Set Mesos environment variables to pass slave ID, framework ID, etc.
   setenv("MESOS_SLAVE_PID", slavePid.c_str(), true);
   setenv("MESOS_FRAMEWORK_ID", frameworkId.value().c_str(), true);
+  setenv("MESOS_EXECUTOR_ID", executorId.value().c_str(), true);
   
   // Set LIBPROCESS_PORT so that we bind to a random free port.
   setenv("LIBPROCESS_PORT", "0", true);

Modified: incubator/mesos/trunk/src/launcher/launcher.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/launcher/launcher.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/launcher/launcher.hpp (original)
+++ incubator/mesos/trunk/src/launcher/launcher.hpp Sun Jun  5 09:06:18 2011
@@ -31,6 +31,7 @@ using std::vector;
 class ExecutorLauncher {
 protected:
   FrameworkID frameworkId;
+  ExecutorID executorId;
   string executorUri;
   string user;
   string workDirectory; // Directory in which the framework should run
@@ -43,7 +44,8 @@ protected:
   map<string, string> params; // Key-value params in framework's ExecutorInfo
 
 public:
-  ExecutorLauncher(FrameworkID _frameworkId, const string& _executorUri,
+  ExecutorLauncher(const FrameworkID& _frameworkId,
+                   const ExecutorID& _executorId, const string& _executorUri,
                    const string& _user, const string& _workDirectory,
                    const string& _slavePid, const string& _frameworksHome,
                    const string& _mesosHome, const string& _hadoopHome, 

Modified: incubator/mesos/trunk/src/launcher/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/launcher/main.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/launcher/main.cpp (original)
+++ incubator/mesos/trunk/src/launcher/main.cpp Sun Jun  5 09:06:18 2011
@@ -26,7 +26,11 @@ int main(int argc, char **argv)
   FrameworkID frameworkId;
   frameworkId.set_value(getenvOrFail("MESOS_FRAMEWORK_ID"));
 
+  ExecutorID executorId;
+  executorId.set_value(getenvOrFail("MESOS_EXECUTOR_ID"));
+
   ExecutorLauncher(frameworkId,
+                   executorId,
                    getenvOrFail("MESOS_EXECUTOR_URI"),
                    getenvOrFail("MESOS_USER"),
                    getenvOrFail("MESOS_WORK_DIRECTORY"),

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 09:06:18 2011
@@ -805,8 +805,7 @@ void Master::operator () ()
     }
 
     case M2M_TIMER_TICK: {
-      unordered_map<SlaveID, Slave *> slavesCopy = slaves;
-      foreachpair (_, Slave *slave, slavesCopy) {
+      foreachpaircopy (_, Slave *slave, slaves) {
 	if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
 	  LOG(INFO) << slave
                     << " missing heartbeats ... considering disconnected";
@@ -848,8 +847,7 @@ void Master::operator () ()
 	  framework->active = false;
 
           // Remove the framework's slot offers.
-          unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
-          foreach (SlotOffer* offer, slotOffersCopy) {
+          foreachcopy (SlotOffer* offer, framework->slotOffers) {
             removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
           }
 
@@ -1182,8 +1180,7 @@ void Master::failoverFramework(Framework
 
   // Remove the framework's slot offers (if they weren't removed before)..
   // TODO(benh): Consider just reoffering these to the new framework.
-  unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
-  foreach (SlotOffer* offer, slotOffersCopy) {
+  foreachcopy (SlotOffer* offer, framework->slotOffers) {
     removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
   }
 
@@ -1231,16 +1228,14 @@ void Master::removeFramework(Framework *
   }
 
   // Remove pointers to the framework's tasks in slaves
-  unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
-  foreachpair (_, Task *task, tasksCopy) {
+  foreachpaircopy (_, Task *task, framework->tasks) {
     Slave *slave = lookupSlave(task->slave_id());
     CHECK(slave != NULL);
     removeTask(task, TRR_FRAMEWORK_LOST);
   }
   
   // Remove the framework's slot offers (if they weren't removed before).
-  unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
-  foreach (SlotOffer* offer, slotOffersCopy) {
+  foreachcopy (SlotOffer* offer, framework->slotOffers) {
     removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
   }
 
@@ -1264,8 +1259,7 @@ void Master::removeSlave(Slave *slave)
   // TODO: Notify allocator that a slave removal is beginning?
   
   // Remove pointers to slave's tasks in frameworks, and send status updates
-  unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
-  foreachpair (_, Task *task, tasksCopy) {
+  foreachpaircopy (_, Task *task, slave->tasks) {
     Framework *framework = lookupFramework(task->framework_id());
     // A framework might not actually exist because the master failed
     // over and the framework hasn't reconnected. This can be a tricky
@@ -1288,8 +1282,7 @@ void Master::removeSlave(Slave *slave)
   }
 
   // Remove slot offers from the slave; this will also rescind them
-  unordered_set<SlotOffer *> slotOffersCopy = slave->slotOffers;
-  foreach (SlotOffer *offer, slotOffersCopy) {
+  foreachcopy (SlotOffer *offer, slave->slotOffers) {
     // Only report resources on slaves other than this one to the allocator
     vector<SlaveResources> otherSlaveResources;
     foreach (const SlaveResources& r, offer->resources) {

Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun  5 09:06:18 2011
@@ -3,16 +3,18 @@ import "mesos.proto";
 package mesos.internal;
 
 // TODO(benh): It would be great if this could just be a
-// TaskDescription wherever it gets used! One performance reason why
-// we don't do that now is because storing whatever data is coupled
-// with a TaskDescription could be large and unnecessary.
+// TaskDescription wherever it gets used! We would need to add both
+// the framework_id field and the state field into TaskDescription
+// though. Also, one performance reason why we don't do that now is
+// because storing whatever data is coupled with a TaskDescription
+// could be large and unnecessary.
 message Task {
-  required string name = 1;
-  required TaskID task_id = 2;
-  required FrameworkID framework_id = 3;
-  required SlaveID slave_id = 4;
-  repeated Resource resources = 5;
-  required TaskState state = 6;
+  required FrameworkID framework_id = 1;
+  required TaskState state = 2;
+  required string name = 3;
+  required TaskID task_id = 4;
+  required SlaveID slave_id = 5;
+  repeated Resource resources = 6;
 }
 
 
@@ -147,6 +149,7 @@ message UpdateFrameworkMessage {
 
 message RegisterExecutorMessage {
   required FrameworkID framework_id = 1;
+  required ExecutorID executor_id = 2;
 }
 
 
@@ -158,7 +161,8 @@ message ExecutorRegisteredMessage {
 message ExitedExecutorMessage {
   required SlaveID slave_id = 1;
   required FrameworkID framework_id = 2;
-  required int32 status = 3;
+  required ExecutorID executor_id = 3;
+  required int32 status = 4;
 }
 
 

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun  5 09:06:18 2011
@@ -726,6 +726,11 @@ int MesosSchedulerDriver::sendFrameworkM
     return -1;
   }
 
+  if (!message.has_executor_id()) {
+    VLOG(1) << "Missing ExecutorID (executor_id), cannot send message";
+    return -1;
+  }
+
   Process::dispatch(process, &SchedulerProcess::sendFrameworkMessage, message);
 
   return 0;

Modified: incubator/mesos/trunk/src/slave/isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.hpp Sun Jun  5 09:06:18 2011
@@ -6,8 +6,9 @@
 
 namespace mesos { namespace internal { namespace slave {
 
-class Framework;
 class Slave;
+class Framework;
+class Executor;
 
 
 class IsolationModule {
@@ -21,15 +22,15 @@ public:
   virtual void initialize(Slave *slave) {}
 
   // Called by the slave to launch an executor for a given framework.
-  virtual void startExecutor(Framework *framework) = 0;
+  virtual void launchExecutor(Framework* framework, Executor* executor) = 0;
 
   // Terminate a framework's executor, if it is still running.
   // The executor is expected to be gone after this method exits.
-  virtual void killExecutor(Framework *framework) = 0;
+  virtual void killExecutor(Framework* framework, Executor* executor) = 0;
 
   // Update the resource limits for a given framework. This method will
   // be called only after an executor for the framework is started.
-  virtual void resourcesChanged(Framework *framework) {}
+  virtual void resourcesChanged(Framework *framework, Executor* executor) {}
 };
 
 }}}

Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Sun Jun  5 09:06:18 2011
@@ -51,11 +51,11 @@ public:
 
   virtual void initialize(Slave* slave);
 
-  virtual void startExecutor(Framework* framework);
+  virtual void launchExecutor(Framework* framework, Executor* executor);
 
-  virtual void killExecutor(Framework* framework);
+  virtual void killExecutor(Framework* framework, Executor* executor);
 
-  virtual void resourcesChanged(Framework* framework);
+  virtual void resourcesChanged(Framework* framework, Executor* executor);
 
 protected:
   // Run a shell command formatted with varargs and return its exit code.

Modified: incubator/mesos/trunk/src/slave/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/main.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/main.cpp (original)
+++ incubator/mesos/trunk/src/slave/main.cpp Sun Jun  5 09:06:18 2011
@@ -6,12 +6,15 @@
 #include "slave.hpp"
 #include "webui.hpp"
 
-using boost::lexical_cast;
-using boost::bad_lexical_cast;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
 
-using namespace std;
+using boost::bad_lexical_cast;
+using boost::lexical_cast;
 
-using namespace mesos::internal::slave;
+using std::cerr;
+using std::endl;
+using std::string;
 
 
 void usage(const char *programName, const Configurator& configurator)

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun  5 09:06:18 2011
@@ -13,6 +13,8 @@ using boost::lexical_cast;
 using boost::unordered_map;
 using boost::unordered_set;
 
+using launcher::ExecutorLauncher;
+
 using std::cerr;
 using std::cout;
 using std::endl;
@@ -53,13 +55,13 @@ void ProcessBasedIsolationModule::initia
 }
 
 
-void ProcessBasedIsolationModule::startExecutor(Framework* framework)
+void ProcessBasedIsolationModule::launchExecutor(Framework* framework, Executor* executor)
 {
   if (!initialized)
     LOG(FATAL) << "Cannot launch executors before initialization!";
 
   LOG(INFO) << "Starting executor for framework " << framework->frameworkId
-            << ": " << framework->info.executor().uri();
+            << ": " << executor->info.uri();
 
   pid_t pid;
   if ((pid = fork()) == -1)
@@ -68,30 +70,31 @@ void ProcessBasedIsolationModule::startE
   if (pid) {
     // In parent process, record the pgid for killpg later.
     LOG(INFO) << "Started executor, OS pid = " << pid;
-    pgids[framework->frameworkId] = pid;
-    framework->executorStatus = "PID: " + lexical_cast<string>(pid);
+    pgids[framework->frameworkId][executor->info.executor_id()] = pid;
+    executor->executorStatus = "PID: " + lexical_cast<string>(pid);
   } else {
     // In child process, make cleanup easier.
 //     if (setpgid(0, 0) < 0)
 //       PLOG(FATAL) << "Failed to put executor in own process group";
     if ((pid = setsid()) == -1)
       PLOG(FATAL) << "Failed to put executor in own session";
-
-    createExecutorLauncher(framework)->run();
+    
+    createExecutorLauncher(framework, executor)->run();
   }
 }
 
 
-void ProcessBasedIsolationModule::killExecutor(Framework* framework)
+void ProcessBasedIsolationModule::killExecutor(Framework* framework, Executor* executor)
 {
-  if (pgids[framework->frameworkId] != -1) {
+  if (pgids[framework->frameworkId][executor->info.executor_id()] != -1) {
     // TODO(benh): Consider sending a SIGTERM, then after so much time
     // if it still hasn't exited do a SIGKILL (can use a libprocess
     // process for this).
-    LOG(INFO) << "Sending SIGKILL to gpid " << pgids[framework->frameworkId];
-    killpg(pgids[framework->frameworkId], SIGKILL);
-    pgids[framework->frameworkId] = -1;
-    framework->executorStatus = "No executor running";
+    LOG(INFO) << "Sending SIGKILL to gpid "
+              << pgids[framework->frameworkId][executor->info.executor_id()];
+    killpg(pgids[framework->frameworkId][executor->info.executor_id()], SIGKILL);
+    pgids[framework->frameworkId][executor->info.executor_id()] = -1;
+    executor->executorStatus = "No executor running";
 
     // TODO(benh): Kill all of the process's descendants? Perhaps
     // create a new libprocess process that continually tries to kill
@@ -99,39 +102,39 @@ void ProcessBasedIsolationModule::killEx
     // to kill the executor last ... maybe this is just too much of a
     // burden?
 
-    pgids.erase(framework->frameworkId);
+    pgids[framework->frameworkId].erase(executor->info.executor_id());
   }
 }
 
 
-void ProcessBasedIsolationModule::resourcesChanged(Framework* framework)
+void ProcessBasedIsolationModule::resourcesChanged(Framework* framework, Executor* executor)
 {
   // Do nothing; subclasses may override this.
 }
 
 
-ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(Framework* framework)
+ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(Framework* framework, Executor* executor)
 {
-  const Configuration& conf = slave->getConfiguration();
-
+  // Create a map of parameters for the executor launcher.
   map<string, string> params;
 
-  for (int i = 0; i < framework->info.executor().params().param_size(); i++) {
-    params[framework->info.executor().params().param(i).key()] = 
-      framework->info.executor().params().param(i).value();
+  for (int i = 0; i < executor->info.params().param_size(); i++) {
+    params[executor->info.params().param(i).key()] = 
+      executor->info.params().param(i).value();
   }
 
-  return
+  return 
     new ExecutorLauncher(framework->frameworkId,
-                         framework->info.executor().uri(),
+                         executor->info.executor_id(),
+                         executor->info.uri(),
                          framework->info.user(),
                          slave->getUniqueWorkDirectory(framework->frameworkId),
                          slave->self(),
-                         conf.get("frameworks_home", ""),
-                         conf.get("home", ""),
-                         conf.get("hadoop_home", ""),
+                         slave->getConfiguration().get("frameworks_home", ""),
+                         slave->getConfiguration().get("home", ""),
+                         slave->getConfiguration().get("hadoop_home", ""),
                          !slave->local,
-                         conf.get("switch_user", true),
+                         slave->getConfiguration().get("switch_user", true),
                          params);
 }
 
@@ -151,18 +154,20 @@ void ProcessBasedIsolationModule::Reaper
       pid_t pid;
       int status;
       if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
-        foreachpair (const FrameworkID& frameworkId, pid_t pgid,
-                     module->pgids) {
-          if (pgid == pid) {
-            // Kill the process group to clean up the tasks.
-            LOG(INFO) << "Sending SIGKILL to gpid " << pgid;
-            killpg(pgid, SIGKILL);
-            module->pgids[frameworkId] = -1;
-            LOG(INFO) << "Telling slave of lost framework " << frameworkId;
-            // TODO(benh): This is broken if/when libprocess is parallel!
-            module->slave->executorExited(frameworkId, status);
-            module->pgids.erase(frameworkId);
-            break;
+        foreachpair (const FrameworkID& frameworkId, _, module->pgids) {
+          foreachpair (const ExecutorID& executorId, pid_t pgid, module->pgids[frameworkId]) {
+            if (pgid == pid) {
+              // Kill the process group to clean up the tasks.
+              LOG(INFO) << "Sending SIGKILL to gpid " << pgid;
+              killpg(pgid, SIGKILL);
+              module->pgids[frameworkId][executorId] = -1;
+              LOG(INFO) << "Telling slave of lost executor " << executorId
+                        << " of framework " << frameworkId;
+              // TODO(benh): This is broken if/when libprocess is parallel!
+              module->slave->executorExited(frameworkId, executorId, status);
+              module->pgids[frameworkId].erase(executorId);
+              break;
+            }
           }
         }
       }

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Sun Jun  5 09:06:18 2011
@@ -15,11 +15,20 @@
 
 namespace mesos { namespace internal { namespace slave {
 
-using boost::unordered_map;
-using mesos::internal::launcher::ExecutorLauncher;
-
 class ProcessBasedIsolationModule : public IsolationModule {
 public:
+  ProcessBasedIsolationModule();
+
+  virtual ~ProcessBasedIsolationModule();
+
+  virtual void initialize(Slave *slave);
+
+  virtual void launchExecutor(Framework* framework, Executor* executor);
+
+  virtual void killExecutor(Framework* framework, Executor* executor);
+
+  virtual void resourcesChanged(Framework* framework, Executor* executor);
+
   // Reaps child processes and tells the slave if they exit
   class Reaper : public Process {
     ProcessBasedIsolationModule* module;
@@ -34,25 +43,6 @@ public:
   // Extra shutdown message for reaper
   enum { SHUTDOWN_REAPER = PROCESS_MSGID };
 
-private:
-  bool initialized;
-  Slave* slave;
-  unordered_map<FrameworkID, pid_t> pgids;
-  Reaper* reaper;
-
-public:
-  ProcessBasedIsolationModule();
-
-  virtual ~ProcessBasedIsolationModule();
-
-  virtual void initialize(Slave *slave);
-
-  virtual void startExecutor(Framework *framework);
-
-  virtual void killExecutor(Framework* framework);
-
-  virtual void resourcesChanged(Framework* framework);
-
 protected:
   // Main method executed after a fork() to create a Launcher for launching
   // an executor's process. The Launcher will create the child's working
@@ -61,7 +51,13 @@ protected:
   // Subclasses of ProcessBasedIsolationModule that wish to override the
   // default launching behavior should override createLauncher() and return
   // their own Launcher object (including possibly a subclass of Launcher).
-  virtual ExecutorLauncher* createExecutorLauncher(Framework* framework);
+  virtual launcher::ExecutorLauncher* createExecutorLauncher(Framework* framework, Executor* executor);
+
+private:
+  bool initialized;
+  Slave* slave;
+  boost::unordered_map<FrameworkID, boost::unordered_map<ExecutorID, pid_t> > pgids;
+  Reaper* reaper;
 };
 
 }}}

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 09:06:18 2011
@@ -90,39 +90,42 @@ state::SlaveState *Slave::getState()
     new state::SlaveState(BUILD_DATE, BUILD_USER, slaveId.value(),
                           cpus.value(), mem.value(), self(), master);
 
-  foreachpair(_, Framework *f, frameworks) {
-    Resources resources(f->resources);
-    Resource::Scalar cpus;
-    Resource::Scalar mem;
-    cpus.set_value(-1);
-    mem.set_value(-1);
-    cpus = resources.getScalar("cpus", cpus);
-    mem = resources.getScalar("mem", mem);
-
-    state::Framework *framework =
-      new state::Framework(f->frameworkId.value(), f->info.name(),
-                           f->info.executor().uri(), f->executorStatus,
-                           cpus.value(), mem.value());
-
-    state->frameworks.push_back(framework);
-
-    foreachpair(_, Task *t, f->tasks) {
-      Resources resources(t->resources());
-      Resource::Scalar cpus;
-      Resource::Scalar mem;
-      cpus.set_value(-1);
-      mem.set_value(-1);
-      cpus = resources.getScalar("cpus", cpus);
-      mem = resources.getScalar("mem", mem);
-
-      state::Task *task =
-        new state::Task(t->task_id().value(), t->name(),
-                        TaskState_descriptor()->FindValueByNumber(t->state())->name(),
-                        cpus.value(), mem.value());
+//   foreachpair (_, Framework *f, frameworks) {
 
-      framework->tasks.push_back(task);
-    }
-  }
+//     foreachpair (_, Executor* e, f->executors) {
+
+//       Resources resources(e->resources);
+//       Resource::Scalar cpus;
+//       Resource::Scalar mem;
+//       cpus.set_value(-1);
+//       mem.set_value(-1);
+//       cpus = resources.getScalar("cpus", cpus);
+//       mem = resources.getScalar("mem", mem);
+
+//     state::Framework *framework =
+//       new state::Framework(f->frameworkId.value(), f->info.name(),
+//                            f->info.executor().uri(), f->executorStatus,
+//                            cpus.value(), mem.value());
+
+//     state->frameworks.push_back(framework);
+
+//     foreachpair(_, Task *t, f->tasks) {
+//       Resources resources(t->resources());
+//       Resource::Scalar cpus;
+//       Resource::Scalar mem;
+//       cpus.set_value(-1);
+//       mem.set_value(-1);
+//       cpus = resources.getScalar("cpus", cpus);
+//       mem = resources.getScalar("mem", mem);
+
+//       state::Task *task =
+//         new state::Task(t->task_id().value(), t->name(),
+//                         TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+//                         cpus.value(), mem.value());
+
+//       framework->tasks.push_back(task);
+//     }
+//   }
 
   return state;
 }
@@ -181,11 +184,13 @@ void Slave::operator () ()
           out.mutable_slave_id()->MergeFrom(slaveId);
           out.mutable_slave()->MergeFrom(slave);
 
-	  foreachpair(_, Framework *framework, frameworks) {
-	    foreachpair(_, Task *task, framework->tasks) {
-              out.add_task()->MergeFrom(*task);
-	    }
-	  }
+	  foreachpair (_, Framework* framework, frameworks) {
+	    foreachpair (_, Executor* executor, framework->executors) {
+              foreachpair (_, Task* task, executor->tasks) {
+                out.add_task()->MergeFrom(*task);
+              }
+            }
+          }
 
 	  send(master, out);
 	}
@@ -241,33 +246,48 @@ void Slave::operator () ()
         LOG(INFO) << "Got assigned task " << task.task_id()
                   << " for framework " << msg.framework_id();
 
-        // Start an executor if one isn't already running.
         Framework *framework = getFramework(msg.framework_id());
         if (framework == NULL) {
           framework =
-            new Framework(msg.framework(), msg.framework_id(), msg.pid());
-
+            new Framework(msg.framework_id(), msg.framework(), msg.pid());
           frameworks[msg.framework_id()] = framework;
-          isolationModule->startExecutor(framework);
         }
 
-        // Create a local task.
-        Task *t = framework->addTask(task);
-
-        // Either send the task to an executor or queue the task until
-        // the executor has started.
-        Executor *executor = getExecutor(msg.framework_id());
+        // Either send the task to an executor or start a new executor
+        // and queue the task until the executor has started.
+        Executor* executor = task.has_executor()
+          ? framework->getExecutor(task.executor().executor_id())
+          : framework->getExecutor(framework->info.executor().executor_id());
+        
         if (executor != NULL) {
-          Message<S2E_RUN_TASK> out;
-          out.mutable_framework()->MergeFrom(framework->info);
-          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-          out.set_pid(framework->pid);
-          out.mutable_task()->MergeFrom(task);
-          send(executor->pid, out);
-          isolationModule->resourcesChanged(framework);
+          if (!executor->pid) {
+            // Queue task until the executor starts up.
+            executor->queuedTasks.push_back(task);
+          } else {
+            // Add the task to the executor.
+            executor->addTask(task);
+
+            Message<S2E_RUN_TASK> out;
+            out.mutable_framework()->MergeFrom(framework->info);
+            out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+            out.set_pid(framework->pid);
+            out.mutable_task()->MergeFrom(task);
+            send(executor->pid, out);
+            isolationModule->resourcesChanged(framework, executor);
+          }
         } else {
-          // Executor not yet registered; queue task for when it starts up
-          framework->queuedTasks.push_back(task);
+          // Launch an executor for this task.
+          if (task.has_executor()) {
+            executor = framework->createExecutor(task.executor());
+          } else {
+            executor = framework->createExecutor(framework->info.executor());
+          }
+
+          // Queue task until the executor starts up.
+          executor->queuedTasks.push_back(task);
+
+          // Tell the isolation module to launch the executor.
+          isolationModule->launchExecutor(framework, executor);
         }
         break;
       }
@@ -280,45 +300,47 @@ void Slave::operator () ()
 
         Framework* framework = getFramework(msg.framework_id());
         if (framework != NULL) {
-	  // Tell the executor to kill the task if it is up and
-	  // running, if not 
-	  Executor* executor = getExecutor(msg.framework_id());
-	  if (executor != NULL) {
-	    Message<S2E_KILL_TASK> out;
-	    out.mutable_framework_id()->MergeFrom(msg.framework_id());
-	    out.mutable_task_id()->MergeFrom(msg.task_id());
-	    send(executor->pid, out);
-	  } else {
-	    // Update the resources locally, if an executor comes up
-	    // after this then it just won't receive this task.
-	    framework->removeTask(msg.task_id());
-	    isolationModule->resourcesChanged(framework);
-
-	    Message<S2M_STATUS_UPDATE> out;
-	    out.mutable_framework_id()->MergeFrom(msg.framework_id());
-	    TaskStatus *status = out.mutable_status();
-	    status->mutable_task_id()->MergeFrom(msg.task_id());
-	    status->mutable_slave_id()->MergeFrom(slaveId);
-	    status->set_state(TASK_LOST);
-
-	    int seq = rsend(master, framework->pid, out);
-	    seqs[msg.framework_id()].insert(seq);
-	  }
-	} else {
-	  LOG(ERROR) << "Cannot kill task " << msg.task_id()
-		     << " of framework " << msg.framework_id()
-		     << " because no such framework is running";
+          // Tell the executor to kill the task if it is up and
+          // running, otherwise, consider the task lost.
+          Executor* executor = framework->getExecutor(msg.task_id());
+          if (executor == NULL || !executor->pid) {
+            // Update the resources locally, if an executor comes up
+            // after this then it just won't receive this task.
+            executor->removeTask(msg.task_id());
+            isolationModule->resourcesChanged(framework, executor);
+
+            Message<S2M_STATUS_UPDATE> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            TaskStatus *status = out.mutable_status();
+            status->mutable_task_id()->MergeFrom(msg.task_id());
+            status->mutable_slave_id()->MergeFrom(slaveId);
+            status->set_state(TASK_LOST);
+
+            int seq = rsend(master, framework->pid, out);
+            seqs[msg.framework_id()].insert(seq);
+          } else {
+            // Otherwise, send a message to the executor and wait for
+            // it to send us a status update.
+            Message<S2E_KILL_TASK> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            out.mutable_task_id()->MergeFrom(msg.task_id());
+            send(executor->pid, out);
+          }
+        } else {
+          LOG(WARNING) << "Cannot kill task " << msg.task_id()
+                       << " of framework " << msg.framework_id()
+                       << " because no such framework is running";
 
           Message<S2M_STATUS_UPDATE> out;
-	  out.mutable_framework_id()->MergeFrom(msg.framework_id());
+          out.mutable_framework_id()->MergeFrom(msg.framework_id());
           TaskStatus *status = out.mutable_status();
           status->mutable_task_id()->MergeFrom(msg.task_id());
           status->mutable_slave_id()->MergeFrom(slaveId);
           status->set_state(TASK_LOST);
 
-	  int seq = rsend(master, out);
-	  seqs[msg.framework_id()].insert(seq);
-	}
+          int seq = rsend(master, out);
+          seqs[msg.framework_id()].insert(seq);
+        }
         break;
       }
 
@@ -336,22 +358,35 @@ void Slave::operator () ()
       case M2S_FRAMEWORK_MESSAGE: {
         const Message<M2S_FRAMEWORK_MESSAGE>&msg = message();
 
-        const FrameworkMessage& message = msg.message();
+        Framework* framework = getFramework(msg.framework_id());
+        if (framework != NULL) {
+          const FrameworkMessage& message = msg.message();
 
-        Executor* executor = getExecutor(msg.framework_id());
-        if (executor != NULL) {
-          Message<S2E_FRAMEWORK_MESSAGE> out;
-          out.mutable_framework_id()->MergeFrom(msg.framework_id());
-          out.mutable_message()->MergeFrom(message);
-          send(executor->pid, out);
+          Executor* executor = framework->getExecutor(message.executor_id());
+          if (executor == NULL) {
+            LOG(WARNING) << "Dropping message for executor "
+                         << message.executor_id() << " of framework "
+                         << msg.framework_id()
+                         << " because executor does not exist";
+          } else if (!executor->pid) {
+            // TODO(*): If executor is not started, queue framework message?
+            // (It's probably okay to just drop it since frameworks can have
+            // the executor send a message to the master to say when it's ready.)
+            LOG(WARNING) << "Dropping message for executor "
+                         << message.executor_id() << " of framework "
+                         << msg.framework_id()
+                         << " because executor is not running";
+          } else {
+            Message<S2E_FRAMEWORK_MESSAGE> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            out.mutable_message()->MergeFrom(message);
+            send(executor->pid, out);
+          }
         } else {
-          VLOG(1) << "Dropping framework message for framework "
-                  << msg.framework_id()
-                  << " because its executor is not running";
-        }
-        // TODO(*): If executor is not started, queue framework message?
-        // (It's probably okay to just drop it since frameworks can have
-        // the executor send a message to the master to say when it's ready.)
+          LOG(WARNING) << "Dropping message for framework "
+                       << msg.framework_id()
+                       << " because it does not exist";
+        }
         break;
       }
 
@@ -370,37 +405,50 @@ void Slave::operator () ()
       case E2S_REGISTER_EXECUTOR: {
         const Message<E2S_REGISTER_EXECUTOR>& msg = message();
 
-        LOG(INFO) << "Got executor registration for framework "
+        LOG(INFO) << "Got registration for executor "
+                  << msg.executor_id() << " of framework "
                   << msg.framework_id();
 
         Framework* framework = getFramework(msg.framework_id());
         if (framework != NULL) {
-          Executor* executor = getExecutor(msg.framework_id());
-          if (executor != NULL) {
-            LOG(ERROR) << "Executor for framework " << msg.framework_id()
-                       << "already exists";
+          Executor* executor = framework->getExecutor(msg.executor_id());
+
+          // Check the status of the executor.
+          if (executor == NULL) {
+            LOG(WARNING) << "Not expecting executor " << msg.executor_id()
+                         << " of framework " << msg.framework_id();
+            send(from(), S2E_KILL_EXECUTOR);
+          } else if (executor->pid != PID()) {
+            LOG(WARNING) << "Not good, executor " << msg.executor_id()
+                         << " of framework " << msg.framework_id()
+                         << " is already running";
             send(from(), S2E_KILL_EXECUTOR);
-            break;
+          } else {
+            // Save the pid for the executor.
+            executor->pid = from();
+
+            // Now that the executor is up, set its resource limits.
+            isolationModule->resourcesChanged(framework, executor);
+
+            // Tell executor it's registered and give it any queued tasks.
+            Message<S2E_REGISTER_REPLY> out;
+            ExecutorArgs* args = out.mutable_args();
+            args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+            args->set_name(framework->info.name());
+            args->mutable_slave_id()->MergeFrom(slaveId);
+            args->set_hostname(hostname);
+            args->set_data(framework->info.executor().data());
+            send(executor->pid, out);
+            sendQueuedTasks(framework, executor);
           }
-
-          executor = new Executor(msg.framework_id(), from());
-          executors[msg.framework_id()] = executor;
-
-          // Now that the executor is up, set its resource limits
-          isolationModule->resourcesChanged(framework);
-
-          // Tell executor that it's registered and give it its queued tasks
-          Message<S2E_REGISTER_REPLY> out;
-          ExecutorArgs* args = out.mutable_args();
-          args->mutable_framework_id()->MergeFrom(framework->frameworkId);
-          args->set_name(framework->info.name());
-          args->mutable_slave_id()->MergeFrom(slaveId);
-          args->set_hostname(hostname);
-          args->set_data(framework->info.executor().data());
-          send(executor->pid, out);
-          sendQueuedTasks(framework, executor);
         } else {
-          // Framework is gone; tell the executor to exit
+          // Framework is gone; tell the executor to exit.
+          LOG(WARNING) << "Framework " << msg.framework_id()
+                       << " does not exist (it may have been killed),"
+                       << " telling executor to exit";
+
+          // TODO(benh): Don't we also want to tell the isolation
+          // module to shut this guy down!
           send(from(), S2E_KILL_EXECUTOR);
         }
         break;
@@ -418,24 +466,30 @@ void Slave::operator () ()
 
         Framework *framework = getFramework(msg.framework_id());
         if (framework != NULL) {
-	  if (status.state() == TASK_FINISHED ||
-              status.state() == TASK_FAILED ||
-	      status.state() == TASK_KILLED ||
-              status.state() == TASK_LOST) {
-            framework->removeTask(status.task_id());
-            isolationModule->resourcesChanged(framework);
+          Executor* executor = framework->getExecutor(status.task_id());
+          if (executor != NULL) {
+            if (status.state() == TASK_FINISHED ||
+                status.state() == TASK_FAILED ||
+                status.state() == TASK_KILLED ||
+                status.state() == TASK_LOST) {
+              executor->removeTask(status.task_id());
+              isolationModule->resourcesChanged(framework, executor);
+            }
+
+            // Reliably send message and save sequence number for
+            // canceling later.
+            Message<S2M_STATUS_UPDATE> out;
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
+            out.mutable_status()->MergeFrom(status);
+            int seq = rsend(master, framework->pid, out);
+            seqs[msg.framework_id()].insert(seq);
+          } else {
+            LOG(WARNING) << "Status update error: couldn't lookup "
+                         << "executor for framework " << msg.framework_id();
           }
-
-	  // Reliably send message and save sequence number for
-	  // canceling later.
-          Message<S2M_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(msg.framework_id());
-          out.mutable_status()->MergeFrom(status);
-	  int seq = rsend(master, framework->pid, out);
-	  seqs[msg.framework_id()].insert(seq);
 	} else {
-          LOG(ERROR) << "Status update error: couldn't lookup "
-                     << "framework " << msg.framework_id();
+          LOG(WARNING) << "Status update error: couldn't lookup "
+                       << "framework " << msg.framework_id();
 	}
         break;
       }
@@ -482,8 +536,7 @@ void Slave::operator () ()
 
       case M2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by master: " << from();
-        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
-        foreachpair (_, Framework *framework, frameworksCopy) {
+        foreachpaircopy (_, Framework *framework, frameworks) {
           killFramework(framework);
         }
         return;
@@ -491,8 +544,7 @@ void Slave::operator () ()
 
       case S2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by " << from();
-        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
-        foreachpair (_, Framework *framework, frameworksCopy) {
+        foreachpaircopy (_, Framework *framework, frameworks) {
           killFramework(framework);
         }
         return;
@@ -508,21 +560,13 @@ void Slave::operator () ()
 }
 
 
-Framework * Slave::getFramework(const FrameworkID& frameworkId)
+Framework* Slave::getFramework(const FrameworkID& frameworkId)
 {
-  if (frameworks.count(frameworkId) > 0)
+  if (frameworks.count(frameworkId) > 0) {
     return frameworks[frameworkId];
-  else
-    return NULL;
-}
-
+  }
 
-Executor * Slave::getExecutor(const FrameworkID& frameworkId)
-{
-  if (executors.count(frameworkId) > 0)
-    return executors[frameworkId];
-  else
-    return NULL;
+  return NULL;
 }
 
 
@@ -533,7 +577,12 @@ void Slave::sendQueuedTasks(Framework* f
   LOG(INFO) << "Flushing queued tasks for framework "
             << framework->frameworkId;
 
-  foreach(const TaskDescription& task, framework->queuedTasks) {
+  CHECK(executor->pid != PID());
+
+  foreach (const TaskDescription& task, executor->queuedTasks) {
+    // Add the task to the executor.
+    executor->addTask(task);
+
     Message<S2E_RUN_TASK> out;
     out.mutable_framework()->MergeFrom(framework->info);
     out.mutable_framework_id()->MergeFrom(framework->frameworkId);
@@ -542,12 +591,12 @@ void Slave::sendQueuedTasks(Framework* f
     send(executor->pid, out);
   }
 
-  framework->queuedTasks.clear();
+  executor->queuedTasks.clear();
 }
 
 
 // Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutor)
+void Slave::killFramework(Framework *framework, bool killExecutors)
 {
   LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
 
@@ -558,29 +607,23 @@ void Slave::killFramework(Framework *fra
 
   seqs.erase(framework->frameworkId);
 
-  // Remove its allocated resources.
-  framework->resources = Resources();
+  // Shutdown all executors of this framework.
+  foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
+    if (killExecutors) {
+      LOG(INFO) << "Killing executor " << executorId
+                << " of framework " << framework->frameworkId;
 
-  // If an executor is running, tell it to exit and kill it.
+      send(executor->pid, S2E_KILL_EXECUTOR);
 
-  Executor *executor = getExecutor(framework->frameworkId);
-  if (executor != NULL) {
-    if (killExecutor) {
-      LOG(INFO) << "Killing executor for framework "
-                << framework->frameworkId;
       // TODO(benh): There really isn't ANY time between when an
       // executor gets a S2E_KILL_EXECUTOR message and the isolation
       // module goes and kills it. We should really think about making
       // the semantics of this better.
-      send(executor->pid, S2E_KILL_EXECUTOR);
-      isolationModule->killExecutor(framework);
-    }
 
-    LOG(INFO) << "Cleaning up executor for framework "
-              << framework->frameworkId;
+      isolationModule->killExecutor(framework, executor);
+    }
 
-    executors.erase(framework->frameworkId);
-    delete executor;
+    framework->destroyExecutor(executorId);
   }
 
   frameworks.erase(framework->frameworkId);
@@ -591,20 +634,39 @@ void Slave::killFramework(Framework *fra
 // Called by isolation module when an executor process exits
 // TODO(benh): Make this callback be a message so that we can avoid
 // race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, int status)
+void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status)
 {
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
-    LOG(INFO) << "Executor for framework " << frameworkId << " exited "
-              << "with status " << status;
-
-    Message<S2M_EXITED_EXECUTOR> out;
-    out.mutable_slave_id()->MergeFrom(slaveId);
-    out.mutable_framework_id()->MergeFrom(frameworkId);
-    out.set_status(status);
-    send(master, out);
-
-    killFramework(framework, false);
+    Executor* executor = framework->getExecutor(executorId);
+    if (executor != NULL) {
+      LOG(INFO) << "Exited executor " << executorId
+                << " of framework " << frameworkId
+                << " with status " << status;
+
+      Message<S2M_EXITED_EXECUTOR> out;
+      out.mutable_slave_id()->MergeFrom(slaveId);
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_executor_id()->MergeFrom(executorId);
+      out.set_status(status);
+      send(master, out);
+
+      framework->destroyExecutor(executorId);
+
+      // TODO(benh): When should we kill the presence of an entire
+      // framework on a slave?
+      if (framework->executors.size() == 0) {
+        killFramework(framework);
+      }
+    } else {
+      LOG(WARNING) << "UNKNOWN executor " << executorId
+                   << " of framework " << frameworkId
+                   << " has exited with status " << status;
+    }
+  } else {
+    LOG(WARNING) << "UNKNOWN executor " << executorId
+                 << " of UNKNOWN framework " << frameworkId
+                 << " has exited with status " << status;
   }
 };
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun  5 09:06:18 2011
@@ -45,44 +45,16 @@
 
 namespace mesos { namespace internal { namespace slave {
 
-using namespace mesos;
-using namespace mesos::internal;
-
-using std::list;
-using std::pair;
-using std::make_pair;
-using std::ostringstream;
-using std::string;
-using std::vector;
-
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
 using foreach::_;
 
 
-// Information about a framework
-struct Framework
+// Information describing an executor (goes away if executor crashes).
+struct Executor
 {
-  FrameworkInfo info;
-  FrameworkID frameworkId;
-  PID pid;
-
-  list<TaskDescription> queuedTasks; // Holds tasks until executor starts
-  unordered_map<TaskID, Task *> tasks;
-
-  Resources resources;
+  Executor(const FrameworkID& _frameworkId, const ExecutorInfo& _info)
+    : frameworkId(_frameworkId), info(_info), pid(PID()) {}
 
-  // Information about the status of the executor for this framework, set by
-  // the isolation module. For example, this might include a PID, a VM ID, etc.
-  string executorStatus;
-  
-  Framework(const FrameworkInfo& _info, const FrameworkID& _frameworkId,
-            const PID& _pid)
-    : info(_info), frameworkId(_frameworkId), pid(_pid) {}
-
-  ~Framework()
+  ~Executor()
   {
     // Delete the tasks.
     foreachpair (_, Task *task, tasks) {
@@ -90,39 +62,28 @@ struct Framework
     }
   }
 
-  Task * lookupTask(const TaskID& taskId)
-  {
-    unordered_map<TaskID, Task *>::iterator it = tasks.find(taskId);
-    if (it != tasks.end())
-      return it->second;
-    else
-      return NULL;
-  }
-
-  Task * addTask(const TaskDescription& task)
+  Task* addTask(const TaskDescription& task)
   {
     // The master should enforce unique task IDs, but just in case
     // maybe we shouldn't make this a fatal error.
     CHECK(tasks.count(task.task_id()) == 0);
 
     Task *t = new Task();
+    t->mutable_framework_id()->MergeFrom(frameworkId);
+    t->set_state(TASK_STARTING);
     t->set_name(task.name());
     t->mutable_task_id()->MergeFrom(task.task_id());
-    t->mutable_framework_id()->MergeFrom(frameworkId);
     t->mutable_slave_id()->MergeFrom(task.slave_id());
     t->mutable_resources()->MergeFrom(task.resources());
-    t->set_state(TASK_STARTING);
 
     tasks[task.task_id()] = t;
     resources += task.resources();
-
-    return t;
   }
 
   void removeTask(const TaskID& taskId)
   {
     // Remove task from the queue if it's queued
-    for (list<TaskDescription>::iterator it = queuedTasks.begin();
+    for (std::list<TaskDescription>::iterator it = queuedTasks.begin();
 	 it != queuedTasks.end(); ++it) {
       if ((*it).task_id() == taskId) {
 	queuedTasks.erase(it);
@@ -140,17 +101,75 @@ struct Framework
       delete task;
     }
   }
+
+  const FrameworkID frameworkId;
+  const ExecutorInfo info;
+
+  PID pid;
+
+  std::list<TaskDescription> queuedTasks;
+  boost::unordered_map<TaskID, Task*> tasks;
+
+  Resources resources;
+
+  // Information about the status of the executor for this framework, set by
+  // the isolation module. For example, this might include a PID, a VM ID, etc.
+  std::string executorStatus;
 };
 
 
-// A connection to an executor (goes away if executor crashes)
-struct Executor
+// Information about a framework.
+struct Framework
 {
-  FrameworkID frameworkId;
+  Framework( const FrameworkID& _frameworkId, const FrameworkInfo& _info,
+            const PID& _pid)
+    : frameworkId(_frameworkId), info(_info), pid(_pid) {}
+
+  ~Framework() {}
+
+  Executor* createExecutor(const ExecutorInfo& info)
+  {
+    Executor* executor = new Executor(frameworkId, info);
+    CHECK(executors.count(info.executor_id()) == 0);
+    executors[info.executor_id()] = executor;
+    return executor;
+  }
+
+  void destroyExecutor(const ExecutorID& executorId)
+  {
+    if (executors.count(executorId) > 0) {
+      Executor* executor = executors[executorId];
+      executors.erase(executorId);
+      delete executor;
+    }
+  }
+
+  Executor* getExecutor(const ExecutorID& executorId)
+  {
+    if (executors.count(executorId) > 0) {
+      return executors[executorId];
+    }
+
+    return NULL;
+  }
+
+  Executor* getExecutor(const TaskID& taskId)
+  {
+    foreachpair (_, Executor* executor, executors) {
+      if (executor->tasks.count(taskId) > 0) {
+        return executor;
+      }
+    }
+
+    return NULL;
+  }
+
+  const FrameworkID frameworkId;
+  const FrameworkInfo info;
+
   PID pid;
-  
-  Executor(const FrameworkID& _frameworkId, const PID& _pid)
-    : frameworkId(_frameworkId), pid(_pid) {}
+
+  boost::unordered_map<ExecutorID, Executor*> executors;
 };
 
 
@@ -206,12 +225,12 @@ public:
   state::SlaveState *getState();
 
   // Callback used by isolation module to tell us when an executor exits.
-  void executorExited(const FrameworkID& frameworkId, int status);
+  void executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status);
 
   // Kill a framework (possibly killing its executor).
-  void killFramework(Framework *framework, bool killExecutor = true);
+  void killFramework(Framework *framework, bool killExecutors = true);
 
-  string getUniqueWorkDirectory(const FrameworkID& frameworkId);
+  std::string getUniqueWorkDirectory(const FrameworkID& frameworkId);
 
   const Configuration& getConfiguration();
 
@@ -223,9 +242,7 @@ public:
 protected:
   virtual void operator () ();
 
-  Framework * getFramework(const FrameworkID& frameworkId);
-
-  Executor * getExecutor(const FrameworkID& frameworkId);
+  Framework* getFramework(const FrameworkID& frameworkId);
 
   // Send any tasks queued up for the given framework to its executor
   // (needed if we received tasks while the executor was starting up).
@@ -238,14 +255,13 @@ private:
   Resources resources;
 
   // Invariant: framework will exist if executor exists.
-  unordered_map<FrameworkID, Framework*> frameworks;
-  unordered_map<FrameworkID, Executor*> executors;
+  boost::unordered_map<FrameworkID, Framework*> frameworks;
+
+  // Sequence numbers of reliable messages sent per framework.
+  boost::unordered_map<FrameworkID, boost::unordered_set<int> > seqs;
 
   IsolationModule *isolationModule;
   Heart* heart;
-
-  // Sequence numbers of reliable messages sent on behalf of framework.
-  unordered_map<FrameworkID, unordered_set<int> > seqs;
 };
 
 }}}

Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun  5 09:06:18 2011
@@ -24,8 +24,6 @@ using boost::lexical_cast;
 using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
-using mesos::internal::slave::Framework;
-using mesos::internal::slave::IsolationModule;
 using mesos::internal::slave::ProcessBasedIsolationModule;
 
 using std::string;
@@ -46,7 +44,7 @@ using testing::Sequence;
 using testing::StrEq;
 
 
-class LocalIsolationModule : public IsolationModule
+class LocalIsolationModule : public slave::IsolationModule
 {
 public:
   Executor* executor;
@@ -58,21 +56,22 @@ public:
 
   virtual ~LocalIsolationModule() {}
 
-  virtual void initialize(Slave* slave) {
+  virtual void initialize(slave::Slave* slave) {
     pid = slave->self();
   }
 
-  virtual void startExecutor(Framework* framework) {
+  virtual void launchExecutor(slave::Framework* f, slave::Executor* e) {
     // TODO(benh): Cleanup the way we launch local drivers!
     setenv("MESOS_LOCAL", "1", 1);
     setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
-    setenv("MESOS_FRAMEWORK_ID", framework->frameworkId.value().c_str(), 1);
+    setenv("MESOS_FRAMEWORK_ID", f->frameworkId.value().c_str(), 1);
+    setenv("MESOS_EXECUTOR_ID", e->info.executor_id().value().c_str(), 1);
 
     driver = new MesosExecutorDriver(executor);
     driver->start();
   }
 
-  virtual void killExecutor(Framework* framework) {
+  virtual void killExecutor(slave::Framework* f, slave::Executor* e) {
     driver->stop();
     driver->join();
     delete driver;
@@ -81,6 +80,7 @@ public:
     unsetenv("MESOS_LOCAL");
     unsetenv("MESOS_SLAVE_PID");
     unsetenv("MESOS_FRAMEWORK_ID");
+    unsetenv("MESOS_EXECUTOR_ID");
   }
 };
 
@@ -969,6 +969,7 @@ TEST(MasterTest, FrameworkMessage)
 
   FrameworkMessage hello;
   *hello.mutable_slave_id() = offers[0].slave_id();
+  hello.mutable_executor_id()->set_value("default"); // TODO(benh): No constant!
   hello.set_data("hello");
 
   schedDriver.sendFrameworkMessage(hello);
@@ -979,6 +980,7 @@ TEST(MasterTest, FrameworkMessage)
 
   FrameworkMessage reply;
   *reply.mutable_slave_id() = args.slave_id();
+  reply.mutable_executor_id()->set_value("default"); // TODO(benh): No constant!
   reply.set_data("reply");
 
   execDriver->sendFrameworkMessage(reply);
@@ -1113,6 +1115,7 @@ TEST(MasterTest, SchedulerFailoverFramew
 
   FrameworkMessage message;
   message.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  message.mutable_executor_id()->set_value("default"); // TODO(benh): No constant!
 
   execDriver->sendFrameworkMessage(message);
 

Modified: incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp Sun Jun  5 09:06:18 2011
@@ -56,6 +56,7 @@ public:
     // TODO(benh): The following line crashes some Linux compilers. :(
     // return DEFAULT_EXECUTOR_INFO;
     ExecutorInfo executor;
+    executor.mutable_executor_id()->set_value("default");
     executor.set_uri("noexecutor");
     return executor;
   }

Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Sun Jun  5 09:06:18 2011
@@ -45,8 +45,11 @@ void enterTestDirectory(const char* test
 /**
  * Macro to get a "default" dummy ExecutorInfo object for testing.
  */
-#define DEFAULT_EXECUTOR_INFO \
-  ({ ExecutorInfo executor; executor.set_uri("noexecutor"); executor; })
+#define DEFAULT_EXECUTOR_INFO                                           \
+      ({ ExecutorInfo executor;                                         \
+        executor.mutable_executor_id()->set_value("default");           \
+        executor.set_uri("noexecutor");                                 \
+        executor; })
 
 
 /**