You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:25:43 UTC
svn commit: r1132329 [6/6] - in /incubator/mesos/trunk: ./ src/ src/common/
src/config/ src/event_history/ src/examples/ src/local/ src/master/
src/messaging/ src/slave/ src/tests/ third_party/sqlite-3.6.23.1/
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:25:41 2011
@@ -6,8 +6,6 @@
#include <algorithm>
#include <fstream>
-#include <google/protobuf/descriptor.h>
-
#include "slave.hpp"
#include "webui.hpp"
@@ -16,69 +14,101 @@
#define gethostbyname2(name, _) gethostbyname(name)
#endif
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::slave;
-
-using boost::unordered_map;
-using boost::unordered_set;
-
-using process::Promise;
-using process::UPID;
-
using std::list;
using std::make_pair;
using std::ostringstream;
+using std::istringstream;
using std::pair;
using std::queue;
using std::string;
using std::vector;
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
-Slave::Slave(const Resources& _resources, bool _local,
- IsolationModule *_isolationModule)
- : resources(_resources), local(_local),
- isolationModule(_isolationModule)
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
+
+
+namespace {
+
+// Periodically sends heartbeats to the master
+class Heart : public MesosProcess
{
- initialize();
-}
+private:
+ PID master;
+ PID slave;
+ SlaveID sid;
+ double interval;
+
+protected:
+ void operator () ()
+ {
+ link(slave);
+ link(master);
+ do {
+ switch (receive(interval)) {
+ case PROCESS_TIMEOUT:
+ send(master, pack<SH2M_HEARTBEAT>(sid));
+ break;
+ case PROCESS_EXIT:
+ return;
+ }
+ } while (true);
+ }
+
+public:
+ Heart(const PID &_master, const PID &_slave, SlaveID _sid, double _interval)
+ : master(_master), slave(_slave), sid(_sid), interval(_interval) {}
+};
+
+// Default values for CPU cores and memory to include in configuration
+const int32_t DEFAULT_CPUS = 1;
+const int32_t DEFAULT_MEM = 1 * Gigabyte;
-Slave::Slave(const Configuration& _conf, bool _local,
- IsolationModule* _isolationModule)
- : conf(_conf), local(_local),
- isolationModule(_isolationModule)
-{
- resources =
- Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
-
- initialize();
+
+} /* namespace */
+
+
+Slave::Slave(Resources _resources, bool _local,
+ IsolationModule *_isolationModule)
+ : id(""), resources(_resources), local(_local),
+ isolationModule(_isolationModule) {}
+
+
+Slave::Slave(const Params& _conf, bool _local, IsolationModule *_module)
+ : id(""), conf(_conf), local(_local), isolationModule(_module)
+{
+ resources = Resources(conf.get<int32_t>("cpus", DEFAULT_CPUS),
+ conf.get<int32_t>("mem", DEFAULT_MEM));
}
-void Slave::registerOptions(Configurator* configurator)
-{
- // TODO(benh): Is there a way to specify units for the resources?
- configurator->addOption<string>("resources",
- "Total consumable resources per slave\n");
-// configurator->addOption<string>("attributes",
-// "Attributes of machine\n");
- configurator->addOption<string>("work_dir",
- "Where to place framework work directories\n"
- "(default: MESOS_HOME/work)");
- configurator->addOption<string>("hadoop_home",
- "Where to find Hadoop installed (for\n"
- "fetching framework executors from HDFS)\n"
- "(default: look for HADOOP_HOME in\n"
- "environment or find hadoop on PATH)");
- configurator->addOption<bool>("switch_user",
- "Whether to run tasks as the user who\n"
- "submitted them rather than the user running\n"
- "the slave (requires setuid permission)",
- true);
- configurator->addOption<string>("frameworks_home",
- "Directory prepended to relative executor\n"
- "paths (default: MESOS_HOME/frameworks)");
+void Slave::registerOptions(Configurator* conf)
+{
+ conf->addOption<int32_t>("cpus", 'c', "CPU cores for use by tasks",
+ DEFAULT_CPUS);
+ conf->addOption<int64_t>("mem", 'm', "Memory for use by tasks, in MB\n",
+ DEFAULT_MEM);
+ conf->addOption<string>("work_dir",
+ "Where to place framework work directories\n"
+ "(default: MESOS_HOME/work)");
+ conf->addOption<string>("hadoop_home",
+ "Where to find Hadoop installed (for fetching\n"
+ "framework executors from HDFS)\n"
+ "(default: look for HADOOP_HOME environment\n"
+ "variable or find hadoop on PATH)");
+ conf->addOption<bool>("switch_user",
+ "Whether to run tasks as the user who\n"
+ "submitted them rather than the user running\n"
+ "the slave (requires setuid permission)",
+ true);
+ conf->addOption<string>("frameworks_home",
+ "Directory prepended to relative executor\n"
+ "paths (default: MESOS_HOME/frameworks)");
}
@@ -88,63 +118,25 @@ Slave::~Slave()
}
-Promise<state::SlaveState*> Slave::getState()
+state::SlaveState *Slave::getState()
{
- Resources resources(resources);
- Resource::Scalar cpus;
- Resource::Scalar mem;
- cpus.set_value(-1);
- mem.set_value(-1);
- cpus = resources.getScalar("cpus", cpus);
- mem = resources.getScalar("mem", mem);
-
- state::SlaveState* state =
- new state::SlaveState(build::DATE, build::USER, slaveId.value(),
- cpus.value(), mem.value(), self(), master);
-
- foreachpair (_, Framework* f, frameworks) {
- foreachpair (_, Executor* e, f->executors) {
- Resources resources(e->resources);
- Resource::Scalar cpus;
- Resource::Scalar mem;
- cpus.set_value(-1);
- mem.set_value(-1);
- cpus = resources.getScalar("cpus", cpus);
- mem = resources.getScalar("mem", mem);
-
- // TOOD(benh): For now, we will add a state::Framework object
- // for each executor that the framework has. Therefore, we tweak
- // the framework ID to also include the associated executor ID
- // to differentiate them. This is so we don't have to make very
- // many changes to the webui right now. Note that this ID
- // construction must be identical to what we do for directory
- // suffix returned from Slave::getUniqueWorkDirectory.
-
- string id = f->frameworkId.value() + "-" + e->info.executor_id().value();
-
- state::Framework* framework =
- new state::Framework(id, f->info.name(),
- e->info.uri(), e->executorStatus,
- cpus.value(), mem.value());
-
- state->frameworks.push_back(framework);
-
- foreachpair (_, Task* t, e->tasks) {
- Resources resources(t->resources());
- Resource::Scalar cpus;
- Resource::Scalar mem;
- cpus.set_value(-1);
- mem.set_value(-1);
- cpus = resources.getScalar("cpus", cpus);
- mem = resources.getScalar("mem", mem);
-
- state::Task* task =
- new state::Task(t->task_id().value(), t->name(),
- TaskState_descriptor()->FindValueByNumber(t->state())->name(),
- cpus.value(), mem.value());
-
- framework->tasks.push_back(task);
- }
+ std::ostringstream my_pid;
+ my_pid << self();
+ std::ostringstream master_pid;
+ master_pid << master;
+ state::SlaveState *state =
+ new state::SlaveState(BUILD_DATE, BUILD_USER, id, resources.cpus,
+ resources.mem, my_pid.str(), master_pid.str());
+
+ foreachpair(_, Framework *f, frameworks) {
+ state::Framework *framework = new state::Framework(f->id, f->name,
+ f->executorInfo.uri, f->executorStatus, f->resources.cpus,
+ f->resources.mem);
+ state->frameworks.push_back(framework);
+ foreachpair(_, Task *t, f->tasks) {
+ state::Task *task = new state::Task(t->id, t->name, t->state,
+ t->resources.cpus, t->resources.mem);
+ framework->tasks.push_back(task);
}
}
@@ -155,556 +147,377 @@ Promise<state::SlaveState*> Slave::getSt
void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
- LOG(INFO) << "Slave resources: " << resources;
// Get our hostname
- char buf[256];
+ char buf[512];
gethostname(buf, sizeof(buf));
- hostent* he = gethostbyname2(buf, AF_INET);
+ hostent *he = gethostbyname2(buf, AF_INET);
string hostname = he->h_name;
- // Check and see if we have a different public DNS name. Normally
- // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
- // environment variable. This allows the master to display our
- // public name in its web UI.
- string public_hostname = hostname;
+ // Get our public DNS name. Normally this is our hostname, but on EC2
+ // we look for the MESOS_PUBLIC_DNS environment variable. This allows
+ // the master to display our public name in its web UI.
+ string publicDns = hostname;
if (getenv("MESOS_PUBLIC_DNS") != NULL) {
- public_hostname = getenv("MESOS_PUBLIC_DNS");
+ publicDns = getenv("MESOS_PUBLIC_DNS");
}
- // Initialize slave info.
- slave.set_hostname(hostname);
- slave.set_public_hostname(public_hostname);
- slave.mutable_resources()->MergeFrom(resources);
-
// Initialize isolation module.
isolationModule->initialize(this);
while (true) {
- serve(1);
- if (name() == process::TERMINATE) {
- LOG(INFO) << "Asked to shut down by " << from();
- foreachpaircopy (_, Framework* framework, frameworks) {
- killFramework(framework);
- }
- return;
- }
- }
-}
-
-
-void Slave::initialize()
-{
- install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
-
- install(M2S_REGISTER_REPLY, &Slave::registerReply,
- &SlaveRegisteredMessage::slave_id);
-
- install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
- &SlaveRegisteredMessage::slave_id);
-
- install(M2S_RUN_TASK, &Slave::runTask,
- &RunTaskMessage::framework,
- &RunTaskMessage::framework_id,
- &RunTaskMessage::pid,
- &RunTaskMessage::task);
-
- install(M2S_KILL_TASK, &Slave::killTask,
- &KillTaskMessage::framework_id,
- &KillTaskMessage::task_id);
-
- install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
- &KillFrameworkMessage::framework_id);
+ switch (receive()) {
+ case NEW_MASTER_DETECTED: {
+ string masterSeq;
+ PID masterPid;
+ tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
+
+ LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+
+ redirect(master, masterPid);
+ master = masterPid;
+ link(master);
+
+ if (id.empty()) {
+ // Slave started before master.
+ send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
+ } else {
+ // Reconnecting, so reconstruct resourcesInUse for the master.
+ Resources resourcesInUse;
+ vector<Task> taskVec;
+
+ foreachpair(_, Framework *framework, frameworks) {
+ foreachpair(_, Task *task, framework->tasks) {
+ resourcesInUse += task->resources;
+ Task ti = *task;
+ ti.slaveId = id;
+ taskVec.push_back(ti);
+ }
+ }
- install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
- &UpdateFrameworkMessage::framework_id,
- &UpdateFrameworkMessage::pid);
-
- install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
- &StatusUpdateAckMessage::framework_id,
- &StatusUpdateAckMessage::slave_id,
- &StatusUpdateAckMessage::task_id);
-
- install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
- &RegisterExecutorMessage::framework_id,
- &RegisterExecutorMessage::executor_id);
-
- install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
- &StatusUpdateMessage::framework_id,
- &StatusUpdateMessage::status);
-
- install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(PING, &Slave::ping);
-
- install(process::TIMEOUT, &Slave::timeout);
-
- install(process::EXITED, &Slave::exited);
-}
-
-
-void Slave::newMasterDetected(const string& pid)
-{
- LOG(INFO) << "New master detected at " << pid;
-
- master = pid;
- link(master);
-
- if (slaveId == "") {
- // Slave started before master.
- MSG<S2M_REGISTER_SLAVE> out;
- out.mutable_slave()->MergeFrom(slave);
- send(master, out);
- } else {
- // Re-registering, so send tasks running.
- MSG<S2M_REREGISTER_SLAVE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_slave()->MergeFrom(slave);
-
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (_, Executor* executor, framework->executors) {
- foreachpair (_, Task* task, executor->tasks) {
- out.add_tasks()->MergeFrom(*task);
+ send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
}
+ break;
+ }
+
+ case NO_MASTER_DETECTED: {
+ LOG(INFO) << "Lost master(s) ... waiting";
+ break;
}
- }
-
- send(master, out);
- }
-}
-
-
-void Slave::noMasterDetected()
-{
- LOG(INFO) << "Lost master(s) ... waiting";
-}
-
-
-void Slave::registerReply(const SlaveID& slaveId)
-{
- LOG(INFO) << "Registered with master; given slave ID " << slaveId;
- this->slaveId = slaveId;
-}
-
-
-void Slave::reregisterReply(const SlaveID& slaveId)
-{
- LOG(INFO) << "Re-registered with master";
-
- if (!(this->slaveId == slaveId)) {
- LOG(FATAL) << "Slave re-registered but got wrong ID";
- }
-}
-
-
-void Slave::runTask(const FrameworkInfo& frameworkInfo,
- const FrameworkID& frameworkId,
- const string& pid,
- const TaskDescription& task)
-{
- LOG(INFO) << "Got assigned task " << task.task_id()
- << " for framework " << frameworkId;
-
- Framework* framework = getFramework(frameworkId);
- if (framework == NULL) {
- framework = new Framework(frameworkId, frameworkInfo, pid);
- frameworks[frameworkId] = framework;
- }
-
- // Either send the task to an executor or start a new executor
- // and queue the task until the executor has started.
- Executor* executor = task.has_executor()
- ? framework->getExecutor(task.executor().executor_id())
- : framework->getExecutor(framework->info.executor().executor_id());
-
- if (executor != NULL) {
- if (!executor->pid) {
- // Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
- } else {
- // Add the task to the executor.
- executor->addTask(task);
-
- MSG<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
- isolationModule->resourcesChanged(framework, executor);
- }
- } else {
- // Launch an executor for this task.
- if (task.has_executor()) {
- executor = framework->createExecutor(task.executor());
- } else {
- executor = framework->createExecutor(framework->info.executor());
- }
-
- // Queue task until the executor starts up.
- executor->queuedTasks.push_back(task);
-
- // Tell the isolation module to launch the executor.
- isolationModule->launchExecutor(framework, executor);
- }
-}
-
-
-void Slave::killTask(const FrameworkID& frameworkId,
- const TaskID& taskId)
-{
- LOG(INFO) << "Asked to kill task " << taskId
- << " of framework " << frameworkId;
-
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- // Tell the executor to kill the task if it is up and
- // running, otherwise, consider the task lost.
- Executor* executor = framework->getExecutor(taskId);
- if (executor == NULL || !executor->pid) {
- // Update the resources locally, if an executor comes up
- // after this then it just won't receive this task.
- executor->removeTask(taskId);
- isolationModule->resourcesChanged(framework, executor);
-
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
- } else {
- // Otherwise, send a message to the executor and wait for
- // it to send us a status update.
- MSG<S2E_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_task_id()->MergeFrom(taskId);
- send(executor->pid, out);
- }
- } else {
- LOG(WARNING) << "Cannot kill task " << taskId
- << " of framework " << frameworkId
- << " because no such framework is running";
-
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status->task_id()] = *status;
- }
-}
-
-
-void Slave::killFramework(const FrameworkID& frameworkId)
-{
- LOG(INFO) << "Asked to kill framework " << frameworkId;
-
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- killFramework(framework);
- }
-}
-
-
-void Slave::schedulerMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const string& data)
-{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
- if (executor == NULL) {
- LOG(WARNING) << "Dropping message for executor '"
- << executorId << "' of framework " << frameworkId
- << " because executor does not exist";
- } else if (!executor->pid) {
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
- LOG(WARNING) << "Dropping message for executor '"
- << executorId << "' of framework " << frameworkId
- << " because executor is not running";
- } else {
- MSG<S2E_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(executor->pid, out);
- }
- } else {
- LOG(WARNING) << "Dropping message for framework "<< frameworkId
- << " because it does not exist";
- }
-}
-
-void Slave::updateFramework(const FrameworkID& frameworkId,
- const string& pid)
-{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Updating framework " << frameworkId
- << " pid to " <<pid;
- framework->pid = pid;
- }
-}
+ case M2S_REGISTER_REPLY: {
+ double interval = 0;
+ tie(this->id, interval) = unpack<M2S_REGISTER_REPLY>(body());
+ LOG(INFO) << "Registered with master; given slave ID " << this->id;
+ link(spawn(new Heart(master, self(), this->id, interval)));
+ break;
+ }
+
+ case M2S_REREGISTER_REPLY: {
+ SlaveID sid;
+ double interval = 0;
+ tie(sid, interval) = unpack<M2S_REREGISTER_REPLY>(body());
+ LOG(INFO) << "RE-registered with master; given slave ID " << sid << " had "<< this->id;
+ if (this->id == "")
+ this->id = sid;
+ CHECK(this->id == sid);
+ link(spawn(new Heart(master, self(), this->id, interval)));
+ break;
+ }
+
+ case M2S_RUN_TASK: {
+ FrameworkID fid;
+ TaskID tid;
+ string fwName, user, taskName, taskArg;
+ ExecutorInfo execInfo;
+ Params params;
+ PID pid;
+ tie(fid, tid, fwName, user, execInfo, taskName, taskArg, params, pid) =
+ unpack<M2S_RUN_TASK>(body());
+ LOG(INFO) << "Got assigned task " << fid << ":" << tid;
+ Resources res;
+ res.cpus = params.getInt32("cpus", -1);
+ res.mem = params.getInt64("mem", -1);
+ Framework *framework = getFramework(fid);
+ if (framework == NULL) {
+ // Framework not yet created on this node - create it.
+ framework = new Framework(fid, fwName, user, execInfo, pid);
+ frameworks[fid] = framework;
+ isolationModule->startExecutor(framework);
+ }
+ Task *task = framework->addTask(tid, taskName, res);
+ Executor *executor = getExecutor(fid);
+ if (executor) {
+ send(executor->pid,
+ pack<S2E_RUN_TASK>(tid, taskName, taskArg, params));
+ isolationModule->resourcesChanged(framework);
+ } else {
+ // Executor not yet registered; queue task for when it starts up
+ TaskDescription *td = new TaskDescription(
+ tid, taskName, taskArg, params.str());
+ framework->queuedTasks.push_back(td);
+ }
+ break;
+ }
+ case M2S_KILL_TASK: {
+ FrameworkID fid;
+ TaskID tid;
+ tie(fid, tid) = unpack<M2S_KILL_TASK>(body());
+ LOG(INFO) << "Killing task " << fid << ":" << tid;
+ if (Executor *ex = getExecutor(fid)) {
+ send(ex->pid, pack<S2E_KILL_TASK>(tid));
+ }
+ if (Framework *fw = getFramework(fid)) {
+ fw->removeTask(tid);
+ isolationModule->resourcesChanged(fw);
+ }
+ break;
+ }
-void Slave::statusUpdateAck(const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const TaskID& taskId)
-{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- foreachpair (double deadline, _, framework->statuses) {
- if (framework->statuses[deadline].count(taskId) > 0) {
- LOG(INFO) << "Got acknowledgement of status update"
- << " for task " << taskId
- << " of framework " << framework->frameworkId;
- framework->statuses[deadline].erase(taskId);
+ case M2S_KILL_FRAMEWORK: {
+ FrameworkID fid;
+ tie(fid) = unpack<M2S_KILL_FRAMEWORK>(body());
+ LOG(INFO) << "Asked to kill framework " << fid;
+ Framework *fw = getFramework(fid);
+ if (fw != NULL)
+ killFramework(fw);
break;
}
- }
- }
-}
+ case M2S_FRAMEWORK_MESSAGE: {
+ FrameworkID fid;
+ FrameworkMessage message;
+ tie(fid, message) = unpack<M2S_FRAMEWORK_MESSAGE>(body());
+ if (Executor *ex = getExecutor(fid)) {
+ VLOG(1) << "Relaying framework message for framework " << fid;
+ send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+ } else {
+ VLOG(1) << "Dropping framework message for framework " << fid
+ << " because its executor is not running";
+ }
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ break;
+ }
-void Slave::registerExecutor(const FrameworkID& frameworkId,
- const ExecutorID& executorId)
-{
- LOG(INFO) << "Got registration for executor '" << executorId
- << "' of framework " << frameworkId;
+ case M2S_UPDATE_FRAMEWORK_PID: {
+ FrameworkID fid;
+ PID pid;
+ tie(fid, pid) = unpack<M2S_UPDATE_FRAMEWORK_PID>(body());
+ Framework *framework = getFramework(fid);
+ if (framework != NULL) {
+ LOG(INFO) << "Updating framework " << fid << " pid to " << pid;
+ framework->pid = pid;
+ }
+ break;
+ }
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
-
- // Check the status of the executor.
- if (executor == NULL) {
- LOG(WARNING) << "Not expecting executor '" << executorId
- << "' of framework " << frameworkId;
- send(from(), S2E_KILL_EXECUTOR);
- } else if (executor->pid != UPID()) {
- LOG(WARNING) << "Not good, executor '" << executorId
- << "' of framework " << frameworkId
- << " is already running";
- send(from(), S2E_KILL_EXECUTOR);
- } else {
- // Save the pid for the executor.
- executor->pid = from();
-
- // Now that the executor is up, set its resource limits.
- isolationModule->resourcesChanged(framework, executor);
-
- // Tell executor it's registered and give it any queued tasks.
- MSG<S2E_REGISTER_REPLY> out;
- ExecutorArgs* args = out.mutable_args();
- args->mutable_framework_id()->MergeFrom(framework->frameworkId);
- args->mutable_executor_id()->MergeFrom(executor->info.executor_id());
- args->mutable_slave_id()->MergeFrom(slaveId);
- args->set_hostname(slave.hostname());
- args->set_data(framework->info.executor().data());
- send(executor->pid, out);
- sendQueuedTasks(framework, executor);
- }
- } else {
- // Framework is gone; tell the executor to exit.
- LOG(WARNING) << "Framework " << frameworkId
- << " does not exist (it may have been killed),"
- << " telling executor to exit";
-
- // TODO(benh): Don't we also want to tell the isolation
- // module to shut this guy down!
- send(from(), S2E_KILL_EXECUTOR);
- }
-}
+ case E2S_REGISTER_EXECUTOR: {
+ FrameworkID fid;
+ tie(fid) = unpack<E2S_REGISTER_EXECUTOR>(body());
+ LOG(INFO) << "Got executor registration for framework " << fid;
+ if (Framework *fw = getFramework(fid)) {
+ if (getExecutor(fid) != 0) {
+ LOG(ERROR) << "Executor for framework " << fid
+ << "already exists";
+ send(from(), pack<S2E_KILL_EXECUTOR>());
+ break;
+ }
+ Executor *executor = new Executor(fid, from());
+ executors[fid] = executor;
+ link(from());
+ // Now that the executor is up, set its resource limits
+ isolationModule->resourcesChanged(fw);
+ // Tell executor that it's registered and give it its queued tasks
+ send(from(), pack<S2E_REGISTER_REPLY>(this->id,
+ hostname,
+ fw->name,
+ fw->executorInfo.initArg));
+ sendQueuedTasks(fw);
+ } else {
+ // Framework is gone; tell the executor to exit
+ send(from(), pack<S2E_KILL_EXECUTOR>());
+ }
+ break;
+ }
+ case E2S_STATUS_UPDATE: {
+ FrameworkID fid;
+ TaskID tid;
+ TaskState taskState;
+ string data;
+ tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
+
+ Framework *framework = getFramework(fid);
+ if (framework != NULL) {
+ LOG(INFO) << "Got status update for task " << fid << ":" << tid;
+ if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
+ taskState == TASK_KILLED || taskState == TASK_LOST) {
+ LOG(INFO) << "Task " << fid << ":" << tid << " done";
+
+ framework->removeTask(tid);
+ isolationModule->resourcesChanged(framework);
+ }
+
+ // Reliably send message and save sequence number for
+ // canceling later.
+ int seq = rsend(master, framework->pid,
+ pack<S2M_STATUS_UPDATE>(id, fid, tid,
+ taskState, data));
+ seqs[fid].insert(seq);
+ } else {
+ LOG(WARNING) << "Got status update for UNKNOWN task "
+ << fid << ":" << tid;
+ }
+ break;
+ }
-void Slave::statusUpdate(const FrameworkID& frameworkId,
- const TaskStatus& status)
-{
- LOG(INFO) << "Status update: task " << status.task_id()
- << " of framework " << frameworkId
- << " is now in state "
- << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(status.task_id());
- if (executor != NULL) {
- executor->updateTaskState(status.task_id(), status.state());
- if (status.state() == TASK_FINISHED ||
- status.state() == TASK_FAILED ||
- status.state() == TASK_KILLED ||
- status.state() == TASK_LOST) {
- executor->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework, executor);
- }
-
- // Send message and record the status for possible resending.
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(master, out);
-
- double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
- framework->statuses[deadline][status.task_id()] = status;
- } else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "executor for framework " << frameworkId;
- }
- } else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "framework " << frameworkId;
- }
-}
+ case E2S_FRAMEWORK_MESSAGE: {
+ FrameworkID fid;
+ FrameworkMessage message;
+ tie(fid, message) = unpack<E2S_FRAMEWORK_MESSAGE>(body());
+
+ Framework *framework = getFramework(fid);
+ if (framework != NULL) {
+ LOG(INFO) << "Sending message for framework " << fid
+ << " to " << framework->pid;
+
+ // Set slave ID in case framework omitted it.
+ message.slaveId = this->id;
+ VLOG(1) << "Sending framework message to framework " << fid
+ << " with PID " << framework->pid;
+ send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+ }
+ break;
+ }
+ case S2S_GET_STATE: {
+ send(from(), pack<S2S_GET_STATE_REPLY>(getState()));
+ break;
+ }
-void Slave::executorMessage(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const string& data)
-{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Sending message for framework " << frameworkId
- << " to " << framework->pid;
-
- // TODO(benh): This is weird, sending an M2F message.
- MSG<M2F_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(framework->pid, out);
- }
-}
+ case PROCESS_EXIT: {
+ LOG(INFO) << "Process exited: " << from();
+ if (from() == master) {
+ LOG(WARNING) << "Master disconnected! "
+ << "Waiting for a new master to be elected.";
+ // TODO(benh): After so long waiting for a master, commit suicide.
+ } else {
+ // Check if an executor has exited (this is technically
+ // redundant because the isolation module should be doing
+ // this for us).
+ foreachpair (_, Executor *ex, executors) {
+ if (from() == ex->pid) {
+ LOG(INFO) << "Executor for framework " << ex->frameworkId
+ << " disconnected";
+ Framework *framework = getFramework(ex->frameworkId);
+ if (framework != NULL) {
+ send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
+ killFramework(framework);
+ }
+ break;
+ }
+ }
+ }
-void Slave::ping()
-{
- send(from(), PONG);
-}
+ break;
+ }
+ case M2S_SHUTDOWN: {
+ LOG(INFO) << "Asked to shut down by master: " << from();
+ unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+ foreachpair (_, Framework *framework, frameworksCopy) {
+ killFramework(framework);
+ }
+ return;
+ }
-void Slave::timeout()
-{
- // Check and see if we should re-send any status updates.
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (double deadline, _, framework->statuses) {
- if (deadline <= elapsed()) {
- foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
- LOG(WARNING) << "Resending status update"
- << " for task " << status.task_id()
- << " of framework " << framework->frameworkId;
- MSG<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(master, out);
+ case S2S_SHUTDOWN: {
+ LOG(INFO) << "Asked to shut down by " << from();
+ unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
+ foreachpair (_, Framework *framework, frameworksCopy) {
+ killFramework(framework);
}
+ return;
+ }
+
+ default: {
+ LOG(ERROR) << "Received unknown message ID " << msgid()
+ << " from " << from();
+ break;
}
}
}
}
-void Slave::exited()
-{
- LOG(INFO) << "Process exited: " << from();
- if (from() == master) {
- LOG(WARNING) << "Master disconnected! "
- << "Waiting for a new master to be elected.";
- // TODO(benh): After so long waiting for a master, commit suicide.
- }
+Framework * Slave::getFramework(FrameworkID frameworkId)
+{
+ FrameworkMap::iterator it = frameworks.find(frameworkId);
+ if (it == frameworks.end()) return NULL;
+ return it->second;
}
-
-
-Framework* Slave::getFramework(const FrameworkID& frameworkId)
+Executor * Slave::getExecutor(FrameworkID frameworkId)
{
- if (frameworks.count(frameworkId) > 0) {
- return frameworks[frameworkId];
- }
-
- return NULL;
+ ExecutorMap::iterator it = executors.find(frameworkId);
+ if (it == executors.end()) return NULL;
+ return it->second;
}
// Send any tasks queued up for the given framework to its executor
// (needed if we received tasks while the executor was starting up)
-void Slave::sendQueuedTasks(Framework* framework, Executor* executor)
+void Slave::sendQueuedTasks(Framework *framework)
{
- LOG(INFO) << "Flushing queued tasks for framework "
- << framework->frameworkId;
-
- CHECK(executor->pid != UPID());
-
- foreach (const TaskDescription& task, executor->queuedTasks) {
- // Add the task to the executor.
- executor->addTask(task);
-
- MSG<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
+ LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
+ Executor *executor = getExecutor(framework->id);
+ if (!executor) return;
+ foreach(TaskDescription *td, framework->queuedTasks) {
+ send(executor->pid,
+ pack<S2E_RUN_TASK>(td->tid, td->name, td->args, td->params));
+ delete td;
}
-
- executor->queuedTasks.clear();
+ framework->queuedTasks.clear();
}
// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutors)
+void Slave::killFramework(Framework *framework, bool killExecutor)
{
- LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
+ LOG(INFO) << "Cleaning up framework " << framework->id;
+
+ // Cancel sending any reliable messages for this framework.
+ foreach (int seq, seqs[framework->id])
+ cancel(seq);
- // Shutdown all executors of this framework.
- foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
- if (killExecutors) {
- LOG(INFO) << "Killing executor '" << executorId
- << "' of framework " << framework->frameworkId;
+ seqs.erase(framework->id);
- send(executor->pid, S2E_KILL_EXECUTOR);
+ // Remove its allocated resources.
+ framework->resources = Resources();
+ // If an executor is running, tell it to exit and kill it.
+ if (Executor *ex = getExecutor(framework->id)) {
+ if (killExecutor) {
+ LOG(INFO) << "Killing executor for framework " << framework->id;
// TODO(benh): There really isn't ANY time between when an
// executor gets a S2E_KILL_EXECUTOR message and the isolation
// module goes and kills it. We should really think about making
// the semantics of this better.
-
- isolationModule->killExecutor(framework, executor);
+ send(ex->pid, pack<S2E_KILL_EXECUTOR>());
+ isolationModule->killExecutor(framework);
}
- framework->destroyExecutor(executorId);
+ LOG(INFO) << "Cleaning up executor for framework " << framework->id;
+ delete ex;
+ executors.erase(framework->id);
}
- frameworks.erase(framework->frameworkId);
+ frameworks.erase(framework->id);
delete framework;
}
@@ -712,45 +525,18 @@ void Slave::killFramework(Framework *fra
// Called by isolation module when an executor process exits
// TODO(benh): Make this callback be a message so that we can avoid
// race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result)
+void Slave::executorExited(FrameworkID fid, int status)
{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- Executor* executor = framework->getExecutor(executorId);
- if (executor != NULL) {
- LOG(INFO) << "Exited executor '" << executorId
- << "' of framework " << frameworkId
- << " with result " << result;
-
- MSG<S2M_EXITED_EXECUTOR> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_result(result);
- send(master, out);
-
- framework->destroyExecutor(executorId);
-
- // TODO(benh): When should we kill the presence of an entire
- // framework on a slave?
- if (framework->executors.size() == 0) {
- killFramework(framework);
- }
- } else {
- LOG(WARNING) << "UNKNOWN executor '" << executorId
- << "' of framework " << frameworkId
- << " has exited with result " << result;
- }
- } else {
- LOG(WARNING) << "UNKNOWN executor '" << executorId
- << "' of UNKNOWN framework " << frameworkId
- << " has exited with result " << result;
+ if (Framework *f = getFramework(fid)) {
+ LOG(INFO) << "Executor for framework " << fid << " exited "
+ << "with status " << status;
+ send(master, pack<S2M_LOST_EXECUTOR>(id, fid, status));
+ killFramework(f, false);
}
};
-string Slave::getUniqueWorkDirectory(const FrameworkID& frameworkId,
- const ExecutorID& executorId)
+string Slave::getUniqueWorkDirectory(FrameworkID fid)
{
string workDir;
if (conf.contains("work_dir")) {
@@ -762,8 +548,7 @@ string Slave::getUniqueWorkDirectory(con
}
ostringstream os(std::ios_base::app | std::ios_base::out);
- os << workDir << "/slave-" << slaveId
- << "/fw-" << frameworkId << "-" << executorId;
+ os << workDir << "/slave-" << id << "/fw-" << fid;
// Find a unique directory based on the path given by the slave
// (this is because we might launch multiple executors from the same
@@ -784,7 +569,7 @@ string Slave::getUniqueWorkDirectory(con
}
-const Configuration& Slave::getConfiguration()
+const Params& Slave::getConf()
{
return conf;
}
Modified: incubator/mesos/trunk/src/slave/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.cpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.cpp (original)
+++ incubator/mesos/trunk/src/slave/webui.cpp Sun Jun 5 09:25:41 2011
@@ -6,29 +6,28 @@
#include "state.hpp"
#include "webui.hpp"
-#include "configurator/configuration.hpp"
-
#ifdef MESOS_WEBUI
#include <Python.h>
-using process::PID;
-
using std::string;
extern "C" void init_slave(); // Initializer for the Python slave module
+namespace {
-namespace mesos { namespace internal { namespace slave {
+PID slave;
+string webuiPort;
+string logDir;
+string workDir;
-static PID<Slave> slave;
-static string webuiPort;
-static string logDir;
-static string workDir;
+}
+
+namespace mesos { namespace internal { namespace slave {
-void* runSlaveWebUI(void*)
+void *runSlaveWebUI(void *)
{
LOG(INFO) << "Web UI thread started";
Py_Initialize();
@@ -51,19 +50,19 @@ void* runSlaveWebUI(void*)
}
-void startSlaveWebUI(const PID<Slave>& _slave, const Configuration& conf)
+void startSlaveWebUI(const PID &slave, const Params ¶ms)
{
// TODO(*): See the note in master/webui.cpp about having to
// determine default values. These should be set by now and can just
// be used! For example, what happens when the slave code changes
// their default location for the work directory, it might not get
// changed here!
- webuiPort = conf.get("webui_port", "8081");
- logDir = conf.get("log_dir", FLAGS_log_dir);
- if (conf.contains("work_dir")) {
- workDir = conf.get("work_dir", "");
- } else if (conf.contains("home")) {
- workDir = conf.get("home", "") + "/work";
+ webuiPort = params.get("webui_port", "8081");
+ logDir = params.get("log_dir", FLAGS_log_dir);
+ if (params.contains("work_dir")) {
+ workDir = params.get("work_dir", "");
+ } else if (params.contains("home")) {
+ workDir = params.get("home", "") + "/work";
} else {
workDir = "work";
}
@@ -72,7 +71,7 @@ void startSlaveWebUI(const PID<Slave>& _
LOG(INFO) << "Starting slave web UI on port " << webuiPort;
- slave = _slave;
+ ::slave = slave;
pthread_t thread;
pthread_create(&thread, 0, runSlaveWebUI, NULL);
}
@@ -80,15 +79,36 @@ void startSlaveWebUI(const PID<Slave>& _
namespace state {
-// From slave_state.hpp.
-SlaveState* get_slave()
+class StateGetter : public MesosProcess
+{
+public:
+ SlaveState *slaveState;
+
+ StateGetter() {}
+ ~StateGetter() {}
+
+ void operator () ()
+ {
+ send(::slave, pack<S2S_GET_STATE>());
+ receive();
+ CHECK(msgid() == S2S_GET_STATE_REPLY);
+ slaveState = unpack<S2S_GET_STATE_REPLY, 0>(body());
+ }
+};
+
+
+// From slave_state.hpp
+SlaveState *get_slave()
{
- return process::call(slave, &Slave::getState);
+ StateGetter getter;
+ PID pid = Process::spawn(&getter);
+ Process::wait(pid);
+ return getter.slaveState;
}
-} // namespace state {
+} /* namespace state { */
-}}} // namespace mesos { namespace internal { namespace slave {
+}}} /* namespace mesos { namespace internal { namespace slave { */
-#endif // MESOS_WEBUI
+#endif /* MESOS_WEBUI */
Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Sun Jun 5 09:25:41 2011
@@ -2,8 +2,6 @@
SHELL = '/bin/sh'
-SRCDIR = @srcdir@
-INCLUDEDIR = @top_builddir@/include
BINDIR = @top_builddir@/bin
LIBDIR = @top_builddir@/lib
@@ -20,11 +18,12 @@ WITH_ZOOKEEPER = @WITH_ZOOKEEPER@
WITH_INCLUDED_ZOOKEEPER = @WITH_INCLUDED_ZOOKEEPER@
LIBPROCESS = third_party/libprocess
+
LIBEV = $(LIBPROCESS)/third_party/libev-3.8
-BOOST = third_party/boost-1.37.0
-PROTOBUF = third_party/protobuf-2.3.0
+
GLOG = third_party/glog-0.3.1
GMOCK = third_party/gmock-1.5.0
+
ZOOKEEPER = third_party/zookeeper-3.3.1/src/c
# Ensure that we get better debugging info.
@@ -35,17 +34,13 @@ CXXFLAGS += -g
CFLAGS += -I@srcdir@/.. -I..
CXXFLAGS += -I@srcdir@/.. -I..
-# Add include and build include to CFLAGS and CXXFLAGS.
-CFLAGS += -I@top_srcdir@/include -I$(INCLUDEDIR)
-CXXFLAGS += -I@top_srcdir@/include -I$(INCLUDEDIR)
+# Add include to CFLAGS and CXXFLAGS.
+CFLAGS += -I@top_srcdir@/include
+CXXFLAGS += -I@top_srcdir@/include
# Add boost to CFLAGS and CXXFLAGS.
-CFLAGS += -I@top_srcdir@/$(BOOST)
-CXXFLAGS += -I@top_srcdir@/$(BOOST)
-
-# Add protobuf to include and lib paths.
-CXXFLAGS += -I@top_srcdir@/$(PROTOBUF)/src
-LDFLAGS += -L@top_builddir@/$(PROTOBUF)/src/.libs
+CFLAGS += -I@top_srcdir@/third_party/boost-1.37.0
+CXXFLAGS += -I@top_srcdir@/third_party/boost-1.37.0
# Add libprocess to CFLAGS, CXXFLAGS, and LDFLAGS.
CFLAGS += -I@top_srcdir@/$(LIBPROCESS)
@@ -69,12 +64,20 @@ endif
CFLAGS += -MMD -MP
CXXFLAGS += -MMD -MP
-# Add protobuf, glog, gmock, gtest, libev, libprocess, pthread, and dl to LIBS.
-LIBS += -lprotobuf -lglog -lgmock -lgtest -lprocess -lev -lpthread -ldl
+# Add build date to CFLAGS, CXXFLAGS
+CFLAGS += -DBUILD_DATE="\"$$(date '+%Y-%m-%d %H:%M:%S')\""
+CXXFLAGS += -DBUILD_DATE="\"$$(date '+%Y-%m-%d %H:%M:%S')\""
+
+# Add build user to CFLAGS, CXXFLAGS
+CFLAGS += -DBUILD_USER="\"$$USER\""
+CXXFLAGS += -DBUILD_USER="\"$$USER\""
+
+# Add glog, gmock, gtest, libev, libprocess, pthread, and dl to LIBS.
+LIBS += -lglog -lgmock -lgtest -lprocess -lev -lpthread -ldl
# Add ZooKeeper if necessary.
ifeq ($(WITH_ZOOKEEPER),1)
- LIBS += -lzookeeper_mt
+ LIBS += -lzookeeper_st
endif
SCHED_LIB = $(LIBDIR)/libmesos_sched.a
@@ -82,7 +85,7 @@ EXEC_LIB = $(LIBDIR)/libmesos_exec.a
TESTS_OBJ = main.o utils.o master_test.o offer_reply_errors_test.o \
resources_test.o external_test.o sample_frameworks_test.o \
- configurator_test.o string_utils_test.o multimap_test.o \
+ configurator_test.o string_utils_test.o \
lxc_isolation_test.o
ALLTESTS_EXE = $(BINDIR)/tests/all-tests
@@ -92,6 +95,7 @@ EXTERNAL_SCRIPTS = \
$(BINDIR)/tests/external/LxcIsolation/ScaleUpAndDown.sh \
$(BINDIR)/tests/external/LxcIsolation/TwoSeparateTasks.sh \
$(BINDIR)/tests/external/LxcIsolation/run_scheduled_memhog_test.sh \
+ $(BINDIR)/tests/external/SampleFrameworks/CFramework.sh \
$(BINDIR)/tests/external/SampleFrameworks/CFrameworkCmdlineParsing.sh \
$(BINDIR)/tests/external/SampleFrameworks/CFrameworkInvalidCmdline.sh \
$(BINDIR)/tests/external/SampleFrameworks/CFrameworkInvalidEnv.sh \
Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132329&r1=1132328&r2=1132329&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun 5 09:25:41 2011
@@ -5,8 +5,6 @@
#include <boost/lexical_cast.hpp>
-#include <detector/detector.hpp>
-
#include <local/local.hpp>
#include <master/master.hpp>
@@ -24,12 +22,10 @@ using namespace mesos::internal::test;
using boost::lexical_cast;
using mesos::internal::master::Master;
-
using mesos::internal::slave::Slave;
+using mesos::internal::slave::Framework;
+using mesos::internal::slave::IsolationModule;
using mesos::internal::slave::ProcessBasedIsolationModule;
-using mesos::internal::slave::STATUS_UPDATE_RETRY_TIMEOUT;
-
-using process::PID;
using std::string;
using std::map;
@@ -49,59 +45,42 @@ using testing::Sequence;
using testing::StrEq;
-class TestingIsolationModule : public slave::IsolationModule
+class LocalIsolationModule : public IsolationModule
{
public:
- TestingIsolationModule(const map<ExecutorID, Executor*>& _executors)
- : executors(_executors) {}
+ Executor *executor;
+ MesosExecutorDriver *driver;
+ string pid;
- virtual ~TestingIsolationModule() {}
+ LocalIsolationModule(Executor *_executor)
+ : executor(_executor), driver(NULL) {}
- virtual void initialize(Slave* _slave)
- {
- slave = _slave;
- }
+ virtual ~LocalIsolationModule() {}
- virtual void launchExecutor(slave::Framework* f, slave::Executor* e)
- {
- if (executors.count(e->info.executor_id()) > 0) {
- Executor* executor = executors[e->info.executor_id()];
- MesosExecutorDriver* driver = new MesosExecutorDriver(executor);
- drivers[e->info.executor_id()] = driver;
-
- setenv("MESOS_LOCAL", "1", 1);
- setenv("MESOS_SLAVE_PID", string(slave->self()).c_str(), 1);
- setenv("MESOS_FRAMEWORK_ID", f->frameworkId.value().c_str(), 1);
- setenv("MESOS_EXECUTOR_ID", e->info.executor_id().value().c_str(), 1);
-
- driver->start();
-
- unsetenv("MESOS_LOCAL");
- unsetenv("MESOS_SLAVE_PID");
- unsetenv("MESOS_FRAMEWORK_ID");
- unsetenv("MESOS_EXECUTOR_ID");
- } else {
- FAIL() << "Cannot launch executor";
- }
+ virtual void initialize(Slave *slave) {
+ pid = slave->self();
}
- virtual void killExecutor(slave::Framework* f, slave::Executor* e)
- {
- if (drivers.count(e->info.executor_id()) > 0) {
- MesosExecutorDriver* driver = drivers[e->info.executor_id()];
- driver->stop();
- driver->join();
- delete driver;
- drivers.erase(e->info.executor_id());
- } else {
- FAIL() << "Cannot kill executor";
- }
+ virtual void startExecutor(Framework *framework) {
+ // TODO(benh): Cleanup the way we launch local drivers!
+ setenv("MESOS_LOCAL", "1", 1);
+ setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
+ setenv("MESOS_FRAMEWORK_ID", framework->id.c_str(), 1);
+
+ driver = new MesosExecutorDriver(executor);
+ driver->start();
}
-private:
- map<ExecutorID, Executor*> executors;
- map<ExecutorID, MesosExecutorDriver*> drivers;
- Slave* slave;
+ virtual void killExecutor(Framework* framework) {
+ driver->stop();
+ driver->join();
+ delete driver;
+
+ // TODO(benh): Cleanup the way we launch local drivers!
+ unsetenv("MESOS_LOCAL");
+ unsetenv("MESOS_SLAVE_PID");
+ unsetenv("MESOS_FRAMEWORK_ID");
+ }
};
@@ -109,7 +88,7 @@ TEST(MasterTest, ResourceOfferWithMultip
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID<Master> master = local::launch(10, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, master);
@@ -122,14 +101,13 @@ TEST(MasterTest, ResourceOfferWithMultip
.WillOnce(Return(""));
EXPECT_CALL(sched, getExecutorInfo(&driver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched, registered(&driver, _))
.Times(1);
EXPECT_CALL(sched, resourceOffer(&driver, _, _))
- .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
+ .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)));
EXPECT_CALL(sched, offerRescinded(&driver, _))
.Times(AtMost(1));
@@ -141,9 +119,8 @@ TEST(MasterTest, ResourceOfferWithMultip
EXPECT_NE(0, offers.size());
EXPECT_GE(10, offers.size());
- Resources resources(offers[0].resources());
- EXPECT_EQ(2, resources.getScalar("cpus", Resource::Scalar()).value());
- EXPECT_EQ(1024, resources.getScalar("mem", Resource::Scalar()).value());
+ EXPECT_EQ("2", offers[0].params["cpus"]);
+ EXPECT_EQ("1024", offers[0].params["mem"]);
driver.stop();
driver.join();
@@ -156,7 +133,7 @@ TEST(MasterTest, ResourcesReofferedAfter
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
MockScheduler sched1;
MesosSchedulerDriver driver1(&sched1, master);
@@ -169,20 +146,19 @@ TEST(MasterTest, ResourcesReofferedAfter
.WillOnce(Return(""));
EXPECT_CALL(sched1, getExecutorInfo(&driver1))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched1, registered(&driver1, _))
.Times(1);
EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
- .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)))
- .WillRepeatedly(Return());
+ .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)));
driver1.start();
WAIT_UNTIL(sched1ResourceOfferCall);
- driver1.replyToOffer(offerId, vector<TaskDescription>());
+ driver1.replyToOffer(offerId, vector<TaskDescription>(), map<string, string>());
driver1.stop();
driver1.join();
@@ -196,14 +172,13 @@ TEST(MasterTest, ResourcesReofferedAfter
.WillOnce(Return(""));
EXPECT_CALL(sched2, getExecutorInfo(&driver2))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched2, registered(&driver2, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
- .WillOnce(Trigger(&sched2ResourceOfferCall))
- .WillRepeatedly(Return());
+ .WillOnce(Trigger(&sched2ResourceOfferCall));
EXPECT_CALL(sched2, offerRescinded(&driver2, _))
.Times(AtMost(1));
@@ -223,7 +198,7 @@ TEST(MasterTest, ResourcesReofferedAfter
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
MockScheduler sched1;
MesosSchedulerDriver driver1(&sched1, master);
@@ -237,15 +212,14 @@ TEST(MasterTest, ResourcesReofferedAfter
.WillOnce(Return(""));
EXPECT_CALL(sched1, getExecutorInfo(&driver1))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched1, registered(&driver1, _))
.Times(1);
EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
.WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&sched1ResourceOfferCall)))
- .WillRepeatedly(Return());
+ Trigger(&sched1ResourceOfferCall)));
driver1.start();
@@ -253,34 +227,22 @@ TEST(MasterTest, ResourcesReofferedAfter
EXPECT_NE(0, offers.size());
- TaskDescription task;
- task.set_name("");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-
- Resource* cpus = task.add_resources();
- cpus->set_name("cpus");
- cpus->set_type(Resource::SCALAR);
- cpus->mutable_scalar()->set_value(0);
-
- Resource* mem = task.add_resources();
- mem->set_name("mem");
- mem->set_type(Resource::SCALAR);
- mem->mutable_scalar()->set_value(1 * Gigabyte);
+ map<string, string> params;
+ params["cpus"] = "0";
+ params["mem"] = lexical_cast<string>(1 * Gigabyte);
vector<TaskDescription> tasks;
- tasks.push_back(task);
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", params, bytes()));
trigger sched1ErrorCall;
- EXPECT_CALL(sched1,
- error(&driver1, _, "Invalid resources for task"))
+ EXPECT_CALL(sched1, error(&driver1, _, "Invalid task size: <0 CPUs, 1024 MEM>"))
.WillOnce(Trigger(&sched1ErrorCall));
EXPECT_CALL(sched1, offerRescinded(&driver1, offerId))
.Times(AtMost(1));
- driver1.replyToOffer(offerId, tasks);
+ driver1.replyToOffer(offerId, tasks, map<string, string>());
WAIT_UNTIL(sched1ErrorCall);
@@ -296,14 +258,13 @@ TEST(MasterTest, ResourcesReofferedAfter
.WillOnce(Return(""));
EXPECT_CALL(sched2, getExecutorInfo(&driver2))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched2, registered(&driver2, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
- .WillOnce(Trigger(&sched2ResourceOfferCall))
- .WillRepeatedly(Return());
+ .WillOnce(Trigger(&sched2ResourceOfferCall));
EXPECT_CALL(sched2, offerRescinded(&driver2, _))
.Times(AtMost(1));
@@ -324,14 +285,11 @@ TEST(MasterTest, SlaveLost)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
+ PID master = Process::spawn(&m);
ProcessBasedIsolationModule isolationModule;
-
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -347,15 +305,14 @@ TEST(MasterTest, SlaveLost)
.WillOnce(Return(""));
EXPECT_CALL(sched, getExecutorInfo(&driver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched, registered(&driver, _))
.Times(1);
EXPECT_CALL(sched, resourceOffer(&driver, _, _))
.WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
+ Trigger(&resourceOfferCall)));
driver.start();
@@ -368,10 +325,10 @@ TEST(MasterTest, SlaveLost)
EXPECT_CALL(sched, offerRescinded(&driver, offerId))
.WillOnce(Trigger(&offerRescindedCall));
- EXPECT_CALL(sched, slaveLost(&driver, offers[0].slave_id()))
+ EXPECT_CALL(sched, slaveLost(&driver, offers[0].slaveId))
.WillOnce(Trigger(&slaveLostCall));
- process::post(slave, process::TERMINATE);
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
WAIT_UNTIL(offerRescindedCall);
WAIT_UNTIL(slaveLostCall);
@@ -379,10 +336,10 @@ TEST(MasterTest, SlaveLost)
driver.stop();
driver.join();
- process::wait(slave);
+ Process::wait(slave);
- process::post(master, process::TERMINATE);
- process::wait(master);
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
}
@@ -390,7 +347,7 @@ TEST(MasterTest, SchedulerFailover)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
// Launch the first (i.e., failing) scheduler and wait until
// registered gets called to launch the second (i.e., failover)
@@ -407,13 +364,13 @@ TEST(MasterTest, SchedulerFailover)
.WillOnce(Return(""));
EXPECT_CALL(sched1, getExecutorInfo(&driver1))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched1, registered(&driver1, _))
.WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&sched1RegisteredCall)));
EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
- .WillRepeatedly(Return());
+ .Times(AtMost(1));
EXPECT_CALL(sched1, offerRescinded(&driver1, _))
.Times(AtMost(1));
@@ -438,13 +395,13 @@ TEST(MasterTest, SchedulerFailover)
.WillOnce(Return(""));
EXPECT_CALL(sched2, getExecutorInfo(&driver2))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched2, registered(&driver2, frameworkId))
.WillOnce(Trigger(&sched2RegisteredCall));
EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
- .WillRepeatedly(Return());
+ .Times(AtMost(1));
EXPECT_CALL(sched2, offerRescinded(&driver2, _))
.Times(AtMost(1));
@@ -467,15 +424,15 @@ TEST(MasterTest, SlavePartitioned)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- process::Clock::pause();
+ Clock::pause();
MockFilter filter;
- process::filter(&filter);
+ Process::filter(&filter);
EXPECT_MSG(filter, _, _, _)
.WillRepeatedly(Return(false));
- PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, master);
@@ -486,13 +443,13 @@ TEST(MasterTest, SlavePartitioned)
.WillOnce(Return(""));
EXPECT_CALL(sched, getExecutorInfo(&driver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched, registered(&driver, _))
.Times(1);
EXPECT_CALL(sched, resourceOffer(&driver, _, _))
- .WillRepeatedly(Return());
+ .Times(AtMost(1));
EXPECT_CALL(sched, offerRescinded(&driver, _))
.Times(AtMost(1));
@@ -500,14 +457,12 @@ TEST(MasterTest, SlavePartitioned)
EXPECT_CALL(sched, slaveLost(&driver, _))
.WillOnce(Trigger(&slaveLostCall));
- EXPECT_MSG(filter, Eq(PONG), _, _)
+ EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
.WillRepeatedly(Return(true));
driver.start();
- double secs = master::SLAVE_PONG_TIMEOUT * master::MAX_SLAVE_TIMEOUTS;
-
- process::Clock::advance(secs);
+ Clock::advance(master::HEARTBEAT_TIMEOUT);
WAIT_UNTIL(slaveLostCall);
@@ -516,9 +471,9 @@ TEST(MasterTest, SlavePartitioned)
local::shutdown();
- process::filter(NULL);
+ Process::filter(NULL);
- process::Clock::resume();
+ Clock::resume();
}
@@ -527,9 +482,7 @@ TEST(MasterTest, TaskRunning)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
+ PID master = Process::spawn(&m);
MockExecutor exec;
@@ -542,13 +495,10 @@ TEST(MasterTest, TaskRunning)
EXPECT_CALL(exec, shutdown(_))
.Times(1);
- map<ExecutorID, Executor*> execs;
- execs[DEFAULT_EXECUTOR_ID] = &exec;
-
- TestingIsolationModule isolationModule(execs);
+ LocalIsolationModule isolationModule(&exec);
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -565,15 +515,14 @@ TEST(MasterTest, TaskRunning)
.WillOnce(Return(""));
EXPECT_CALL(sched, getExecutorInfo(&driver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched, registered(&driver, _))
.Times(1);
EXPECT_CALL(sched, resourceOffer(&driver, _, _))
.WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
+ Trigger(&resourceOfferCall)));
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
@@ -584,129 +533,23 @@ TEST(MasterTest, TaskRunning)
EXPECT_NE(0, offers.size());
- TaskDescription task;
- task.set_name("");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task.mutable_resources()->MergeFrom(offers[0].resources());
-
vector<TaskDescription> tasks;
- tasks.push_back(task);
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- driver.replyToOffer(offerId, tasks);
+ driver.replyToOffer(offerId, tasks, map<string, string>());
WAIT_UNTIL(statusUpdateCall);
- EXPECT_EQ(TASK_RUNNING, status.state());
+ EXPECT_EQ(TASK_RUNNING, status.state);
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
- process::wait(slave);
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+ Process::wait(slave);
- process::post(master, process::TERMINATE);
- process::wait(master);
-}
-
-
-TEST(MasterTest, KillTask)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
-
- MockExecutor exec;
-
- trigger killTaskCall;
-
- EXPECT_CALL(exec, init(_, _))
- .Times(1);
-
- EXPECT_CALL(exec, launchTask(_, _))
- .Times(1);
-
- EXPECT_CALL(exec, killTask(_, _))
- .WillOnce(Trigger(&killTaskCall));
-
- EXPECT_CALL(exec, shutdown(_))
- .Times(1);
-
- map<ExecutorID, Executor*> execs;
- execs[DEFAULT_EXECUTOR_ID] = &exec;
-
- TestingIsolationModule isolationModule(execs);
-
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
-
- BasicMasterDetector detector(master, slave, true);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(&sched, master);
-
- OfferID offerId;
- vector<SlaveOffer> offers;
- TaskStatus status;
-
- trigger resourceOfferCall, statusUpdateCall;
-
- EXPECT_CALL(sched, getFrameworkName(&driver))
- .WillOnce(Return(""));
-
- EXPECT_CALL(sched, getExecutorInfo(&driver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
-
- EXPECT_CALL(sched, registered(&driver, _))
- .Times(1);
-
- EXPECT_CALL(sched, resourceOffer(&driver, _, _))
- .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
-
- driver.start();
-
- WAIT_UNTIL(resourceOfferCall);
-
- EXPECT_NE(0, offers.size());
-
- TaskID taskId;
- taskId.set_value("1");
-
- TaskDescription task;
- task.set_name("");
- task.mutable_task_id()->MergeFrom(taskId);
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task.mutable_resources()->MergeFrom(offers[0].resources());
-
- vector<TaskDescription> tasks;
- tasks.push_back(task);
-
- driver.replyToOffer(offerId, tasks);
-
- WAIT_UNTIL(statusUpdateCall);
-
- EXPECT_EQ(TASK_RUNNING, status.state());
-
- driver.killTask(taskId);
-
- WAIT_UNTIL(killTaskCall);
-
- driver.stop();
- driver.join();
-
- process::post(slave, process::TERMINATE);
- process::wait(slave);
-
- process::post(master, process::TERMINATE);
- process::wait(master);
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
}
@@ -714,19 +557,14 @@ TEST(MasterTest, SchedulerFailoverStatus
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- process::Clock::pause();
+ Clock::pause();
MockFilter filter;
- process::filter(&filter);
+ Process::filter(&filter);
EXPECT_MSG(filter, _, _, _)
.WillRepeatedly(Return(false));
- Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
-
MockExecutor exec;
EXPECT_CALL(exec, init(_, _))
@@ -738,13 +576,13 @@ TEST(MasterTest, SchedulerFailoverStatus
EXPECT_CALL(exec, shutdown(_))
.Times(1);
- map<ExecutorID, Executor*> execs;
- execs[DEFAULT_EXECUTOR_ID] = &exec;
+ LocalIsolationModule isolationModule(&exec);
- TestingIsolationModule isolationModule(execs);
+ Master m;
+ PID master = Process::spawn(&m);
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -764,15 +602,14 @@ TEST(MasterTest, SchedulerFailoverStatus
.WillOnce(Return(""));
EXPECT_CALL(sched1, getExecutorInfo(&driver1))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched1, registered(&driver1, _))
.WillOnce(SaveArg<1>(&frameworkId));
EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
.WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
+ Trigger(&resourceOfferCall)));
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
.Times(0);
@@ -790,16 +627,10 @@ TEST(MasterTest, SchedulerFailoverStatus
EXPECT_NE(0, offers.size());
- TaskDescription task;
- task.set_name("");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task.mutable_resources()->MergeFrom(offers[0].resources());
-
vector<TaskDescription> tasks;
- tasks.push_back(task);
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- driver1.replyToOffer(offerId, tasks);
+ driver1.replyToOffer(offerId, tasks, map<string, string>());
WAIT_UNTIL(statusUpdateMsg);
@@ -817,7 +648,7 @@ TEST(MasterTest, SchedulerFailoverStatus
.WillOnce(Return(""));
EXPECT_CALL(sched2, getExecutorInfo(&driver2))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched2, registered(&driver2, frameworkId))
.WillOnce(Trigger(®isteredCall));
@@ -829,7 +660,7 @@ TEST(MasterTest, SchedulerFailoverStatus
WAIT_UNTIL(registeredCall);
- process::Clock::advance(STATUS_UPDATE_RETRY_TIMEOUT);
+ Clock::advance(RELIABLE_TIMEOUT);
WAIT_UNTIL(statusUpdateCall);
@@ -839,15 +670,15 @@ TEST(MasterTest, SchedulerFailoverStatus
driver1.join();
driver2.join();
- process::post(slave, process::TERMINATE);
- process::wait(slave);
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+ Process::wait(slave);
- process::post(master, process::TERMINATE);
- process::wait(master);
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
- process::filter(NULL);
+ Process::filter(NULL);
- process::Clock::resume();
+ Clock::resume();
}
@@ -855,16 +686,11 @@ TEST(MasterTest, FrameworkMessage)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
-
MockExecutor exec;
- ExecutorDriver* execDriver;
+ ExecutorDriver *execDriver;
ExecutorArgs args;
- string execData;
+ FrameworkMessage execMessage;
trigger execFrameworkMessageCall;
@@ -875,19 +701,19 @@ TEST(MasterTest, FrameworkMessage)
.Times(1);
EXPECT_CALL(exec, frameworkMessage(_, _))
- .WillOnce(DoAll(SaveArg<1>(&execData),
+ .WillOnce(DoAll(SaveArg<1>(&execMessage),
Trigger(&execFrameworkMessageCall)));
EXPECT_CALL(exec, shutdown(_))
.Times(1);
- map<ExecutorID, Executor*> execs;
- execs[DEFAULT_EXECUTOR_ID] = &exec;
+ LocalIsolationModule isolationModule(&exec);
- TestingIsolationModule isolationModule(execs);
+ Master m;
+ PID master = Process::spawn(&m);
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -900,7 +726,7 @@ TEST(MasterTest, FrameworkMessage)
OfferID offerId;
vector<SlaveOffer> offers;
TaskStatus status;
- string schedData;
+ FrameworkMessage schedMessage;
trigger resourceOfferCall, statusUpdateCall, schedFrameworkMessageCall;
@@ -908,21 +734,20 @@ TEST(MasterTest, FrameworkMessage)
.WillOnce(Return(""));
EXPECT_CALL(sched, getExecutorInfo(&schedDriver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched, registered(&schedDriver, _))
.Times(1);
EXPECT_CALL(sched, resourceOffer(&schedDriver, _, _))
.WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
+ Trigger(&resourceOfferCall)));
EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
.WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
- EXPECT_CALL(sched, frameworkMessage(&schedDriver, _, _, _))
- .WillOnce(DoAll(SaveArg<3>(&schedData),
+ EXPECT_CALL(sched, frameworkMessage(&schedDriver, _))
+ .WillOnce(DoAll(SaveArg<1>(&schedMessage),
Trigger(&schedFrameworkMessageCall)));
schedDriver.start();
@@ -931,47 +756,37 @@ TEST(MasterTest, FrameworkMessage)
EXPECT_NE(0, offers.size());
- TaskDescription task;
- task.set_name("");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task.mutable_resources()->MergeFrom(offers[0].resources());
-
vector<TaskDescription> tasks;
- tasks.push_back(task);
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- schedDriver.replyToOffer(offerId, tasks);
+ schedDriver.replyToOffer(offerId, tasks, map<string, string>());
WAIT_UNTIL(statusUpdateCall);
- EXPECT_EQ(TASK_RUNNING, status.state());
+ EXPECT_EQ(TASK_RUNNING, status.state);
- string hello = "hello";
-
- schedDriver.sendFrameworkMessage(offers[0].slave_id(),
- DEFAULT_EXECUTOR_ID,
- hello);
+ FrameworkMessage hello(offers[0].slaveId, 1, "hello");
+ schedDriver.sendFrameworkMessage(hello);
WAIT_UNTIL(execFrameworkMessageCall);
- EXPECT_EQ(hello, execData);
-
- string reply = "reply";
+ EXPECT_EQ("hello", execMessage.data);
+ FrameworkMessage reply(args.slaveId, 1, "reply");
execDriver->sendFrameworkMessage(reply);
WAIT_UNTIL(schedFrameworkMessageCall);
- EXPECT_EQ(reply, schedData);
+ EXPECT_EQ("reply", schedMessage.data);
schedDriver.stop();
schedDriver.join();
- process::post(slave, process::TERMINATE);
- process::wait(slave);
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+ Process::wait(slave);
- process::post(master, process::TERMINATE);
- process::wait(master);
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
}
@@ -979,14 +794,9 @@ TEST(MasterTest, SchedulerFailoverFramew
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
-
MockExecutor exec;
- ExecutorDriver* execDriver;
+ ExecutorDriver *execDriver;
EXPECT_CALL(exec, init(_, _))
.WillOnce(SaveArg<0>(&execDriver));
@@ -997,13 +807,13 @@ TEST(MasterTest, SchedulerFailoverFramew
EXPECT_CALL(exec, shutdown(_))
.Times(1);
- map<ExecutorID, Executor*> execs;
- execs[DEFAULT_EXECUTOR_ID] = &exec;
+ LocalIsolationModule isolationModule(&exec);
- TestingIsolationModule isolationModule(execs);
+ Master m;
+ PID master = Process::spawn(&m);
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
+ Slave s(Resources(2, 1 * Gigabyte), true, &isolationModule);
+ PID slave = Process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -1021,7 +831,7 @@ TEST(MasterTest, SchedulerFailoverFramew
.WillOnce(Return(""));
EXPECT_CALL(sched1, getExecutorInfo(&driver1))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched1, registered(&driver1, _))
.WillOnce(SaveArg<1>(&frameworkId));
@@ -1031,8 +841,7 @@ TEST(MasterTest, SchedulerFailoverFramew
EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
.WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&sched1ResourceOfferCall)))
- .WillRepeatedly(Return());
+ Trigger(&sched1ResourceOfferCall)));
EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
.Times(1);
@@ -1043,20 +852,14 @@ TEST(MasterTest, SchedulerFailoverFramew
EXPECT_NE(0, offers.size());
- TaskDescription task;
- task.set_name("");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task.mutable_resources()->MergeFrom(offers[0].resources());
-
vector<TaskDescription> tasks;
- tasks.push_back(task);
+ tasks.push_back(TaskDescription(1, offers[0].slaveId, "", offers[0].params, ""));
- driver1.replyToOffer(offerId, tasks);
+ driver1.replyToOffer(offerId, tasks, map<string, string>());
WAIT_UNTIL(sched1StatusUpdateCall);
- EXPECT_EQ(TASK_RUNNING, status.state());
+ EXPECT_EQ(TASK_RUNNING, status.state);
MockScheduler sched2;
MesosSchedulerDriver driver2(&sched2, master, frameworkId);
@@ -1067,19 +870,19 @@ TEST(MasterTest, SchedulerFailoverFramew
.WillOnce(Return(""));
EXPECT_CALL(sched2, getExecutorInfo(&driver2))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+ .WillOnce(Return(ExecutorInfo("noexecutor", "")));
EXPECT_CALL(sched2, registered(&driver2, frameworkId))
.WillOnce(Trigger(&sched2RegisteredCall));
- EXPECT_CALL(sched2, frameworkMessage(&driver2, _, _, _))
+ EXPECT_CALL(sched2, frameworkMessage(&driver2, _))
.WillOnce(Trigger(&sched2FrameworkMessageCall));
driver2.start();
WAIT_UNTIL(sched2RegisteredCall);
- execDriver->sendFrameworkMessage("");
+ execDriver->sendFrameworkMessage(FrameworkMessage());
WAIT_UNTIL(sched2FrameworkMessageCall);
@@ -1089,145 +892,9 @@ TEST(MasterTest, SchedulerFailoverFramew
driver1.join();
driver2.join();
- process::post(slave, process::TERMINATE);
- process::wait(slave);
-
- process::post(master, process::TERMINATE);
- process::wait(master);
-}
-
-
-TEST(MasterTest, MultipleExecutors)
-{
- ASSERT_TRUE(GTEST_IS_THREADSAFE);
-
- Master m;
- PID<Master> master = process::spawn(&m);
-
- Resources resources = Resources::parse("cpus:2;mem:1024");
-
- MockExecutor exec1;
- TaskDescription exec1Task;
- trigger exec1LaunchTaskCall;
-
- EXPECT_CALL(exec1, init(_, _))
- .Times(1);
-
- EXPECT_CALL(exec1, launchTask(_, _))
- .WillOnce(DoAll(SaveArg<1>(&exec1Task),
- Trigger(&exec1LaunchTaskCall)));
-
- EXPECT_CALL(exec1, shutdown(_))
- .Times(1);
-
- MockExecutor exec2;
- TaskDescription exec2Task;
- trigger exec2LaunchTaskCall;
-
- EXPECT_CALL(exec2, init(_, _))
- .Times(1);
-
- EXPECT_CALL(exec2, launchTask(_, _))
- .WillOnce(DoAll(SaveArg<1>(&exec2Task),
- Trigger(&exec2LaunchTaskCall)));
-
- EXPECT_CALL(exec2, shutdown(_))
- .Times(1);
-
- ExecutorID executorId1;
- executorId1.set_value("executor-1");
-
- ExecutorID executorId2;
- executorId2.set_value("executor-2");
-
- map<ExecutorID, Executor*> execs;
- execs[executorId1] = &exec1;
- execs[executorId2] = &exec2;
-
- TestingIsolationModule isolationModule(execs);
-
- Slave s(resources, true, &isolationModule);
- PID<Slave> slave = process::spawn(&s);
-
- BasicMasterDetector detector(master, slave, true);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(&sched, master);
-
- OfferID offerId;
- vector<SlaveOffer> offers;
- TaskStatus status1, status2;
-
- trigger resourceOfferCall, statusUpdateCall1, statusUpdateCall2;
-
- EXPECT_CALL(sched, getFrameworkName(&driver))
- .WillOnce(Return(""));
-
- EXPECT_CALL(sched, getExecutorInfo(&driver))
- .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
-
- EXPECT_CALL(sched, registered(&driver, _))
- .Times(1);
-
- EXPECT_CALL(sched, resourceOffer(&driver, _, _))
- .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
- Trigger(&resourceOfferCall)))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(DoAll(SaveArg<1>(&status1), Trigger(&statusUpdateCall1)))
- .WillOnce(DoAll(SaveArg<1>(&status2), Trigger(&statusUpdateCall2)));
-
- driver.start();
-
- WAIT_UNTIL(resourceOfferCall);
-
- ASSERT_NE(0, offers.size());
-
- TaskDescription task1;
- task1.set_name("");
- task1.mutable_task_id()->set_value("1");
- task1.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
- task1.mutable_executor()->mutable_executor_id()->MergeFrom(executorId1);
- task1.mutable_executor()->set_uri("noexecutor");
-
- TaskDescription task2;
- task2.set_name("");
- task2.mutable_task_id()->set_value("2");
- task2.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
- task2.mutable_executor()->mutable_executor_id()->MergeFrom(executorId2);
- task2.mutable_executor()->set_uri("noexecutor");
-
- vector<TaskDescription> tasks;
- tasks.push_back(task1);
- tasks.push_back(task2);
-
- driver.replyToOffer(offerId, tasks);
-
- WAIT_UNTIL(statusUpdateCall1);
-
- EXPECT_EQ(TASK_RUNNING, status1.state());
-
- WAIT_UNTIL(statusUpdateCall2);
-
- EXPECT_EQ(TASK_RUNNING, status2.state());
-
- WAIT_UNTIL(exec1LaunchTaskCall);
-
- EXPECT_EQ(task1.task_id(), exec1Task.task_id());
-
- WAIT_UNTIL(exec2LaunchTaskCall);
-
- EXPECT_EQ(task2.task_id(), exec2Task.task_id());
-
- driver.stop();
- driver.join();
-
- process::post(slave, process::TERMINATE);
- process::wait(slave);
+ MesosProcess::post(slave, pack<S2S_SHUTDOWN>());
+ Process::wait(slave);
- process::post(master, process::TERMINATE);
- process::wait(master);
+ MesosProcess::post(master, pack<M2M_SHUTDOWN>());
+ Process::wait(master);
}