You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:09:12 UTC
svn commit: r1468777 - in /incubator/mesos/trunk/src: slave/slave.cpp
slave/slave.hpp tests/gc_tests.cpp
Author: vinodkone
Date: Wed Apr 17 07:09:12 2013
New Revision: 1468777
URL: http://svn.apache.org/r1468777
Log:
Fixed slave to properly schedule/unschedule top level executor
directories for garbage collection.
Review: https://reviews.apache.org/r/10413
Modified:
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/gc_tests.cpp
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468777&r1=1468776&r2=1468777&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:09:12 2013
@@ -384,7 +384,8 @@ void Slave::initialize()
void Slave::_initialize(const Future<Nothing>& future)
{
if (!future.isReady()) {
- LOG(FATAL) << "Recovery failure: " << future.failure();
+ LOG(FATAL) << "Recovery failure: "
+ << (future.isFailed() ? future.failure() : "future discarded");
}
LOG(INFO) << "Finished recovery";
@@ -516,10 +517,11 @@ void Slave::fileAttached(const Future<No
}
-void Slave::detachFile(const Future<Nothing>& result, const string& path)
+// TODO(vinod/bmahler): Get rid of this helper.
+Nothing Slave::detachFile(const string& path)
{
- CHECK(!result.isDiscarded());
files->detach(path);
+ return Nothing();
}
@@ -698,6 +700,14 @@ void Slave::doReliableRegistration()
}
+// Helper to unschedule the path.
+// TODO(vinod): Can we avoid this helper?
+Future<bool> Slave::unschedule(const string& path)
+{
+ return gc.unschedule(path);
+}
+
+
// TODO(vinod): Instead of crashing the slave on checkpoint errors,
// send TASK_LOST to the framework.
void Slave::runTask(
@@ -709,10 +719,102 @@ void Slave::runTask(
LOG(INFO) << "Got assigned task " << task.task_id()
<< " for framework " << frameworkId;
+ Future<bool> unschedule = true;
+
+ Executor* executor = NULL;
+ ExecutorID executorId = getExecutorInfo(task).executor_id();
+
+ Framework* framework = getFramework(frameworkId);
+ // If we are about to create a new framework, unschedule the work
+ // and meta directories from getting gc'ed.
+ if (framework == NULL) {
+ // Framework work directory.
+ string path = paths::getFrameworkPath(
+ flags.work_dir, info.id(), frameworkId);
+
+ if (os::exists(path)) {
+ unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+ }
+
+ // Framework meta directory.
+ path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
+ if (os::exists(path)) {
+ unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+ }
+ } else {
+ // We add the corresponding executor to 'pending' to ensure the
+ // framework and top level executor directories are not scheduled
+ // for deletion before '_runTask()' is called.
+ framework->pending.insert(executorId);
+
+ executor = framework->getExecutor(executorId);
+ }
+
+ // If we are about to create a new executor, unschedule the top
+ // level work and meta directories from getting gc'ed.
+ if (executor == NULL) {
+ // Executor work directory.
+ string path = paths::getExecutorPath(
+ flags.work_dir, info.id(), frameworkId, executorId);
+
+ if (os::exists(path)) {
+ unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+ }
+
+ // Executor meta directory.
+ path = paths::getExecutorPath(
+ metaDir, info.id(), frameworkId, executorId);
+
+ if (os::exists(path)) {
+ unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+ }
+ }
+
+ // Run the task after the unschedules are done.
+ unschedule.onAny(
+ defer(self(),
+ &Self::_runTask,
+ params::_1,
+ frameworkInfo,
+ frameworkId,
+ pid,
+ task));
+}
+
+
+void Slave::_runTask(
+ const Future<bool>& future,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const string& pid,
+ const TaskInfo& task)
+{
+ LOG(INFO) << "Launching task " << task.task_id()
+ << " for framework " << frameworkId;
+
+ if (!future.isReady()) {
+ LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
+ << (future.isFailed() ? future.failure() : "future discarded");
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ task.task_id(),
+ TASK_LOST,
+ "Could not launch the task because we failed to unschedule directories"
+ " scheduled for gc");
+
+ statusUpdate(update);
+ return;
+ }
+
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
+ // This could happen if the runTask message was intended for the
+ // old slave, but the new (restarted) slave received it because
+ // it has the same pid.
if (state != RUNNING) {
LOG(WARNING) << "Cannot run task " << task.task_id()
<< " of framework " << frameworkId
@@ -768,7 +870,7 @@ void Slave::runTask(
return;
}
- const ExecutorInfo& executorInfo = framework->getExecutorInfo(task);
+ const ExecutorInfo& executorInfo = getExecutorInfo(task);
const ExecutorID& executorId = executorInfo.executor_id();
// Either send the task to an executor or start a new executor
@@ -1783,6 +1885,59 @@ Framework* Slave::getFramework(const Fra
}
+ExecutorInfo Slave::getExecutorInfo(const TaskInfo& task)
+{
+ CHECK(task.has_executor() != task.has_command());
+
+ if (task.has_command()) {
+ ExecutorInfo executor;
+
+ // Command executors share the same id as the task.
+ executor.mutable_executor_id()->set_value(task.task_id().value());
+
+ // Prepare an executor name which includes information on the
+ // command being launched.
+ string name =
+ "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
+
+ if (task.command().value().length() > 15) {
+ name += task.command().value().substr(0, 12) + "...')";
+ } else {
+ name += task.command().value() + "')";
+ }
+
+ executor.set_name("Command Executor " + name);
+ executor.set_source(task.task_id().value());
+
+ // Copy the CommandInfo to get the URIs and environment, but
+ // update it to invoke 'mesos-executor' (unless we couldn't
+ // resolve 'mesos-executor' via 'realpath', in which case just
+ // echo the error and exit).
+ executor.mutable_command()->MergeFrom(task.command());
+
+ Try<string> path = os::realpath(
+ path::join(flags.launcher_dir, "mesos-executor"));
+
+ if (path.isSome()) {
+ executor.mutable_command()->set_value(path.get());
+ } else {
+ executor.mutable_command()->set_value(
+ "echo '" + path.error() + "'; exit 1");
+ }
+
+ // TODO(benh): Set some resources for the executor so that a task
+ // doesn't end up getting killed because the amount of resources
+ // of the executor went over those allocated. Note that this might
+ // mean that the number of resources on the machine will actually
+ // be slightly oversubscribed, so we'll need to reevaluate with
+ // respect to resources that can't be oversubscribed.
+ return executor;
+ }
+
+ return task.executor();
+}
+
+
void _watch(
const Future<Nothing>& watch,
const FrameworkID& frameworkId,
@@ -2010,8 +2165,8 @@ void Slave::remove(Framework* framework,
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
- LOG(INFO) << "Cleaning up executor '" << executor->id << "'"
- << " of framework " << framework->id;
+ LOG(INFO) << "Cleaning up executor '" << executor->id
+ << "' of framework " << framework->id;
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING);
@@ -2024,23 +2179,40 @@ void Slave::remove(Framework* framework,
(executor->updates.empty() ||
framework->state == Framework::TERMINATING));
+ // TODO(vinod): Move the responsibility of gc'ing to the
+ // Executor struct.
+
// Schedule the executor run work directory to get garbage collected.
- // TODO(vinod): Also schedule the top level executor work directory.
- gc.schedule(flags.gc_delay, executor->directory).onAny(
- defer(self(), &Self::detachFile, params::_1, executor->directory));
+ const string& path = paths::getExecutorRunPath(
+ flags.work_dir, info.id(), framework->id, executor->id, executor->uuid);
+
+ gc.schedule(flags.gc_delay, path)
+ .then(defer(self(), &Self::detachFile, path));
+
+ // Schedule the top level executor work directory, only if the
+ // framework doesn't have any 'pending' tasks for this executor.
+ if (!framework->pending.contains(executor->id)) {
+ const string& path = paths::getExecutorPath(
+ flags.work_dir, info.id(), framework->id, executor->id);
+
+ gc.schedule(flags.gc_delay, path);
+ }
if (executor->checkpoint) {
// Schedule the executor run meta directory to get garbage collected.
- // TODO(vinod): Also schedule the top level executor meta directory.
- const string& executorMetaDir = paths::getExecutorRunPath(
- metaDir,
- info.id(),
- framework->id,
- executor->id,
- executor->uuid);
+ const string& path = paths::getExecutorRunPath(
+ metaDir, info.id(), framework->id, executor->id, executor->uuid);
- gc.schedule(flags.gc_delay, executorMetaDir).onAny(
- defer(self(), &Self::detachFile, params::_1, executorMetaDir));
+ gc.schedule(flags.gc_delay, path);
+
+ // Schedule the top level executor meta directory, only if the
+ // framework doesn't have any 'pending' tasks for this executor.
+ if (!framework->pending.contains(executor->id)) {
+ const string& path = paths::getExecutorPath(
+ metaDir, info.id(), framework->id, executor->id);
+
+ gc.schedule(flags.gc_delay, path);
+ }
}
framework->destroyExecutor(executor->id);
@@ -2056,7 +2228,7 @@ void Slave::remove(Framework* framework)
{
CHECK_NOTNULL(framework);
- LOG(INFO) << "Cleaning up framework " << framework->id;
+ LOG(INFO)<< "Cleaning up framework " << framework->id;
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING);
@@ -2066,24 +2238,24 @@ void Slave::remove(Framework* framework)
// Close all status update streams for this framework.
statusUpdateManager->cleanup(framework->id);
- // Schedule the framework work directory to get garbage collected.
- const string& frameworkDir = paths::getFrameworkPath(
- flags.work_dir,
- info.id(),
- framework->id);
-
- gc.schedule(flags.gc_delay, frameworkDir).onAny(
- defer(self(), &Self::detachFile, params::_1, frameworkDir));
-
- if (framework->info.checkpoint()) {
- // Schedule the framework meta directory to get garbage collected.
- const string& frameworkMetaDir = paths::getFrameworkPath(
- metaDir,
- info.id(),
- framework->id);
- gc.schedule(flags.gc_delay, frameworkMetaDir).onAny(
- defer(self(), &Self::detachFile, params::_1, frameworkMetaDir));
+ // Schedule the framework work and meta directories for garbage
+ // collection, only if it has no pending tasks.
+ // TODO(vinod): Move the responsibility of gc'ing to the
+ // Framework struct.
+ if (framework->pending.empty()) {
+ const string& path = paths::getFrameworkPath(
+ flags.work_dir, info.id(), framework->id);
+
+ gc.schedule(flags.gc_delay, path);
+
+ if (framework->info.checkpoint()) {
+ // Schedule the framework meta directory to get garbage collected.
+ const string& path = paths::getFrameworkPath(
+ metaDir, info.id(), framework->id);
+
+ gc.schedule(flags.gc_delay, path);
+ }
}
frameworks.erase(framework->id);
@@ -2099,7 +2271,7 @@ void Slave::remove(Framework* framework)
// and shutdownExecutor() could return Futures and a slave could
// shutdown when all the Futures are satisfied (e.g., collect()).
if (state == TERMINATING ||
- (flags.recover == "cleanup" && !recovered.future().isPending()) ) {
+ (flags.recover == "cleanup" && !recovered.future().isPending())) {
terminate(self());
}
}
@@ -2522,59 +2694,6 @@ Framework::~Framework()
}
-ExecutorInfo Framework::getExecutorInfo(const TaskInfo& task)
-{
- CHECK(task.has_executor() != task.has_command());
-
- if (task.has_command()) {
- ExecutorInfo executor;
-
- // Command executors share the same id as the task.
- executor.mutable_executor_id()->set_value(task.task_id().value());
-
- // Prepare an executor name which includes information on the
- // command being launched.
- string name =
- "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
-
- if (task.command().value().length() > 15) {
- name += task.command().value().substr(0, 12) + "...')";
- } else {
- name += task.command().value() + "')";
- }
-
- executor.set_name("Command Executor " + name);
- executor.set_source(task.task_id().value());
-
- // Copy the CommandInfo to get the URIs and environment, but
- // update it to invoke 'mesos-executor' (unless we couldn't
- // resolve 'mesos-executor' via 'realpath', in which case just
- // echo the error and exit).
- executor.mutable_command()->MergeFrom(task.command());
-
- Try<string> path = os::realpath(
- path::join(slave->flags.launcher_dir, "mesos-executor"));
-
- if (path.isSome()) {
- executor.mutable_command()->set_value(path.get());
- } else {
- executor.mutable_command()->set_value(
- "echo '" + path.error() + "'; exit 1");
- }
-
- // TODO(benh): Set some resources for the executor so that a task
- // doesn't end up getting killed because the amount of resources
- // of the executor went over those allocated. Note that this might
- // mean that the number of resources on the machine will actually
- // be slightly oversubscribed, so we'll need to reevaluate with
- // respect to resources that can't be oversubscribed.
- return executor;
- }
-
- return task.executor();
-}
-
-
Executor* Framework::createExecutor(const ExecutorInfo& executorInfo)
{
// We create a UUID for the new executor. The UUID uniquely
@@ -2648,11 +2767,11 @@ Executor* Framework::recoverExecutor(con
<< "' of framework " << id
<< " because its latest run cannot be recovered";
- // GC the executor work directory.
+ // GC the top level executor work directory.
slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorPath(
slave->flags.work_dir, slave->info.id(), id, state.id));
- // GC the executor meta directory.
+ // GC the top level executor meta directory.
slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorPath(
slave->metaDir, slave->info.id(), id, state.id));
@@ -2665,13 +2784,17 @@ Executor* Framework::recoverExecutor(con
foreachvalue (const RunState& run, state.runs) {
CHECK_SOME(run.id);
if (uuid != run.id.get()) {
- // GC the run's work directory.
+ // GC the executor run's work directory.
slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorRunPath(
slave->flags.work_dir, slave->info.id(), id, state.id, run.id.get()));
- // GC the run's meta directory.
+ // GC the executor run's meta directory.
slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorRunPath(
slave->metaDir, slave->info.id(), id, state.id, run.id.get()));
+
+ // NOTE: We don't schedule the top level executor work and meta
+ // directories for GC here, because they will be scheduled when
+ // the latest executor run terminates.
}
}
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468777&r1=1468776&r2=1468777&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:09:12 2013
@@ -32,6 +32,7 @@
#include <process/protobuf.hpp>
#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
#include <stout/multihashmap.hpp>
#include <stout/os.hpp>
#include <stout/owned.hpp>
@@ -99,6 +100,15 @@ public:
const std::string& pid,
const TaskInfo& task);
+ void _runTask(
+ const Future<bool>& future,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task);
+
+ Future<bool> unschedule(const std::string& path);
+
void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
void shutdownFramework(const FrameworkID& frameworkId);
@@ -197,11 +207,15 @@ protected:
void fileAttached(const Future<Nothing>& result, const std::string& path);
- void detachFile(const Future<Nothing>& result, const std::string& path);
+ Nothing detachFile(const std::string& path);
// Helper routine to lookup a framework.
Framework* getFramework(const FrameworkID& frameworkId);
+ // Returns an ExecutorInfo for a TaskInfo (possibly
+ // constructing one if the task has a CommandInfo).
+ ExecutorInfo getExecutorInfo(const TaskInfo& task);
+
// Handle the second phase of shutting down an executor for those
// executors that have not properly shutdown within a timeout.
void shutdownExecutorTimeout(
@@ -376,9 +390,6 @@ struct Framework
~Framework();
- // Returns an ExecutorInfo for a TaskInfo (possibly
- // constructing one if the task has a CommandInfo).
- ExecutorInfo getExecutorInfo(const TaskInfo& task);
Executor* createExecutor(const ExecutorInfo& executorInfo);
void destroyExecutor(const ExecutorID& executorId);
Executor* getExecutor(const ExecutorID& executorId);
@@ -400,7 +411,7 @@ struct Framework
UPID pid;
- std::vector<TaskInfo> pending; // Pending tasks (used while INITIALIZING).
+ hashset<ExecutorID> pending; // Executors with pending tasks.
// Current running executors.
hashmap<ExecutorID, Executor*> executors;
Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1468777&r1=1468776&r2=1468777&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Wed Apr 17 07:09:12 2013
@@ -51,6 +51,7 @@
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/gc.hpp"
+#include "slave/isolator.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
@@ -64,6 +65,7 @@ using mesos::internal::master::Master;
using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::GarbageCollectorProcess;
+using mesos::internal::slave::Isolator;
using mesos::internal::slave::Slave;
using process::Clock;
@@ -637,3 +639,128 @@ TEST_F(GarbageCollectorIntegrationTest,
cluster.shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
+
+
+// This test verifies that the launch of new executor will result in
+// an unschedule of the framework and executor work directories
+// created by an old executor (with the same id).
+TEST_F(GarbageCollectorIntegrationTest, Unschedule)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegistered =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ MockExecutor exec;
+
+ TestingIsolator isolator(DEFAULT_EXECUTOR_ID, &exec);
+
+ Try<PID<Slave> > slave = cluster.slaves.start(&isolator);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegistered);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Resources resources = Resources::parse(cluster.slaves.flags.resources.get());
+ double cpus = resources.get("cpus", Value::Scalar()).value();
+ double mem = resources.get("mem", Value::Scalar()).value();
+
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(1, cpus, mem));
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // TODO(benh/vinod): Would've been great to match the dispatch
+ // against arguments here.
+ // NOTE: Since Google Mock selects the last matching expectation
+ // that is still active, the order of (un)schedule expectations
+ // below are the reverse of the actual (un)schedule call order.
+
+ // Schedule framework work directory.
+ Future<Nothing> scheduleFrameworkWork =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+ // Schedule top level executor work directory.
+ Future<Nothing> scheduleExecutorWork =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+ // Schedule executor run work directory.
+ Future<Nothing> scheduleExecutorRunWork =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+ // Unschedule top level executor work directory.
+ Future<Nothing> unscheduleExecutorWork =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::unschedule);
+
+ // Unschedule framework work directory.
+ Future<Nothing> unscheduleFrameworkWork =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::unschedule);
+
+ // Launch the next run of the executor on the receipt of next offer.
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(1, cpus, mem));
+
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(Return());
+
+ Clock::pause();
+
+ // Kill the first executor.
+ process::dispatch(
+ isolator,
+ &Isolator::killExecutor,
+ frameworkId.get(),
+ DEFAULT_EXECUTOR_ID);
+
+ AWAIT_READY(scheduleExecutorRunWork);
+ AWAIT_READY(scheduleExecutorWork);
+ AWAIT_READY(scheduleFrameworkWork);
+
+ // Speedup the allocator.
+ while (unscheduleFrameworkWork.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ AWAIT_READY(unscheduleFrameworkWork);
+ AWAIT_READY(unscheduleExecutorWork);
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ cluster.shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}