You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/09 19:53:55 UTC

svn commit: r1466159 - in /incubator/mesos/trunk/src: slave/cgroups_isolator.cpp slave/gc.cpp slave/gc.hpp tests/gc_tests.cpp tests/utils.hpp

Author: bmahler
Date: Tue Apr  9 17:53:55 2013
New Revision: 1466159

URL: http://svn.apache.org/r1466159
Log:
Added 'unschedule' operation to the GarbageCollector, changed schedule
to discard the future when unscheduled/rescheduled, added
GarbageCollector unit tests.

Review: https://reviews.apache.org/r/10028

Modified:
    incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
    incubator/mesos/trunk/src/slave/gc.cpp
    incubator/mesos/trunk/src/slave/gc.hpp
    incubator/mesos/trunk/src/tests/gc_tests.cpp
    incubator/mesos/trunk/src/tests/utils.hpp

Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.cpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.cpp Tue Apr  9 17:53:55 2013
@@ -893,15 +893,18 @@ Try<Nothing> CgroupsIsolator::cfsChanged
   CHECK(resource.type() == Value::SCALAR);
 
   Try<Nothing> write = cgroups::write(
-      hierarchy, info->name(), "cpu.cfs_period_us", stringify(CPU_CFS_PERIOD.us()));
+      hierarchy,
+      info->name(),
+      "cpu.cfs_period_us",
+      stringify(CPU_CFS_PERIOD.us()));
 
   if (write.isError()) {
     return Error("Failed to update 'cpu.cfs_period_us': " + write.error());
   }
 
   double cpus = resource.scalar().value();
-  size_t quota =
-    std::max(CPU_CFS_PERIOD.us() * cpus, MIN_CPU_CFS_QUOTA.us());
+  size_t quota = static_cast<size_t>(
+    std::max(CPU_CFS_PERIOD.us() * cpus, MIN_CPU_CFS_QUOTA.us()));
 
   write = cgroups::write(
       hierarchy, info->name(), "cpu.cfs_quota_us", stringify(quota));

Modified: incubator/mesos/trunk/src/slave/gc.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.cpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.cpp (original)
+++ incubator/mesos/trunk/src/slave/gc.cpp Tue Apr  9 17:53:55 2013
@@ -16,17 +16,11 @@
  * limitations under the License.
  */
 
-#include <map>
-#include <string>
-#include <vector>
+#include <list>
 
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
-#include <process/future.hpp>
-#include <process/process.hpp>
-#include <process/timeout.hpp>
 
-#include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/os.hpp>
 
@@ -38,21 +32,19 @@ using namespace process;
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
+using std::list;
 using std::map;
 using std::string;
-using std::vector;
 
 namespace mesos {
 namespace internal {
 namespace slave {
 
+
 GarbageCollectorProcess::~GarbageCollectorProcess()
 {
-  foreachvalue (const vector<PathInfo>& infos, paths) {
-    foreach (const PathInfo& info, infos) {
-      info.promise->future().discard();
-      delete info.promise;
-    }
+  foreachvalue (const PathInfo& info, paths) {
+    info.promise->future().discard();
   }
 }
 
@@ -61,13 +53,20 @@ Future<Nothing> GarbageCollectorProcess:
     const Duration& d,
     const string& path)
 {
-  LOG(INFO) << "Scheduling " << path << " for removal";
+  LOG(INFO) << "Scheduling '" << path << "' for removal";
 
-  Promise<Nothing>* promise = new Promise<Nothing>();
+  // If there's an existing schedule for this path, we must remove
+  // it here in order to reschedule.
+  if (timeouts.contains(path)) {
+    CHECK(unschedule(path));
+  }
+
+  Owned<Promise<Nothing> > promise(new Promise<Nothing>());
 
   Timeout removalTime(d);
 
-  paths[removalTime].push_back(PathInfo(path, promise));
+  timeouts[path] = removalTime;
+  paths.put(removalTime, PathInfo(path, promise));
 
   // If the timer is not yet initialized or the timeout is sooner than
   // the currently active timer, update it.
@@ -80,6 +79,36 @@ Future<Nothing> GarbageCollectorProcess:
 }
 
 
+bool GarbageCollectorProcess::unschedule(const string& path)
+{
+  LOG(INFO) << "Unscheduling '" << path << "' for removal";
+
+  if (!timeouts.contains(path)) {
+    return false;
+  }
+
+  Timeout timeout = timeouts[path]; // Make a copy, as we erase() below.
+  CHECK(paths.contains(timeout));
+
+  // Locate the path.
+  foreach (const PathInfo& info, paths.get(timeout)) {
+    if (info.path == path) {
+      // Discard the future.
+      info.promise->future().discard();
+
+      // Clean up the maps.
+      CHECK(paths.remove(timeout, info));
+      CHECK(timeouts.erase(path) > 0);
+
+      return true;
+    }
+  }
+
+  LOG(FATAL) << "Inconsistent state across 'paths' and 'timeouts'";
+  return false;
+}
+
+
 // Fires a message to self for the next event. This also cancels any
 // existing timer.
 void GarbageCollectorProcess::reset()
@@ -97,29 +126,34 @@ void GarbageCollectorProcess::reset()
 
 void GarbageCollectorProcess::remove(const Timeout& removalTime)
 {
+  // TODO(bmahler): Other dispatches can block waiting for a removal
+  // operation. To fix this, the removal operation can be done
+  // asynchronously in another thread.
   if (paths.count(removalTime) > 0) {
-    foreach (const PathInfo& info, paths[removalTime]) {
-      const string& path = info.path;
-      Promise<Nothing>* promise = info.promise;
-
-      LOG(INFO) << "Deleting " << path;
-
-      Try<Nothing> result = os::rmdir(path);
-      if (result.isError()) {
-        LOG(WARNING) << "Failed to delete " << path << ": " << result.error();
-        promise->fail(result.error());
+    foreach (const PathInfo& info, paths.get(removalTime)) {
+      LOG(INFO) << "Deleting " << info.path;
+
+      Try<Nothing> rmdir = os::rmdir(info.path);
+
+      if (rmdir.isError()) {
+        LOG(WARNING) << "Failed to delete '" << info.path << "': "
+                     << rmdir.error();
+        info.promise->fail(rmdir.error());
       } else {
-        LOG(INFO) << "Deleted " << path;
-        promise->set(result.get());
+        LOG(INFO) << "Deleted '" << info.path << "'";
+        info.promise->set(rmdir.get());
       }
-      delete promise;
+
+      timeouts.erase(info.path);
     }
-    paths.erase(removalTime);
+
+    paths.remove(removalTime);
   } else {
-    // This might happen if the directory(s) has already been removed
-    // (e.g: by prune())
-    LOG(WARNING) << "Ignoring gc event at " << removalTime.remaining()
-                 << " as the corresponding directories are already removed";
+    // This occurs when either:
+    //   1. The path(s) has already been removed (e.g. by prune()).
+    //   2. All paths under the removal time were unscheduled.
+    LOG(INFO) << "Ignoring gc event at " << removalTime.remaining()
+              << " as the paths were already removed, or were unscheduled";
   }
 
   reset(); // Schedule the timer for next event.
@@ -128,7 +162,7 @@ void GarbageCollectorProcess::remove(con
 
 void GarbageCollectorProcess::prune(const Duration& d)
 {
-  foreachkey (const Timeout& removalTime, paths) {
+  foreach (const Timeout& removalTime, paths.keys()) {
     if (removalTime.remaining() <= d) {
       LOG(INFO) << "Pruning directories with remaining removal time "
                 << removalTime.remaining();
@@ -161,6 +195,12 @@ Future<Nothing> GarbageCollector::schedu
 }
 
 
+Future<bool> GarbageCollector::unschedule(const string& path)
+{
+  return dispatch(process, &GarbageCollectorProcess::unschedule, path);
+}
+
+
 void GarbageCollector::prune(const Duration& d)
 {
   dispatch(process, &GarbageCollectorProcess::prune, d);

Modified: incubator/mesos/trunk/src/slave/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.hpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.hpp (original)
+++ incubator/mesos/trunk/src/slave/gc.hpp Tue Apr  9 17:53:55 2013
@@ -19,16 +19,19 @@
 #ifndef __SLAVE_GC_HPP__
 #define __SLAVE_GC_HPP__
 
-#include <map>
 #include <string>
 #include <vector>
 
 #include <process/future.hpp>
+#include <process/process.hpp>
 #include <process/timeout.hpp>
 #include <process/timer.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/multimap.hpp>
 #include <stout/nothing.hpp>
+#include <stout/owned.hpp>
 #include <stout/try.hpp>
 
 namespace mesos {
@@ -52,15 +55,23 @@ public:
   ~GarbageCollector();
 
   // Schedules the specified path for removal after the specified
-  // duration of time has elapsed. The future will become ready when
-  // the path has been removed. If the directory did not exist, or an
-  // error occurred, the future will fail.
-  process::Future<Nothing> schedule(
-      const Duration& d,
-      const std::string& path);
+  // duration of time has elapsed. If the path is already scheduled,
+  // this will reschedule the removal operation, and induce a discard
+  // on the previous future.
+  // The future will become ready when the path has been removed.
+  // The future will fail if the path did not exist, or on error.
+  // The future will be discarded if the path was unscheduled, or
+  // was rescheduled.
+  process::Future<Nothing> schedule(const Duration& d, const std::string& path);
+
+  // Unschedules the specified path for removal.
+  // The future will be true if the path has been unscheduled.
+  // The future will be false if the path is not scheduled for
+  // removal, or the path has already being removed.
+  process::Future<bool> unschedule(const std::string& path);
 
-  // Deletes all the directories, whose scheduled garbage collection
-  // time is within the next 'd' duration of time.
+  // Deletes all the directories, whose scheduled garbage collection time
+  // is within the next 'd' duration of time.
   void prune(const Duration& d);
 
 private:
@@ -74,34 +85,43 @@ class GarbageCollectorProcess :
 public:
   virtual ~GarbageCollectorProcess();
 
-  // GarbageCollector implementation.
   process::Future<Nothing> schedule(
       const Duration& d,
       const std::string& path);
 
+  bool unschedule(const std::string& path);
+
   void prune(const Duration& d);
 
 private:
+  void reset();
+
   void remove(const process::Timeout& removalTime);
 
   struct PathInfo
   {
-    PathInfo(
-        const std::string& _path,
-        process::Promise<Nothing>* _promise)
+    PathInfo(const std::string& _path,
+             Owned<process::Promise<Nothing> > _promise)
       : path(_path), promise(_promise) {}
 
-    std::string path;
-    process::Promise<Nothing>* promise;
+    bool operator == (const PathInfo& that) const
+    {
+      return path == that.path && promise == that.promise;
+    }
+
+    const std::string path;
+    const Owned<process::Promise<Nothing> > promise;
   };
 
-  // Store all the paths that needed to be deleted after a given timeout.
-  // NOTE: We are using std::map here instead of hashmap, because we
-  // need the keys of the map (deletion time) to be sorted in
-  // ascending order.
-  std::map<process::Timeout, std::vector<PathInfo> > paths;
+  // Store all the timeouts and corresponding paths to delete.
+  // NOTE: We are using Multimap here instead of Multihashmap, because
+  // we need the keys of the map (deletion time) to be sorted.
+  Multimap<process::Timeout, PathInfo> paths;
+
+  // We also need efficient lookup for a path, to determine whether
+  // it exists in our paths mapping.
+  hashmap<std::string, process::Timeout> timeouts;
 
-  void reset();
   process::Timer timer;
 };
 

Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Tue Apr  9 17:53:55 2013
@@ -59,10 +59,12 @@ using mesos::internal::master::Allocator
 using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
+using mesos::internal::slave::GarbageCollector;
 using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::Slave;
 
 using process::Clock;
+using process::Future;
 using process::PID;
 
 using std::string;
@@ -76,7 +78,175 @@ using testing::Eq;
 using testing::Return;
 using testing::SaveArg;
 
-class GarbageCollectorTest : public MesosTest
+
+class GarbageCollectorTest : public TemporaryDirectoryTest {};
+
+
+TEST_F(GarbageCollectorTest, Schedule)
+{
+  GarbageCollector gc;
+
+  // Make some temporary files to gc.
+  const string& file1 = "file1";
+  const string& file2 = "file2";
+  const string& file3 = "file3";
+
+  ASSERT_SOME(os::touch(file1));
+  ASSERT_SOME(os::touch(file2));
+  ASSERT_SOME(os::touch(file3));
+
+  ASSERT_TRUE(os::exists(file1));
+  ASSERT_TRUE(os::exists(file2));
+  ASSERT_TRUE(os::exists(file3));
+
+  Clock::pause();
+
+  Future<Nothing> scheduleDispatch1 =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+  Future<Nothing> scheduleDispatch2 =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+  Future<Nothing> scheduleDispatch3 =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+  // Schedule the gc operations.
+  Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
+  Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
+  Future<Nothing> schedule3 = gc.schedule(Seconds(15), file3);
+
+  // Ensure the dispatches are completed before advancing the clock.
+  AWAIT_UNTIL(scheduleDispatch1);
+  AWAIT_UNTIL(scheduleDispatch2);
+  AWAIT_UNTIL(scheduleDispatch3);
+  Clock::settle();
+
+  // Advance the clock to trigger the GC of file1 and file2.
+  Clock::advance(Seconds(10).secs());
+  Clock::settle();
+
+  ASSERT_FUTURE_WILL_SUCCEED(schedule1);
+  ASSERT_FUTURE_WILL_SUCCEED(schedule2);
+  ASSERT_TRUE(schedule3.isPending());
+
+  EXPECT_FALSE(os::exists(file1));
+  EXPECT_FALSE(os::exists(file2));
+  EXPECT_TRUE(os::exists(file3));
+
+  // Trigger the GC of file3.
+  Clock::advance(Seconds(5).secs());
+  Clock::settle();
+
+  ASSERT_FUTURE_WILL_SUCCEED(schedule3);
+
+  EXPECT_FALSE(os::exists(file3));
+
+  Clock::resume();
+}
+
+
+TEST_F(GarbageCollectorTest, Unschedule)
+{
+  GarbageCollector gc;
+
+  // Attempt to unschedule a file that is not scheduled.
+  ASSERT_FUTURE_WILL_EQ(false, gc.unschedule("bogus"));
+
+  // Make some temporary files to gc.
+  const string& file1 = "file1";
+  const string& file2 = "file2";
+  const string& file3 = "file3";
+
+  ASSERT_SOME(os::touch(file1));
+  ASSERT_SOME(os::touch(file2));
+  ASSERT_SOME(os::touch(file3));
+
+  ASSERT_TRUE(os::exists(file1));
+  ASSERT_TRUE(os::exists(file2));
+  ASSERT_TRUE(os::exists(file3));
+
+  Clock::pause();
+
+  // Schedule the gc operations.
+  Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
+  Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
+  Future<Nothing> schedule3 = gc.schedule(Seconds(10), file3);
+
+  // Unschedule each operation.
+  ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file2));
+  ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file3));
+  ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file1));
+
+  // Advance the clock to ensure nothing was GCed.
+  Clock::advance(Seconds(10).secs());
+  Clock::settle();
+
+  // The unscheduling will have discarded the GC futures.
+  ASSERT_FUTURE_WILL_DISCARD(schedule1);
+  ASSERT_FUTURE_WILL_DISCARD(schedule2);
+  ASSERT_FUTURE_WILL_DISCARD(schedule3);
+
+  EXPECT_TRUE(os::exists(file1));
+  EXPECT_TRUE(os::exists(file2));
+  EXPECT_TRUE(os::exists(file3));
+
+  Clock::resume();
+}
+
+
+TEST_F(GarbageCollectorTest, Prune)
+{
+  GarbageCollector gc;
+
+  // Make some temporary files to prune.
+  const string& file1 = "file1";
+  const string& file2 = "file2";
+  const string& file3 = "file3";
+  const string& file4 = "file4";
+
+  ASSERT_SOME(os::touch(file1));
+  ASSERT_SOME(os::touch(file2));
+  ASSERT_SOME(os::touch(file3));
+  ASSERT_SOME(os::touch(file4));
+
+  ASSERT_TRUE(os::exists(file1));
+  ASSERT_TRUE(os::exists(file2));
+  ASSERT_TRUE(os::exists(file3));
+  ASSERT_TRUE(os::exists(file4));
+
+  Clock::pause();
+
+  Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
+  Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
+  Future<Nothing> schedule3 = gc.schedule(Seconds(15), file3);
+  Future<Nothing> schedule4 = gc.schedule(Seconds(15), file4);
+
+  ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file3));
+  ASSERT_FUTURE_WILL_DISCARD(schedule3);
+
+  // Prune file1 and file2.
+  gc.prune(Seconds(10));
+
+  ASSERT_FUTURE_WILL_SUCCEED(schedule1);
+  ASSERT_FUTURE_WILL_SUCCEED(schedule2);
+  ASSERT_TRUE(schedule4.isPending());
+
+  // Both file1 and file2 will have been removed.
+  EXPECT_FALSE(os::exists(file1));
+  EXPECT_FALSE(os::exists(file2));
+  EXPECT_TRUE(os::exists(file3));
+  EXPECT_TRUE(os::exists(file4));
+
+  // Prune file4.
+  gc.prune(Seconds(15));
+
+  ASSERT_FUTURE_WILL_SUCCEED(schedule4);
+
+  EXPECT_FALSE(os::exists(file4));
+
+  Clock::resume();
+}
+
+
+class GarbageCollectorIntegrationTest : public MesosTest
 {
 protected:
   virtual void SetUp()
@@ -157,7 +327,7 @@ protected:
 };
 
 
-TEST_F(GarbageCollectorTest, Restart)
+TEST_F(GarbageCollectorIntegrationTest, Restart)
 {
   // Messages expectations.
   process::Message message;
@@ -255,7 +425,7 @@ TEST_F(GarbageCollectorTest, Restart)
 }
 
 
-TEST_F(GarbageCollectorTest, ExitedExecutor)
+TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
 {
   // Executor expectations.
   EXPECT_CALL(exec, registered(_, _, _, _))
@@ -333,7 +503,7 @@ TEST_F(GarbageCollectorTest, ExitedExecu
 }
 
 
-TEST_F(GarbageCollectorTest, DiskUsage)
+TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
 {
   // Messages expectations.
   trigger exitedExecutorMsg;

Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Tue Apr  9 17:53:55 2013
@@ -38,6 +38,7 @@
 
 #include <stout/duration.hpp>
 #include <stout/gtest.hpp>
+#include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>