You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:39:44 UTC
svn commit: r1131841 - in /incubator/mesos/trunk: include/ src/ src/tests/
Author: benh
Date: Sun Jun 5 05:39:43 2011
New Revision: 1131841
URL: http://svn.apache.org/viewvc?rev=1131841&view=rev
Log:
Making testing Executors and NexusExecutorDriver possible.
Added:
incubator/mesos/trunk/src/isolation_module.cpp
Modified:
incubator/mesos/trunk/include/nexus_exec.hpp
incubator/mesos/trunk/include/nexus_sched.hpp
incubator/mesos/trunk/src/Makefile.in
incubator/mesos/trunk/src/isolation_module.hpp
incubator/mesos/trunk/src/lxc_isolation_module.cpp
incubator/mesos/trunk/src/lxc_isolation_module.hpp
incubator/mesos/trunk/src/nexus_exec.cpp
incubator/mesos/trunk/src/nexus_local.cpp
incubator/mesos/trunk/src/process_based_isolation_module.cpp
incubator/mesos/trunk/src/process_based_isolation_module.hpp
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/slave.hpp
incubator/mesos/trunk/src/slave_main.cpp
incubator/mesos/trunk/src/solaris_project_isolation_module.cpp
incubator/mesos/trunk/src/solaris_project_isolation_module.hpp
incubator/mesos/trunk/src/tests/test_master.cpp
Modified: incubator/mesos/trunk/include/nexus_exec.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_exec.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_exec.hpp (original)
+++ incubator/mesos/trunk/include/nexus_exec.hpp Sun Jun 5 05:39:43 2011
@@ -65,8 +65,11 @@ class ExecutorDriver
public:
virtual ~ExecutorDriver() {}
- // Connect to a slave and run the scheduler until it is shut down
- virtual int run() { return -1; }
+ // Lifecycle methods
+ virtual int start() { return -1; }
+ virtual int stop() { return -1; }
+ virtual int join() { return -1; }
+ virtual int run() { return -1; } // Start and then join driver
// Communication methods from executor to Nexus
virtual int sendStatusUpdate(const TaskStatus& status) { return -1; }
@@ -86,7 +89,12 @@ public:
NexusExecutorDriver(Executor* executor);
virtual ~NexusExecutorDriver();
- virtual int run();
+ // Lifecycle methods
+ virtual int start();
+ virtual int stop();
+ virtual int join();
+ virtual int run(); // Start and then join driver
+
virtual int sendStatusUpdate(const TaskStatus& status);
virtual int sendFrameworkMessage(const FrameworkMessage& message);
@@ -100,9 +108,15 @@ private:
// LibProcess process for communicating with slave
internal::ExecutorProcess* process;
+
+ // Are we currently registered with the slave
+ bool running;
// Mutex to enforce all non-callbacks are execute serially
pthread_mutex_t mutex;
+
+ // Condition variable for waiting until driver terminates
+ pthread_cond_t cond;
};
} /* namespace nexus { */
Modified: incubator/mesos/trunk/include/nexus_sched.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_sched.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_sched.hpp (original)
+++ incubator/mesos/trunk/include/nexus_sched.hpp Sun Jun 5 05:39:43 2011
@@ -55,9 +55,9 @@ public:
// Lifecycle methods
virtual int start() { return -1; }
- virtual int join() { return -1; }
virtual int stop() { return -1; }
- virtual int run() { return -1; } // Start and then join scheduler
+ virtual int join() { return -1; }
+ virtual int run() { return -1; } // Start and then join driver
// Communication methods
virtual int sendFrameworkMessage(const FrameworkMessage& message) { return -1; }
@@ -85,9 +85,9 @@ public:
// Lifecycle methods
virtual int start();
- virtual int join();
virtual int stop();
- virtual int run(); // Start and then join scheduler
+ virtual int join();
+ virtual int run(); // Start and then join driver
// Communication methods
virtual int sendFrameworkMessage(const FrameworkMessage& message);
@@ -121,7 +121,7 @@ private:
// Mutex to enforce all non-callbacks are execute serially
pthread_mutex_t mutex;
- // Condition variable for waiting until scheduler terminates
+ // Condition variable for waiting until driver terminates
pthread_cond_t cond;
};
Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun 5 05:39:43 2011
@@ -108,7 +108,7 @@ NEXUS_LIB = libnexus++.a
NEXUS_LIBS = $(SCHED_LIB) $(EXEC_LIB) $(NEXUS_LIB)
MASTER_OBJ = master.o allocator_factory.o simple_allocator.o
-SLAVE_OBJ = slave.o launcher.o isolation_module_factory.o \
+SLAVE_OBJ = slave.o launcher.o isolation_module.o \
process_based_isolation_module.o
COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o master_detector.o url_processor.o zookeeper.o
EXEC_LIB_OBJ = nexus_exec.o
Added: incubator/mesos/trunk/src/isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/isolation_module.cpp?rev=1131841&view=auto
==============================================================================
--- incubator/mesos/trunk/src/isolation_module.cpp (added)
+++ incubator/mesos/trunk/src/isolation_module.cpp Sun Jun 5 05:39:43 2011
@@ -0,0 +1,34 @@
+#include "isolation_module.hpp"
+#include "process_based_isolation_module.hpp"
+#ifdef __sun__
+#include "solaris_project_isolation_module.hpp"
+#elif __linux__
+#include "lxc_isolation_module.hpp"
+#endif
+
+using std::string;
+
+using namespace nexus::internal::slave;
+
+
+IsolationModule * IsolationModule::create(const string &type)
+{
+ if (type == "process")
+ return new ProcessBasedIsolationModule();
+#ifdef __sun__
+ else if (type == "project")
+ return new SolarisProjectIsolationModule();
+#elif __linux__
+ else if (type == "lxc")
+ return new LxcIsolationModule>();
+#endif
+
+ return NULL;
+}
+
+
+void IsolationModule::destroy(IsolationModule *module)
+{
+ if (module != NULL)
+ delete module;
+}
Modified: incubator/mesos/trunk/src/isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/isolation_module.hpp Sun Jun 5 05:39:43 2011
@@ -1,14 +1,25 @@
#ifndef __ISOLATION_MODULE_HPP__
#define __ISOLATION_MODULE_HPP__
-#include "slave.hpp"
+#include <string>
+
namespace nexus { namespace internal { namespace slave {
+class Framework;
+class Slave;
+
+
class IsolationModule {
public:
+ static IsolationModule * create(const std::string &type);
+ static void destroy(IsolationModule *module);
+
virtual ~IsolationModule() {}
+ // Called when during slave initialization.
+ virtual void initialize(Slave *slave) {}
+
// Called when a new framework is launched on the slave.
virtual void frameworkAdded(Framework *framework) {}
@@ -26,7 +37,7 @@ public:
// Update the resource limits for a given framework. This method will
// be called only after an executor for the framework is started.
- virtual void resourcesChanged(Framework *framework) = 0;
+ virtual void resourcesChanged(Framework *framework) {}
};
}}}
Modified: incubator/mesos/trunk/src/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/lxc_isolation_module.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/lxc_isolation_module.cpp Sun Jun 5 05:39:43 2011
@@ -1,8 +1,5 @@
#include "lxc_isolation_module.hpp"
-#include <stdlib.h>
-#include <unistd.h>
-
#include <algorithm>
#include "foreach.hpp"
@@ -27,68 +24,58 @@ using namespace nexus;
using namespace nexus::internal;
using namespace nexus::internal::slave;
-namespace {
-const int32_t CPU_SHARES_PER_CPU = 1024;
-const int32_t MIN_CPU_SHARES = 10;
-const int64_t MIN_RSS = 128 * Megabyte;
+LxcIsolationModule::LxcIsolationModule()
+ : initialized(false) {}
+
+LxcIsolationModule::~LxcIsolationModule()
+{
+ // We want to wait until the reaper has completed because it
+ // accesses 'this' in order to make callbacks ... deleting 'this'
+ // could thus lead to a seg fault!
+ if (initialized) {
+ CHECK(reaper != NULL);
+ Process::post(reaper->getPID(), SHUTDOWN_REAPER);
+ Process::wait(reaper);
+ delete reaper;
+ }
}
-LxcIsolationModule::LxcIsolationModule(Slave* slave)
+void LxcIsolationModule::initialize(Slave *slave)
{
this->slave = slave;
reaper = new Reaper(this);
Process::spawn(reaper);
-
- // Run a basic check to see whether Linux Container tools are available
- if (system("lxc-version > /dev/null") != 0) {
- LOG(FATAL) << "Could not run lxc-version; make sure Linux Container "
- << "tools are installed";
- }
- // Check that we are root (it might also be possible to create Linux
- // containers without being root, but we can support that later)
- if (getuid() != 0) {
- LOG(FATAL) << "LXC isolation module requires slave to run as root";
- }
+ initialized = true;
}
-LxcIsolationModule::~LxcIsolationModule()
-{
- // We want to wait until the reaper has completed because it
- // accesses 'this' in order to make callbacks ... deleting 'this'
- // could thus lead to a seg fault!
- Process::post(reaper->getPID(), SHUTDOWN_REAPER);
- Process::wait(reaper);
- delete reaper;
-}
-
-void LxcIsolationModule::frameworkAdded(Framework* fw)
+void LxcIsolationModule::frameworkAdded(Framework* framework)
{
- infos[fw->id] = new FrameworkInfo();
- infos[fw->id]->lxcExecutePid = -1;
- infos[fw->id]->container = "";
- fw->executorStatus = "No executor running";
+ lxcExecutePid[framework->id] = -1;
+ container[framework->id] = "";
+ framework->executorStatus = "No executor running";
}
-void LxcIsolationModule::frameworkRemoved(Framework* fw)
+void LxcIsolationModule::frameworkRemoved(Framework *framework)
{
- if (infos.find(fw->id) != infos.end()) {
- delete infos[fw->id];
- infos.erase(fw->id);
- }
+ lxcExecutePid.erase(framework->id);
+ container.erase(framework->id);
}
void LxcIsolationModule::startExecutor(Framework *fw)
{
+ if (!initialized)
+ LOG(FATAL) << "Cannot launch executors before initialization!";
+
LOG(INFO) << "Starting executor for framework " << fw->id << ": "
<< fw->executorInfo.uri;
- CHECK(infos[fw->id]->lxcExecutePid == -1 && infos[fw->id]->container == "");
+ CHECK(lxcExecutePid[fw->id] == -1 && container[fw->id] == "");
// Get location of Nexus install in order to find nexus-launcher.
const char *nexusHome = getenv("NEXUS_HOME");
@@ -101,7 +88,7 @@ void LxcIsolationModule::startExecutor(F
oss << "nexus.slave-" << slave->id << ".framework-" << fw->id;
string containerName = oss.str();
- infos[fw->id]->container = containerName;
+ container[fw->id] = containerName;
fw->executorStatus = "Container: " + containerName;
// Run lxc-execute nexus-launcher using a fork-exec (since lxc-execute
@@ -113,7 +100,7 @@ void LxcIsolationModule::startExecutor(F
if (pid) {
// In parent process
- infos[fw->id]->lxcExecutePid = pid;
+ lxcExecutePid[fw->id] = pid;
LOG(INFO) << "Started lxc-execute, pid = " << pid;
int status;
} else {
@@ -131,7 +118,7 @@ void LxcIsolationModule::startExecutor(F
setenv("NEXUS_EXECUTOR_URI", fw->executorInfo.uri.c_str(), 1);
setenv("NEXUS_USER", fw->user.c_str(), 1);
setenv("NEXUS_SLAVE_PID", lexical_cast<string>(slave->self()).c_str(), 1);
- setenv("NEXUS_REDIRECT_IO", slave->local ? "0" : "1", 1);
+ setenv("NEXUS_REDIRECT_IO", slave->local ? "1" : "0", 1);
setenv("NEXUS_WORK_DIRECTORY", slave->getWorkDirectory(fw->id).c_str(), 1);
// Run lxc-execute.
@@ -146,13 +133,12 @@ void LxcIsolationModule::startExecutor(F
void LxcIsolationModule::killExecutor(Framework* fw)
{
- string container = infos[fw->id]->container;
- if (container != "") {
- LOG(INFO) << "Stopping container " << container;
- int ret = shell("lxc-stop -n %s", container.c_str());
+ if (container[fw->id] != "") {
+ LOG(INFO) << "Stopping container " << container[fw->id];
+ int ret = shell("lxc-stop -n %s", container[fw->id].c_str());
if (ret != 0)
LOG(ERROR) << "lxc-stop returned " << ret;
- infos[fw->id]->container = "";
+ container[fw->id] = "";
fw->executorStatus = "No executor running";
}
}
@@ -160,45 +146,28 @@ void LxcIsolationModule::killExecutor(Fr
void LxcIsolationModule::resourcesChanged(Framework* fw)
{
- if (infos[fw->id]->container != "") {
- // For now, just try setting the CPUs and memory right away, and kill the
- // framework if this fails.
- // A smarter thing to do might be to only update them periodically in a
- // separate thread, and to give frameworks some time to scale down their
- // memory usage.
-
- int32_t cpuShares = max(CPU_SHARES_PER_CPU * fw->resources.cpus,
- MIN_CPU_SHARES);
- if (!setResourceLimit(fw, "cpu.shares", cpuShares)) {
- slave->removeExecutor(fw->id, true);
- return;
- }
-
- int64_t rssLimit = max(fw->resources.mem, MIN_RSS);
- if (!setResourceLimit(fw, "memory.limit_in_bytes", rssLimit)) {
- slave->removeExecutor(fw->id, true);
- return;
- }
- }
-}
-
+ if (container[fw->id] != "") {
+ // For now, just try setting the CPUs and mem right away.
+ // A slightly smarter thing might be to only update them periodically.
+ int ret;
+
+ int32_t cpuShares = max(1024 * fw->resources.cpus, 10);
+ LOG(INFO) << "Setting CPU shares for " << fw->id << " to " << cpuShares;
+ ret = shell("lxc-cgroup -n %s cpu.shares %d",
+ container[fw->id].c_str(), cpuShares);
+ if (ret != 0)
+ LOG(ERROR) << "lxc-cgroup returned " << ret;
-bool LxcIsolationModule::setResourceLimit(Framework* fw,
- const string& property,
- int64_t value)
-{
- LOG(INFO) << "Setting " << property << " for framework " << fw->id
- << " to " << value;
- int ret = shell("lxc-cgroup -n %s %s %lld",
- infos[fw->id]->container.c_str(),
- property.c_str(),
- value);
- if (ret != 0) {
- LOG(ERROR) << "Failed to set " << property << " for framework " << fw->id
- << ": lxc-cgroup returned " << ret;
- return false;
- } else {
- return true;
+ int64_t rssLimit = max(fw->resources.mem, 128 * Megabyte);
+ LOG(INFO) << "Setting RSS limit for " << fw->id << " to " << rssLimit;
+ ret = shell("lxc-cgroup -n %s memory.limit_in_bytes %lld",
+ container[fw->id].c_str(), rssLimit);
+ if (ret != 0)
+ LOG(ERROR) << "lxc-cgroup returned " << ret;
+
+ // TODO: Decreasing the RSS limit will fail if the current RSS is too
+ // large and memory can't be swapped out. In that case, we should
+ // either freeze the container before changing RSS, or just kill it.
}
}
@@ -238,10 +207,10 @@ void LxcIsolationModule::Reaper::operato
pid_t pid;
int status;
if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
- foreachpair (FrameworkID fid, FrameworkInfo* info, module->infos) {
- if (info->lxcExecutePid == pid) {
- info->container = "";
- info->lxcExecutePid = -1;
+ foreachpair (FrameworkID fid, pid_t& fwPid, module->lxcExecutePid) {
+ if (fwPid == pid) {
+ module->container[fid] = "";
+ module->lxcExecutePid[fid] = -1;
LOG(INFO) << "Telling slave of lost framework " << fid;
// TODO(benh): This is broken if/when libprocess is parallel!
module->slave->executorExited(fid, status);
Modified: incubator/mesos/trunk/src/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/lxc_isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/lxc_isolation_module.hpp Sun Jun 5 05:39:43 2011
@@ -35,15 +35,18 @@ public:
};
protected:
+ bool initialized;
Slave* slave;
unordered_map<FrameworkID, FrameworkInfo*> infos;
Reaper* reaper;
public:
- LxcIsolationModule(Slave* slave);
+ LxcIsolationModule();
virtual ~LxcIsolationModule();
+ virtual void initialize(Slave* slave);
+
virtual void frameworkAdded(Framework* framework);
virtual void frameworkRemoved(Framework* framework);
Modified: incubator/mesos/trunk/src/nexus_exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_exec.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_exec.cpp (original)
+++ incubator/mesos/trunk/src/nexus_exec.cpp Sun Jun 5 05:39:43 2011
@@ -41,13 +41,18 @@ protected:
Executor* executor;
FrameworkID fid;
SlaveID sid;
+ bool local;
+
+ volatile bool terminate;
public:
ExecutorProcess(const PID& _slave,
NexusExecutorDriver* _driver,
Executor* _executor,
- FrameworkID _fid)
- : slave(_slave), driver(_driver), executor(_executor), fid(_fid) {}
+ FrameworkID _fid,
+ bool _local)
+ : slave(_slave), driver(_driver), executor(_executor),
+ fid(_fid), local(_local), terminate(false) {}
protected:
void operator() ()
@@ -60,7 +65,13 @@ protected:
// process any other messages. This is especially tricky if a
// slave dies since we won't handle the PROCESS_EXIT message in
// a timely manner (if at all).
- switch(receive()) {
+
+ // Check for terminate in the same way as SchedulerProcess. See
+ // comments there for an explanation of why this is necessary.
+ if (terminate)
+ return;
+
+ switch(receive(2)) {
case S2E_REGISTER_REPLY: {
string host;
string fwName;
@@ -99,7 +110,9 @@ protected:
case S2E_KILL_EXECUTOR: {
invoke(bind(&Executor::shutdown, executor, driver));
- exit(0);
+ if (!local)
+ exit(0);
+ break;
}
case PROCESS_EXIT: {
@@ -111,7 +124,8 @@ protected:
// ourself) hoping to clean up any processes this executor
// launched itself.
// TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
- killpg(0, SIGKILL);
+ if (!local)
+ killpg(0, SIGKILL);
}
default: {
@@ -134,17 +148,16 @@ protected:
// Default implementation of error() that logs to stderr and exits
-void Executor::error(ExecutorDriver*, int code, const string &message)
+void Executor::error(ExecutorDriver* driver, int code, const string &message)
{
cerr << "Nexus error: " << message
<< " (error code: " << code << ")" << endl;
- // TODO(*): Don't exit here, let errors be recoverable. (?)
- exit(1);
+ driver->stop();
}
NexusExecutorDriver::NexusExecutorDriver(Executor* _executor)
- : executor(_executor)
+ : executor(_executor), running(false)
{
// Create mutex and condition variable
pthread_mutexattr_t attr;
@@ -152,28 +165,49 @@ NexusExecutorDriver::NexusExecutorDriver
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mutex, &attr);
pthread_mutexattr_destroy(&attr);
+ pthread_cond_init(&cond, 0);
}
NexusExecutorDriver::~NexusExecutorDriver()
{
pthread_mutex_destroy(&mutex);
+ pthread_cond_destroy(&cond);
+
+ Process::wait(process);
+ delete process;
}
-int NexusExecutorDriver::run()
+int NexusExecutorDriver::start()
{
+ Lock lock(&mutex);
+
+ if (running) {
+ return -1;
+ }
+
// Set stream buffering mode to flush on newlines so that we capture logs
// from user processes even when output is redirected to a file.
setvbuf(stdout, 0, _IOLBF, 0);
setvbuf(stderr, 0, _IOLBF, 0);
+ bool local;
+
PID slave;
FrameworkID fid;
char* value;
std::istringstream iss;
+ /* Check if this is local (for example, for testing). */
+ value = getenv("NEXUS_LOCAL");
+
+ if (value != NULL)
+ local = true;
+ else
+ local = false;
+
/* Get slave PID from environment. */
value = getenv("NEXUS_SLAVE_PID");
@@ -196,23 +230,56 @@ int NexusExecutorDriver::run()
if (!(iss >> fid))
fatal("cannot parse NEXUS_FRAMEWORK_ID");
- process = new ExecutorProcess(slave, this, executor, fid);
+ process = new ExecutorProcess(slave, this, executor, fid, local);
- Process::wait(Process::spawn(process));
+ Process::spawn(process);
- process = NULL;
+ running = true;
return 0;
}
+int NexusExecutorDriver::stop()
+{
+ Lock lock(&mutex);
+
+ if (!running) {
+ return -1;
+ }
+
+ process->terminate = true;
+
+ running = false;
+
+ pthread_cond_signal(&cond);
+
+ return 0;
+}
+
+
+int NexusExecutorDriver::join()
+{
+ Lock lock(&mutex);
+ while (running)
+ pthread_cond_wait(&cond, &mutex);
+
+ return 0;
+}
+
+
+int NexusExecutorDriver::run()
+{
+ int ret = start();
+ return ret != 0 ? ret : join();
+}
+
+
int NexusExecutorDriver::sendStatusUpdate(const TaskStatus &status)
{
Lock lock(&mutex);
- /* TODO(benh): Increment ref count on process. */
-
- if (!process) {
+ if (!running) {
//executor->error(this, EINVAL, "Executor has exited");
return -1;
}
@@ -223,8 +290,6 @@ int NexusExecutorDriver::sendStatusUpdat
status.state,
status.data));
- /* TODO(benh): Decrement ref count on process. */
-
return 0;
}
@@ -233,9 +298,7 @@ int NexusExecutorDriver::sendFrameworkMe
{
Lock lock(&mutex);
- /* TODO(benh): Increment ref count on process. */
-
- if (!process) {
+ if (!running) {
//executor->error(this, EINVAL, "Executor has exited");
return -1;
}
@@ -244,8 +307,6 @@ int NexusExecutorDriver::sendFrameworkMe
process->pack<E2S_FRAMEWORK_MESSAGE>(process->fid,
message));
- /* TODO(benh): Decrement ref count on process. */
-
return 0;
}
Modified: incubator/mesos/trunk/src/nexus_local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_local.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_local.cpp (original)
+++ incubator/mesos/trunk/src/nexus_local.cpp Sun Jun 5 05:39:43 2011
@@ -3,14 +3,20 @@
#include <tuple.hpp>
+#include <map>
#include <vector>
+#include "foreach.hpp"
#include "nexus_local.hpp"
+#include "process_based_isolation_module.hpp"
+using std::map;
using std::vector;
using nexus::internal::master::Master;
using nexus::internal::slave::Slave;
+using nexus::internal::slave::IsolationModule;
+using nexus::internal::slave::ProcessBasedIsolationModule;
using namespace nexus::internal;
@@ -30,7 +36,7 @@ void initialize_glog() {
namespace nexus { namespace internal { namespace local {
static Master *master = NULL;
-static vector<Slave*> *slaves = NULL;
+static map<IsolationModule*, Slave*> slaves;
static MasterDetector *detector = NULL;
@@ -47,15 +53,17 @@ PID launch(int numSlaves, int32_t cpus,
}
master = new Master();
- slaves = new vector<Slave*>();
PID pid = Process::spawn(master);
vector<PID> pids;
for (int i = 0; i < numSlaves; i++) {
- Slave* slave = new Slave(Resources(cpus, mem), true);
- slaves->push_back(slave);
+ // TODO(benh): Create a local isolation module?
+ ProcessBasedIsolationModule *isolationModule =
+ new ProcessBasedIsolationModule();
+ Slave* slave = new Slave(Resources(cpus, mem), true, isolationModule);
+ slaves[isolationModule] = slave;
pids.push_back(Process::spawn(slave));
}
@@ -72,15 +80,20 @@ void shutdown()
delete master;
master = NULL;
- for (int i = 0; i < slaves->size(); i++) {
- Slave *slave = slaves->at(i);
+ // TODO(benh): Ugh! Because the isolation module calls back into the
+ // slave (not the best design) we can't delete the slave until we
+ // have deleted the isolation module. But since the slave calls into
+ // the isolation module, we can't delete the isolation module until
+ // we have stopped the slave.
+
+ foreachpair (IsolationModule *isolationModule, Slave *slave, slaves) {
Process::post(slave->getPID(), S2S_SHUTDOWN);
Process::wait(slave);
+ delete isolationModule;
delete slave;
}
- delete slaves;
- slaves = NULL;
+ slaves.clear();
delete detector;
detector = NULL;
Modified: incubator/mesos/trunk/src/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/process_based_isolation_module.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/process_based_isolation_module.cpp Sun Jun 5 05:39:43 2011
@@ -22,22 +22,29 @@ using namespace nexus::internal;
using namespace nexus::internal::slave;
-ProcessBasedIsolationModule::ProcessBasedIsolationModule(Slave* slave)
-{
- this->slave = slave;
- reaper = new Reaper(this);
- Process::spawn(reaper);
-}
+ProcessBasedIsolationModule::ProcessBasedIsolationModule()
+ : initialized(false) {}
ProcessBasedIsolationModule::~ProcessBasedIsolationModule()
{
- // We want to wait until the reaper has completed because it
+ // We need to wait until the reaper has completed because it
// accesses 'this' in order to make callbacks ... deleting 'this'
// could thus lead to a seg fault!
- Process::post(reaper->getPID(), SHUTDOWN_REAPER);
- Process::wait(reaper);
- delete reaper;
+ if (initialized) {
+ CHECK(reaper != NULL);
+ Process::post(reaper->getPID(), SHUTDOWN_REAPER);
+ Process::wait(reaper);
+ delete reaper;
+ }
+}
+
+void ProcessBasedIsolationModule::initialize(Slave *slave)
+{
+ this->slave = slave;
+ reaper = new Reaper(this);
+ Process::spawn(reaper);
+ initialized = true;
}
@@ -56,6 +63,9 @@ void ProcessBasedIsolationModule::framew
void ProcessBasedIsolationModule::startExecutor(Framework* framework)
{
+ if (!initialized)
+ LOG(FATAL) << "Cannot launch executors before initialization!";
+
LOG(INFO) << "Starting executor for framework " << framework->id << ": "
<< framework->executorInfo.uri;
CHECK(pgids[framework->id] == -1);
Modified: incubator/mesos/trunk/src/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/process_based_isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/process_based_isolation_module.hpp Sun Jun 5 05:39:43 2011
@@ -1,18 +1,18 @@
#ifndef __PROCESS_BASED_ISOLATION_MODULE_HPP__
#define __PROCESS_BASED_ISOLATION_MODULE_HPP__
-#include <string>
-
#include <sys/types.h>
#include <boost/unordered_map.hpp>
-#include "launcher.hpp"
#include "isolation_module.hpp"
+#include "launcher.hpp"
+#include "messages.hpp"
+#include "slave.hpp"
+
namespace nexus { namespace internal { namespace slave {
-using std::string;
using boost::unordered_map;
using nexus::internal::launcher::ExecutorLauncher;
@@ -33,15 +33,18 @@ public:
enum { SHUTDOWN_REAPER = NEXUS_MESSAGES };
protected:
+ bool initialized;
Slave* slave;
unordered_map<FrameworkID, pid_t> pgids;
Reaper* reaper;
public:
- ProcessBasedIsolationModule(Slave* slave);
+ ProcessBasedIsolationModule();
virtual ~ProcessBasedIsolationModule();
+ virtual void initialize(Slave *slave);
+
virtual void frameworkAdded(Framework* framework);
virtual void frameworkRemoved(Framework* framework);
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 05:39:43 2011
@@ -1,9 +1,6 @@
-#include <getopt.h>
-
#include <fstream>
#include <algorithm>
-#include "isolation_module_factory.hpp"
#include "slave.hpp"
#include "slave_webui.hpp"
@@ -64,17 +61,14 @@ public:
} /* namespace */
-Slave::Slave(Resources _resources, bool _local, const string &_isolationType)
+Slave::Slave(Resources _resources, bool _local,
+ IsolationModule *_isolationModule)
: id(""), resources(_resources), local(_local),
- isolationType(_isolationType), isolationModule(NULL) {}
+ isolationModule(_isolationModule) {}
Slave::~Slave()
{
- // TODO(matei): Add support for factory style destroy of objects!
- if (isolationModule != NULL)
- delete isolationModule;
-
// TODO(benh): Shut down and free executors?
}
@@ -123,15 +117,11 @@ void Slave::operator () ()
publicDns = hostname;
}
- FrameworkID fid;
- TaskID tid;
- TaskState taskState;
- Params params;
- FrameworkMessage message;
- string data;
+ // Initialize isolation module.
+ isolationModule->initialize(this);
while (true) {
- switch (receive(2)) {
+ switch (receive()) {
case NEW_MASTER_DETECTED: {
string masterSeq;
PID masterPid;
@@ -175,9 +165,6 @@ void Slave::operator () ()
unpack<M2S_REGISTER_REPLY>(this->id, interval);
LOG(INFO) << "Registered with master; given slave ID " << this->id;
link(spawn(new Heart(master, this->getPID(), this->id, interval)));
- isolationModule = createIsolationModule();
- if (!isolationModule)
- LOG(FATAL) << "Unrecognized isolation type: " << isolationType;
break;
}
@@ -194,8 +181,10 @@ void Slave::operator () ()
case M2S_RUN_TASK: {
FrameworkID fid;
+ TaskID tid;
string fwName, user, taskName, taskArg, fwPidStr;
ExecutorInfo execInfo;
+ Params params;
unpack<M2S_RUN_TASK>(fid, tid, fwName, user, execInfo,
taskName, taskArg, params, fwPidStr);
LOG(INFO) << "Got assigned task " << fid << ":" << tid;
@@ -230,6 +219,8 @@ void Slave::operator () ()
}
case M2S_KILL_TASK: {
+ FrameworkID fid;
+ TaskID tid;
unpack<M2S_KILL_TASK>(fid, tid);
LOG(INFO) << "Killing task " << fid << ":" << tid;
if (Executor *ex = getExecutor(fid)) {
@@ -245,6 +236,8 @@ void Slave::operator () ()
}
case M2S_FRAMEWORK_MESSAGE: {
+ FrameworkID fid;
+ FrameworkMessage message;
unpack<M2S_FRAMEWORK_MESSAGE>(fid, message);
if (Executor *ex = getExecutor(fid)) {
send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
@@ -256,6 +249,7 @@ void Slave::operator () ()
}
case M2S_KILL_FRAMEWORK: {
+ FrameworkID fid;
unpack<M2S_KILL_FRAMEWORK>(fid);
LOG(INFO) << "Asked to kill framework " << fid;
Framework *fw = getFramework(fid);
@@ -265,6 +259,7 @@ void Slave::operator () ()
}
case E2S_REGISTER_EXECUTOR: {
+ FrameworkID fid;
unpack<E2S_REGISTER_EXECUTOR>(fid);
LOG(INFO) << "Got executor registration for framework " << fid;
if (Framework *fw = getFramework(fid)) {
@@ -292,6 +287,10 @@ void Slave::operator () ()
}
case E2S_STATUS_UPDATE: {
+ FrameworkID fid;
+ TaskID tid;
+ TaskState taskState;
+ string data;
unpack<E2S_STATUS_UPDATE>(fid, tid, taskState, data);
LOG(INFO) << "Got status update for task " << fid << ":" << tid;
if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
@@ -312,6 +311,8 @@ void Slave::operator () ()
}
case E2S_FRAMEWORK_MESSAGE: {
+ FrameworkID fid;
+ FrameworkMessage message;
unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
// Set slave ID in case framework omitted it
message.slaveId = this->id;
@@ -337,7 +338,7 @@ void Slave::operator () ()
if (from() == ex->pid) {
LOG(INFO) << "Executor for framework " << ex->frameworkId
<< " disconnected";
- Framework *framework = getFramework(fid);
+ Framework *framework = getFramework(ex->frameworkId);
if (framework != NULL) {
send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
killFramework(framework);
@@ -352,18 +353,22 @@ void Slave::operator () ()
case M2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by master: " << from();
+ unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+ foreachpair (_, Framework *framework, frameworksCopy) {
+ killFramework(framework);
+ }
return;
}
case S2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by " << from();
+ unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+ foreachpair (_, Framework *framework, frameworksCopy) {
+ killFramework(framework);
+ }
return;
}
- case PROCESS_TIMEOUT: {
- break;
- }
-
default: {
LOG(ERROR) << "Received unknown message ID " << msgid()
<< " from " << from();
@@ -374,13 +379,6 @@ void Slave::operator () ()
}
-IsolationModule * Slave::createIsolationModule()
-{
- LOG(INFO) << "Creating \"" << isolationType << "\" isolation module";
- return IsolationModuleFactory::instantiate(isolationType, this);
-}
-
-
Framework * Slave::getFramework(FrameworkID frameworkId)
{
FrameworkMap::iterator it = frameworks.find(frameworkId);
@@ -440,6 +438,10 @@ void Slave::killFramework(Framework *fw)
// If an executor is running, tell it to exit and kill it
if (Executor *ex = getExecutor(fw->id)) {
send(ex->pid, pack<S2E_KILL_EXECUTOR>());
+ // TODO(benh): There really isn't much time between when an
+ // executor gets a S2E_KILL_EXECUTOR message and the isolation
+ // module goes and kills it. We should really think about making
+ // the semantics of this better.
removeExecutor(fw->id, true);
}
frameworks.erase(fw->id);
Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun 5 05:39:43 2011
@@ -30,6 +30,7 @@
#include "fatal.hpp"
#include "foreach.hpp"
+#include "isolation_module.hpp"
#include "messages.hpp"
#include "params.hpp"
#include "resources.hpp"
@@ -54,10 +55,6 @@ using boost::unordered_map;
using boost::unordered_set;
-// Forward declarations
-class IsolationModule;
-
-
// A description of a task that is yet to be launched
struct TaskDescription
{
@@ -164,19 +161,16 @@ public:
typedef unordered_map<FrameworkID, Framework*> FrameworkMap;
typedef unordered_map<FrameworkID, Executor*> ExecutorMap;
- bool isFT;
PID master;
SlaveID id;
Resources resources;
bool local;
FrameworkMap frameworks;
ExecutorMap executors; // Invariant: framework will exist if executor exists
- string isolationType;
IsolationModule *isolationModule;
public:
- Slave(Resources resources, bool local,
- const string& isolationType = "process");
+ Slave(Resources resources, bool local, IsolationModule *isolationModule);
virtual ~Slave();
@@ -207,11 +201,6 @@ protected:
// Kill a framework (including its executor)
void killFramework(Framework *fw);
-
- // Create the slave's isolation module; this method is virtual so that
- // it is easy to override in tests
- virtual IsolationModule * createIsolationModule();
-
};
}}}
Modified: incubator/mesos/trunk/src/slave_main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave_main.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave_main.cpp (original)
+++ incubator/mesos/trunk/src/slave_main.cpp Sun Jun 5 05:39:43 2011
@@ -1,5 +1,6 @@
#include <getopt.h>
+#include "isolation_module_factory.hpp"
#include "slave.hpp"
#include "slave_webui.hpp"
@@ -85,10 +86,16 @@ int main(int argc, char **argv)
FLAGS_logbufsecs = 1;
google::InitGoogleLogging(argv[0]);
+ LOG(INFO) << "Creating \"" << isolation << "\" isolation module";
+ IsolationModule *isolationModule = IsolationModule::create(isolation);
+
+ if (isolationModule == NULL)
+ fatal("unrecognized isolation type: %s", isolation);
+
LOG(INFO) << "Build: " << BUILD_DATE << " by " << BUILD_USER;
LOG(INFO) << "Starting Nexus slave";
- Slave* slave = new Slave(resources, false, isolation);
+ Slave* slave = new Slave(resources, false, isolationModule);
PID pid = Process::spawn(slave);
MasterDetector *detector = MasterDetector::create(url, pid, false, quiet);
@@ -103,5 +110,9 @@ int main(int argc, char **argv)
MasterDetector::destroy(detector);
+ IsolationModule::destroy(isolationModule);
+
+ delete slave;
+
return 0;
}
Modified: incubator/mesos/trunk/src/solaris_project_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/solaris_project_isolation_module.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/solaris_project_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/solaris_project_isolation_module.cpp Sun Jun 5 05:39:43 2011
@@ -27,12 +27,12 @@ using namespace nexus::internal;
using namespace nexus::internal::slave;
-SolarisProjectIsolationModule::SolarisProjectIsolationModule(Slave* slave)
- : ProcessBasedIsolationModule(slave)
+SolarisProjectIsolationModule::SolarisProjectIsolationModule()
{
// Launch the communicator module, which will start the projd's.
// TODO: It would be nice to not return from the constructor
// until the communicator is up and running.
+ // TODO(*): Not great to let this escape from constructor.
comm = new Communicator(this);
Process::spawn(comm);
}
@@ -44,6 +44,12 @@ SolarisProjectIsolationModule::~SolarisP
}
+void SolarisProjectIsolationModule::initialize(Slave* slave)
+{
+ ProcessBasedIsolationModule::initialize(slave);
+}
+
+
void SolarisProjectIsolationModule::startExecutor(Framework* fw)
{
// Figure out which project to use.
Modified: incubator/mesos/trunk/src/solaris_project_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/solaris_project_isolation_module.hpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/solaris_project_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/solaris_project_isolation_module.hpp Sun Jun 5 05:39:43 2011
@@ -53,10 +53,12 @@ protected:
Communicator* comm;
public:
- SolarisProjectIsolationModule(Slave* slave);
+ SolarisProjectIsolationModule();
virtual ~SolarisProjectIsolationModule();
+ virtual void initialize(Slave* slave);
+
virtual void startExecutor(Framework* framework);
virtual void killExecutor(Framework* framework);
Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131841&r1=1131840&r2=1131841&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun 5 05:39:43 2011
@@ -3,8 +3,12 @@
#include <boost/lexical_cast.hpp>
#include "master.hpp"
+#include "slave.hpp"
+#include "nexus_exec.hpp"
#include "nexus_sched.hpp"
#include "nexus_local.hpp"
+#include "isolation_module.hpp"
+#include "process_based_isolation_module.hpp"
using std::string;
using std::vector;
@@ -16,6 +20,9 @@ using namespace nexus::internal;
using nexus::internal::master::Master;
using nexus::internal::slave::Slave;
+using nexus::internal::slave::Framework;
+using nexus::internal::slave::IsolationModule;
+using nexus::internal::slave::ProcessBasedIsolationModule;
class NoopScheduler : public Scheduler
@@ -324,7 +331,8 @@ TEST(MasterTest, SlaveLost)
Master m;
PID master = Process::spawn(&m);
- Slave s(Resources(2, 1 * Gigabyte), true);
+ ProcessBasedIsolationModule isolationModule;
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -443,7 +451,7 @@ public:
vector<TaskDescription> tasks;
ASSERT_TRUE(offers.size() == 1);
const SlaveOffer &offer = offers[0];
- TaskDescription desc(0, offer.slaveId, "", map<string, string>(), "");
+ TaskDescription desc(0, offer.slaveId, "", offer.params, "");
tasks.push_back(desc);
d->replyToOffer(id, tasks, map<string, string>());
Process::post(slave, S2S_SHUTDOWN);
@@ -476,7 +484,8 @@ TEST(MasterTest, OfferRescinded)
Master m;
PID master = Process::spawn(&m);
- Slave s(Resources(2, 1 * Gigabyte), true);
+ ProcessBasedIsolationModule isolationModule;
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -555,3 +564,108 @@ TEST(MasterTest, SlavePartitioned)
Process::filter(NULL);
}
+
+
+class TaskRunningScheduler : public Scheduler
+{
+public:
+ bool statusUpdateCalled;
+
+ TaskRunningScheduler()
+ : statusUpdateCalled(false) {}
+
+ virtual ~TaskRunningScheduler() {}
+
+ virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+ return ExecutorInfo("noexecutor", "");
+ }
+
+ virtual void resourceOffer(SchedulerDriver* d,
+ OfferID id,
+ const vector<SlaveOffer>& offers) {
+ LOG(INFO) << "TaskRunningScheduler got a slot offer";
+ vector<TaskDescription> tasks;
+ ASSERT_TRUE(offers.size() == 1);
+ const SlaveOffer &offer = offers[0];
+ TaskDescription desc(0, offer.slaveId, "", offer.params, "");
+ tasks.push_back(desc);
+ d->replyToOffer(id, tasks, map<string, string>());
+ }
+
+ virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
+ EXPECT_EQ(status.state, TASK_RUNNING);
+ statusUpdateCalled = true;
+ d->stop();
+ }
+};
+
+
+class TaskRunningExecutor : public Executor {};
+
+
+class TaskRunningIsolationModule : public IsolationModule
+{
+public:
+ TaskRunningExecutor *executor;
+ NexusExecutorDriver *driver;
+ string pid;
+
+ TaskRunningIsolationModule() : executor(NULL), driver(NULL) {}
+
+ virtual ~TaskRunningIsolationModule() {}
+
+ virtual void initialize(Slave *slave) {
+ pid = slave->getPID();
+ }
+
+ virtual void startExecutor(Framework *framework) {
+ // TODO(benh): Cleanup the way we launch local drivers!
+ setenv("NEXUS_LOCAL", "1", 1);
+ setenv("NEXUS_SLAVE_PID", pid.c_str(), 1);
+ setenv("NEXUS_FRAMEWORK_ID", "0-0", 1);
+
+ executor = new TaskRunningExecutor();
+ driver = new NexusExecutorDriver(executor);
+ driver->start();
+ }
+
+ virtual void killExecutor(Framework* framework) {
+ driver->stop();
+ driver->join();
+ delete driver;
+ delete executor;
+
+ // TODO(benh): Cleanup the way we launch local drivers!
+ unsetenv("NEXUS_LOCAL");
+ unsetenv("NEXUS_SLAVE_PID");
+ unsetenv("NEXUS_FRAMEWORK_ID");
+ }
+};
+
+
+TEST(MasterTest, TaskRunning)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ Master m;
+ PID master = Process::spawn(&m);
+
+ TaskRunningIsolationModule isolationModule;
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
+
+ BasicMasterDetector detector(master, slave, true);
+
+ TaskRunningScheduler sched;
+ NexusSchedulerDriver driver(&sched, master);
+
+ driver.run();
+
+ EXPECT_TRUE(sched.statusUpdateCalled);
+
+ Process::post(slave, S2S_SHUTDOWN);
+ Process::wait(slave);
+
+ Process::post(master, M2M_SHUTDOWN);
+ Process::wait(master);
+}