You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/19 03:35:09 UTC
svn commit: r1469660 - in /incubator/mesos/trunk/src: slave/slave.cpp
slave/slave.hpp tests/allocator_zookeeper_tests.cpp
Author: vinodkone
Date: Fri Apr 19 01:35:08 2013
New Revision: 1469660
URL: http://svn.apache.org/r1469660
Log:
Fixed slave to properly schedule executor directories
for garbage collection.
Review: https://reviews.apache.org/r/10604
Modified:
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1469660&r1=1469659&r2=1469660&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Fri Apr 19 01:35:08 2013
@@ -719,16 +719,35 @@ void Slave::runTask(
LOG(INFO) << "Got assigned task " << task.task_id()
<< " for framework " << frameworkId;
- Future<bool> unschedule = true;
+ // TODO(vinod): These ignored tasks should be consolidated by
+ // the master when the slave re-registers.
- Executor* executor = NULL;
- ExecutorID executorId = getExecutorInfo(task).executor_id();
+ if (!(task.slave_id() == info.id())) {
+ LOG(WARNING) << "Ignoring task " << task.task_id()
+ << " because it was intended for the old slave " << info.id();
+ return;
+ }
+
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state == RECOVERING || state == TERMINATING) {
+ LOG(WARNING) << "Ignoring task " << task.task_id()
+ << " because the slave is " << state;
+ // TODO(vinod): Consider sending a TASK_LOST here.
+ // Currently it is tricky because 'statsuUpdate()'
+ // ignores updates for unknown frameworks.
+ return;
+ }
+
+ Future<bool> unschedule = true;
- Framework* framework = getFramework(frameworkId);
// If we are about to create a new framework, unschedule the work
// and meta directories from getting gc'ed.
+ Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- // Framework work directory.
+ // Unschedule framework work directory.
string path = paths::getFrameworkPath(
flags.work_dir, info.id(), frameworkId);
@@ -736,24 +755,30 @@ void Slave::runTask(
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
- // Framework meta directory.
+ // Unschedule framework meta directory.
path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
if (os::exists(path)) {
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
- } else {
- // We add the corresponding executor to 'pending' to ensure the
- // framework and top level executor directories are not scheduled
- // for deletion before '_runTask()' is called.
- framework->pending.insert(executorId);
- executor = framework->getExecutor(executorId);
+ framework = new Framework(this, frameworkId, frameworkInfo, pid);
+ frameworks[frameworkId] = framework;
}
+ const ExecutorInfo& executorInfo = getExecutorInfo(task);
+ const ExecutorID& executorId = executorInfo.executor_id();
+
+ // We add the task to 'pending' to ensure the framework is not
+ // removed and the framework and top level executor directories
+ // are not scheduled for deletion before '_runTask()' is called.
+ CHECK_NOTNULL(framework);
+ framework->pending.put(executorId, task.task_id());
+
// If we are about to create a new executor, unschedule the top
// level work and meta directories from getting gc'ed.
+ Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
- // Executor work directory.
+ // Unschedule executor work directory.
string path = paths::getExecutorPath(
flags.work_dir, info.id(), frameworkId, executorId);
@@ -761,7 +786,7 @@ void Slave::runTask(
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
- // Executor meta directory.
+ // Unschedule executor meta directory.
path = paths::getExecutorPath(
metaDir, info.id(), frameworkId, executorId);
@@ -792,6 +817,29 @@ void Slave::_runTask(
LOG(INFO) << "Launching task " << task.task_id()
<< " for framework " << frameworkId;
+ const ExecutorInfo& executorInfo = getExecutorInfo(task);
+ const ExecutorID& executorId = executorInfo.executor_id();
+
+ // Remove the pending task from framework.
+ Framework* framework = getFramework(frameworkId);
+ CHECK_NOTNULL(framework);
+
+ framework->pending.remove(executorId, task.task_id());
+
+ // We don't send a status update here because a terminating
+ // framework cannot send acknowledgements.
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Ignoring run task " << task.task_id()
+ << " of framework " << frameworkId
+ << " because the framework is terminating";
+
+ if (framework->executors.empty() && framework->pending.empty()) {
+ remove(framework);
+ }
+ return;
+ }
+
+
if (!future.isReady()) {
LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
<< (future.isFailed() ? future.failure() : "future discarded");
@@ -804,34 +852,34 @@ void Slave::_runTask(
"Could not launch the task because we failed to unschedule directories"
" scheduled for gc");
+ // TODO(vinod): Ensure that the status update manager reliably
+ // delivers this update. Currently, we don't guarantee this
+ // because removal of the framework causes the status update
+ // manager to stop retrying for its un-acked updates.
statusUpdate(update);
+
+ if (framework->executors.empty() && framework->pending.empty()) {
+ remove(framework);
+ }
+
return;
}
- CHECK(state == RECOVERING || state == DISCONNECTED ||
- state == RUNNING || state == TERMINATING)
+ // NOTE: The slave cannot be in 'RECOVERING' because the task would
+ // have been rejected in 'runTask()' in that case.
+ CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
<< state;
- // This could happen if the runTask message was intended for the
- // old slave, but the new (restarted) slave received it because
- // it has the same pid.
- if (state != RUNNING) {
- LOG(WARNING) << "Cannot run task " << task.task_id()
+ if (state == TERMINATING) {
+ LOG(WARNING) << "Ignoring run task " << task.task_id()
<< " of framework " << frameworkId
- << " because the slave is in " << state << " state";
-
- const StatusUpdate& update = protobuf::createStatusUpdate(
- frameworkId,
- info.id(),
- task.task_id(),
- TASK_LOST,
- "Slave is not in RUNNING state");
+ << " because the slave is terminating";
- statusUpdate(update);
+ // We don't send a TASK_LOST here because the slave is
+ // terminating.
return;
}
- Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
framework = new Framework(this, frameworkId, frameworkInfo, pid);
frameworks[frameworkId] = framework;
@@ -839,56 +887,14 @@ void Slave::_runTask(
CHECK_NOTNULL(framework);
- CHECK(framework->state == Framework::RUNNING ||
- framework->state == Framework::TERMINATING)
- << framework->state;
-
- // We don't send a status update here because a terminating
- // framework cannot send acknowledgements.
- if (framework->state == Framework::TERMINATING) {
- LOG(WARNING) << "Ignoring run task " << task.task_id()
- << " of framework " << frameworkId
- << " because the framework is terminating";
- return;
- }
-
- const ExecutorInfo& executorInfo = getExecutorInfo(task);
- const ExecutorID& executorId = executorInfo.executor_id();
+ CHECK(framework->state == Framework::RUNNING) << framework->state;
// Either send the task to an executor or start a new executor
// and queue the task until the executor has started.
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
- // Launch an executor for this task.
- executor = framework->createExecutor(executorInfo);
-
- files->attach(executor->directory, executor->directory)
- .onAny(defer(self(),
- &Self::fileAttached,
- params::_1,
- executor->directory));
-
- // Tell the isolator to launch the executor.
- dispatch(isolator,
- &Isolator::launchExecutor,
- info.id(),
- framework->id,
- framework->info,
- executor->info,
- executor->uuid,
- executor->directory,
- executor->resources);
-
- // Make sure the executor registers within the given timeout.
- // NOTE: We send this message before dispatching the launchExecutor to
- // the isolator, to make writing tests easier.
- delay(flags.executor_registration_timeout,
- self(),
- &Slave::registerExecutorTimeout,
- framework->id,
- executor->id,
- executor->uuid);
+ executor = framework->launchExecutor(executorInfo);
}
CHECK_NOTNULL(executor);
@@ -974,19 +980,13 @@ void Slave::killTask(const FrameworkID&
state == RUNNING || state == TERMINATING)
<< state;
- if (state != RUNNING) {
+ if (state == RECOVERING || state == TERMINATING) {
LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
- << " because the slave is in " << state << " state";
-
- const StatusUpdate& update = protobuf::createStatusUpdate(
- frameworkId,
- info.id(),
- taskId,
- TASK_LOST,
- "Slave is not in RUNNING state");
-
- statusUpdate(update);
+ << " because the slave is " << state;
+ // TODO(vinod): Consider sending a TASK_LOST here.
+ // Currently it is tricky because 'statsuUpdate()'
+ // ignores updates for unknown frameworks.
return;
}
@@ -1126,6 +1126,9 @@ void Slave::shutdownFramework(const Fram
framework->state = Framework::TERMINATING;
// Shut down all executors of this framework.
+ // NOTE: If there are no executors but 'pending' tasks, the
+ // framework will be removed and all its tasks are appropriately
+ // handled in '_runTask()'.
foreachvalue (Executor* executor, utils::copy(framework->executors)) {
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING ||
@@ -1490,9 +1493,11 @@ void Slave::registerExecutor(
message.mutable_slave_info()->MergeFrom(info);
send(executor->pid, message);
- LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
-
foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+ LOG(INFO) << "Flushing queued task " << task.task_id()
+ << " for executor '" << executor->id << "'"
+ << " of framework " << framework->id;
+
stats.tasks[TASK_STAGING]++;
RunTaskMessage message;
@@ -1952,25 +1957,27 @@ void Slave::executorStarted(
<< framework->state;
if (framework->state == Framework::TERMINATING) {
- LOG(WARNING) << "Framework " << frameworkId
- << " for executor '" << executorId
- << "' is terminating";
+ LOG(WARNING) << "Killing executor '" << executorId
+ << "' of framework '" << frameworkId
+ << "' because the framework is terminating";
+ dispatch(isolator, &Isolator::killExecutor, frameworkId, executorId);
return;
}
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
- LOG(WARNING) << "Invalid executor '" << executorId
- << "' of framework " << frameworkId
- << " has started";
+ LOG(WARNING) << "Killing unknown executor '" << executorId
+ << "' of framework " << frameworkId;
+ dispatch(isolator, &Isolator::killExecutor, frameworkId, executorId);
return;
}
switch (executor->state) {
case Executor::TERMINATING:
- LOG(WARNING) << "Executor '" << executorId
+ LOG(WARNING) << "Killing executor '" << executorId
<< "' of framework " << frameworkId
- << " is terminating";
+ << " because the executor is terminating";
+ dispatch(isolator, &Isolator::killExecutor, frameworkId, executorId);
break;
case Executor::REGISTERING:
case Executor::RUNNING:
@@ -2157,15 +2164,16 @@ void Slave::remove(Framework* framework,
<< "' of framework " << framework->id;
CHECK(framework->state == Framework::RUNNING ||
- framework->state == Framework::TERMINATING);
+ framework->state == Framework::TERMINATING)
+ << framework->state;
// Check that this executor has terminated and either has no
// pending updates or the framework is terminating. We don't
// care for pending updates when a framework is terminating
// because the framework cannot ACK them.
- CHECK(executor->state == Executor::TERMINATED &&
- (executor->updates.empty() ||
- framework->state == Framework::TERMINATING));
+ CHECK(executor->state == Executor::TERMINATED) << executor->state;
+ CHECK (executor->updates.empty() ||
+ framework->state == Framework::TERMINATING);
// TODO(vinod): Move the responsibility of gc'ing to the
// Executor struct.
@@ -2205,8 +2213,8 @@ void Slave::remove(Framework* framework,
framework->destroyExecutor(executor->id);
- // Remove this framework if it has no executors running.
- if (framework->executors.empty()) {
+ // Remove this framework if it has no pending executors and tasks.
+ if (framework->executors.empty() && framework->pending.empty()) {
remove(framework);
}
}
@@ -2221,29 +2229,30 @@ void Slave::remove(Framework* framework)
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING);
+ // The invariant here is that a framework should not be removed
+ // if it has either pending executors or pending tasks.
CHECK(framework->executors.empty());
+ CHECK(framework->pending.empty());
// Close all status update streams for this framework.
statusUpdateManager->cleanup(framework->id);
-
// Schedule the framework work and meta directories for garbage
- // collection, only if it has no pending tasks.
+ // collection.
// TODO(vinod): Move the responsibility of gc'ing to the
// Framework struct.
- if (framework->pending.empty()) {
- const string& path = paths::getFrameworkPath(
- flags.work_dir, info.id(), framework->id);
- gc.schedule(flags.gc_delay, path);
+ const string& path = paths::getFrameworkPath(
+ flags.work_dir, info.id(), framework->id);
- if (framework->info.checkpoint()) {
- // Schedule the framework meta directory to get garbage collected.
- const string& path = paths::getFrameworkPath(
- metaDir, info.id(), framework->id);
+ gc.schedule(flags.gc_delay, path);
- gc.schedule(flags.gc_delay, path);
- }
+ if (framework->info.checkpoint()) {
+ // Schedule the framework meta directory to get garbage collected.
+ const string& path = paths::getFrameworkPath(
+ metaDir, info.id(), framework->id);
+
+ gc.schedule(flags.gc_delay, path);
}
frameworks.erase(framework->id);
@@ -2682,7 +2691,8 @@ Framework::~Framework()
}
-Executor* Framework::createExecutor(const ExecutorInfo& executorInfo)
+// Create and launch an executor.
+Executor* Framework::launchExecutor(const ExecutorInfo& executorInfo)
{
// We create a UUID for the new executor. The UUID uniquely
// identifies this new instance of the executor across executors
@@ -2704,6 +2714,34 @@ Executor* Framework::createExecutor(cons
CHECK(!executors.contains(executorInfo.executor_id()));
executors[executorInfo.executor_id()] = executor;
+
+ CHECK_NOTNULL(executor);
+
+ slave->files->attach(executor->directory, executor->directory)
+ .onAny(defer(slave,
+ &Slave::fileAttached,
+ params::_1,
+ executor->directory));
+
+ // Tell the isolator to launch the executor.
+ dispatch(slave->isolator,
+ &Isolator::launchExecutor,
+ slave->info.id(),
+ id,
+ info,
+ executor->info,
+ executor->uuid,
+ executor->directory,
+ executor->resources);
+
+ // Make sure the executor registers within the given timeout.
+ delay(slave->flags.executor_registration_timeout,
+ slave,
+ &Slave::registerExecutorTimeout,
+ id,
+ executor->id,
+ executor->uuid);
+
return executor;
}
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1469660&r1=1469659&r2=1469660&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Fri Apr 19 01:35:08 2013
@@ -390,7 +390,7 @@ struct Framework
~Framework();
- Executor* createExecutor(const ExecutorInfo& executorInfo);
+ Executor* launchExecutor(const ExecutorInfo& executorInfo);
void destroyExecutor(const ExecutorID& executorId);
Executor* getExecutor(const ExecutorID& executorId);
Executor* getExecutor(const TaskID& taskId);
@@ -411,7 +411,7 @@ struct Framework
UPID pid;
- hashset<ExecutorID> pending; // Executors with pending tasks.
+ multihashmap<ExecutorID, TaskID> pending; // Executors with pending tasks.
// Current running executors.
hashmap<ExecutorID, Executor*> executors;
Modified: incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp?rev=1469660&r1=1469659&r2=1469660&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp Fri Apr 19 01:35:08 2013
@@ -144,10 +144,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
EXPECT_CALL(exec, shutdown(_))
.WillRepeatedly(DoDefault());
- Future<Nothing> resourcesChanged;
- EXPECT_CALL(isolator, resourcesChanged(_, _, _))
- .WillOnce(FutureSatisfy(&resourcesChanged));
-
driver.start();
AWAIT_READY(registered);
@@ -162,10 +158,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
EXPECT_EQ(TASK_RUNNING, statusUpdate.get().state());
- // Ensures that the task has been fully launched before we kill the
- // first master.
- AWAIT_READY(resourcesChanged);
-
// Stop the failing master from telling the slave to shut down when
// it is killed.
Future<process::Message> shutdownMsg =
@@ -289,10 +281,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
EXPECT_CALL(exec, shutdown(_))
.WillRepeatedly(DoDefault());
- Future<Nothing> resourcesChanged;
- EXPECT_CALL(isolator, resourcesChanged(_, _, _))
- .WillOnce(FutureSatisfy(&resourcesChanged));
-
driver.start();
AWAIT_READY(registered);
@@ -307,10 +295,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
EXPECT_EQ(TASK_RUNNING, statusUpdate.get().state());
- // Ensures that the task has been fully launched before we kill the
- // first master.
- AWAIT_READY(resourcesChanged);
-
// Stop the failing master from telling the slave to shut down when
// it is killed.
Future<process::Message> shutdownMsg =