You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:09:02 UTC
svn commit: r1468774 - in /incubator/mesos/trunk/src:
slave/process_isolator.cpp slave/slave.cpp slave/slave.hpp slave/state.cpp
tests/slave_recovery_tests.cpp
Author: vinodkone
Date: Wed Apr 17 07:09:01 2013
New Revision: 1468774
URL: http://svn.apache.org/r1468774
Log:
Fixed the slave to wait for all executors to exit and delete the
"latest" slave symlink when shutting down.
Review: https://reviews.apache.org/r/10233
Modified:
incubator/mesos/trunk/src/slave/process_isolator.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/slave/state.cpp
incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp
Modified: incubator/mesos/trunk/src/slave/process_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.cpp Wed Apr 17 07:09:01 2013
@@ -455,8 +455,8 @@ void ProcessIsolator::processExited(pid_
ProcessInfo* info = infos[frameworkId][executorId];
if (info->pid.isSome() && info->pid.get() == pid) {
- LOG(INFO) << "Telling slave of lost executor " << executorId
- << " of framework " << frameworkId;
+ LOG(INFO) << "Telling slave of terminated executor '" << executorId
+ << "' of framework " << frameworkId;
dispatch(slave,
&Slave::executorTerminated,
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:09:01 2013
@@ -421,8 +421,13 @@ void Slave::_initialize(const Future<Not
}
}
- // Signal recovery.
- recovered.set(Nothing());
+ recovered.set(Nothing()); // Signal recovery.
+
+ // Terminate slave, if it has no active frameworks and is started
+ // in 'cleanup' mode.
+ if (frameworks.empty() && flags.recover == "cleanup") {
+ terminate(self());
+ }
}
@@ -439,16 +444,22 @@ void Slave::finalize()
// immediately). Of course, this still isn't sufficient
// because those status updates might get lost and we won't
// resend them unless we build that into the system.
- // NOTE: We shut down the framework if either
- // 1: The slave is asked to shutdown (halting = true) or
- // 2: The framework has disabled checkpointing.
- if (halting || !frameworks[frameworkId]->info.checkpoint()) {
+ // NOTE: We shut down the framework only if it has disabled
+ // checkpointing. This is because slave recovery tests terminate
+ // the slave to simulate slave restart.
+ if (!frameworks[frameworkId]->info.checkpoint()) {
shutdownFramework(frameworkId);
}
}
+ if (flags.checkpoint && (halting || flags.recover == "cleanup")) {
+ // We remove the "latest" symlink in meta directory, so that the
+ // slave doesn't recover the state when it restarts and registers
+ // as a new slave with the master.
+ CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
+ }
+
// Stop the isolator.
- // TODO(vinod): Wait until all the executors have terminated.
terminate(isolator);
wait(isolator);
}
@@ -470,7 +481,21 @@ void Slave::shutdown()
halting = true;
- terminate(self());
+ if (frameworks.empty()) { // Terminate slave if there are no frameworks.
+ terminate(self());
+ } else {
+ // NOTE: The slave will terminate after all
+ // executors have terminated.
+ // TODO(vinod): Wait until all updates have been acknowledged.
+ // This is tricky without persistent state at master because the
+ // slave might wait forever for status update acknowledgements,
+ // since it cannot reliably know when a framework has shut down.
+ // A short-term fix could be to wait for a certain time for ACKs
+ // and then shutdown.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(frameworks)) {
+ shutdownFramework(frameworkId);
+ }
+ }
}
@@ -903,15 +928,25 @@ void Slave::shutdownFramework(const Fram
framework->state = Framework::TERMINATING;
// Shut down all executors of this framework.
- // Note that the framework and its corresponding executors are
- // removed from the frameworks map by shutdownExecutorTimeout() or
- // executorTerminated().
- foreachvalue (Executor* executor, framework->executors) {
- shutdownExecutor(framework, executor);
+ foreachvalue (Executor* executor, utils::copy(framework->executors)) {
+ CHECK(executor->state == Executor::REGISTERING ||
+ executor->state == Executor::RUNNING ||
+ executor->state == Executor::TERMINATING ||
+ executor->state == Executor::TERMINATED)
+ << executor->state;
+
+ if (executor->state == Executor::REGISTERING ||
+ executor->state == Executor::RUNNING) {
+ shutdownExecutor(framework, executor);
+ } else if (executor->state == Executor::TERMINATED) {
+ // NOTE: We call remove here to ensure we can remove an
+ // executor (of a terminating framework) that is terminated
+ // but waiting for acknowledgements.
+ remove(framework, executor);
+ } else {
+ // Executor is terminating. Ignore.
+ }
}
-
- // Close all status update streams for this framework.
- statusUpdateManager->cleanup(frameworkId);
break;
}
default:
@@ -1101,8 +1136,11 @@ void Slave::_statusUpdateAcknowledgement
executor->updates.remove(taskId, uuid);
- // Cleanup the executor and framework, if possible.
- cleanup(framework, executor);
+ // Remove the executor if it has terminated and there are no more
+ // pending updates.
+ if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+ remove(framework, executor);
+ }
}
@@ -1464,7 +1502,7 @@ void Slave::_statusUpdate(
const Option<UPID>& pid)
{
if (!future.isReady()) {
- LOG(FATAL) << "Failed to handle status update " << update
+ LOG(FATAL) << "Failed to handle status update " << update << ": "
<< (future.isFailed() ? future.failure() : "future discarded");
return;
}
@@ -1541,11 +1579,11 @@ void Slave::ping(const UPID& from, const
void Slave::exited(const UPID& pid)
{
- LOG(INFO) << "Process exited: " << from;
+ LOG(INFO) << pid << " exited";
if (master == pid) {
- LOG(WARNING) << "WARNING! Master disconnected!"
- << " Waiting for a new master to be elected.";
+ LOG(WARNING) << "Master disconnected!"
+ << " Waiting for a new master to be elected";
// TODO(benh): After so long waiting for a master, commit suicide.
}
}
@@ -1659,10 +1697,11 @@ void Slave::executorTerminated(
<< "' of framework " << frameworkId
<< (WIFEXITED(status)
? " has exited with status "
- : " has terminated with signal ")
+ : " has terminated with signal '")
<< (WIFEXITED(status)
? stringify(WEXITSTATUS(status))
- : strsignal(WTERMSIG(status)));
+ : strsignal(WTERMSIG(status)))
+ << "'";
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
@@ -1694,6 +1733,8 @@ void Slave::executorTerminated(
monitor.unwatch(frameworkId, executorId)
.onAny(lambda::bind(_unwatch, lambda::_1, frameworkId, executorId));
+ // TODO(vinod): If there are no pending tasks or if the framework
+ // is terminating, this variable will not be properly set.
bool isCommandExecutor = false;
// Transition all live tasks to TASK_LOST/TASK_FAILED.
@@ -1770,8 +1811,12 @@ void Slave::executorTerminated(
send(master, message);
}
- // Cleanup the executor and framework, if possible.
- cleanup(framework, executor);
+ // Remove the executor if either there are no pending updates
+ // or the framework is terminating.
+ if (executor->updates.empty() ||
+ framework->state == Framework::TERMINATING) {
+ remove(framework, executor);
+ }
break;
}
default:
@@ -1783,109 +1828,104 @@ void Slave::executorTerminated(
}
-void Slave::cleanup(Framework* framework, Executor* executor)
+void Slave::remove(Framework* framework, Executor* executor)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
- // Cleanup this executor if it has terminated and either has no
+ LOG(INFO) << "Cleaning up executor '" << executor->id << "'"
+ << " of framework " << framework->id;
+
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING);
+
+ // Check that this executor has terminated and either has no
// pending updates or the framework is terminating. We don't
// care for pending updates when a framework is terminating
// because the framework cannot ACK them.
- // TODO(vinod): Use switch statement.
- if (executor->state == Executor::TERMINATED &&
- (executor->updates.empty() ||
- framework->state == Framework::TERMINATING)) {
-
- // Schedule the executor run work directory to get garbage collected.
- // TODO(vinod): Also schedule the top level executor work directory.
- gc.schedule(flags.gc_delay, executor->directory)
- .onAny(defer(self(), &Self::detachFile, params::_1, executor->directory));
-
- if (executor->checkpoint) {
- // Schedule the executor run meta directory to get garbage collected.
- // TODO(vinod): Also schedule the top level executor meta directory.
- const string& executorMetaDir = paths::getExecutorRunPath(
- metaDir,
- info.id(),
- framework->id,
- executor->id,
- executor->uuid);
+ CHECK(executor->state == Executor::TERMINATED &&
+ (executor->updates.empty() ||
+ framework->state == Framework::TERMINATING));
+
+ // Schedule the executor run work directory to get garbage collected.
+ // TODO(vinod): Also schedule the top level executor work directory.
+ gc.schedule(flags.gc_delay, executor->directory).onAny(
+ defer(self(), &Self::detachFile, params::_1, executor->directory));
- gc.schedule(flags.gc_delay, executorMetaDir)
- .onAny(defer(self(), &Self::detachFile, params::_1, executorMetaDir));
- }
+ if (executor->checkpoint) {
+ // Schedule the executor run meta directory to get garbage collected.
+ // TODO(vinod): Also schedule the top level executor meta directory.
+ const string& executorMetaDir = paths::getExecutorRunPath(
+ metaDir,
+ info.id(),
+ framework->id,
+ executor->id,
+ executor->uuid);
- framework->destroyExecutor(executor->id);
+ gc.schedule(flags.gc_delay, executorMetaDir).onAny(
+ defer(self(), &Self::detachFile, params::_1, executorMetaDir));
}
- // Cleanup if this framework has no executors running.
- if (framework->executors.empty()) {
- // Schedule the framework work directory to get garbage collected.
- const string& frameworkDir =
- paths::getFrameworkPath(flags.work_dir, info.id(), framework->id);
+ framework->destroyExecutor(executor->id);
- gc.schedule(flags.gc_delay, frameworkDir)
- .onAny(defer(self(), &Self::detachFile, params::_1, frameworkDir));
-
- if (framework->info.checkpoint()) {
- // Schedule the framework meta directory to get garbage collected.
- const string& frameworkMetaDir = paths::getFrameworkPath(
- metaDir, info.id(), framework->id);
-
- gc.schedule(flags.gc_delay, frameworkMetaDir)
- .onAny(defer(self(), &Self::detachFile, params::_1, frameworkMetaDir));
- }
-
- frameworks.erase(framework->id);
-
- // Pass ownership of the framework pointer.
- completedFrameworks.push_back(Owned<Framework>(framework));
- }
-
- // If this slave is in 'recover=cleanup' mode, exit after all
- // executors have been removed.
- if (flags.recover == "cleanup" && frameworks.size() == 0) {
- cleanup();
+ // Remove this framework if it has no executors running.
+ if (framework->executors.empty()) {
+ remove(framework);
}
}
-void Slave::cleanup()
+void Slave::remove(Framework* framework)
{
- CHECK(flags.recover == "cleanup");
+ CHECK_NOTNULL(framework);
- LOG(INFO) << "Slave is shutting down because it is started with "
- << " --recover==cleanup and all executors have terminated!";
+ LOG(INFO) << "Cleaning up framework " << framework->id;
- string archiveDir = paths::getArchiveDir(flags.work_dir);
- string metaDir = paths::getMetaRootDir(flags.work_dir);
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING);
- // Archive and delete the meta directory, to allow incompatible upgrades.
- LOG(INFO) << "Archiving and deleting the meta directory '"
- << metaDir << "' to allow incompatible upgrade!";
+ CHECK(framework->executors.empty());
- // Create the archive directory, if it doesn't exist.
- Try<Nothing> result = os::mkdir(archiveDir);
- if (result.isSome()) {
- result = os::tar(
- metaDir, path::join(archiveDir, info.id().value() + ".tar.gz"));
+ // Close all status update streams for this framework.
+ statusUpdateManager->cleanup(framework->id);
- if (result.isError()) {
- LOG(ERROR) << "Failed to archive meta directory '"
- << archiveDir << "': " << result.error();
+ // Schedule the framework work directory to get garbage collected.
+ const string& frameworkDir = paths::getFrameworkPath(
+ flags.work_dir,
+ info.id(),
+ framework->id);
+
+ gc.schedule(flags.gc_delay, frameworkDir).onAny(
+ defer(self(), &Self::detachFile, params::_1, frameworkDir));
+
+ if (framework->info.checkpoint()) {
+ // Schedule the framework meta directory to get garbage collected.
+ const string& frameworkMetaDir = paths::getFrameworkPath(
+ metaDir,
+ info.id(),
+ framework->id);
+
+ gc.schedule(flags.gc_delay, frameworkMetaDir).onAny(
+ defer(self(), &Self::detachFile, params::_1, frameworkMetaDir));
+ }
+
+ frameworks.erase(framework->id);
+
+ // Pass ownership of the framework pointer.
+ completedFrameworks.push_back(Owned<Framework>(framework));
+
+ if (frameworks.empty()) {
+ // Terminate the slave if
+ // 1) it's being shut down or
+ // 2) it's started in cleanup mode and recovery finished.
+ // TODO(vinod): Instead of doing it this way, shutdownFramework()
+ // and shutdownExecutor() could return Futures and a slave could
+ // shutdown when all the Futures are satisfied (e.g., collect()).
+ if (halting ||
+ (flags.recover == "cleanup" && !recovered.future().isPending()) ) {
+ terminate(self());
}
- } else {
- LOG(ERROR) << "Failed to create archive directory '"
- << archiveDir << ": " << result.error();
- }
-
- result = os::rmdir(metaDir);
- if (result.isError()) {
- LOG(ERROR) << "Failed to delete meta directory '" << metaDir << "'";
}
-
- shutdown();
}
@@ -1914,37 +1954,24 @@ void Slave::shutdownExecutor(Framework*
framework->state == Framework::TERMINATING)
<< framework->state;
- switch (executor->state) {
- case Executor::TERMINATING:
- case Executor::TERMINATED: {
- LOG(WARNING) << "Ignoring shutdown of executor '" << executor->id
- << "' of framework " << framework->id
- << " because the executor is terminating/terminated";
- break;
- }
- case Executor::REGISTERING:
- case Executor::RUNNING: {
- executor->state = Executor::TERMINATING;
- // If the executor hasn't yet registered, this message
- // will be dropped to the floor!
- send(executor->pid, ShutdownExecutorMessage());
+ CHECK(executor->state == Executor::REGISTERING ||
+ executor->state == Executor::RUNNING)
+ << executor->state;
- // Prepare for sending a kill if the executor doesn't comply.
- delay(flags.executor_shutdown_grace_period,
- self(),
- &Slave::shutdownExecutorTimeout,
- framework->id,
- executor->id,
- executor->uuid);
- break;
- }
- default:
- LOG(FATAL) << "Executor '" << executor->id
- << "' of framework " << framework->id
- << " is in unexpected state " << executor->state;
- break;
- }
+ executor->state = Executor::TERMINATING;
+
+ // If the executor hasn't yet registered, this message
+ // will be dropped to the floor!
+ send(executor->pid, ShutdownExecutorMessage());
+
+ // Prepare for sending a kill if the executor doesn't comply.
+ delay(flags.executor_shutdown_grace_period,
+ self(),
+ &Slave::shutdownExecutorTimeout,
+ framework->id,
+ executor->id,
+ executor->uuid);
}
@@ -2273,6 +2300,11 @@ void Slave::recoverFramework(const Frame
}
}
}
+
+ // Remove the framework in case we didn't recover any executors.
+ if (framework->executors.empty()) {
+ remove(framework);
+ }
}
@@ -2282,7 +2314,7 @@ Framework::Framework(
const FrameworkID& _id,
const FrameworkInfo& _info,
const UPID& _pid)
- : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
+ : state(RUNNING),
slave(_slave),
id(_id),
info(_info),
@@ -2382,7 +2414,11 @@ Executor* Framework::createExecutor(cons
// Create a directory for the executor.
const string& directory = paths::createExecutorDirectory(
- slave->flags.work_dir, slave->info.id(), id, executorInfo.executor_id(), uuid);
+ slave->flags.work_dir,
+ slave->info.id(),
+ id,
+ executorInfo.executor_id(),
+ uuid);
Executor* executor = new Executor(
slave, id, executorInfo, uuid, directory, info.checkpoint());
@@ -2509,7 +2545,7 @@ Executor::Executor(
const UUID& _uuid,
const string& _directory,
bool _checkpoint)
- : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
+ : state(REGISTERING),
slave(_slave),
id(_info.executor_id()),
info(_info),
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:09:01 2013
@@ -228,15 +228,11 @@ protected:
// Helper to recover a framework from the specified state.
void recoverFramework(const state::FrameworkState& state, bool reconnect);
- // Called when an executor terminates or a status update
- // acknowledgement is handled by the status update manager.
- // This potentially removes the executor and framework and
- // schedules them for garbage collection.
- void cleanup(Framework* framework, Executor* executor);
-
- // Called when the slave is started in 'cleanup' recovery mode and
- // all the executors have terminated.
- void cleanup();
+ // Removes and garbage collects the executor.
+ void remove(Framework* framework, Executor* executor);
+
+ // Removes and garbage collects the framework.
+ void remove(Framework* framework);
private:
Slave(const Slave&); // No copying.
@@ -289,6 +285,9 @@ private:
double startTime;
+ // TODO(Vinod): Add 'state' to slave instead of capturing the
+ // semantics of waiting for registration ('connecting') and
+ // shutting down ('halting') in boolean variables.
bool connected; // Flag to indicate if slave is registered.
GarbageCollector gc;
Modified: incubator/mesos/trunk/src/slave/state.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/state.cpp (original)
+++ incubator/mesos/trunk/src/slave/state.cpp Wed Apr 17 07:09:01 2013
@@ -36,15 +36,10 @@ Result<SlaveState> recover(const string&
// Check if the "latest" symlink to a slave directory exists.
if (!os::exists(latest)) {
- string message = "Failed to find the latest slave from '" + rootDir + "'";
- if (safe) {
- return Error(message);
- } else {
- // The slave likely died before it registered and had a chance
- // to create the "latest" symlink.
- LOG(WARNING) << message;
- return None();
- }
+ // The slave was asked to shutdown or died before it registered
+ // and had a chance to create the "latest" symlink.
+ LOG(INFO) << "Failed to find the latest slave from '" << rootDir << "'";
+ return None();
}
// Get the latest slave id.
Modified: incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp Wed Apr 17 07:09:01 2013
@@ -1073,3 +1073,93 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor
driver.stop();
driver.join();
}
+
+
+// The slave is asked to shutdown. When it comes back up, it should
+// register as a new slave.
+TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
+{
+ // Scheduler expectations.
+ MockScheduler sched;
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers1;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillOnce(Return()); // Ignore the offer when slave is shutting down.
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
+
+ driver.start();
+
+ AWAIT_READY(offers1);
+
+ EXPECT_NE(0u, offers1.get().size());
+
+ TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+
+ Future<Nothing> statusUpdate1;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureSatisfy(&statusUpdate1))
+ .WillOnce(Return()); // Ignore TASK_FAILED update.
+
+ Future<Message> registerExecutor =
+ FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+ driver.launchTasks(offers1.get()[0].id(), tasks);
+
+ // Capture the executor pid.
+ AWAIT_READY(registerExecutor);
+ UPID executorPid = registerExecutor.get().from;
+
+ AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+
+ Future<Nothing> executorTerminated =
+ FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ // We shut down the executor here so that a shutting down slave
+ // does not spend too much time waiting for the executor to exit.
+ process::post(executorPid, ShutdownExecutorMessage());
+
+ Clock::pause();
+
+ // Now advance time until the reaper reaps the executor.
+ while (executorTerminated.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(executorTerminated);
+
+ Clock::resume();
+
+ // Shut down the slave.
+ this->stopSlave(true);
+
+ Future<vector<Offer> > offers2;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers2))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Now restart the slave.
+ this->startSlave();
+
+ // Ensure that the slave registered with a new id.
+ AWAIT_UNTIL(offers2);
+
+ EXPECT_NE(0u, offers2.get().size());
+
+ // Ensure the slave id is different.
+ ASSERT_NE(
+ offers1.get()[0].slave_id().value(), offers2.get()[0].slave_id().value());
+
+ driver.stop();
+ driver.join();
+}