You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/09/18 00:27:03 UTC
svn commit: r1386882 - in /incubator/mesos/trunk: src/slave/ src/tests/
third_party/libprocess/include/process/ third_party/libprocess/include/stout/
Author: benh
Date: Mon Sep 17 22:27:02 2012
New Revision: 1386882
URL: http://svn.apache.org/viewvc?rev=1386882&view=rev
Log:
Updated slave garbage collection to do it based on disk usage
(contributed by Vinod Kone, https://reviews.apache.org/r/6704).
Modified:
incubator/mesos/trunk/src/slave/constants.hpp
incubator/mesos/trunk/src/slave/flags.hpp
incubator/mesos/trunk/src/slave/gc.cpp
incubator/mesos/trunk/src/slave/gc.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/gc_tests.cpp
incubator/mesos/trunk/src/tests/utils.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp
incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp
Modified: incubator/mesos/trunk/src/slave/constants.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/constants.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/constants.hpp (original)
+++ incubator/mesos/trunk/src/slave/constants.hpp Mon Sep 17 22:27:02 2012
@@ -28,6 +28,7 @@ namespace slave {
const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5.0);
const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10.0);
const Duration GC_DELAY = Weeks(1.0);
+const Duration DISK_WATCH_INTERVAL = Minutes(1.0);
} // namespace slave {
} // namespace internal {
Modified: incubator/mesos/trunk/src/slave/flags.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/flags.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/flags.hpp (original)
+++ incubator/mesos/trunk/src/slave/flags.hpp Mon Sep 17 22:27:02 2012
@@ -94,10 +94,18 @@ public:
add(&Flags::gc_delay,
"gc_delay",
- "Amount of time to wait before cleaning up\n"
- "executor directories (e.g., 3days, 2weeks, etc)",
+ "Maximum amount of time to wait before cleaning up\n"
+ "executor directories (e.g., 3days, 2weeks, etc).\n"
+ "Note that this delay may be shorter depending on\n"
+ "the available disk usage.",
GC_DELAY);
+ add(&Flags::disk_watch_interval,
+ "disk_watch_interval",
+ "Periodic time interval (e.g., 10secs, 2mins, etc)\n"
+ "to check the disk usage",
+ DISK_WATCH_INTERVAL);
+
#ifdef __linux__
add(&Flags::cgroups_hierarchy_root,
"cgroups_hierarchy_root",
@@ -117,6 +125,7 @@ public:
std::string frameworks_home; // TODO(benh): Make an Option.
Duration executor_shutdown_grace_period;
Duration gc_delay;
+ Duration disk_watch_interval;
#ifdef __linux__
std::string cgroups_hierarchy_root;
#endif
Modified: incubator/mesos/trunk/src/slave/gc.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.cpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.cpp (original)
+++ incubator/mesos/trunk/src/slave/gc.cpp Mon Sep 17 22:27:02 2012
@@ -16,14 +16,18 @@
* limitations under the License.
*/
+#include <map>
#include <string>
+#include <vector>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
+#include <process/timeout.hpp>
#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
#include <stout/os.hpp>
#include "logging/logging.hpp"
@@ -34,23 +38,58 @@ using namespace process;
using process::wait; // Necessary on some OS's to disambiguate.
+using std::map;
using std::string;
+using std::vector;
namespace mesos {
namespace internal {
namespace slave {
+
class GarbageCollectorProcess : public Process<GarbageCollectorProcess>
{
public:
+ virtual ~GarbageCollectorProcess();
+
// GarbageCollector implementation.
Future<bool> schedule(const Duration& d, const string& path);
+ void prune(const Duration& d);
+
private:
- void remove(const string& path, Promise<bool>* promise);
+ void remove(const Timeout& removalTime);
+
+ struct PathInfo
+ {
+ PathInfo(const string& _path, Promise<bool>* _promise)
+ : path(_path), promise(_promise) {}
+
+ string path;
+ Promise<bool>* promise;
+ };
+
+ // Store all the paths that needed to be deleted after a given timeout.
+ // NOTE: We are using std::map here instead of hashmap, because we
+ // need the keys of the map (deletion time) to be sorted in ascending order.
+ map<Timeout, vector<PathInfo> > paths;
+
+ void reset();
+ Timer timer;
};
+GarbageCollectorProcess::~GarbageCollectorProcess()
+{
+ foreachvalue (const vector<PathInfo>& infos, paths) {
+ foreach (const PathInfo& info, infos) {
+ info.promise->future().discard();
+ delete info.promise;
+ }
+ }
+}
+
+
Future<bool> GarbageCollectorProcess::schedule(
const Duration& d,
const string& path)
@@ -59,24 +98,77 @@ Future<bool> GarbageCollectorProcess::sc
Promise<bool>* promise = new Promise<bool>();
- delay(d, self(), &Self::remove, path, promise);
+ Timeout removalTime(d);
+
+ paths[removalTime].push_back(PathInfo(path, promise));
+
+ // If the timer is not yet initialized or the timeout is sooner than
+ // the currently active timer, update it.
+ if (timer.timeout().remaining() == Seconds(0) ||
+ removalTime < timer.timeout()) {
+ reset(); // Schedule the timer for next event.
+ }
return promise->future();
}
-void GarbageCollectorProcess::remove(
- const string& path,
- Promise<bool>* promise)
-{
- LOG(INFO) << "Removing " << path;
-
- // TODO(benh): Check error conditions of 'rmdir', e.g., permission
- // denied, file no longer exists, etc.
- bool result = os::rmdir(path);
+// Fires a message to self for the next event. This also cancels any
+// existing timer.
+void GarbageCollectorProcess::reset()
+{
+ Timer::cancel(timer); // Cancel the existing timer, if any.
+ if (!paths.empty()) {
+ Timeout removalTime = (*paths.begin()).first; // Get the first entry.
+ Duration d = removalTime.remaining();
+
+ VLOG(1) << "Scheduling GC removal event to fire after " << d;
+ timer = delay(d, self(), &Self::remove, removalTime);
+ } else {
+ timer = Timer(); // Reset the timer.
+ }
+}
- promise->set(result);
- delete promise;
+
+void GarbageCollectorProcess::remove(const Timeout& removalTime)
+{
+ if (paths.count(removalTime) > 0) {
+ foreach (const PathInfo& info, paths[removalTime]) {
+ const string& path = info.path;
+ Promise<bool>* promise = info.promise;
+
+ LOG(INFO) << "Deleting " << path;
+
+ // TODO(benh): Check error conditions of 'rmdir', e.g., permission
+ // denied, file no longer exists, etc.
+ // TODO(vinod): Consider invoking rmdir via async.
+ bool result = os::rmdir(path);
+
+ VLOG(1) << "Deleted " << path;
+ promise->set(result);
+ delete promise;
+ }
+ paths.erase(removalTime);
+ } else {
+ // This might happen if the directory(s) has already been removed
+ // (e.g: by prune())
+ LOG(WARNING) << "Ignoring gc event at " << removalTime.remaining()
+ << " as the corresponding directories are already removed";
+ }
+
+ reset(); // Schedule the timer for next event.
+}
+
+
+void GarbageCollectorProcess::prune(const Duration& d)
+{
+ foreachkey (const Timeout& removalTime, paths) {
+ if (removalTime.remaining() <= d) {
+ LOG(INFO) << "Pruning directories with remaining removal time "
+ << removalTime.remaining();
+ dispatch(self(), &GarbageCollectorProcess::remove, removalTime);
+ }
+ }
}
@@ -91,6 +183,7 @@ GarbageCollector::~GarbageCollector()
{
terminate(process);
wait(process);
+ delete process;
}
@@ -101,6 +194,12 @@ Future<bool> GarbageCollector::schedule(
return dispatch(process, &GarbageCollectorProcess::schedule, d, path);
}
+
+void GarbageCollector::prune(const Duration& d)
+{
+ return dispatch(process, &GarbageCollectorProcess::prune, d);
+}
+
} // namespace mesos {
} // namespace internal {
} // namespace slave {
Modified: incubator/mesos/trunk/src/slave/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.hpp (original)
+++ incubator/mesos/trunk/src/slave/gc.hpp Mon Sep 17 22:27:02 2012
@@ -51,6 +51,10 @@ public:
// error, e.g., permission denied).
process::Future<bool> schedule(const Duration& d, const std::string& path);
+ // Deletes all the directories, whose scheduled garbage collection time
+ // is within the next 'd' duration of time.
+ void prune(const Duration& d);
+
private:
GarbageCollectorProcess* process;
};
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Mon Sep 17 22:27:02 2012
@@ -31,6 +31,7 @@
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
+#include <stout/numify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/utils.hpp>
@@ -210,6 +211,12 @@ void Slave::initialize()
&IsolationModule::initialize,
flags, local, self());
+ // Start disk monitoring.
+ // NOTE: We send a delayed message here instead of directly calling
+ // checkDiskUsage, to make disabling this feature easy (e.g by specifying
+ // a very large disk_watch_interval).
+ delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
+
// Start all the statistics at 0.
stats.tasks[TASK_STAGING] = 0;
stats.tasks[TASK_STARTING] = 0;
@@ -520,6 +527,10 @@ void Slave::runTask(const FrameworkInfo&
&IsolationModule::resourcesChanged,
framework->id, executor->id, executor->resources);
+ LOG(INFO) << "Sending task '" << task.task_id()
+ << "' to executor '" << executorId
+ << "' of framework " << framework->id;
+
RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
message.mutable_framework_id()->MergeFrom(framework->id);
@@ -1542,25 +1553,75 @@ string Slave::createUniqueWorkDirectory(
// Find a unique directory based on the path given by the slave
// (this is because we might launch multiple executors from the same
// framework on this slave).
+ // NOTE: The run number of the new directory will be the highest of
+ // all the existing run directories for this executor.
out << "/runs/";
- const string& prefix = out.str();
+ int maxrun = 0;
+ foreach (const string& runStr, os::ls(out.str())) {
+ Try<int> run = numify<int>(runStr);
+ if (run.isError()) {
+ LOG(ERROR) << "Ignoring invalid run directory " << runStr;
+ continue;
+ }
+
+ maxrun = std::max(maxrun, run.get());
+ }
+ out << maxrun;
+
+ bool created = os::mkdir(out.str());
+ CHECK(created) << "Error creating work directory " << out.str();
+
+ return out.str();
+}
+
+
+// TODO(vinod): Figure out a way to express this function via cmd line.
+Duration Slave::age(double usage)
+{
+ return Weeks(flags.gc_delay.weeks() * (1.0 - usage));
+}
+
- for (int i = 0; i < INT_MAX; i++) {
- out << i;
- VLOG(1) << "Checking if " << out.str() << " already exists";
- if (!os::exists(out.str())) {
- bool created = os::mkdir(out.str());
- CHECK(created) << "Error creating work directory: " << out.str();
- return out.str();
+void Slave::checkDiskUsage()
+{
+ VLOG(1) << "Checking disk usage";
+
+ // TODO(vinod): We are making usage a Future, so that we can plug-in
+ // os::usage() into async.
+ Future<Try<double> > usage = os::usage();
+
+ usage.onAny(
+ defer(self(),
+ &Slave::_checkDiskUsage,
+ usage));
+}
+
+
+void Slave::_checkDiskUsage(const Future<Try<double> >& usage)
+{
+ if (!usage.isReady()) {
+ LOG(WARNING) << "Error getting disk usage";
+ } else {
+ Try<double> result = usage.get();
+
+ if (result.isSome()) {
+ double use = result.get();
+
+ LOG(INFO) << "Current disk usage " << std::setiosflags(std::ios::fixed)
+ << std::setprecision(2) << 100 * use << "%."
+ << " Max allowed age: " << age(use);
+
+ // We prune all directories whose deletion time is within
+ // the next 'gc_delay - age'. Since a directory is always
+ // scheduled for deletion 'gc_delay' into the future, only directories
+ // that are at least 'age' old are deleted.
+ gc.prune(Weeks(flags.gc_delay.weeks() - age(use).weeks()));
} else {
- out.str(prefix); // Try with prefix again.
+ LOG(WARNING) << "Unable to get disk usage: " << result.error();
}
}
-
- LOG(FATAL) << "Could not create work directory for executor '"
- << executorId << "' of framework" << frameworkId;
- return NULL;
+ delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
}
} // namespace slave {
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Mon Sep 17 22:27:02 2012
@@ -124,6 +124,14 @@ public:
const FrameworkID& frameworkId,
bool command_executor,
int status);
+
+ // NOTE: Pulled this to public to make it visible for testing.
+ // Garbage collects the directories based on the current disk usage.
+ // TODO(vinod): Instead of making this function public, we need to
+ // mock both GarbageCollector (and pass it through slave's constructor)
+ // and os calls.
+ void _checkDiskUsage(const Future<Try<double> >& capacity);
+
protected:
virtual void initialize();
virtual void finalize();
@@ -159,6 +167,14 @@ protected:
std::string createUniqueWorkDirectory(const FrameworkID& frameworkId,
const ExecutorID& executorId);
+
+ // This function returns the max age of executor/slave directories allowed,
+ // given a disk usage. This value could be used to tune gc.
+ Duration age(double usage);
+
+ // Checks the current disk usage and schedules for gc as necessary.
+ void checkDiskUsage();
+
private:
// HTTP handlers, friends of the slave in order to access state,
// they get invoked from within the slave so there is no need to
Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Mon Sep 17 22:27:02 2012
@@ -27,6 +27,8 @@
#include <stout/duration.hpp>
#include <stout/os.hpp>
+#include "common/resources.hpp"
+
#include "detector/detector.hpp"
#include "logging/logging.hpp"
@@ -70,6 +72,12 @@ protected:
static void SetUpTestCase()
{
flags.work_dir = "/tmp/mesos-tests";
+ flags.resources = Option<string>::some("cpus:2;mem:1024");
+
+ Resources resources = Resources::parse(flags.resources.get());
+ Value::Scalar none;
+ cpus = resources.get("cpus", none).value();
+ mem = resources.get("mem", none).value();
}
virtual void SetUp()
@@ -87,17 +95,6 @@ protected:
master = process::spawn(m);
execs[DEFAULT_EXECUTOR_ID] = &exec;
-
- EXPECT_CALL(exec, registered(_, _, _, _))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(exec, launchTask(_, _))
- .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
-
- EXPECT_CALL(exec, shutdown(_))
- .WillRepeatedly(Return());
-
- driver = new MesosSchedulerDriver(&sched, DEFAULT_FRAMEWORK_INFO, master);
}
virtual void TearDown()
@@ -141,30 +138,6 @@ protected:
startSlave();
}
- // Launches a task based on the received offer.
- void launchTask(const ExecutorInfo& executorInfo)
- {
- EXPECT_NE(0u, offers.size());
-
- TaskInfo task;
- task.set_name("");
- task.mutable_task_id()->set_value(UUID::random().toString());
- task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
- task.mutable_resources()->MergeFrom(offers[0].resources());
- task.mutable_executor()->MergeFrom(executorInfo);
-
- tasks.push_back(task);
-
- driver->launchTasks(offers[0].id(), tasks);
-
- tasks.clear();
- }
-
- void launchTask()
- {
- launchTask(DEFAULT_EXECUTOR_INFO);
- }
-
TestAllocatorProcess* a;
Master* m;
TestingIsolationModule* isolationModule;
@@ -174,25 +147,26 @@ protected:
MockExecutor exec, exec1;
map<ExecutorID, Executor*> execs;
MockScheduler sched;
- MesosSchedulerDriver* driver;
- trigger resourceOffersCall;
SlaveRegisteredMessage registeredMsg;
- vector<Offer> offers;
TaskStatus status;
- vector<TaskInfo> tasks;
MockFilter filter;
PID<Master> master;
PID<Slave> slave;
static flags::Flags<logging::Flags, slave::Flags> flags;
+ static double cpus;
+ static double mem;
};
// Initialize static members here.
flags::Flags<logging::Flags, slave::Flags> GarbageCollectorTest::flags;
+double GarbageCollectorTest::cpus;
+double GarbageCollectorTest::mem;
TEST_F(GarbageCollectorTest, Restart)
{
+ // Messages expectations.
process::Message message;
trigger slaveRegisteredMsg1, slaveRegisteredMsg2;
EXPECT_MESSAGE(filter, Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
@@ -206,11 +180,23 @@ TEST_F(GarbageCollectorTest, Restart)
EXPECT_MESSAGE(filter, Eq(LostSlaveMessage().GetTypeName()), _, _)
.WillRepeatedly(DoAll(Trigger(&lostSlaveMsg), Return(false)));
+ // Executor expectations.
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(Return());
+
+ // Scheduler expectations.
EXPECT_CALL(sched, registered(_, _, _))
.Times(1);
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillRepeatedly(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)));
+ .WillOnce(LaunchTasks(1, cpus, mem))
+ .WillRepeatedly(Return());
trigger statusUpdateCall;
EXPECT_CALL(sched, statusUpdate(_, _))
@@ -228,11 +214,9 @@ TEST_F(GarbageCollectorTest, Restart)
registeredMsg.ParseFromString(message.body);
SlaveID slaveId = registeredMsg.slave_id();
- driver->start();
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
- WAIT_UNTIL(resourceOffersCall);
-
- launchTask();
+ driver.start();
WAIT_UNTIL(statusUpdateCall);
@@ -269,23 +253,36 @@ TEST_F(GarbageCollectorTest, Restart)
Clock::resume();
- driver->stop();
- driver->join();
+ driver.stop();
+ driver.join();
}
TEST_F(GarbageCollectorTest, ExitedExecutor)
{
+ // Messages expectations.
trigger exitedExecutorMsg;
EXPECT_MESSAGE(filter, Eq(ExitedExecutorMessage().GetTypeName()), _, _)
.WillOnce(DoAll(Trigger(&exitedExecutorMsg), Return(false)));
+ // Executor expectations.
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(Return());
+
+ // Scheduler expectations.
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillRepeatedly(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)));
+ .WillOnce(LaunchTasks(1, cpus, mem))
+ .WillRepeatedly(Return());
trigger statusUpdateCall;
EXPECT_CALL(sched, statusUpdate(_, _))
@@ -295,11 +292,9 @@ TEST_F(GarbageCollectorTest, ExitedExecu
startSlave();
- driver->start();
-
- WAIT_UNTIL(resourceOffersCall);
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
- launchTask();
+ driver.start();
WAIT_UNTIL(statusUpdateCall);
@@ -331,11 +326,92 @@ TEST_F(GarbageCollectorTest, ExitedExecu
Clock::settle();
- // First executor's directory should be gc'ed by now.
+ // Executor's directory should be gc'ed by now.
+ ASSERT_FALSE(os::exists(executorDir));
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+}
+
+
+TEST_F(GarbageCollectorTest, DiskUsage)
+{
+ // Messages expectations.
+ trigger exitedExecutorMsg;
+ EXPECT_MESSAGE(filter, Eq(ExitedExecutorMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(Trigger(&exitedExecutorMsg), Return(false)));
+
+ // Executor expectations.
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(Return());
+
+ // Scheduler expectations.
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(1, cpus, mem))
+ .WillRepeatedly(Return());
+
+ trigger statusUpdateCall;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)))
+ .WillOnce(Return()) // Ignore the TASK_LOST update.
+ .WillRepeatedly(Trigger(&statusUpdateCall));
+
+ startSlave();
+
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+
+ driver.start();
+
+ WAIT_UNTIL(statusUpdateCall);
+
+ EXPECT_EQ(TASK_RUNNING, status.state());
+
+ const std::string& executorDir =
+ isolationModule->directories[DEFAULT_EXECUTOR_ID];
+
+ ASSERT_TRUE(os::exists(executorDir));
+
+ Clock::pause();
+
+ // Kill the executor and inform the slave.
+ isolationModule->killExecutor(frameworkId, DEFAULT_EXECUTOR_ID);
+
+ process::dispatch(slave, &Slave::executorExited, frameworkId,
+ DEFAULT_EXECUTOR_ID, 0);
+
+ // In order to make sure the slave has scheduled the executor
+ // directory to get garbage collected we need to wait until the
+ // slave has sent the ExecutorExited message. TODO(benh): We really
+ // need to wait until the GarbageCollectorProcess has dispatched a
+ // message back to itself.
+ WAIT_UNTIL(exitedExecutorMsg);
+
+ // Simulate a disk full message to the slave.
+ process::dispatch(slave, &Slave::_checkDiskUsage, Try<double>::some(1));
+
+ // TODO(vinod): As above, we need to wait until GarbageCollectorProcess has
+ // dispatched remove message back to itself.
+ sleep(1);
+
+ Clock::settle();
+
+ // Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
Clock::resume();
- driver->stop();
- driver->join();
+ driver.stop();
+ driver.join();
}
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Mon Sep 17 22:27:02 2012
@@ -275,6 +275,7 @@ ACTION_P3(LaunchTasks, tasks, cpus, mem)
}
}
+
// Like LaunchTasks, but decline the entire offer and
// don't launch any tasks.
ACTION(DeclineOffers)
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp Mon Sep 17 22:27:02 2012
@@ -40,6 +40,21 @@ public:
return *this;
}
+ bool operator == (const Timeout& that) const
+ {
+ return timeout == that.timeout;
+ }
+
+ bool operator < (const Timeout& that) const
+ {
+ return timeout < that.timeout;
+ }
+
+ bool operator <= (const Timeout& that) const
+ {
+ return timeout <= that.timeout;
+ }
+
// Returns the value of the timeout as the number of seconds elapsed
// since the epoch.
double value() const
Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp Mon Sep 17 22:27:02 2012
@@ -73,6 +73,8 @@ public:
bool operator == (const Duration& that) const { return value == that.value; }
bool operator != (const Duration& that) const { return value != that.value; }
+ // TODO(vinod): Overload arithmetic operators.
+
protected:
static const uint64_t NANOSECONDS = 1;
static const uint64_t MICROSECONDS = 1000 * NANOSECONDS;
Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp Mon Sep 17 22:27:02 2012
@@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <sys/stat.h>
+#include <sys/statvfs.h>
#ifdef __linux__
#include <sys/sysinfo.h>
#endif
@@ -165,7 +166,8 @@ inline Try<bool> isNonblock(int fd)
inline Try<bool> touch(const std::string& path)
{
- Try<int> fd = open(path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+ Try<int> fd =
+ open(path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
if (fd.isError()) {
return Try<bool>::error("Failed to open file " + path);
@@ -586,7 +588,8 @@ inline Try<std::list<std::string> > find
std::list<std::string> results;
if (!isdir(directory)) {
- return Try<std::list<std::string> >::error("Directory " + directory + " doesn't exist!");
+ return Try<std::list<std::string> >::error(
+ "Directory " + directory + " doesn't exist!");
}
foreach (const std::string& entry, ls(directory)) {
@@ -597,7 +600,8 @@ inline Try<std::list<std::string> > find
// This is just a hack to check whether this path is a regular file or
// a (sub) directory.
if (isdir(result)) { // If its a directory recurse.
- CHECK(find(result, pattern).isSome()) << "Directory " << directory << " doesn't exist";
+ CHECK(find(result, pattern).isSome())
+ << "Directory " << directory << " doesn't exist";
foreach (const std::string& path, find(result, pattern).get()) {
results.push_back(path);
}
@@ -612,6 +616,17 @@ inline Try<std::list<std::string> > find
}
+// Returns relative disk usage of the file system mounted at the given path.
+inline Try<double> usage(const std::string& fs = "/")
+{
+ struct statvfs buf;
+ if (statvfs(fs.c_str(), &buf) < 0) {
+ return Try<double>::error(strerror(errno));
+ }
+ return (double) (buf.f_blocks - buf.f_bfree) / buf.f_blocks;
+}
+
+
inline std::string user()
{
passwd* passwd;