You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/09/18 00:27:03 UTC

svn commit: r1386882 - in /incubator/mesos/trunk: src/slave/ src/tests/ third_party/libprocess/include/process/ third_party/libprocess/include/stout/

Author: benh
Date: Mon Sep 17 22:27:02 2012
New Revision: 1386882

URL: http://svn.apache.org/viewvc?rev=1386882&view=rev
Log:
Updated slave garbage collection to do it based on disk usage
(contributed by Vinod Kone, https://reviews.apache.org/r/6704).

Modified:
    incubator/mesos/trunk/src/slave/constants.hpp
    incubator/mesos/trunk/src/slave/flags.hpp
    incubator/mesos/trunk/src/slave/gc.cpp
    incubator/mesos/trunk/src/slave/gc.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/gc_tests.cpp
    incubator/mesos/trunk/src/tests/utils.hpp
    incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
    incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp
    incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp

Modified: incubator/mesos/trunk/src/slave/constants.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/constants.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/constants.hpp (original)
+++ incubator/mesos/trunk/src/slave/constants.hpp Mon Sep 17 22:27:02 2012
@@ -28,6 +28,7 @@ namespace slave {
 const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5.0);
 const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10.0);
 const Duration GC_DELAY = Weeks(1.0);
+const Duration DISK_WATCH_INTERVAL = Minutes(1.0);
 
 } // namespace slave {
 } // namespace internal {

Modified: incubator/mesos/trunk/src/slave/flags.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/flags.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/flags.hpp (original)
+++ incubator/mesos/trunk/src/slave/flags.hpp Mon Sep 17 22:27:02 2012
@@ -94,10 +94,18 @@ public:
 
     add(&Flags::gc_delay,
         "gc_delay",
-        "Amount of time to wait before cleaning up\n"
-        "executor directories (e.g., 3days, 2weeks, etc)",
+        "Maximum amount of time to wait before cleaning up\n"
+        "executor directories (e.g., 3days, 2weeks, etc).\n"
+        "Note that this delay may be shorter depending on\n"
+        "the available disk usage.",
         GC_DELAY);
 
+    add(&Flags::disk_watch_interval,
+        "disk_watch_interval",
+        "Periodic time interval (e.g., 10secs, 2mins, etc)\n"
+        "to check the disk usage",
+        DISK_WATCH_INTERVAL);
+
 #ifdef __linux__
     add(&Flags::cgroups_hierarchy_root,
         "cgroups_hierarchy_root",
@@ -117,6 +125,7 @@ public:
   std::string frameworks_home;  // TODO(benh): Make an Option.
   Duration executor_shutdown_grace_period;
   Duration gc_delay;
+  Duration disk_watch_interval;
 #ifdef __linux__
   std::string cgroups_hierarchy_root;
 #endif

Modified: incubator/mesos/trunk/src/slave/gc.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.cpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.cpp (original)
+++ incubator/mesos/trunk/src/slave/gc.cpp Mon Sep 17 22:27:02 2012
@@ -16,14 +16,18 @@
  * limitations under the License.
  */
 
+#include <map>
 #include <string>
+#include <vector>
 
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/process.hpp>
+#include <process/timeout.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/foreach.hpp>
 #include <stout/os.hpp>
 
 #include "logging/logging.hpp"
@@ -34,23 +38,58 @@ using namespace process;
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
+using std::map;
 using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
 namespace slave {
 
+
 class GarbageCollectorProcess : public Process<GarbageCollectorProcess>
 {
 public:
+  virtual ~GarbageCollectorProcess();
+
   // GarbageCollector implementation.
   Future<bool> schedule(const Duration& d, const string& path);
 
+  void prune(const Duration& d);
+
 private:
-  void remove(const string& path, Promise<bool>* promise);
+  void remove(const Timeout& removalTime);
+
+  struct PathInfo
+  {
+    PathInfo(const string& _path, Promise<bool>* _promise)
+      : path(_path), promise(_promise) {}
+
+    string path;
+    Promise<bool>* promise;
+  };
+
+  // Store all the paths that needed to be deleted after a given timeout.
+  // NOTE: We are using std::map here instead of hashmap, because we
+  // need the keys of the map (deletion time) to be sorted in ascending order.
+  map<Timeout, vector<PathInfo> > paths;
+
+  void reset();
+  Timer timer;
 };
 
 
+GarbageCollectorProcess::~GarbageCollectorProcess()
+{
+  foreachvalue (const vector<PathInfo>& infos, paths) {
+    foreach (const PathInfo& info, infos) {
+      info.promise->future().discard();
+      delete info.promise;
+    }
+  }
+}
+
+
 Future<bool> GarbageCollectorProcess::schedule(
     const Duration& d,
     const string& path)
@@ -59,24 +98,77 @@ Future<bool> GarbageCollectorProcess::sc
 
   Promise<bool>* promise = new Promise<bool>();
 
-  delay(d, self(), &Self::remove, path, promise);
+  Timeout removalTime(d);
+
+  paths[removalTime].push_back(PathInfo(path, promise));
+
+  // If the timer is not yet initialized or the timeout is sooner than
+  // the currently active timer, update it.
+  if (timer.timeout().remaining() == Seconds(0) ||
+      removalTime < timer.timeout()) {
+    reset(); // Schedule the timer for next event.
+  }
 
   return promise->future();
 }
 
 
-void GarbageCollectorProcess::remove(
-    const string& path,
-    Promise<bool>* promise)
-{
-  LOG(INFO) << "Removing " << path;
-
-  // TODO(benh): Check error conditions of 'rmdir', e.g., permission
-  // denied, file no longer exists, etc.
-  bool result = os::rmdir(path);
+// Fires a message to self for the next event. This also cancels any
+// existing timer.
+void GarbageCollectorProcess::reset()
+{
+  Timer::cancel(timer); // Cancel the existing timer, if any.
+  if (!paths.empty()) {
+    Timeout removalTime = (*paths.begin()).first; // Get the first entry.
+    Duration d = removalTime.remaining();
+
+    VLOG(1) << "Scheduling GC removal event to fire after " << d;
+    timer = delay(d, self(), &Self::remove, removalTime);
+  } else {
+    timer = Timer(); // Reset the timer.
+  }
+}
 
-  promise->set(result);
-  delete promise;
+
+void GarbageCollectorProcess::remove(const Timeout& removalTime)
+{
+  if (paths.count(removalTime) > 0) {
+    foreach (const PathInfo& info, paths[removalTime]) {
+      const string& path = info.path;
+      Promise<bool>* promise = info.promise;
+
+      LOG(INFO) << "Deleting " << path;
+
+      // TODO(benh): Check error conditions of 'rmdir', e.g., permission
+      // denied, file no longer exists, etc.
+      // TODO(vinod): Consider invoking rmdir via async.
+      bool result = os::rmdir(path);
+
+      VLOG(1) << "Deleted " << path;
+      promise->set(result);
+      delete promise;
+    }
+    paths.erase(removalTime);
+  } else {
+    // This might happen if the directory(s) has already been removed
+    // (e.g: by prune())
+    LOG(WARNING) << "Ignoring gc event at " << removalTime.remaining()
+                 << " as the corresponding directories are already removed";
+  }
+
+  reset(); // Schedule the timer for next event.
+}
+
+
+void GarbageCollectorProcess::prune(const Duration& d)
+{
+  foreachkey (const Timeout& removalTime, paths) {
+    if (removalTime.remaining() <= d) {
+      LOG(INFO) << "Pruning directories with remaining removal time "
+                << removalTime.remaining();
+      dispatch(self(), &GarbageCollectorProcess::remove, removalTime);
+    }
+  }
 }
 
 
@@ -91,6 +183,7 @@ GarbageCollector::~GarbageCollector()
 {
   terminate(process);
   wait(process);
+  delete process;
 }
 
 
@@ -101,6 +194,12 @@ Future<bool> GarbageCollector::schedule(
   return dispatch(process, &GarbageCollectorProcess::schedule, d, path);
 }
 
+
+void GarbageCollector::prune(const Duration& d)
+{
+  return dispatch(process, &GarbageCollectorProcess::prune, d);
+}
+
 } // namespace mesos {
 } // namespace internal {
 } // namespace slave {

Modified: incubator/mesos/trunk/src/slave/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.hpp (original)
+++ incubator/mesos/trunk/src/slave/gc.hpp Mon Sep 17 22:27:02 2012
@@ -51,6 +51,10 @@ public:
   // error, e.g., permission denied).
   process::Future<bool> schedule(const Duration& d, const std::string& path);
 
+  // Deletes all the directories, whose scheduled garbage collection time
+  // is within the next 'd' duration of time.
+  void prune(const Duration& d);
+
 private:
   GarbageCollectorProcess* process;
 };

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Mon Sep 17 22:27:02 2012
@@ -31,6 +31,7 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
+#include <stout/numify.hpp>
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 #include <stout/utils.hpp>
@@ -210,6 +211,12 @@ void Slave::initialize()
            &IsolationModule::initialize,
            flags, local, self());
 
+  // Start disk monitoring.
+  // NOTE: We send a delayed message here instead of directly calling
+  // checkDiskUsage, to make disabling this feature easy (e.g by specifying
+  // a very large disk_watch_interval).
+  delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
+
   // Start all the statistics at 0.
   stats.tasks[TASK_STAGING] = 0;
   stats.tasks[TASK_STARTING] = 0;
@@ -520,6 +527,10 @@ void Slave::runTask(const FrameworkInfo&
                &IsolationModule::resourcesChanged,
                framework->id, executor->id, executor->resources);
 
+      LOG(INFO) << "Sending task '" << task.task_id()
+                << "' to executor '" << executorId
+                << "' of framework " << framework->id;
+
       RunTaskMessage message;
       message.mutable_framework()->MergeFrom(framework->info);
       message.mutable_framework_id()->MergeFrom(framework->id);
@@ -1542,25 +1553,75 @@ string Slave::createUniqueWorkDirectory(
   // Find a unique directory based on the path given by the slave
   // (this is because we might launch multiple executors from the same
   // framework on this slave).
+  // NOTE: The run number of the new directory will be the highest of
+  // all the existing run directories for this executor.
   out << "/runs/";
 
-  const string& prefix = out.str();
+  int maxrun = 0;
+  foreach (const string& runStr, os::ls(out.str())) {
+    Try<int> run = numify<int>(runStr);
+    if (run.isError()) {
+      LOG(ERROR) << "Ignoring invalid run directory " << runStr;
+      continue;
+    }
+
+    maxrun = std::max(maxrun, run.get());
+  }
+  out << maxrun;
+
+  bool created = os::mkdir(out.str());
+  CHECK(created) << "Error creating work directory " << out.str();
+
+  return out.str();
+}
+
+
+// TODO(vinod): Figure out a way to express this function via cmd line.
+Duration Slave::age(double usage)
+{
+ return Weeks(flags.gc_delay.weeks() * (1.0 - usage));
+}
+
 
-  for (int i = 0; i < INT_MAX; i++) {
-    out << i;
-    VLOG(1) << "Checking if " << out.str() << " already exists";
-    if (!os::exists(out.str())) {
-      bool created = os::mkdir(out.str());
-      CHECK(created) << "Error creating work directory: " << out.str();
-      return out.str();
+void Slave::checkDiskUsage()
+{
+  VLOG(1) << "Checking disk usage";
+
+  // TODO(vinod): We are making usage a Future, so that we can plug-in
+  // os::usage() into async.
+  Future<Try<double> > usage = os::usage();
+
+  usage.onAny(
+      defer(self(),
+            &Slave::_checkDiskUsage,
+            usage));
+}
+
+
+void Slave::_checkDiskUsage(const Future<Try<double> >& usage)
+{
+  if (!usage.isReady()) {
+    LOG(WARNING) << "Error getting disk usage";
+  } else {
+    Try<double> result = usage.get();
+
+    if (result.isSome()) {
+      double use = result.get();
+
+      LOG(INFO) << "Current disk usage " << std::setiosflags(std::ios::fixed)
+                << std::setprecision(2) << 100 * use << "%."
+                << " Max allowed age: " << age(use);
+
+      // We prune all directories whose deletion time is within
+      // the next 'gc_delay - age'. Since a directory is always
+      // scheduled for deletion 'gc_delay' into the future, only directories
+      // that are at least 'age' old are deleted.
+      gc.prune(Weeks(flags.gc_delay.weeks() - age(use).weeks()));
     } else {
-      out.str(prefix); // Try with prefix again.
+      LOG(WARNING) << "Unable to get disk usage: " << result.error();
     }
   }
-
-  LOG(FATAL) << "Could not create work directory for executor '"
-             << executorId << "' of framework" << frameworkId;
-  return NULL;
+  delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
 }
 
 } // namespace slave {

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Mon Sep 17 22:27:02 2012
@@ -124,6 +124,14 @@ public:
                           const FrameworkID& frameworkId,
                           bool command_executor,
                           int status);
+
+  // NOTE: Pulled this to public to make it visible for testing.
+  // Garbage collects the directories based on the current disk usage.
+  // TODO(vinod): Instead of making this function public, we need to
+  // mock both GarbageCollector (and pass it through slave's constructor)
+  // and os calls.
+  void _checkDiskUsage(const Future<Try<double> >& capacity);
+
 protected:
   virtual void initialize();
   virtual void finalize();
@@ -159,6 +167,14 @@ protected:
   std::string createUniqueWorkDirectory(const FrameworkID& frameworkId,
                                         const ExecutorID& executorId);
 
+
+  // This function returns the max age of executor/slave directories allowed,
+  // given a disk usage. This value could be used to tune gc.
+  Duration age(double usage);
+
+  // Checks the current disk usage and schedules for gc as necessary.
+  void checkDiskUsage();
+
 private:
   // HTTP handlers, friends of the slave in order to access state,
   // they get invoked from within the slave so there is no need to

Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Mon Sep 17 22:27:02 2012
@@ -27,6 +27,8 @@
 #include <stout/duration.hpp>
 #include <stout/os.hpp>
 
+#include "common/resources.hpp"
+
 #include "detector/detector.hpp"
 
 #include "logging/logging.hpp"
@@ -70,6 +72,12 @@ protected:
   static void SetUpTestCase()
   {
     flags.work_dir = "/tmp/mesos-tests";
+    flags.resources = Option<string>::some("cpus:2;mem:1024");
+
+    Resources resources = Resources::parse(flags.resources.get());
+    Value::Scalar none;
+    cpus = resources.get("cpus", none).value();
+    mem = resources.get("mem", none).value();
   }
 
   virtual void SetUp()
@@ -87,17 +95,6 @@ protected:
     master = process::spawn(m);
 
     execs[DEFAULT_EXECUTOR_ID] = &exec;
-
-    EXPECT_CALL(exec, registered(_, _, _, _))
-      .WillRepeatedly(Return());
-
-    EXPECT_CALL(exec, launchTask(_, _))
-      .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
-
-    EXPECT_CALL(exec, shutdown(_))
-      .WillRepeatedly(Return());
-
-    driver = new MesosSchedulerDriver(&sched, DEFAULT_FRAMEWORK_INFO, master);
   }
 
   virtual void TearDown()
@@ -141,30 +138,6 @@ protected:
     startSlave();
   }
 
-  // Launches a task based on the received offer.
-  void launchTask(const ExecutorInfo& executorInfo)
-  {
-    EXPECT_NE(0u, offers.size());
-
-    TaskInfo task;
-    task.set_name("");
-    task.mutable_task_id()->set_value(UUID::random().toString());
-    task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
-    task.mutable_resources()->MergeFrom(offers[0].resources());
-    task.mutable_executor()->MergeFrom(executorInfo);
-
-    tasks.push_back(task);
-
-    driver->launchTasks(offers[0].id(), tasks);
-
-    tasks.clear();
-  }
-
-  void launchTask()
-  {
-    launchTask(DEFAULT_EXECUTOR_INFO);
-  }
-
   TestAllocatorProcess* a;
   Master* m;
   TestingIsolationModule* isolationModule;
@@ -174,25 +147,26 @@ protected:
   MockExecutor exec, exec1;
   map<ExecutorID, Executor*> execs;
   MockScheduler sched;
-  MesosSchedulerDriver* driver;
-  trigger resourceOffersCall;
   SlaveRegisteredMessage registeredMsg;
-  vector<Offer> offers;
   TaskStatus status;
-  vector<TaskInfo> tasks;
   MockFilter filter;
   PID<Master> master;
   PID<Slave> slave;
   static flags::Flags<logging::Flags, slave::Flags> flags;
+  static double cpus;
+  static double mem;
 };
 
 
 // Initialize static members here.
 flags::Flags<logging::Flags, slave::Flags> GarbageCollectorTest::flags;
+double GarbageCollectorTest::cpus;
+double GarbageCollectorTest::mem;
 
 
 TEST_F(GarbageCollectorTest, Restart)
 {
+  // Messages expectations.
   process::Message message;
   trigger slaveRegisteredMsg1, slaveRegisteredMsg2;
   EXPECT_MESSAGE(filter, Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
@@ -206,11 +180,23 @@ TEST_F(GarbageCollectorTest, Restart)
   EXPECT_MESSAGE(filter, Eq(LostSlaveMessage().GetTypeName()), _, _)
     .WillRepeatedly(DoAll(Trigger(&lostSlaveMsg), Return(false)));
 
+  // Executor expectations.
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(Return());
+
+  // Scheduler expectations.
   EXPECT_CALL(sched, registered(_, _, _))
     .Times(1);
 
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillRepeatedly(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)));
+    .WillOnce(LaunchTasks(1, cpus, mem))
+    .WillRepeatedly(Return());
 
   trigger statusUpdateCall;
   EXPECT_CALL(sched, statusUpdate(_, _))
@@ -228,11 +214,9 @@ TEST_F(GarbageCollectorTest, Restart)
   registeredMsg.ParseFromString(message.body);
   SlaveID slaveId = registeredMsg.slave_id();
 
-  driver->start();
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
-  WAIT_UNTIL(resourceOffersCall);
-
-  launchTask();
+  driver.start();
 
   WAIT_UNTIL(statusUpdateCall);
 
@@ -269,23 +253,36 @@ TEST_F(GarbageCollectorTest, Restart)
 
   Clock::resume();
 
-  driver->stop();
-  driver->join();
+  driver.stop();
+  driver.join();
 }
 
 
 TEST_F(GarbageCollectorTest, ExitedExecutor)
 {
+  // Messages expectations.
   trigger exitedExecutorMsg;
   EXPECT_MESSAGE(filter, Eq(ExitedExecutorMessage().GetTypeName()), _, _)
     .WillOnce(DoAll(Trigger(&exitedExecutorMsg), Return(false)));
 
+  // Executor expectations.
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(Return());
+
+  // Scheduler expectations.
   FrameworkID frameworkId;
   EXPECT_CALL(sched, registered(_, _, _))
     .WillOnce(SaveArg<1>(&frameworkId));
 
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillRepeatedly(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)));
+    .WillOnce(LaunchTasks(1, cpus, mem))
+    .WillRepeatedly(Return());
 
   trigger statusUpdateCall;
   EXPECT_CALL(sched, statusUpdate(_, _))
@@ -295,11 +292,9 @@ TEST_F(GarbageCollectorTest, ExitedExecu
 
   startSlave();
 
-  driver->start();
-
-  WAIT_UNTIL(resourceOffersCall);
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
-  launchTask();
+  driver.start();
 
   WAIT_UNTIL(statusUpdateCall);
 
@@ -331,11 +326,92 @@ TEST_F(GarbageCollectorTest, ExitedExecu
 
   Clock::settle();
 
-  // First executor's directory should be gc'ed by now.
+  // Executor's directory should be gc'ed by now.
+  ASSERT_FALSE(os::exists(executorDir));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
+TEST_F(GarbageCollectorTest, DiskUsage)
+{
+  // Messages expectations.
+  trigger exitedExecutorMsg;
+  EXPECT_MESSAGE(filter, Eq(ExitedExecutorMessage().GetTypeName()), _, _)
+    .WillOnce(DoAll(Trigger(&exitedExecutorMsg), Return(false)));
+
+  // Executor expectations.
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(Return());
+
+  // Scheduler expectations.
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(1, cpus, mem))
+    .WillRepeatedly(Return());
+
+  trigger statusUpdateCall;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)))
+    .WillOnce(Return()) // Ignore the TASK_LOST update.
+    .WillRepeatedly(Trigger(&statusUpdateCall));
+
+  startSlave();
+
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
+
+  driver.start();
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  const std::string& executorDir =
+    isolationModule->directories[DEFAULT_EXECUTOR_ID];
+
+  ASSERT_TRUE(os::exists(executorDir));
+
+  Clock::pause();
+
+  // Kill the executor and inform the slave.
+  isolationModule->killExecutor(frameworkId, DEFAULT_EXECUTOR_ID);
+
+  process::dispatch(slave, &Slave::executorExited, frameworkId,
+                    DEFAULT_EXECUTOR_ID, 0);
+
+  // In order to make sure the slave has scheduled the executor
+  // directory to get garbage collected we need to wait until the
+  // slave has sent the ExecutorExited message. TODO(benh): We really
+  // need to wait until the GarbageCollectorProcess has dispatched a
+  // message back to itself.
+  WAIT_UNTIL(exitedExecutorMsg);
+
+  // Simulate a disk full message to the slave.
+  process::dispatch(slave, &Slave::_checkDiskUsage, Try<double>::some(1));
+
+  // TODO(vinod): As above, we need to wait until GarbageCollectorProcess has
+  // dispatched remove message back to itself.
+  sleep(1);
+
+  Clock::settle();
+
+  // Executor's directory should be gc'ed by now.
   ASSERT_FALSE(os::exists(executorDir));
 
   Clock::resume();
 
-  driver->stop();
-  driver->join();
+  driver.stop();
+  driver.join();
 }

Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Mon Sep 17 22:27:02 2012
@@ -275,6 +275,7 @@ ACTION_P3(LaunchTasks, tasks, cpus, mem)
   }
 }
 
+
 // Like LaunchTasks, but decline the entire offer and
 // don't launch any tasks.
 ACTION(DeclineOffers)

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp Mon Sep 17 22:27:02 2012
@@ -40,6 +40,21 @@ public:
     return *this;
   }
 
+  bool operator == (const Timeout& that) const
+  {
+      return timeout == that.timeout;
+  }
+
+  bool operator < (const Timeout& that) const
+  {
+      return timeout < that.timeout;
+  }
+
+  bool operator <= (const Timeout& that) const
+  {
+    return timeout <= that.timeout;
+  }
+
   // Returns the value of the timeout as the number of seconds elapsed
   // since the epoch.
   double value() const

Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/duration.hpp Mon Sep 17 22:27:02 2012
@@ -73,6 +73,8 @@ public:
   bool operator == (const Duration& that) const { return value == that.value; }
   bool operator != (const Duration& that) const { return value != that.value; }
 
+  // TODO(vinod): Overload arithmetic operators.
+
 protected:
   static const uint64_t NANOSECONDS = 1;
   static const uint64_t MICROSECONDS = 1000 * NANOSECONDS;

Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp?rev=1386882&r1=1386881&r2=1386882&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp Mon Sep 17 22:27:02 2012
@@ -18,6 +18,7 @@
 #include <glog/logging.h>
 
 #include <sys/stat.h>
+#include <sys/statvfs.h>
 #ifdef __linux__
 #include <sys/sysinfo.h>
 #endif
@@ -165,7 +166,8 @@ inline Try<bool> isNonblock(int fd)
 
 inline Try<bool> touch(const std::string& path)
 {
-  Try<int> fd = open(path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+  Try<int> fd =
+      open(path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
 
   if (fd.isError()) {
     return Try<bool>::error("Failed to open file " + path);
@@ -586,7 +588,8 @@ inline Try<std::list<std::string> > find
   std::list<std::string> results;
 
   if (!isdir(directory)) {
-    return Try<std::list<std::string> >::error("Directory " + directory + " doesn't exist!");
+    return Try<std::list<std::string> >::error(
+        "Directory " + directory + " doesn't exist!");
   }
 
   foreach (const std::string& entry, ls(directory)) {
@@ -597,7 +600,8 @@ inline Try<std::list<std::string> > find
     // This is just a hack to check whether this path is a regular file or
     // a (sub) directory.
     if (isdir(result)) { // If its a directory recurse.
-      CHECK(find(result, pattern).isSome()) << "Directory " << directory << " doesn't exist";
+      CHECK(find(result, pattern).isSome())
+        << "Directory " << directory << " doesn't exist";
       foreach (const std::string& path, find(result, pattern).get()) {
         results.push_back(path);
       }
@@ -612,6 +616,17 @@ inline Try<std::list<std::string> > find
 }
 
 
+// Returns relative disk usage of the file system mounted at the given path.
+inline Try<double> usage(const std::string& fs = "/")
+{
+  struct statvfs buf;
+  if (statvfs(fs.c_str(), &buf) < 0) {
+    return Try<double>::error(strerror(errno));
+  }
+  return (double) (buf.f_blocks - buf.f_bfree) / buf.f_blocks;
+}
+
+
 inline std::string user()
 {
   passwd* passwd;