You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/09 19:53:55 UTC
svn commit: r1466159 - in /incubator/mesos/trunk/src:
slave/cgroups_isolator.cpp slave/gc.cpp slave/gc.hpp tests/gc_tests.cpp
tests/utils.hpp
Author: bmahler
Date: Tue Apr 9 17:53:55 2013
New Revision: 1466159
URL: http://svn.apache.org/r1466159
Log:
Added 'unschedule' operation to the GarbageCollector, changed schedule
to discard the future when unscheduled/rescheduled, added
GarbageCollector unit tests.
Review: https://reviews.apache.org/r/10028
Modified:
incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
incubator/mesos/trunk/src/slave/gc.cpp
incubator/mesos/trunk/src/slave/gc.hpp
incubator/mesos/trunk/src/tests/gc_tests.cpp
incubator/mesos/trunk/src/tests/utils.hpp
Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.cpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.cpp Tue Apr 9 17:53:55 2013
@@ -893,15 +893,18 @@ Try<Nothing> CgroupsIsolator::cfsChanged
CHECK(resource.type() == Value::SCALAR);
Try<Nothing> write = cgroups::write(
- hierarchy, info->name(), "cpu.cfs_period_us", stringify(CPU_CFS_PERIOD.us()));
+ hierarchy,
+ info->name(),
+ "cpu.cfs_period_us",
+ stringify(CPU_CFS_PERIOD.us()));
if (write.isError()) {
return Error("Failed to update 'cpu.cfs_period_us': " + write.error());
}
double cpus = resource.scalar().value();
- size_t quota =
- std::max(CPU_CFS_PERIOD.us() * cpus, MIN_CPU_CFS_QUOTA.us());
+ size_t quota = static_cast<size_t>(
+ std::max(CPU_CFS_PERIOD.us() * cpus, MIN_CPU_CFS_QUOTA.us()));
write = cgroups::write(
hierarchy, info->name(), "cpu.cfs_quota_us", stringify(quota));
Modified: incubator/mesos/trunk/src/slave/gc.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.cpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.cpp (original)
+++ incubator/mesos/trunk/src/slave/gc.cpp Tue Apr 9 17:53:55 2013
@@ -16,17 +16,11 @@
* limitations under the License.
*/
-#include <map>
-#include <string>
-#include <vector>
+#include <list>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
-#include <process/future.hpp>
-#include <process/process.hpp>
-#include <process/timeout.hpp>
-#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/os.hpp>
@@ -38,21 +32,19 @@ using namespace process;
using process::wait; // Necessary on some OS's to disambiguate.
+using std::list;
using std::map;
using std::string;
-using std::vector;
namespace mesos {
namespace internal {
namespace slave {
+
GarbageCollectorProcess::~GarbageCollectorProcess()
{
- foreachvalue (const vector<PathInfo>& infos, paths) {
- foreach (const PathInfo& info, infos) {
- info.promise->future().discard();
- delete info.promise;
- }
+ foreachvalue (const PathInfo& info, paths) {
+ info.promise->future().discard();
}
}
@@ -61,13 +53,20 @@ Future<Nothing> GarbageCollectorProcess:
const Duration& d,
const string& path)
{
- LOG(INFO) << "Scheduling " << path << " for removal";
+ LOG(INFO) << "Scheduling '" << path << "' for removal";
- Promise<Nothing>* promise = new Promise<Nothing>();
+ // If there's an existing schedule for this path, we must remove
+ // it here in order to reschedule.
+ if (timeouts.contains(path)) {
+ CHECK(unschedule(path));
+ }
+
+ Owned<Promise<Nothing> > promise(new Promise<Nothing>());
Timeout removalTime(d);
- paths[removalTime].push_back(PathInfo(path, promise));
+ timeouts[path] = removalTime;
+ paths.put(removalTime, PathInfo(path, promise));
// If the timer is not yet initialized or the timeout is sooner than
// the currently active timer, update it.
@@ -80,6 +79,36 @@ Future<Nothing> GarbageCollectorProcess:
}
+bool GarbageCollectorProcess::unschedule(const string& path)
+{
+ LOG(INFO) << "Unscheduling '" << path << "' for removal";
+
+ if (!timeouts.contains(path)) {
+ return false;
+ }
+
+ Timeout timeout = timeouts[path]; // Make a copy, as we erase() below.
+ CHECK(paths.contains(timeout));
+
+ // Locate the path.
+ foreach (const PathInfo& info, paths.get(timeout)) {
+ if (info.path == path) {
+ // Discard the future.
+ info.promise->future().discard();
+
+ // Clean up the maps.
+ CHECK(paths.remove(timeout, info));
+ CHECK(timeouts.erase(path) > 0);
+
+ return true;
+ }
+ }
+
+ LOG(FATAL) << "Inconsistent state across 'paths' and 'timeouts'";
+ return false;
+}
+
+
// Fires a message to self for the next event. This also cancels any
// existing timer.
void GarbageCollectorProcess::reset()
@@ -97,29 +126,34 @@ void GarbageCollectorProcess::reset()
void GarbageCollectorProcess::remove(const Timeout& removalTime)
{
+ // TODO(bmahler): Other dispatches can block waiting for a removal
+ // operation. To fix this, the removal operation can be done
+ // asynchronously in another thread.
if (paths.count(removalTime) > 0) {
- foreach (const PathInfo& info, paths[removalTime]) {
- const string& path = info.path;
- Promise<Nothing>* promise = info.promise;
-
- LOG(INFO) << "Deleting " << path;
-
- Try<Nothing> result = os::rmdir(path);
- if (result.isError()) {
- LOG(WARNING) << "Failed to delete " << path << ": " << result.error();
- promise->fail(result.error());
+ foreach (const PathInfo& info, paths.get(removalTime)) {
+ LOG(INFO) << "Deleting " << info.path;
+
+ Try<Nothing> rmdir = os::rmdir(info.path);
+
+ if (rmdir.isError()) {
+ LOG(WARNING) << "Failed to delete '" << info.path << "': "
+ << rmdir.error();
+ info.promise->fail(rmdir.error());
} else {
- LOG(INFO) << "Deleted " << path;
- promise->set(result.get());
+ LOG(INFO) << "Deleted '" << info.path << "'";
+ info.promise->set(rmdir.get());
}
- delete promise;
+
+ timeouts.erase(info.path);
}
- paths.erase(removalTime);
+
+ paths.remove(removalTime);
} else {
- // This might happen if the directory(s) has already been removed
- // (e.g: by prune())
- LOG(WARNING) << "Ignoring gc event at " << removalTime.remaining()
- << " as the corresponding directories are already removed";
+ // This occurs when either:
+ // 1. The path(s) has already been removed (e.g. by prune()).
+ // 2. All paths under the removal time were unscheduled.
+ LOG(INFO) << "Ignoring gc event at " << removalTime.remaining()
+ << " as the paths were already removed, or were unscheduled";
}
reset(); // Schedule the timer for next event.
@@ -128,7 +162,7 @@ void GarbageCollectorProcess::remove(con
void GarbageCollectorProcess::prune(const Duration& d)
{
- foreachkey (const Timeout& removalTime, paths) {
+ foreach (const Timeout& removalTime, paths.keys()) {
if (removalTime.remaining() <= d) {
LOG(INFO) << "Pruning directories with remaining removal time "
<< removalTime.remaining();
@@ -161,6 +195,12 @@ Future<Nothing> GarbageCollector::schedu
}
+Future<bool> GarbageCollector::unschedule(const string& path)
+{
+ return dispatch(process, &GarbageCollectorProcess::unschedule, path);
+}
+
+
void GarbageCollector::prune(const Duration& d)
{
dispatch(process, &GarbageCollectorProcess::prune, d);
Modified: incubator/mesos/trunk/src/slave/gc.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.hpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.hpp (original)
+++ incubator/mesos/trunk/src/slave/gc.hpp Tue Apr 9 17:53:55 2013
@@ -19,16 +19,19 @@
#ifndef __SLAVE_GC_HPP__
#define __SLAVE_GC_HPP__
-#include <map>
#include <string>
#include <vector>
#include <process/future.hpp>
+#include <process/process.hpp>
#include <process/timeout.hpp>
#include <process/timer.hpp>
#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/multimap.hpp>
#include <stout/nothing.hpp>
+#include <stout/owned.hpp>
#include <stout/try.hpp>
namespace mesos {
@@ -52,15 +55,23 @@ public:
~GarbageCollector();
// Schedules the specified path for removal after the specified
- // duration of time has elapsed. The future will become ready when
- // the path has been removed. If the directory did not exist, or an
- // error occurred, the future will fail.
- process::Future<Nothing> schedule(
- const Duration& d,
- const std::string& path);
+ // duration of time has elapsed. If the path is already scheduled,
+ // this will reschedule the removal operation, and induce a discard
+ // on the previous future.
+ // The future will become ready when the path has been removed.
+ // The future will fail if the path did not exist, or on error.
+ // The future will be discarded if the path was unscheduled, or
+ // was rescheduled.
+ process::Future<Nothing> schedule(const Duration& d, const std::string& path);
+
+ // Unschedules the specified path for removal.
+ // The future will be true if the path has been unscheduled.
+ // The future will be false if the path is not scheduled for
+ // removal, or the path has already being removed.
+ process::Future<bool> unschedule(const std::string& path);
- // Deletes all the directories, whose scheduled garbage collection
- // time is within the next 'd' duration of time.
+ // Deletes all the directories, whose scheduled garbage collection time
+ // is within the next 'd' duration of time.
void prune(const Duration& d);
private:
@@ -74,34 +85,43 @@ class GarbageCollectorProcess :
public:
virtual ~GarbageCollectorProcess();
- // GarbageCollector implementation.
process::Future<Nothing> schedule(
const Duration& d,
const std::string& path);
+ bool unschedule(const std::string& path);
+
void prune(const Duration& d);
private:
+ void reset();
+
void remove(const process::Timeout& removalTime);
struct PathInfo
{
- PathInfo(
- const std::string& _path,
- process::Promise<Nothing>* _promise)
+ PathInfo(const std::string& _path,
+ Owned<process::Promise<Nothing> > _promise)
: path(_path), promise(_promise) {}
- std::string path;
- process::Promise<Nothing>* promise;
+ bool operator == (const PathInfo& that) const
+ {
+ return path == that.path && promise == that.promise;
+ }
+
+ const std::string path;
+ const Owned<process::Promise<Nothing> > promise;
};
- // Store all the paths that needed to be deleted after a given timeout.
- // NOTE: We are using std::map here instead of hashmap, because we
- // need the keys of the map (deletion time) to be sorted in
- // ascending order.
- std::map<process::Timeout, std::vector<PathInfo> > paths;
+ // Store all the timeouts and corresponding paths to delete.
+ // NOTE: We are using Multimap here instead of Multihashmap, because
+ // we need the keys of the map (deletion time) to be sorted.
+ Multimap<process::Timeout, PathInfo> paths;
+
+ // We also need efficient lookup for a path, to determine whether
+ // it exists in our paths mapping.
+ hashmap<std::string, process::Timeout> timeouts;
- void reset();
process::Timer timer;
};
Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Tue Apr 9 17:53:55 2013
@@ -59,10 +59,12 @@ using mesos::internal::master::Allocator
using mesos::internal::master::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
+using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Slave;
using process::Clock;
+using process::Future;
using process::PID;
using std::string;
@@ -76,7 +78,175 @@ using testing::Eq;
using testing::Return;
using testing::SaveArg;
-class GarbageCollectorTest : public MesosTest
+
+class GarbageCollectorTest : public TemporaryDirectoryTest {};
+
+
+TEST_F(GarbageCollectorTest, Schedule)
+{
+ GarbageCollector gc;
+
+ // Make some temporary files to gc.
+ const string& file1 = "file1";
+ const string& file2 = "file2";
+ const string& file3 = "file3";
+
+ ASSERT_SOME(os::touch(file1));
+ ASSERT_SOME(os::touch(file2));
+ ASSERT_SOME(os::touch(file3));
+
+ ASSERT_TRUE(os::exists(file1));
+ ASSERT_TRUE(os::exists(file2));
+ ASSERT_TRUE(os::exists(file3));
+
+ Clock::pause();
+
+ Future<Nothing> scheduleDispatch1 =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+ Future<Nothing> scheduleDispatch2 =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+ Future<Nothing> scheduleDispatch3 =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+ // Schedule the gc operations.
+ Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
+ Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
+ Future<Nothing> schedule3 = gc.schedule(Seconds(15), file3);
+
+ // Ensure the dispatches are completed before advancing the clock.
+ AWAIT_UNTIL(scheduleDispatch1);
+ AWAIT_UNTIL(scheduleDispatch2);
+ AWAIT_UNTIL(scheduleDispatch3);
+ Clock::settle();
+
+ // Advance the clock to trigger the GC of file1 and file2.
+ Clock::advance(Seconds(10).secs());
+ Clock::settle();
+
+ ASSERT_FUTURE_WILL_SUCCEED(schedule1);
+ ASSERT_FUTURE_WILL_SUCCEED(schedule2);
+ ASSERT_TRUE(schedule3.isPending());
+
+ EXPECT_FALSE(os::exists(file1));
+ EXPECT_FALSE(os::exists(file2));
+ EXPECT_TRUE(os::exists(file3));
+
+ // Trigger the GC of file3.
+ Clock::advance(Seconds(5).secs());
+ Clock::settle();
+
+ ASSERT_FUTURE_WILL_SUCCEED(schedule3);
+
+ EXPECT_FALSE(os::exists(file3));
+
+ Clock::resume();
+}
+
+
+TEST_F(GarbageCollectorTest, Unschedule)
+{
+ GarbageCollector gc;
+
+ // Attempt to unschedule a file that is not scheduled.
+ ASSERT_FUTURE_WILL_EQ(false, gc.unschedule("bogus"));
+
+ // Make some temporary files to gc.
+ const string& file1 = "file1";
+ const string& file2 = "file2";
+ const string& file3 = "file3";
+
+ ASSERT_SOME(os::touch(file1));
+ ASSERT_SOME(os::touch(file2));
+ ASSERT_SOME(os::touch(file3));
+
+ ASSERT_TRUE(os::exists(file1));
+ ASSERT_TRUE(os::exists(file2));
+ ASSERT_TRUE(os::exists(file3));
+
+ Clock::pause();
+
+ // Schedule the gc operations.
+ Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
+ Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
+ Future<Nothing> schedule3 = gc.schedule(Seconds(10), file3);
+
+ // Unschedule each operation.
+ ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file2));
+ ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file3));
+ ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file1));
+
+ // Advance the clock to ensure nothing was GCed.
+ Clock::advance(Seconds(10).secs());
+ Clock::settle();
+
+ // The unscheduling will have discarded the GC futures.
+ ASSERT_FUTURE_WILL_DISCARD(schedule1);
+ ASSERT_FUTURE_WILL_DISCARD(schedule2);
+ ASSERT_FUTURE_WILL_DISCARD(schedule3);
+
+ EXPECT_TRUE(os::exists(file1));
+ EXPECT_TRUE(os::exists(file2));
+ EXPECT_TRUE(os::exists(file3));
+
+ Clock::resume();
+}
+
+
+TEST_F(GarbageCollectorTest, Prune)
+{
+ GarbageCollector gc;
+
+ // Make some temporary files to prune.
+ const string& file1 = "file1";
+ const string& file2 = "file2";
+ const string& file3 = "file3";
+ const string& file4 = "file4";
+
+ ASSERT_SOME(os::touch(file1));
+ ASSERT_SOME(os::touch(file2));
+ ASSERT_SOME(os::touch(file3));
+ ASSERT_SOME(os::touch(file4));
+
+ ASSERT_TRUE(os::exists(file1));
+ ASSERT_TRUE(os::exists(file2));
+ ASSERT_TRUE(os::exists(file3));
+ ASSERT_TRUE(os::exists(file4));
+
+ Clock::pause();
+
+ Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
+ Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
+ Future<Nothing> schedule3 = gc.schedule(Seconds(15), file3);
+ Future<Nothing> schedule4 = gc.schedule(Seconds(15), file4);
+
+ ASSERT_FUTURE_WILL_EQ(true, gc.unschedule(file3));
+ ASSERT_FUTURE_WILL_DISCARD(schedule3);
+
+ // Prune file1 and file2.
+ gc.prune(Seconds(10));
+
+ ASSERT_FUTURE_WILL_SUCCEED(schedule1);
+ ASSERT_FUTURE_WILL_SUCCEED(schedule2);
+ ASSERT_TRUE(schedule4.isPending());
+
+ // Both file1 and file2 will have been removed.
+ EXPECT_FALSE(os::exists(file1));
+ EXPECT_FALSE(os::exists(file2));
+ EXPECT_TRUE(os::exists(file3));
+ EXPECT_TRUE(os::exists(file4));
+
+ // Prune file4.
+ gc.prune(Seconds(15));
+
+ ASSERT_FUTURE_WILL_SUCCEED(schedule4);
+
+ EXPECT_FALSE(os::exists(file4));
+
+ Clock::resume();
+}
+
+
+class GarbageCollectorIntegrationTest : public MesosTest
{
protected:
virtual void SetUp()
@@ -157,7 +327,7 @@ protected:
};
-TEST_F(GarbageCollectorTest, Restart)
+TEST_F(GarbageCollectorIntegrationTest, Restart)
{
// Messages expectations.
process::Message message;
@@ -255,7 +425,7 @@ TEST_F(GarbageCollectorTest, Restart)
}
-TEST_F(GarbageCollectorTest, ExitedExecutor)
+TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
{
// Executor expectations.
EXPECT_CALL(exec, registered(_, _, _, _))
@@ -333,7 +503,7 @@ TEST_F(GarbageCollectorTest, ExitedExecu
}
-TEST_F(GarbageCollectorTest, DiskUsage)
+TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
{
// Messages expectations.
trigger exitedExecutorMsg;
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1466159&r1=1466158&r2=1466159&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Tue Apr 9 17:53:55 2013
@@ -38,6 +38,7 @@
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
+#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>