You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 02:28:43 UTC

svn commit: r1370079 - in /incubator/mesos/trunk: src/linux/cgroups.cpp src/linux/cgroups.hpp src/tests/cgroups_tests.cpp third_party/libprocess/include/process/future.hpp

Author: benh
Date: Tue Aug  7 00:28:43 2012
New Revision: 1370079

URL: http://svn.apache.org/viewvc?rev=1370079&view=rev
Log:
Add an API to destroy a cgroup (and all its sub-cgroups) (contributed
by Jie Yu, https://reviews.apache.org/r/5840).

Modified:
    incubator/mesos/trunk/src/linux/cgroups.cpp
    incubator/mesos/trunk/src/linux/cgroups.hpp
    incubator/mesos/trunk/src/tests/cgroups_tests.cpp
    incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp

Modified: incubator/mesos/trunk/src/linux/cgroups.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/linux/cgroups.cpp?rev=1370079&r1=1370078&r2=1370079&view=diff
==============================================================================
--- incubator/mesos/trunk/src/linux/cgroups.cpp (original)
+++ incubator/mesos/trunk/src/linux/cgroups.cpp Tue Aug  7 00:28:43 2012
@@ -30,6 +30,7 @@
 #include <map>
 #include <sstream>
 
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/io.hpp>
@@ -1346,4 +1347,122 @@ Future<bool> killTasks(const std::string
   return future;
 }
 
+
+namespace internal {
+
+// The process used to destroy a cgroup.
+class Destroyer : public Process<Destroyer>
+{
+public:
+  Destroyer(const std::string& _hierarchy,
+            const std::vector<std::string>& _cgroups,
+            const seconds& _interval)
+    : hierarchy(_hierarchy),
+      cgroups(_cgroups),
+      interval(_interval) {}
+
+  virtual ~Destroyer() {}
+
+  // Return a future indicating the state of the destroyer.
+  Future<bool> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+          static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
+
+    if (interval.value < 0) {
+      promise.fail("Invalid interval: " + stringify(interval.value));
+      terminate(self());
+      return;
+    }
+
+    // Kill tasks in the given cgroups in parallel. Use collect mechanism to
+    // wait until all kill processes finish.
+    foreach (const std::string& cgroup, cgroups) {
+      Future<bool> killer = killTasks(hierarchy, cgroup, interval);
+      killers.push_back(killer);
+    }
+
+    Future<std::list<bool> > kill = collect(killers);
+    kill.onAny(defer(self(), &Destroyer::killed, kill));
+  }
+
+  virtual void finalize()
+  {
+    // Cancel the operation if the user discards the future.
+    if (promise.future().isDiscarded()) {
+      discard<bool>(killers);
+    }
+  }
+
+private:
+  void killed(const Future<std::list<bool> >& kill)
+  {
+    if (kill.isReady()) {
+      remove();
+    } else if (kill.isFailed()) {
+      promise.fail(kill.failure());
+      terminate(self());
+    } else {
+      LOG(FATAL) << "Invalid kill state";
+    }
+  }
+
+  void remove()
+  {
+    foreach (const std::string& cgroup, cgroups) {
+      Try<bool> remove = internal::removeCgroup(hierarchy, cgroup);
+      if (remove.isError()) {
+        promise.fail(remove.error());
+        terminate(self());
+        return;
+      }
+    }
+
+    promise.set(true);
+    terminate(self());
+  }
+
+  std::string hierarchy;
+  std::vector<std::string> cgroups;
+  const seconds interval;
+  Promise<bool> promise;
+
+  // The killer processes used to atomically kill tasks in each cgroup.
+  std::list<Future<bool> > killers;
+};
+
+} // namespace internal {
+
+
+Future<bool> destroyCgroup(const std::string& hierarchy,
+                           const std::string& cgroup,
+                           const seconds& interval)
+{
+  Try<bool> cgroupCheck = checkCgroup(hierarchy, cgroup);
+  if (cgroupCheck.isError()) {
+    return Future<bool>::failed(cgroupCheck.error());
+  }
+
+  // Construct the vector of cgroups to destroy.
+  Try<std::vector<std::string> > cgroups = getCgroups(hierarchy, cgroup);
+  if (cgroups.isError()) {
+    return Future<bool>::failed(cgroups.error());
+  }
+
+  std::vector<std::string> toDestroy = cgroups.get();
+  if (cgroup != "/") {
+    toDestroy.push_back(cgroup);
+  }
+
+  internal::Destroyer* destroyer =
+    new internal::Destroyer(hierarchy, toDestroy, interval);
+  Future<bool> future = destroyer->future();
+  spawn(destroyer, true);
+  return future;
+}
+
 } // namespace cgroups {

Modified: incubator/mesos/trunk/src/linux/cgroups.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/linux/cgroups.hpp?rev=1370079&r1=1370078&r2=1370079&view=diff
==============================================================================
--- incubator/mesos/trunk/src/linux/cgroups.hpp (original)
+++ incubator/mesos/trunk/src/linux/cgroups.hpp Tue Aug  7 00:28:43 2012
@@ -303,6 +303,24 @@ process::Future<bool> killTasks(const st
                                 const std::string& cgroup,
                                 const seconds& interval = seconds(0.1));
 
+
+// Destroy a cgroup under a given hierarchy. This function is different from
+// removeCgroup in that it tries to kill all tasks in the given cgroup so that
+// this cgroup can be removed. It will also recursively remove sub-cgroups if
+// exist. The given cgroup itself will also be destroyed. However, if the given
+// cgroup is the root cgroup, it will not be destroyed (cannot destroy a root
+// cgroup). The function returns a future indicating the state of the destroy
+// process. The future will become ready when the destroy operation finishes.
+// @param   hierarchy   Path to the hierarchy root.
+// @param   cgroup      Path to the cgroup relative to the hierarchy root.
+// @param   interval    The time interval in seconds between two state check
+//                      requests (default: 0.1 seconds).
+// @return  A future which will become ready when the operation is done.
+//          Error if some unexpected happens.
+process::Future<bool> destroyCgroup(const std::string& hierarchy,
+                                    const std::string& cgroup = "/",
+                                    const seconds& interval = seconds(0.1));
+
 } // namespace cgroups {
 
 #endif // __CGROUPS_HPP__

Modified: incubator/mesos/trunk/src/tests/cgroups_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/cgroups_tests.cpp?rev=1370079&r1=1370078&r2=1370079&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/cgroups_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/cgroups_tests.cpp Tue Aug  7 00:28:43 2012
@@ -575,3 +575,64 @@ TEST_F(CgroupsTest, ROOT_CGROUPS_KillTas
     FAIL() << "Reach an unreachable statement!";
   }
 }
+
+
+TEST_F(CgroupsTest, ROOT_CGROUPS_DestroyCgroup)
+{
+  Future<bool> future = cgroups::destroyCgroup(hierarchy, "/stu/under");
+  future.await(5.0);
+  ASSERT_TRUE(future.isReady());
+  EXPECT_TRUE(future.get());
+
+  int pipes[2];
+  int dummy;
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  pid_t pid = ::fork();
+  ASSERT_NE(-1, pid);
+
+  if (pid) {
+    // In parent process.
+    ::close(pipes[1]);
+
+    // Wait until all children have assigned the cgroup.
+    ASSERT_NE(-1, ::read(pipes[0], &dummy, sizeof(dummy)));
+    ASSERT_NE(-1, ::read(pipes[0], &dummy, sizeof(dummy)));
+    ASSERT_NE(-1, ::read(pipes[0], &dummy, sizeof(dummy)));
+    ASSERT_NE(-1, ::read(pipes[0], &dummy, sizeof(dummy)));
+    ::close(pipes[0]);
+
+    Future<bool> future = cgroups::destroyCgroup(hierarchy, "/");
+    future.await(5.0);
+    ASSERT_TRUE(future.isReady());
+    EXPECT_TRUE(future.get());
+
+    int status;
+    EXPECT_NE(-1, ::waitpid((pid_t) -1, &status, 0));
+  } else {
+    // In child process.
+    // We create 4 child processes here using two forks to test the case in
+    // which there are multiple active processes in the given cgroup.
+    ::fork();
+    ::fork();
+
+    // Put self into "/prof" cgroup.
+    Try<bool> assign = cgroups::assignTask(hierarchy, "/prof", ::getpid());
+    if (assign.isError()) {
+      FAIL() << "Failed to assign cgroup: " << assign.error();
+    }
+
+    // Notify the parent.
+    ::close(pipes[0]);
+    if (::write(pipes[1], &dummy, sizeof(dummy)) != sizeof(dummy)) {
+      FAIL() << "Failed to notify the parent";
+    }
+    ::close(pipes[1]);
+
+    // Wait kill signal from parent.
+    while (true) ;
+
+    // Should not reach here.
+    FAIL() << "Reach an unreachable statement!";
+  }
+}

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp?rev=1370079&r1=1370078&r2=1370079&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp Tue Aug  7 00:28:43 2012
@@ -4,6 +4,7 @@
 #include <assert.h>
 #include <stdlib.h> // For abort.
 
+#include <list>
 #include <queue>
 #include <set>
 
@@ -336,7 +337,18 @@ Future<Future<T> > select(const std::set
 template <typename T>
 void discard(const std::set<Future<T> >& futures)
 {
-  typename std::set<Future<T> >::iterator iterator;
+  typename std::set<Future<T> >::const_iterator iterator;
+  for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
+    Future<T> future = *iterator; // Need a non-const copy to discard.
+    future.discard();
+  }
+}
+
+
+template <typename T>
+void discard(const std::list<Future<T> >& futures)
+{
+  typename std::list<Future<T> >::const_iterator iterator;
   for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
     Future<T> future = *iterator; // Need a non-const copy to discard.
     future.discard();