You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:39:44 UTC

svn commit: r1131841 - in /incubator/mesos/trunk: include/ src/ src/tests/

Author: benh
Date: Sun Jun  5 05:39:43 2011
New Revision: 1131841

URL: http://svn.apache.org/viewvc?rev=1131841&view=rev
Log:
Making testing Executors and NexusExecutorDriver possible.

Added:
    incubator/mesos/trunk/src/isolation_module.cpp
Modified:
    incubator/mesos/trunk/include/nexus_exec.hpp
    incubator/mesos/trunk/include/nexus_sched.hpp
    incubator/mesos/trunk/src/Makefile.in
    incubator/mesos/trunk/src/isolation_module.hpp
    incubator/mesos/trunk/src/lxc_isolation_module.cpp
    incubator/mesos/trunk/src/lxc_isolation_module.hpp
    incubator/mesos/trunk/src/nexus_exec.cpp
    incubator/mesos/trunk/src/nexus_local.cpp
    incubator/mesos/trunk/src/process_based_isolation_module.cpp
    incubator/mesos/trunk/src/process_based_isolation_module.hpp
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/slave.hpp
    incubator/mesos/trunk/src/slave_main.cpp
    incubator/mesos/trunk/src/solaris_project_isolation_module.cpp
    incubator/mesos/trunk/src/solaris_project_isolation_module.hpp
    incubator/mesos/trunk/src/tests/test_master.cpp

Modified: incubator/mesos/trunk/include/nexus_exec.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_exec.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_exec.hpp (original)
+++ incubator/mesos/trunk/include/nexus_exec.hpp Sun Jun  5 05:39:43 2011
@@ -65,8 +65,11 @@ class ExecutorDriver
 public:
   virtual ~ExecutorDriver() {}
 
-  // Connect to a slave and run the scheduler until it is shut down
-  virtual int run() { return -1; }
+  // Lifecycle methods
+  virtual int start() { return -1; }
+  virtual int stop() { return -1; }
+  virtual int join() { return -1; }
+  virtual int run() { return -1; } // Start and then join driver
 
   // Communication methods from executor to Nexus
   virtual int sendStatusUpdate(const TaskStatus& status) { return -1; }
@@ -86,7 +89,12 @@ public:
   NexusExecutorDriver(Executor* executor);
   virtual ~NexusExecutorDriver();
 
-  virtual int run();
+  // Lifecycle methods
+  virtual int start();
+  virtual int stop();
+  virtual int join();
+  virtual int run(); // Start and then join driver
+
   virtual int sendStatusUpdate(const TaskStatus& status);
   virtual int sendFrameworkMessage(const FrameworkMessage& message);
 
@@ -100,9 +108,15 @@ private:
 
   // LibProcess process for communicating with slave
   internal::ExecutorProcess* process;
+
+  // Are we currently registered with the slave
+  bool running;
   
   // Mutex to enforce all non-callbacks are execute serially
   pthread_mutex_t mutex;
+
+  // Condition variable for waiting until driver terminates
+  pthread_cond_t cond;
 };
 
 } /* namespace nexus { */

Modified: incubator/mesos/trunk/include/nexus_sched.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_sched.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_sched.hpp (original)
+++ incubator/mesos/trunk/include/nexus_sched.hpp Sun Jun  5 05:39:43 2011
@@ -55,9 +55,9 @@ public:
 
   // Lifecycle methods
   virtual int start() { return -1; }
-  virtual int join() { return -1; }
   virtual int stop() { return -1; }
-  virtual int run() { return -1; } // Start and then join scheduler
+  virtual int join() { return -1; }
+  virtual int run() { return -1; } // Start and then join driver
 
   // Communication methods
   virtual int sendFrameworkMessage(const FrameworkMessage& message) { return -1; }
@@ -85,9 +85,9 @@ public:
 
   // Lifecycle methods
   virtual int start();
-  virtual int join();
   virtual int stop();
-  virtual int run(); // Start and then join scheduler
+  virtual int join();
+  virtual int run(); // Start and then join driver
 
   // Communication methods
   virtual int sendFrameworkMessage(const FrameworkMessage& message);
@@ -121,7 +121,7 @@ private:
   // Mutex to enforce all non-callbacks are execute serially
   pthread_mutex_t mutex;
 
-  // Condition variable for waiting until scheduler terminates
+  // Condition variable for waiting until driver terminates
   pthread_cond_t cond;
 
 };

Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun  5 05:39:43 2011
@@ -108,7 +108,7 @@ NEXUS_LIB = libnexus++.a
 NEXUS_LIBS = $(SCHED_LIB) $(EXEC_LIB) $(NEXUS_LIB)
 
 MASTER_OBJ = master.o allocator_factory.o simple_allocator.o
-SLAVE_OBJ = slave.o launcher.o isolation_module_factory.o \
+SLAVE_OBJ = slave.o launcher.o isolation_module.o \
 	    process_based_isolation_module.o
 COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o master_detector.o url_processor.o zookeeper.o
 EXEC_LIB_OBJ = nexus_exec.o

Added: incubator/mesos/trunk/src/isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/isolation_module.cpp?rev=1131841&view=auto
==============================================================================
--- incubator/mesos/trunk/src/isolation_module.cpp (added)
+++ incubator/mesos/trunk/src/isolation_module.cpp Sun Jun  5 05:39:43 2011
@@ -0,0 +1,34 @@
+#include "isolation_module.hpp"
+#include "process_based_isolation_module.hpp"
+#ifdef __sun__
+#include "solaris_project_isolation_module.hpp"
+#elif __linux__
+#include "lxc_isolation_module.hpp"
+#endif
+
+using std::string;
+
+using namespace nexus::internal::slave;
+
+
+IsolationModule * IsolationModule::create(const string &type)
+{
+  if (type == "process")
+    return new ProcessBasedIsolationModule();
+#ifdef __sun__
+  else if (type == "project")
+    return new SolarisProjectIsolationModule();
+#elif __linux__
+  else if (type == "lxc")
+    return new LxcIsolationModule>();
+#endif
+
+  return NULL;
+}
+
+
+void IsolationModule::destroy(IsolationModule *module)
+{
+  if (module != NULL)
+    delete module;
+}

Modified: incubator/mesos/trunk/src/isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/isolation_module.hpp Sun Jun  5 05:39:43 2011
@@ -1,14 +1,25 @@
 #ifndef __ISOLATION_MODULE_HPP__
 #define __ISOLATION_MODULE_HPP__
 
-#include "slave.hpp"
+#include <string>
+
 
 namespace nexus { namespace internal { namespace slave {
 
+class Framework;
+class Slave;
+
+
 class IsolationModule {
 public:
+  static IsolationModule * create(const std::string &type);
+  static void destroy(IsolationModule *module);
+
   virtual ~IsolationModule() {}
 
+  // Called when during slave initialization.
+  virtual void initialize(Slave *slave) {}
+
   // Called when a new framework is launched on the slave.
   virtual void frameworkAdded(Framework *framework) {}
 
@@ -26,7 +37,7 @@ public:
 
   // Update the resource limits for a given framework. This method will
   // be called only after an executor for the framework is started.
-  virtual void resourcesChanged(Framework *framework) = 0;
+  virtual void resourcesChanged(Framework *framework) {}
 };
 
 }}}

Modified: incubator/mesos/trunk/src/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/lxc_isolation_module.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/lxc_isolation_module.cpp Sun Jun  5 05:39:43 2011
@@ -1,8 +1,5 @@
 #include "lxc_isolation_module.hpp"
 
-#include <stdlib.h>
-#include <unistd.h>
-
 #include <algorithm>
 
 #include "foreach.hpp"
@@ -27,68 +24,58 @@ using namespace nexus;
 using namespace nexus::internal;
 using namespace nexus::internal::slave;
 
-namespace {
 
-const int32_t CPU_SHARES_PER_CPU = 1024;
-const int32_t MIN_CPU_SHARES = 10;
-const int64_t MIN_RSS = 128 * Megabyte;
+LxcIsolationModule::LxcIsolationModule()
+  : initialized(false) {}
+
 
+LxcIsolationModule::~LxcIsolationModule()
+{
+  // We want to wait until the reaper has completed because it
+  // accesses 'this' in order to make callbacks ... deleting 'this'
+  // could thus lead to a seg fault!
+  if (initialized) {
+    CHECK(reaper != NULL);
+    Process::post(reaper->getPID(), SHUTDOWN_REAPER);
+    Process::wait(reaper);
+    delete reaper;
+  }
 }
 
 
-LxcIsolationModule::LxcIsolationModule(Slave* slave)
+void LxcIsolationModule::initialize(Slave *slave)
 {
   this->slave = slave;
   reaper = new Reaper(this);
   Process::spawn(reaper);
-
-  // Run a basic check to see whether Linux Container tools are available
-  if (system("lxc-version > /dev/null") != 0) {
-    LOG(FATAL) << "Could not run lxc-version; make sure Linux Container "
-                << "tools are installed";
-  }
-  // Check that we are root (it might also be possible to create Linux
-  // containers without being root, but we can support that later)
-  if (getuid() != 0) {
-    LOG(FATAL) << "LXC isolation module requires slave to run as root";
-  }
+  initialized = true;
 }
 
 
-LxcIsolationModule::~LxcIsolationModule()
-{
-  // We want to wait until the reaper has completed because it
-  // accesses 'this' in order to make callbacks ... deleting 'this'
-  // could thus lead to a seg fault!
-  Process::post(reaper->getPID(), SHUTDOWN_REAPER);
-  Process::wait(reaper);
-  delete reaper;
-}
-
 
-void LxcIsolationModule::frameworkAdded(Framework* fw)
+void LxcIsolationModule::frameworkAdded(Framework* framework)
 {
-  infos[fw->id] = new FrameworkInfo();
-  infos[fw->id]->lxcExecutePid = -1;
-  infos[fw->id]->container = "";
-  fw->executorStatus = "No executor running";
+  lxcExecutePid[framework->id] = -1;
+  container[framework->id] = "";
+  framework->executorStatus = "No executor running";
 }
 
 
-void LxcIsolationModule::frameworkRemoved(Framework* fw)
+void LxcIsolationModule::frameworkRemoved(Framework *framework)
 {
-  if (infos.find(fw->id) != infos.end()) {
-    delete infos[fw->id];
-    infos.erase(fw->id);
-  }
+  lxcExecutePid.erase(framework->id);
+  container.erase(framework->id);
 }
 
 
 void LxcIsolationModule::startExecutor(Framework *fw)
 {
+  if (!initialized)
+    LOG(FATAL) << "Cannot launch executors before initialization!";
+
   LOG(INFO) << "Starting executor for framework " << fw->id << ": "
             << fw->executorInfo.uri;
-  CHECK(infos[fw->id]->lxcExecutePid == -1 && infos[fw->id]->container == "");
+  CHECK(lxcExecutePid[fw->id] == -1 && container[fw->id] == "");
 
   // Get location of Nexus install in order to find nexus-launcher.
   const char *nexusHome = getenv("NEXUS_HOME");
@@ -101,7 +88,7 @@ void LxcIsolationModule::startExecutor(F
   oss << "nexus.slave-" << slave->id << ".framework-" << fw->id;
   string containerName = oss.str();
 
-  infos[fw->id]->container = containerName;
+  container[fw->id] = containerName;
   fw->executorStatus = "Container: " + containerName;
 
   // Run lxc-execute nexus-launcher using a fork-exec (since lxc-execute
@@ -113,7 +100,7 @@ void LxcIsolationModule::startExecutor(F
 
   if (pid) {
     // In parent process
-    infos[fw->id]->lxcExecutePid = pid;
+    lxcExecutePid[fw->id] = pid;
     LOG(INFO) << "Started lxc-execute, pid = " << pid;
     int status;
   } else {
@@ -131,7 +118,7 @@ void LxcIsolationModule::startExecutor(F
     setenv("NEXUS_EXECUTOR_URI", fw->executorInfo.uri.c_str(), 1);
     setenv("NEXUS_USER", fw->user.c_str(), 1);
     setenv("NEXUS_SLAVE_PID", lexical_cast<string>(slave->self()).c_str(), 1);
-    setenv("NEXUS_REDIRECT_IO", slave->local ? "0" : "1", 1);
+    setenv("NEXUS_REDIRECT_IO", slave->local ? "1" : "0", 1);
     setenv("NEXUS_WORK_DIRECTORY", slave->getWorkDirectory(fw->id).c_str(), 1);
 
     // Run lxc-execute.
@@ -146,13 +133,12 @@ void LxcIsolationModule::startExecutor(F
 
 void LxcIsolationModule::killExecutor(Framework* fw)
 {
-  string container = infos[fw->id]->container;
-  if (container != "") {
-    LOG(INFO) << "Stopping container " << container;
-    int ret = shell("lxc-stop -n %s", container.c_str());
+  if (container[fw->id] != "") {
+    LOG(INFO) << "Stopping container " << container[fw->id];
+    int ret = shell("lxc-stop -n %s", container[fw->id].c_str());
     if (ret != 0)
       LOG(ERROR) << "lxc-stop returned " << ret;
-    infos[fw->id]->container = "";
+    container[fw->id] = "";
     fw->executorStatus = "No executor running";
   }
 }
@@ -160,45 +146,28 @@ void LxcIsolationModule::killExecutor(Fr
 
 void LxcIsolationModule::resourcesChanged(Framework* fw)
 {
-  if (infos[fw->id]->container != "") {
-    // For now, just try setting the CPUs and memory right away, and kill the
-    // framework if this fails.
-    // A smarter thing to do might be to only update them periodically in a
-    // separate thread, and to give frameworks some time to scale down their
-    // memory usage.
-
-    int32_t cpuShares = max(CPU_SHARES_PER_CPU * fw->resources.cpus,
-                            MIN_CPU_SHARES);
-    if (!setResourceLimit(fw, "cpu.shares", cpuShares)) {
-      slave->removeExecutor(fw->id, true);
-      return;
-    }
-
-    int64_t rssLimit = max(fw->resources.mem, MIN_RSS);
-    if (!setResourceLimit(fw, "memory.limit_in_bytes", rssLimit)) {
-      slave->removeExecutor(fw->id, true);
-      return;
-    }
-  }
-}
-
+  if (container[fw->id] != "") {
+    // For now, just try setting the CPUs and mem right away.
+    // A slightly smarter thing might be to only update them periodically.
+    int ret;
+    
+    int32_t cpuShares = max(1024 * fw->resources.cpus, 10);
+    LOG(INFO) << "Setting CPU shares for " << fw->id << " to " << cpuShares;
+    ret = shell("lxc-cgroup -n %s cpu.shares %d",
+                container[fw->id].c_str(), cpuShares);
+    if (ret != 0)
+      LOG(ERROR) << "lxc-cgroup returned " << ret;
 
-bool LxcIsolationModule::setResourceLimit(Framework* fw,
-                                          const string& property,
-                                          int64_t value)
-{
-  LOG(INFO) << "Setting " << property << " for framework " << fw->id
-            << " to " << value;
-  int ret = shell("lxc-cgroup -n %s %s %lld",
-                  infos[fw->id]->container.c_str(),
-                  property.c_str(),
-                  value);
-  if (ret != 0) {
-    LOG(ERROR) << "Failed to set " << property << " for framework " << fw->id
-               << ": lxc-cgroup returned " << ret;
-    return false;
-  } else {
-    return true;
+    int64_t rssLimit = max(fw->resources.mem, 128 * Megabyte);
+    LOG(INFO) << "Setting RSS limit for " << fw->id << " to " << rssLimit;
+    ret = shell("lxc-cgroup -n %s memory.limit_in_bytes %lld",
+                container[fw->id].c_str(), rssLimit);
+    if (ret != 0)
+      LOG(ERROR) << "lxc-cgroup returned " << ret;
+    
+    // TODO: Decreasing the RSS limit will fail if the current RSS is too
+    // large and memory can't be swapped out. In that case, we should
+    // either freeze the container before changing RSS, or just kill it.
   }
 }
 
@@ -238,10 +207,10 @@ void LxcIsolationModule::Reaper::operato
       pid_t pid;
       int status;
       if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
-        foreachpair (FrameworkID fid, FrameworkInfo* info, module->infos) {
-          if (info->lxcExecutePid == pid) {
-            info->container = "";
-            info->lxcExecutePid = -1;
+        foreachpair (FrameworkID fid, pid_t& fwPid, module->lxcExecutePid) {
+          if (fwPid == pid) {
+            module->container[fid] = "";
+            module->lxcExecutePid[fid] = -1;
             LOG(INFO) << "Telling slave of lost framework " << fid;
             // TODO(benh): This is broken if/when libprocess is parallel!
             module->slave->executorExited(fid, status);

Modified: incubator/mesos/trunk/src/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/lxc_isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/lxc_isolation_module.hpp Sun Jun  5 05:39:43 2011
@@ -35,15 +35,18 @@ public:
   };
 
 protected:
+  bool initialized;
   Slave* slave;
   unordered_map<FrameworkID, FrameworkInfo*> infos;
   Reaper* reaper;
 
 public:
-  LxcIsolationModule(Slave* slave);
+  LxcIsolationModule();
 
   virtual ~LxcIsolationModule();
 
+  virtual void initialize(Slave* slave);
+
   virtual void frameworkAdded(Framework* framework);
 
   virtual void frameworkRemoved(Framework* framework);

Modified: incubator/mesos/trunk/src/nexus_exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_exec.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_exec.cpp (original)
+++ incubator/mesos/trunk/src/nexus_exec.cpp Sun Jun  5 05:39:43 2011
@@ -41,13 +41,18 @@ protected:
   Executor* executor;
   FrameworkID fid;
   SlaveID sid;
+  bool local;
+
+  volatile bool terminate;
 
 public:
   ExecutorProcess(const PID& _slave,
                   NexusExecutorDriver* _driver,
                   Executor* _executor,
-                  FrameworkID _fid)
-    : slave(_slave), driver(_driver), executor(_executor), fid(_fid) {}
+                  FrameworkID _fid,
+                  bool _local)
+    : slave(_slave), driver(_driver), executor(_executor),
+      fid(_fid), local(_local), terminate(false) {}
 
 protected:
   void operator() ()
@@ -60,7 +65,13 @@ protected:
       // process any other messages. This is especially tricky if a
       // slave dies since we won't handle the PROCESS_EXIT message in
       // a timely manner (if at all).
-      switch(receive()) {
+
+      // Check for terminate in the same way as SchedulerProcess. See
+      // comments there for an explanation of why this is necessary.
+      if (terminate)
+        return;
+
+      switch(receive(2)) {
         case S2E_REGISTER_REPLY: {
           string host;
           string fwName;
@@ -99,7 +110,9 @@ protected:
 
         case S2E_KILL_EXECUTOR: {
           invoke(bind(&Executor::shutdown, executor, driver));
-          exit(0);
+          if (!local)
+            exit(0);
+          break;
         }
 
         case PROCESS_EXIT: {
@@ -111,7 +124,8 @@ protected:
 	  // ourself) hoping to clean up any processes this executor
 	  // launched itself.
 	  // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
-	  killpg(0, SIGKILL);
+          if (!local)
+            killpg(0, SIGKILL);
         }
 
         default: {
@@ -134,17 +148,16 @@ protected:
 
 
 // Default implementation of error() that logs to stderr and exits
-void Executor::error(ExecutorDriver*, int code, const string &message)
+void Executor::error(ExecutorDriver* driver, int code, const string &message)
 {
   cerr << "Nexus error: " << message
        << " (error code: " << code << ")" << endl;
-  // TODO(*): Don't exit here, let errors be recoverable. (?)
-  exit(1);
+  driver->stop();
 }
 
 
 NexusExecutorDriver::NexusExecutorDriver(Executor* _executor)
-  : executor(_executor)
+  : executor(_executor), running(false)
 {
   // Create mutex and condition variable
   pthread_mutexattr_t attr;
@@ -152,28 +165,49 @@ NexusExecutorDriver::NexusExecutorDriver
   pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
   pthread_mutex_init(&mutex, &attr);
   pthread_mutexattr_destroy(&attr);
+  pthread_cond_init(&cond, 0);
 }
 
 
 NexusExecutorDriver::~NexusExecutorDriver()
 {
   pthread_mutex_destroy(&mutex);
+  pthread_cond_destroy(&cond);
+
+  Process::wait(process);
+  delete process;
 }
 
 
-int NexusExecutorDriver::run()
+int NexusExecutorDriver::start()
 {
+  Lock lock(&mutex);
+
+  if (running) {
+    return -1;
+  }
+
   // Set stream buffering mode to flush on newlines so that we capture logs
   // from user processes even when output is redirected to a file.
   setvbuf(stdout, 0, _IOLBF, 0);
   setvbuf(stderr, 0, _IOLBF, 0);
 
+  bool local;
+
   PID slave;
   FrameworkID fid;
 
   char* value;
   std::istringstream iss;
 
+  /* Check if this is local (for example, for testing). */
+  value = getenv("NEXUS_LOCAL");
+
+  if (value != NULL)
+    local = true;
+  else
+    local = false;
+
   /* Get slave PID from environment. */
   value = getenv("NEXUS_SLAVE_PID");
 
@@ -196,23 +230,56 @@ int NexusExecutorDriver::run()
   if (!(iss >> fid))
     fatal("cannot parse NEXUS_FRAMEWORK_ID");
 
-  process = new ExecutorProcess(slave, this, executor, fid);
+  process = new ExecutorProcess(slave, this, executor, fid, local);
 
-  Process::wait(Process::spawn(process));
+  Process::spawn(process);
 
-  process = NULL;
+  running = true;
 
   return 0;
 }
 
 
+int NexusExecutorDriver::stop()
+{
+  Lock lock(&mutex);
+
+  if (!running) {
+    return -1;
+  }
+
+  process->terminate = true;
+
+  running = false;
+
+  pthread_cond_signal(&cond);
+
+  return 0;
+}
+
+
+int NexusExecutorDriver::join()
+{
+  Lock lock(&mutex);
+  while (running)
+    pthread_cond_wait(&cond, &mutex);
+
+  return 0;
+}
+
+
+int NexusExecutorDriver::run()
+{
+  int ret = start();
+  return ret != 0 ? ret : join();
+}
+
+
 int NexusExecutorDriver::sendStatusUpdate(const TaskStatus &status)
 {
   Lock lock(&mutex);
 
-  /* TODO(benh): Increment ref count on process. */
-  
-  if (!process) {
+  if (!running) {
     //executor->error(this, EINVAL, "Executor has exited");
     return -1;
   }
@@ -223,8 +290,6 @@ int NexusExecutorDriver::sendStatusUpdat
                                                  status.state,
                                                  status.data));
 
-  /* TODO(benh): Decrement ref count on process. */
-
   return 0;
 }
 
@@ -233,9 +298,7 @@ int NexusExecutorDriver::sendFrameworkMe
 {
   Lock lock(&mutex);
 
-  /* TODO(benh): Increment ref count on process. */
-  
-  if (!process) {
+  if (!running) {
     //executor->error(this, EINVAL, "Executor has exited");
     return -1;
   }
@@ -244,8 +307,6 @@ int NexusExecutorDriver::sendFrameworkMe
                 process->pack<E2S_FRAMEWORK_MESSAGE>(process->fid,
                                                      message));
 
-  /* TODO(benh): Decrement ref count on process. */
-
   return 0;
 }
 

Modified: incubator/mesos/trunk/src/nexus_local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_local.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_local.cpp (original)
+++ incubator/mesos/trunk/src/nexus_local.cpp Sun Jun  5 05:39:43 2011
@@ -3,14 +3,20 @@
 
 #include <tuple.hpp>
 
+#include <map>
 #include <vector>
 
+#include "foreach.hpp"
 #include "nexus_local.hpp"
+#include "process_based_isolation_module.hpp"
 
+using std::map;
 using std::vector;
 
 using nexus::internal::master::Master;
 using nexus::internal::slave::Slave;
+using nexus::internal::slave::IsolationModule;
+using nexus::internal::slave::ProcessBasedIsolationModule;
 
 using namespace nexus::internal;
 
@@ -30,7 +36,7 @@ void initialize_glog() {
 namespace nexus { namespace internal { namespace local {
 
 static Master *master = NULL;
-static vector<Slave*> *slaves = NULL;
+static map<IsolationModule*, Slave*> slaves;
 static MasterDetector *detector = NULL;
 
 
@@ -47,15 +53,17 @@ PID launch(int numSlaves, int32_t cpus, 
   }
 
   master = new Master();
-  slaves = new vector<Slave*>();
 
   PID pid = Process::spawn(master);
 
   vector<PID> pids;
 
   for (int i = 0; i < numSlaves; i++) {
-    Slave* slave = new Slave(Resources(cpus, mem), true);
-    slaves->push_back(slave);
+    // TODO(benh): Create a local isolation module?
+    ProcessBasedIsolationModule *isolationModule =
+      new ProcessBasedIsolationModule();
+    Slave* slave = new Slave(Resources(cpus, mem), true, isolationModule);
+    slaves[isolationModule] = slave;
     pids.push_back(Process::spawn(slave));
   }
 
@@ -72,15 +80,20 @@ void shutdown()
   delete master;
   master = NULL;
 
-  for (int i = 0; i < slaves->size(); i++) {
-    Slave *slave = slaves->at(i);
+  // TODO(benh): Ugh! Because the isolation module calls back into the
+  // slave (not the best design) we can't delete the slave until we
+  // have deleted the isolation module. But since the slave calls into
+  // the isolation module, we can't delete the isolation module until
+  // we have stopped the slave.
+
+  foreachpair (IsolationModule *isolationModule, Slave *slave, slaves) {
     Process::post(slave->getPID(), S2S_SHUTDOWN);
     Process::wait(slave);
+    delete isolationModule;
     delete slave;
   }
 
-  delete slaves;
-  slaves = NULL;
+  slaves.clear();
 
   delete detector;
   detector = NULL;

Modified: incubator/mesos/trunk/src/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/process_based_isolation_module.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/process_based_isolation_module.cpp Sun Jun  5 05:39:43 2011
@@ -22,22 +22,29 @@ using namespace nexus::internal;
 using namespace nexus::internal::slave;
 
 
-ProcessBasedIsolationModule::ProcessBasedIsolationModule(Slave* slave)
-{
-  this->slave = slave;
-  reaper = new Reaper(this);
-  Process::spawn(reaper);
-}
+ProcessBasedIsolationModule::ProcessBasedIsolationModule()
+  : initialized(false) {}
 
 
 ProcessBasedIsolationModule::~ProcessBasedIsolationModule()
 {
-  // We want to wait until the reaper has completed because it
+  // We need to wait until the reaper has completed because it
   // accesses 'this' in order to make callbacks ... deleting 'this'
   // could thus lead to a seg fault!
-  Process::post(reaper->getPID(), SHUTDOWN_REAPER);
-  Process::wait(reaper);
-  delete reaper;
+  if (initialized) {
+    CHECK(reaper != NULL);
+    Process::post(reaper->getPID(), SHUTDOWN_REAPER);
+    Process::wait(reaper);
+    delete reaper;
+  }
+}
+
+void ProcessBasedIsolationModule::initialize(Slave *slave)
+{
+  this->slave = slave;
+  reaper = new Reaper(this);
+  Process::spawn(reaper);
+  initialized = true;
 }
 
 
@@ -56,6 +63,9 @@ void ProcessBasedIsolationModule::framew
 
 void ProcessBasedIsolationModule::startExecutor(Framework* framework)
 {
+  if (!initialized)
+    LOG(FATAL) << "Cannot launch executors before initialization!";
+
   LOG(INFO) << "Starting executor for framework " << framework->id << ": "
             << framework->executorInfo.uri;
   CHECK(pgids[framework->id] == -1);

Modified: incubator/mesos/trunk/src/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/process_based_isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/process_based_isolation_module.hpp Sun Jun  5 05:39:43 2011
@@ -1,18 +1,18 @@
 #ifndef __PROCESS_BASED_ISOLATION_MODULE_HPP__
 #define __PROCESS_BASED_ISOLATION_MODULE_HPP__
 
-#include <string>
-
 #include <sys/types.h>
 
 #include <boost/unordered_map.hpp>
 
-#include "launcher.hpp"
 #include "isolation_module.hpp"
+#include "launcher.hpp"
+#include "messages.hpp"
+#include "slave.hpp"
+
 
 namespace nexus { namespace internal { namespace slave {
 
-using std::string;
 using boost::unordered_map;
 using nexus::internal::launcher::ExecutorLauncher;
 
@@ -33,15 +33,18 @@ public:
   enum { SHUTDOWN_REAPER = NEXUS_MESSAGES };
 
 protected:
+  bool initialized;
   Slave* slave;
   unordered_map<FrameworkID, pid_t> pgids;
   Reaper* reaper;
 
 public:
-  ProcessBasedIsolationModule(Slave* slave);
+  ProcessBasedIsolationModule();
 
   virtual ~ProcessBasedIsolationModule();
 
+  virtual void initialize(Slave *slave);
+
   virtual void frameworkAdded(Framework* framework);
 
   virtual void frameworkRemoved(Framework* framework);

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 05:39:43 2011
@@ -1,9 +1,6 @@
-#include <getopt.h>
-
 #include <fstream>
 #include <algorithm>
 
-#include "isolation_module_factory.hpp"
 #include "slave.hpp"
 #include "slave_webui.hpp"
 
@@ -64,17 +61,14 @@ public:
 } /* namespace */
 
 
-Slave::Slave(Resources _resources, bool _local, const string &_isolationType)
+Slave::Slave(Resources _resources, bool _local,
+             IsolationModule *_isolationModule)
   : id(""), resources(_resources), local(_local),
-    isolationType(_isolationType), isolationModule(NULL) {}
+    isolationModule(_isolationModule) {}
 
 
 Slave::~Slave()
 {
-  // TODO(matei): Add support for factory style destroy of objects!
-  if (isolationModule != NULL)
-    delete isolationModule;
-
   // TODO(benh): Shut down and free executors?
 }
 
@@ -123,15 +117,11 @@ void Slave::operator () ()
     publicDns = hostname;
   }
 
-  FrameworkID fid;
-  TaskID tid;
-  TaskState taskState;
-  Params params;
-  FrameworkMessage message;
-  string data;
+  // Initialize isolation module.
+  isolationModule->initialize(this);
 
   while (true) {
-    switch (receive(2)) {
+    switch (receive()) {
       case NEW_MASTER_DETECTED: {
 	string masterSeq;
 	PID masterPid;
@@ -175,9 +165,6 @@ void Slave::operator () ()
         unpack<M2S_REGISTER_REPLY>(this->id, interval);
         LOG(INFO) << "Registered with master; given slave ID " << this->id;
         link(spawn(new Heart(master, this->getPID(), this->id, interval)));
-        isolationModule = createIsolationModule();
-        if (!isolationModule)
-          LOG(FATAL) << "Unrecognized isolation type: " << isolationType;
         break;
       }
       
@@ -194,8 +181,10 @@ void Slave::operator () ()
       
       case M2S_RUN_TASK: {
 	FrameworkID fid;
+        TaskID tid;
         string fwName, user, taskName, taskArg, fwPidStr;
         ExecutorInfo execInfo;
+        Params params;
         unpack<M2S_RUN_TASK>(fid, tid, fwName, user, execInfo,
                              taskName, taskArg, params, fwPidStr);
         LOG(INFO) << "Got assigned task " << fid << ":" << tid;
@@ -230,6 +219,8 @@ void Slave::operator () ()
       }
 
       case M2S_KILL_TASK: {
+        FrameworkID fid;
+        TaskID tid;
         unpack<M2S_KILL_TASK>(fid, tid);
         LOG(INFO) << "Killing task " << fid << ":" << tid;
         if (Executor *ex = getExecutor(fid)) {
@@ -245,6 +236,8 @@ void Slave::operator () ()
       }
 
       case M2S_FRAMEWORK_MESSAGE: {
+        FrameworkID fid;
+        FrameworkMessage message;
         unpack<M2S_FRAMEWORK_MESSAGE>(fid, message);
         if (Executor *ex = getExecutor(fid)) {
           send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
@@ -256,6 +249,7 @@ void Slave::operator () ()
       }
 
       case M2S_KILL_FRAMEWORK: {
+        FrameworkID fid;
         unpack<M2S_KILL_FRAMEWORK>(fid);
         LOG(INFO) << "Asked to kill framework " << fid;
         Framework *fw = getFramework(fid);
@@ -265,6 +259,7 @@ void Slave::operator () ()
       }
 
       case E2S_REGISTER_EXECUTOR: {
+        FrameworkID fid;
         unpack<E2S_REGISTER_EXECUTOR>(fid);
         LOG(INFO) << "Got executor registration for framework " << fid;
         if (Framework *fw = getFramework(fid)) {
@@ -292,6 +287,10 @@ void Slave::operator () ()
       }
 
       case E2S_STATUS_UPDATE: {
+        FrameworkID fid;
+        TaskID tid;
+        TaskState taskState;
+        string data;
         unpack<E2S_STATUS_UPDATE>(fid, tid, taskState, data);
         LOG(INFO) << "Got status update for task " << fid << ":" << tid;
         if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
@@ -312,6 +311,8 @@ void Slave::operator () ()
       }
 
       case E2S_FRAMEWORK_MESSAGE: {
+        FrameworkID fid;
+        FrameworkMessage message;
         unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
         // Set slave ID in case framework omitted it
         message.slaveId = this->id;
@@ -337,7 +338,7 @@ void Slave::operator () ()
 	    if (from() == ex->pid) {
 	      LOG(INFO) << "Executor for framework " << ex->frameworkId
 			<< " disconnected";
-	      Framework *framework = getFramework(fid);
+	      Framework *framework = getFramework(ex->frameworkId);
 	      if (framework != NULL) {
 		send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
 		killFramework(framework);
@@ -352,18 +353,22 @@ void Slave::operator () ()
 
       case M2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by master: " << from();
+        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+        foreachpair (_, Framework *framework, frameworksCopy) {
+          killFramework(framework);
+        }
         return;
       }
 
       case S2S_SHUTDOWN: {
         LOG(INFO) << "Asked to shut down by " << from();
+        unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+        foreachpair (_, Framework *framework, frameworksCopy) {
+          killFramework(framework);
+        }
         return;
       }
 
-      case PROCESS_TIMEOUT: {
-	break;
-      }
-
       default: {
         LOG(ERROR) << "Received unknown message ID " << msgid()
                    << " from " << from();
@@ -374,13 +379,6 @@ void Slave::operator () ()
 }
 
 
-IsolationModule * Slave::createIsolationModule()
-{
-  LOG(INFO) << "Creating \"" << isolationType << "\" isolation module";
-  return IsolationModuleFactory::instantiate(isolationType, this);
-}
-
-
 Framework * Slave::getFramework(FrameworkID frameworkId)
 {
   FrameworkMap::iterator it = frameworks.find(frameworkId);
@@ -440,6 +438,10 @@ void Slave::killFramework(Framework *fw)
   // If an executor is running, tell it to exit and kill it
   if (Executor *ex = getExecutor(fw->id)) {
     send(ex->pid, pack<S2E_KILL_EXECUTOR>());
+    // TODO(benh): There really isn't much time between when an
+    // executor gets a S2E_KILL_EXECUTOR message and the isolation
+    // module goes and kills it. We should really think about making
+    // the semantics of this better.
     removeExecutor(fw->id, true);
   }
   frameworks.erase(fw->id);

Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun  5 05:39:43 2011
@@ -30,6 +30,7 @@
 
 #include "fatal.hpp"
 #include "foreach.hpp"
+#include "isolation_module.hpp"
 #include "messages.hpp"
 #include "params.hpp"
 #include "resources.hpp"
@@ -54,10 +55,6 @@ using boost::unordered_map;
 using boost::unordered_set;
 
 
-// Forward declarations
-class IsolationModule;
-
-
 // A description of a task that is yet to be launched
 struct TaskDescription
 {
@@ -164,19 +161,16 @@ public:
   typedef unordered_map<FrameworkID, Framework*> FrameworkMap;
   typedef unordered_map<FrameworkID, Executor*> ExecutorMap;
   
-  bool isFT;
   PID master;
   SlaveID id;
   Resources resources;
   bool local;
   FrameworkMap frameworks;
   ExecutorMap executors;  // Invariant: framework will exist if executor exists
-  string isolationType;
   IsolationModule *isolationModule;
 
 public:
-  Slave(Resources resources, bool local,
-	const string& isolationType = "process");
+  Slave(Resources resources, bool local, IsolationModule *isolationModule);
 
   virtual ~Slave();
 
@@ -207,11 +201,6 @@ protected:
 
   // Kill a framework (including its executor)
   void killFramework(Framework *fw);
-
-  // Create the slave's isolation module; this method is virtual so that
-  // it is easy to override in tests
-  virtual IsolationModule * createIsolationModule();
-
 };
 
 }}}

Modified: incubator/mesos/trunk/src/slave_main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave_main.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave_main.cpp (original)
+++ incubator/mesos/trunk/src/slave_main.cpp Sun Jun  5 05:39:43 2011
@@ -1,5 +1,6 @@
 #include <getopt.h>
 
+#include "isolation_module_factory.hpp"
 #include "slave.hpp"
 #include "slave_webui.hpp"
 
@@ -85,10 +86,16 @@ int main(int argc, char **argv)
   FLAGS_logbufsecs = 1;
   google::InitGoogleLogging(argv[0]);
 
+  LOG(INFO) << "Creating \"" << isolation << "\" isolation module";
+  IsolationModule *isolationModule = IsolationModule::create(isolation);
+
+  if (isolationModule == NULL)
+    fatal("unrecognized isolation type: %s", isolation);
+
   LOG(INFO) << "Build: " << BUILD_DATE << " by " << BUILD_USER;
   LOG(INFO) << "Starting Nexus slave";
 
-  Slave* slave = new Slave(resources, false, isolation);
+  Slave* slave = new Slave(resources, false, isolationModule);
   PID pid = Process::spawn(slave);
 
   MasterDetector *detector = MasterDetector::create(url, pid, false, quiet);
@@ -103,5 +110,9 @@ int main(int argc, char **argv)
 
   MasterDetector::destroy(detector);
 
+  IsolationModule::destroy(isolationModule);
+
+  delete slave;
+
   return 0;
 }

Modified: incubator/mesos/trunk/src/solaris_project_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/solaris_project_isolation_module.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/solaris_project_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/solaris_project_isolation_module.cpp Sun Jun  5 05:39:43 2011
@@ -27,12 +27,12 @@ using namespace nexus::internal;
 using namespace nexus::internal::slave;
 
 
-SolarisProjectIsolationModule::SolarisProjectIsolationModule(Slave* slave)
-  : ProcessBasedIsolationModule(slave)
+SolarisProjectIsolationModule::SolarisProjectIsolationModule()
 {
   // Launch the communicator module, which will start the projd's.
   // TODO: It would be nice to not return from the constructor
   // until the communicator is up and running.
+  // TODO(*): Not great to let this escape from constructor.
   comm = new Communicator(this);
   Process::spawn(comm);
 }
@@ -44,6 +44,12 @@ SolarisProjectIsolationModule::~SolarisP
 }
 
 
+void SolarisProjectIsolationModule::initialize(Slave* slave)
+{
+  ProcessBasedIsolationModule::initialize(slave);
+}
+
+
 void SolarisProjectIsolationModule::startExecutor(Framework* fw)
 {
   // Figure out which project to use.

Modified: incubator/mesos/trunk/src/solaris_project_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/solaris_project_isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/solaris_project_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/solaris_project_isolation_module.hpp Sun Jun  5 05:39:43 2011
@@ -53,10 +53,12 @@ protected:
   Communicator* comm;
 
 public:
-  SolarisProjectIsolationModule(Slave* slave);
+  SolarisProjectIsolationModule();
 
   virtual ~SolarisProjectIsolationModule();
 
+  virtual void initialize(Slave* slave);
+
   virtual void startExecutor(Framework* framework);
 
   virtual void killExecutor(Framework* framework);

Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun  5 05:39:43 2011
@@ -3,8 +3,12 @@
 #include <boost/lexical_cast.hpp>
 
 #include "master.hpp"
+#include "slave.hpp"
+#include "nexus_exec.hpp"
 #include "nexus_sched.hpp"
 #include "nexus_local.hpp"
+#include "isolation_module.hpp"
+#include "process_based_isolation_module.hpp"
 
 using std::string;
 using std::vector;
@@ -16,6 +20,9 @@ using namespace nexus::internal;
 
 using nexus::internal::master::Master;
 using nexus::internal::slave::Slave;
+using nexus::internal::slave::Framework;
+using nexus::internal::slave::IsolationModule;
+using nexus::internal::slave::ProcessBasedIsolationModule;
 
 
 class NoopScheduler : public Scheduler
@@ -324,7 +331,8 @@ TEST(MasterTest, SlaveLost)
   Master m;
   PID master = Process::spawn(&m);
 
-  Slave s(Resources(2, 1 * Gigabyte), true);
+  ProcessBasedIsolationModule isolationModule;
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
   PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
@@ -443,7 +451,7 @@ public:
     vector<TaskDescription> tasks;
     ASSERT_TRUE(offers.size() == 1);
     const SlaveOffer &offer = offers[0];
-    TaskDescription desc(0, offer.slaveId, "", map<string, string>(), "");
+    TaskDescription desc(0, offer.slaveId, "", offer.params, "");
     tasks.push_back(desc);
     d->replyToOffer(id, tasks, map<string, string>());
     Process::post(slave, S2S_SHUTDOWN);
@@ -476,7 +484,8 @@ TEST(MasterTest, OfferRescinded)
   Master m;
   PID master = Process::spawn(&m);
 
-  Slave s(Resources(2, 1 * Gigabyte), true);
+  ProcessBasedIsolationModule isolationModule;
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
   PID slave = Process::spawn(&s);
 
   BasicMasterDetector detector(master, slave, true);
@@ -555,3 +564,108 @@ TEST(MasterTest, SlavePartitioned)
 
   Process::filter(NULL);
 }
+
+
+class TaskRunningScheduler : public Scheduler
+{
+public:
+  bool statusUpdateCalled;
+  
+  TaskRunningScheduler()
+    : statusUpdateCalled(false) {}
+
+  virtual ~TaskRunningScheduler() {}
+
+  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+    return ExecutorInfo("noexecutor", "");
+  }
+
+  virtual void resourceOffer(SchedulerDriver* d,
+                             OfferID id,
+                             const vector<SlaveOffer>& offers) {
+    LOG(INFO) << "TaskRunningScheduler got a slot offer";
+    vector<TaskDescription> tasks;
+    ASSERT_TRUE(offers.size() == 1);
+    const SlaveOffer &offer = offers[0];
+    TaskDescription desc(0, offer.slaveId, "", offer.params, "");
+    tasks.push_back(desc);
+    d->replyToOffer(id, tasks, map<string, string>());
+  }
+
+  virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
+    EXPECT_EQ(status.state, TASK_RUNNING);
+    statusUpdateCalled = true;
+    d->stop();
+  }
+};
+
+
+class TaskRunningExecutor : public Executor {};
+
+
+class TaskRunningIsolationModule : public IsolationModule
+{
+public:
+  TaskRunningExecutor *executor;
+  NexusExecutorDriver *driver;
+  string pid;
+
+  TaskRunningIsolationModule() : executor(NULL), driver(NULL) {}
+
+  virtual ~TaskRunningIsolationModule() {}
+
+  virtual void initialize(Slave *slave) {
+    pid = slave->getPID();
+  }
+
+  virtual void startExecutor(Framework *framework) {
+    // TODO(benh): Cleanup the way we launch local drivers!
+    setenv("NEXUS_LOCAL", "1", 1);
+    setenv("NEXUS_SLAVE_PID", pid.c_str(), 1);
+    setenv("NEXUS_FRAMEWORK_ID", "0-0", 1);
+
+    executor = new TaskRunningExecutor();
+    driver = new NexusExecutorDriver(executor);
+    driver->start();
+  }
+
+  virtual void killExecutor(Framework* framework) {
+    driver->stop();
+    driver->join();
+    delete driver;
+    delete executor;
+
+    // TODO(benh): Cleanup the way we launch local drivers!
+    unsetenv("NEXUS_LOCAL");
+    unsetenv("NEXUS_SLAVE_PID");
+    unsetenv("NEXUS_FRAMEWORK_ID");
+  }
+};
+
+
+TEST(MasterTest, TaskRunning)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID master = Process::spawn(&m);
+
+  TaskRunningIsolationModule isolationModule;
+  Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+  PID slave = Process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  TaskRunningScheduler sched;
+  NexusSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_TRUE(sched.statusUpdateCalled);
+
+  Process::post(slave, S2S_SHUTDOWN);
+  Process::wait(slave);
+
+  Process::post(master, M2M_SHUTDOWN);
+  Process::wait(master);
+}