You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:09:06 UTC
svn commit: r1468775 - in /incubator/mesos/trunk/src/slave:
cgroups_isolator.hpp process_isolator.hpp slave.cpp slave.hpp
Author: vinodkone
Date: Wed Apr 17 07:09:05 2013
New Revision: 1468775
URL: http://svn.apache.org/r1468775
Log:
Added states to Slave struct.
Review: https://reviews.apache.org/r/10268
Modified:
incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
incubator/mesos/trunk/src/slave/process_isolator.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.hpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.hpp Wed Apr 17 07:09:05 2013
@@ -48,12 +48,6 @@
namespace mesos {
namespace internal {
namespace slave {
-namespace state {
-
-class State; // Forward declaration.
-
-} // namespace state {
-
// TODO(bmahler): Migrate this into it's own file, along with moving
// all cgroups code inside of a 'cgroups' directory.
Modified: incubator/mesos/trunk/src/slave/process_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.hpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.hpp Wed Apr 17 07:09:05 2013
@@ -40,12 +40,6 @@
namespace mesos {
namespace internal {
namespace slave {
-namespace state {
-
-class State; // Forward declaration.
-
-} // namespace state {
-
class ProcessIsolator : public Isolator, public ProcessExitedListener
{
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:09:05 2013
@@ -79,6 +79,7 @@ Slave::Slave(const Resources& _resources
Isolator* _isolator,
Files* _files)
: ProcessBase(ID::generate("slave")),
+ state(RECOVERING),
flags(),
local(_local),
resources(_resources),
@@ -95,6 +96,7 @@ Slave::Slave(const slave::Flags& _flags,
Isolator* _isolator,
Files* _files)
: ProcessBase(ID::generate("slave")),
+ state(RECOVERING),
flags(_flags),
local(_local),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
@@ -272,10 +274,6 @@ void Slave::initialize()
startTime = Clock::now();
- connected = false;
-
- halting = false;
-
// Install protobuf handlers.
install<NewMasterDetectedMessage>(
&Slave::newMasterDetected,
@@ -427,6 +425,12 @@ void Slave::_initialize(const Future<Not
// in 'cleanup' mode.
if (frameworks.empty() && flags.recover == "cleanup") {
terminate(self());
+ } else {
+ // Register with the master.
+ state = DISCONNECTED;
+ if (master) {
+ doReliableRegistration();
+ }
}
}
@@ -452,7 +456,8 @@ void Slave::finalize()
}
}
- if (flags.checkpoint && (halting || flags.recover == "cleanup")) {
+ if (flags.checkpoint &&
+ (state == TERMINATING || flags.recover == "cleanup")) {
// We remove the "latest" symlink in meta directory, so that the
// slave doesn't recover the state when it restarts and registers
// as a new slave with the master.
@@ -479,7 +484,7 @@ void Slave::shutdown()
LOG(INFO) << "Slave asked to shut down by " << from;
- halting = true;
+ state = TERMINATING;
if (frameworks.empty()) { // Terminate slave if there are no frameworks.
terminate(self());
@@ -525,71 +530,132 @@ void Slave::newMasterDetected(const UPID
master = pid;
link(master);
- connected = false;
+ // Inform the status updates manager about the new master.
+ statusUpdateManager->newMasterDetected(master);
- // Do registration after recovery is complete.
- // NOTE: Slave only registers with master when it is in "reconnect" mode.
- // This ensures that master doesn't offer resources of a slave in "cleanup"
- // mode.
- if (flags.recover == "reconnect") {
- recovered.future()
- .onReady(defer(self(), &Self::doReliableRegistration, params::_1));
- } else {
- LOG(INFO)
- << "Skipping registration because slave is started in 'cleanup' mode";
+ if (flags.recover == "cleanup") {
+ LOG(INFO) << "Skipping registration because slave is in 'cleanup' mode";
+ return;
}
- // Inform the status updates manager about the new master.
- statusUpdateManager->newMasterDetected(master);
+ switch (state) {
+ case RECOVERING:
+ LOG(INFO) << "Postponing registration until recovery is complete";
+ break;
+ case DISCONNECTED:
+ case RUNNING:
+ state = DISCONNECTED;
+ doReliableRegistration();
+ break;
+ case TERMINATING:
+ LOG(INFO) << "Skipping registration because slave is terminating";
+ break;
+ default:
+ LOG(FATAL) << "Unexpected slave state " << state;
+ break;
+ }
}
void Slave::noMasterDetected()
{
LOG(INFO) << "Lost master(s) ... waiting";
- connected = false;
master = UPID();
+
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ // We only change state if the slave is in RUNNING state because
+ // if the slave is in:
+ // RECOVERY: Slave needs to finish recovery before changing states.
+ // DISCONNECTED: Redundant.
+ // TERMINATING: Slave is shutting down.
+ // TODO(vinod): Subscribe to master detector after recovery.
+ // Similarly, unsubscribe from master detector during termination.
+ // Currently it is tricky because master detector is injected into
+ // the slave from outside.
+ if (state == RUNNING) {
+ state = DISCONNECTED;
+ }
}
void Slave::registered(const SlaveID& slaveId)
{
- LOG(INFO) << "Registered with master; given slave ID " << slaveId;
- info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
- connected = true;
-
- if (flags.checkpoint) {
- // Create the slave meta directory.
- paths::createSlaveDirectory(paths::getMetaRootDir(flags.work_dir), slaveId);
-
- // Checkpoint slave info.
- const string& path = paths::getSlaveInfoPath(
- paths::getMetaRootDir(flags.work_dir), slaveId);
+ switch(state) {
+ case DISCONNECTED: {
+ LOG(INFO) << "Registered with master " << master
+ << "; given slave ID " << slaveId;
+
+ state = RUNNING;
+ info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
+
+ if (flags.checkpoint) {
+ // Create the slave meta directory.
+ paths::createSlaveDirectory(paths::getMetaRootDir(flags.work_dir), slaveId);
+
+ // Checkpoint slave info.
+ const string& path = paths::getSlaveInfoPath(
+ paths::getMetaRootDir(flags.work_dir), slaveId);
- CHECK_SOME(state::checkpoint(path, info));
+ CHECK_SOME(state::checkpoint(path, info));
+ }
+ break;
+ }
+ case RUNNING:
+ // Already registered. Ignore registration.
+ break;
+ case TERMINATING:
+ LOG(WARNING) << "Ignoring registration because slave is terminating";
+ break;
+ case RECOVERING:
+ default:
+ LOG(FATAL) << "Unexpected slave state " << state;
+ break;
}
}
void Slave::reregistered(const SlaveID& slaveId)
{
- LOG(INFO) << "Re-registered with master";
-
- if (!(info.id() == slaveId)) {
- LOG(FATAL) << "Slave re-registered but got wrong ID";
+ switch(state) {
+ case DISCONNECTED:
+ LOG(INFO) << "Re-registered with master " << master;
+
+ state = RUNNING;
+ if (!(info.id() == slaveId)) {
+ LOG(FATAL) << "Slave re-registered but got wrong id: " << slaveId
+ << "(expected: " << info.id() << ")";
+ }
+ break;
+ case RUNNING:
+ // Already registered. Ignore registration.
+ break;
+ case TERMINATING:
+ LOG(WARNING) << "Ignoring re-registration because slave is terminating";
+ break;
+ case RECOVERING:
+ default:
+ LOG(FATAL) << "Unexpected slave state " << state;
+ break;
}
- connected = true;
}
-void Slave::doReliableRegistration(const Future<Nothing>& future)
+void Slave::doReliableRegistration()
{
- CHECK(future.isReady());
+ if (!master) {
+ LOG(INFO) << "Skipping registration because no master present";
+ return;
+ }
- if (connected || !master) {
+ if (state == RUNNING) { // Slave (re-)registered with the master.
return;
}
+ CHECK(state == DISCONNECTED || state == TERMINATING) << state;
+
if (info.id() == "") {
// Slave started before master.
// (Vinod): Is the above comment true?
@@ -628,7 +694,7 @@ void Slave::doReliableRegistration(const
}
// Retry registration if necessary.
- delay(Seconds(1.0), self(), &Slave::doReliableRegistration, future);
+ delay(Seconds(1.0), self(), &Slave::doReliableRegistration);
}
@@ -643,6 +709,26 @@ void Slave::runTask(
LOG(INFO) << "Got assigned task " << task.task_id()
<< " for framework " << frameworkId;
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RUNNING) {
+ LOG(WARNING) << "Cannot run task " << task.task_id()
+ << " of framework " << frameworkId
+ << " because the slave is in " << state << " state";
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ "Slave is not in RUNNING state");
+
+ statusUpdate(update);
+ return;
+ }
+
// TODO(vinod): Do this check in the master instead.
if (frameworkInfo.checkpoint() && !flags.checkpoint) {
LOG(WARNING) << "Asked to checkpoint framework " << frameworkId
@@ -741,7 +827,7 @@ void Slave::runTask(
statusUpdate(update);
break;
}
- case Executor::REGISTERING: {
+ case Executor::REGISTERING:
// Checkpoint the task before we do anything else (this is a no-op
// if the framework doesn't have checkpointing enabled).
executor->checkpointTask(task);
@@ -755,7 +841,6 @@ void Slave::runTask(
executor->queuedTasks[task.task_id()] = task;
break;
- }
case Executor::RUNNING: {
// Checkpoint the task before we do anything else (this is a no-op
// if the framework doesn't have checkpointing enabled).
@@ -801,6 +886,26 @@ void Slave::killTask(const FrameworkID&
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RUNNING) {
+ LOG(WARNING) << "Cannot kill task " << taskId
+ << " of framework " << frameworkId
+ << " because the slave is in " << state << " state";
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ taskId,
+ TASK_LOST,
+ "Slave is not in RUNNING state");
+
+ statusUpdate(update);
+ return;
+ }
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring kill task " << taskId
@@ -866,13 +971,12 @@ void Slave::killTask(const FrameworkID&
break;
}
case Executor::TERMINATING:
- case Executor::TERMINATED: {
+ case Executor::TERMINATED:
LOG(WARNING) << "Ignoring kill task " << taskId
<< " of framework " << frameworkId
<< " because the executor '" << executor->id
<< "' is terminating/terminated";
break;
- }
case Executor::RUNNING: {
// Send a message to the executor and wait for
// it to send us a status update.
@@ -902,13 +1006,24 @@ void Slave::shutdownFramework(const Fram
// its called directly (e.g. Slave::finalize()) or
// its a message from the currently registered master.
if (from && from != master) {
- LOG(WARNING) << "Ignoring shutdown framework message from " << from
- << " because it is not from the registered master ("
- << master << ")";
+ LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
+ << " from " << from << "because it is not from the registered "
+ << "master (" << master << ")";
return;
}
- LOG(INFO) << "Asked to shut down framework " << frameworkId << " by " << from;
+ LOG(INFO) << "Asked to shut down framework " << frameworkId
+ << " by " << from;
+
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state == RECOVERING || state == DISCONNECTED) {
+ LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
+ << " because the slave has not yet registered with the master";
+ return;
+ }
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
@@ -917,12 +1032,11 @@ void Slave::shutdownFramework(const Fram
}
switch (framework->state) {
- case Framework::TERMINATING: {
+ case Framework::TERMINATING:
LOG(WARNING) << "Ignoring shutdown framework " << framework->id
<< " because it is terminating";
break;
- }
- case Framework::RUNNING: {
+ case Framework::RUNNING:
LOG(INFO) << "Shutting down framework " << framework->id;
framework->state = Framework::TERMINATING;
@@ -948,7 +1062,6 @@ void Slave::shutdownFramework(const Fram
}
}
break;
- }
default:
LOG(FATAL) << "Framework " << frameworkId
<< " is in unexpected state " << framework->state;
@@ -963,9 +1076,21 @@ void Slave::schedulerMessage(
const ExecutorID& executorId,
const string& data)
{
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RUNNING) {
+ LOG(WARNING) << "Dropping message from framework "<< frameworkId
+ << " because the slave is in " << state << " state";
+ stats.invalidFrameworkMessages++;
+ return;
+ }
+
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- LOG(WARNING) << "Dropping message for framework "<< frameworkId
+ LOG(WARNING) << "Dropping message from framework "<< frameworkId
<< " because framework does not exist";
stats.invalidFrameworkMessages++;
return;
@@ -976,7 +1101,7 @@ void Slave::schedulerMessage(
<< framework->state;
if (framework->state == Framework::TERMINATING) {
- LOG(WARNING) << "Dropping message for framework "<< frameworkId
+ LOG(WARNING) << "Dropping message from framework "<< frameworkId
<< " because framework is terminating";
stats.invalidFrameworkMessages++;
return;
@@ -994,7 +1119,7 @@ void Slave::schedulerMessage(
switch (executor->state) {
case Executor::REGISTERING:
case Executor::TERMINATING:
- case Executor::TERMINATED: {
+ case Executor::TERMINATED:
// TODO(*): If executor is not yet registered, queue framework
// message? It's probably okay to just drop it since frameworks
// can have the executor send a message to the master to say when
@@ -1004,7 +1129,6 @@ void Slave::schedulerMessage(
<< " because executor is not running";
stats.invalidFrameworkMessages++;
break;
- }
case Executor::RUNNING: {
FrameworkToExecutorMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
@@ -1026,6 +1150,17 @@ void Slave::schedulerMessage(
void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
{
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RUNNING) {
+ LOG(WARNING) << "Dropping updateFramework message for "<< frameworkId
+ << " because the slave is in " << state << " state";
+ stats.invalidFrameworkMessages++;
+ return;
+ }
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
@@ -1034,11 +1169,10 @@ void Slave::updateFramework(const Framew
}
switch (framework->state) {
- case Framework::TERMINATING: {
+ case Framework::TERMINATING:
LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
<< " because it is terminating";
break;
- }
case Framework::RUNNING: {
LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
@@ -1109,6 +1243,10 @@ void Slave::_statusUpdateAcknowledgement
<< " acknowledgement for task " << taskId
<< " of framework " << frameworkId;
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(ERROR) << "Status update acknowledgement for task " << taskId
@@ -1151,6 +1289,26 @@ void Slave::registerExecutor(
LOG(INFO) << "Got registration for executor '" << executorId
<< "' of framework " << frameworkId;
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state == RECOVERING) {
+ LOG(WARNING) << "Shutting down executor '" << executorId
+ << "' of framework " << frameworkId
+ << " because the slave is still recovering";
+ reply(ShutdownExecutorMessage());
+ return;
+ }
+
+ if (state == TERMINATING) {
+ LOG(WARNING) << "Shutting down executor '" << executorId
+ << "' of framework " << frameworkId
+ << " because the slave is terminating";
+ reply(ShutdownExecutorMessage());
+ return;
+ }
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << " Shutting down executor '" << executorId
@@ -1189,13 +1347,12 @@ void Slave::registerExecutor(
case Executor::TERMINATED:
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
- case Executor::RUNNING: {
+ case Executor::RUNNING:
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' of framework " << frameworkId
<< " because it is in unexpected state " << executor->state;
reply(ShutdownExecutorMessage());
break;
- }
case Executor::REGISTERING: {
executor->state = Executor::RUNNING;
@@ -1274,6 +1431,18 @@ void Slave::reregisterExecutor(
const vector<TaskInfo>& tasks,
const vector<StatusUpdate>& updates)
{
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RECOVERING) {
+ LOG(WARNING) << "Shutting down executor '" << executorId
+ << "' of framework " << frameworkId
+ << " because the slave is not in recovery mode";
+ reply(ShutdownExecutorMessage());
+ return;
+ }
+
LOG(INFO) << "Re-registering executor " << executorId
<< " of framework " << frameworkId;
@@ -1301,13 +1470,12 @@ void Slave::reregisterExecutor(
case Executor::TERMINATED:
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
- case Executor::RUNNING: {
+ case Executor::RUNNING:
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' of framework " << frameworkId
<< " because it is in unexpected state " << executor->state;
reply(ShutdownExecutorMessage());
break;
- }
case Executor::REGISTERING: {
executor->state = Executor::RUNNING;
@@ -1368,6 +1536,8 @@ void Slave::reregisterExecutor(
void Slave::reregisterExecutorTimeout()
{
+ CHECK(state == RECOVERING || state == TERMINATING) << state;
+
LOG(INFO) << "Cleaning up un-reregistered executors";
foreachvalue (Framework* framework, frameworks) {
@@ -1379,10 +1549,9 @@ void Slave::reregisterExecutorTimeout()
switch (executor->state) {
case Executor::RUNNING: // Executor re-registered.
case Executor::TERMINATING:
- case Executor::TERMINATED: {
+ case Executor::TERMINATED:
break;
- }
- case Executor::REGISTERING: {
+ case Executor::REGISTERING:
// If we are here, the executor must have been hung and not
// exited! This is because if the executor properly exited,
// it should have already been identified by the isolator
@@ -1395,7 +1564,6 @@ void Slave::reregisterExecutorTimeout()
dispatch(
isolator, &Isolator::killExecutor, framework->id, executor->id);
break;
- }
default:
LOG(FATAL) << "Executor '" << executor->id
<< "' of framework " << framework->id
@@ -1415,6 +1583,10 @@ void Slave::reregisterExecutorTimeout()
// 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
void Slave::statusUpdate(const StatusUpdate& update)
{
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
const TaskStatus& status = update.status();
Framework* framework = getFramework(update.framework_id());
@@ -1535,10 +1707,22 @@ void Slave::executorMessage(
const ExecutorID& executorId,
const string& data)
{
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state != RUNNING) {
+ LOG(WARNING) << "Dropping framework message from executor "
+ << executorId << " to framework " << frameworkId
+ << " because the slave is in " << state << " state";
+ stats.invalidFrameworkMessages++;
+ return;
+ }
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- LOG(WARNING) << "Cannot send framework message from slave "
- << slaveId << " to framework " << frameworkId
+ LOG(WARNING) << "Cannot send framework message from executor "
+ << executorId << " to framework " << frameworkId
<< " because framework does not exist";
stats.invalidFrameworkMessages++;
return;
@@ -1549,8 +1733,8 @@ void Slave::executorMessage(
<< framework->state;
if (framework->state == Framework::TERMINATING) {
- LOG(WARNING) << "Ignoring framework message from slave "
- << slaveId << " to framework " << frameworkId
+ LOG(WARNING) << "Ignoring framework message from executor "
+ << executorId << " to framework " << frameworkId
<< " because framework is terminating";
stats.invalidFrameworkMessages++;
return;
@@ -1640,14 +1824,13 @@ void Slave::executorStarted(
}
switch (executor->state) {
- case Executor::TERMINATING: {
+ case Executor::TERMINATING:
LOG(WARNING) << "Executor '" << executorId
<< "' of framework " << frameworkId
<< " is terminating";
break;
- }
case Executor::REGISTERING:
- case Executor::RUNNING: {
+ case Executor::RUNNING:
monitor.watch(
frameworkId,
executorId,
@@ -1655,7 +1838,6 @@ void Slave::executorStarted(
flags.resource_monitoring_interval)
.onAny(lambda::bind(_watch, lambda::_1, frameworkId, executorId));
break;
- }
case Executor::TERMINATED:
default:
LOG(FATAL) << " Executor '" << executorId
@@ -1921,7 +2103,7 @@ void Slave::remove(Framework* framework)
// TODO(vinod): Instead of doing it this way, shutdownFramework()
// and shutdownExecutor() could return Futures and a slave could
// shutdown when all the Futures are satisfied (e.g., collect()).
- if (halting ||
+ if (state == TERMINATING ||
(flags.recover == "cleanup" && !recovered.future().isPending()) ) {
terminate(self());
}
@@ -2010,19 +2192,17 @@ void Slave::shutdownExecutorTimeout(
}
switch (executor->state) {
- case Executor::TERMINATED: {
+ case Executor::TERMINATED:
LOG(INFO) << "Executor '" << executorId
<< "' of framework " << frameworkId
<< " has already terminated";
break;
- }
- case Executor::TERMINATING: {
+ case Executor::TERMINATING:
LOG(INFO) << "Killing executor '" << executor->id
<< "' of framework " << framework->id;
dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
break;
- }
default:
LOG(FATAL) << "Executor '" << executor->id
<< "' of framework " << framework->id
@@ -2074,18 +2254,16 @@ void Slave::registerExecutorTimeout(
}
switch (executor->state) {
- case Executor::RUNNING: {
+ case Executor::RUNNING:
// Executor has registered. Ignore the registration timeout.
break;
- }
case Executor::TERMINATING:
- case Executor::TERMINATED: {
+ case Executor::TERMINATED:
LOG(INFO) << "Ignoring registration timeout for executor '" << executorId
<< "' of framework " << frameworkId
<< " because the executor is terminating/terminated";
break;
- }
- case Executor::REGISTERING: {
+ case Executor::REGISTERING:
LOG(INFO) << "Terminating executor " << executor->id
<< " of framework " << framework->id
<< " because it did not register within "
@@ -2096,7 +2274,6 @@ void Slave::registerExecutorTimeout(
// Immediately kill the executor.
dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
break;
- }
default:
LOG(FATAL) << "Executor '" << executor->id
<< "' of framework " << framework->id
@@ -2321,7 +2498,7 @@ Framework::Framework(
pid(_pid),
completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
{
- if (info.checkpoint()) {
+ if (info.checkpoint() && slave->state != slave->RECOVERING) {
// Checkpoint the framework info.
string path = paths::getFrameworkInfoPath(
paths::getMetaRootDir(slave->flags.work_dir),
@@ -2557,9 +2734,8 @@ Executor::Executor(
resources(_info.resources()),
completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
{
- if (checkpoint) {
- CHECK_NOTNULL(slave);
-
+ CHECK_NOTNULL(slave);
+ if (checkpoint && slave->state != slave->RECOVERING) {
// Checkpoint the executor info.
const string& path = paths::getExecutorInfoPath(
paths::getMetaRootDir(slave->flags.work_dir),
@@ -2709,6 +2885,17 @@ std::ostream& operator << (std::ostream&
}
}
+
+std::ostream& operator << (std::ostream& stream, Slave::State state) {
+ switch (state) {
+ case Slave::RECOVERING: return stream << "RECOVERING";
+ case Slave::DISCONNECTED: return stream << "DISCONNECTED";
+ case Slave::RUNNING: return stream << "RUNNING";
+ case Slave::TERMINATING: return stream << "TERMINATING";
+ default: return stream << "UNKNOWN";
+ }
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:09:05 2013
@@ -91,7 +91,7 @@ public:
void masterDetectionFailure();
void registered(const SlaveID& slaveId);
void reregistered(const SlaveID& slaveId);
- void doReliableRegistration(const Future<Nothing>& future);
+ void doReliableRegistration();
void runTask(
const FrameworkInfo& frameworkInfo,
@@ -180,6 +180,13 @@ public:
// exited.
void shutdownExecutor(Framework* framework, Executor* executor);
+ enum State {
+ RECOVERING, // Slave is doing recovery.
+ DISCONNECTED, // Slave is not connected to the master.
+ RUNNING, // Slave has (re-)registered.
+ TERMINATING, // Slave is shutting down.
+ } state;
+
protected:
virtual void initialize();
virtual void finalize();
@@ -285,24 +292,15 @@ private:
double startTime;
- // TODO(Vinod): Add 'state' to slave instead of capturing the
- // semantics of waiting for registration ('connecting') and
- // shutting down ('halting') in boolean variables.
- bool connected; // Flag to indicate if slave is registered.
-
GarbageCollector gc;
ResourceMonitor monitor;
- state::SlaveState state;
-
StatusUpdateManager* statusUpdateManager;
// Flag to indicate if recovery, including reconciling (i.e., reconnect/kill)
// with executors is finished.
Promise<Nothing> recovered;
- bool halting; // Flag to indicate if the slave is shutting down.
-
// Root meta directory containing checkpointed data.
const std::string metaDir;
};