You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 02:28:29 UTC

svn commit: r1370076 - in /incubator/mesos/trunk/src: linux/cgroups.cpp linux/cgroups.hpp tests/cgroups_tests.cpp

Author: benh
Date: Tue Aug  7 00:28:29 2012
New Revision: 1370076

URL: http://svn.apache.org/viewvc?rev=1370076&view=rev
Log:
Add APIs for event notification in cgroups (contributed by Jie Yu,
https://reviews.apache.org/r/5395).

Modified:
    incubator/mesos/trunk/src/linux/cgroups.cpp
    incubator/mesos/trunk/src/linux/cgroups.hpp
    incubator/mesos/trunk/src/tests/cgroups_tests.cpp

Modified: incubator/mesos/trunk/src/linux/cgroups.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/linux/cgroups.cpp?rev=1370076&r1=1370075&r2=1370076&view=diff
==============================================================================
--- incubator/mesos/trunk/src/linux/cgroups.cpp (original)
+++ incubator/mesos/trunk/src/linux/cgroups.cpp Tue Aug  7 00:28:29 2012
@@ -20,11 +20,20 @@
 #include <fts.h>
 #include <unistd.h>
 
+#include <sys/syscall.h>
+
+#include <glog/logging.h>
+
 #include <fstream>
 #include <map>
 #include <sstream>
 
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp>
+
 #include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/stringify.hpp>
@@ -33,20 +42,11 @@
 #include "linux/cgroups.hpp"
 #include "linux/fs.hpp"
 
-
+using namespace process;
 using namespace mesos::internal;
 
-
 namespace cgroups {
 
-
-//////////////////////////////////////////////////////////////////////////////
-// The following *internal* functions provide very basic controls over Linux
-// cgroups. Sanity checks are removed from these functions for performance.
-// Users can always wrap these functions to provide various checks if needed.
-//////////////////////////////////////////////////////////////////////////////
-
-
 namespace internal {
 
 
@@ -296,15 +296,9 @@ static Try<bool> writeControl(const std:
   return true;
 }
 
-
 } // namespace internal {
 
 
-//////////////////////////////////////////////////////////////////////////////
-// The following functions are visible to users.
-//////////////////////////////////////////////////////////////////////////////
-
-
 bool enabled()
 {
   return os::exists("/proc/cgroups");
@@ -749,4 +743,205 @@ Try<bool> assignTask(const std::string& 
 }
 
 
+namespace internal {
+
+#ifndef __NR_eventfd2
+#error "The eventfd2 syscall is unavailable."
+#endif
+
+#define EFD_SEMAPHORE (1 << 0)
+#define EFD_CLOEXEC O_CLOEXEC
+#define EFD_NONBLOCK O_NONBLOCK
+
+static int eventfd(unsigned int initval, int flags)
+{
+  return ::syscall(__NR_eventfd2, initval, flags);
+}
+
+
+// In cgroups, there is mechanism which allows to get notifications about
+// changing status of a cgroup. It is based on Linux eventfd. See more
+// information in the kernel documentation ("Notification API"). This function
+// will create an eventfd and write appropriate control file to correlate the
+// eventfd with a type of event so that users can start polling on the eventfd
+// to get notified. It returns the eventfd (file descriptor) if the notifier has
+// been successfully opened. This function assumes all the parameters are valid.
+// The eventfd is set to be non-blocking.
+// @param   hierarchy   Path to the hierarchy root.
+// @param   cgroup      Path to the cgroup relative to the hierarchy root.
+// @param   control     Name of the control file.
+// @param   args        Control specific arguments.
+// @return  The eventfd if the operation succeeds.
+//          Error if the operation fails.
+static Try<int> openNotifier(const std::string& hierarchy,
+                             const std::string& cgroup,
+                             const std::string& control,
+                             const Option<std::string>& args =
+                               Option<std::string>::none())
+{
+  int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+  if (efd < 0) {
+    return Try<int>::error(
+        "Create eventfd failed: " + std::string(strerror(errno)));
+  }
+
+  // Open the control file.
+  std::string path = hierarchy + "/" + cgroup + "/" + control;
+  Try<int> cfd = os::open(path, O_RDWR);
+  if (cfd.isError()) {
+    os::close(efd);
+    return Try<int>::error(cfd.error());
+  }
+
+  // Write the event control file (cgroup.event_control).
+  std::ostringstream out;
+  out << std::dec << efd << " " << cfd.get();
+  if (args.isSome()) {
+    out << " " << args.get();
+  }
+  Try<bool> write = internal::writeControl(hierarchy,
+                                           cgroup,
+                                           "cgroup.event_control",
+                                           out.str());
+  if (write.isError()) {
+    os::close(efd);
+    os::close(cfd.get());
+    return Try<int>::error(write.error());
+  }
+
+  os::close(cfd.get());
+
+  return efd;
+}
+
+
+// Close a notifier. The parameter fd is the eventfd returned by openNotifier.
+// @param   fd      The eventfd returned by openNotifier.
+// @return  True if the operation succeeds.
+//          Error if the operation fails.
+static Try<bool> closeNotifier(int fd)
+{
+  return os::close(fd);
+}
+
+
+// The process listening on event notifier. This class is invisible to users.
+class EventListener : public Process<EventListener>
+{
+public:
+  EventListener(const std::string& _hierarchy,
+                const std::string& _cgroup,
+                const std::string& _control,
+                const Option<std::string>& _args)
+    : hierarchy(_hierarchy),
+      cgroup(_cgroup),
+      control(_control),
+      args(_args),
+      data(0) {}
+
+  virtual ~EventListener() {}
+
+  Future<uint64_t> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop the listener if no one cares. Note that here we explicitly specify
+    // the type of the terminate function because it is an overloaded function.
+    // The compiler complains if we do not do it.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
+
+    // Open the event file.
+    Try<int> fd = internal::openNotifier(hierarchy, cgroup, control, args);
+    if (fd.isError()) {
+      promise.fail(fd.error());
+      terminate(self());
+      return;
+    }
+
+    // Remember the opened event file.
+    eventfd = fd.get();
+
+    // Perform nonblocking read on the event file. The nonblocking read will
+    // start polling on the event file until it becomes readable. If we can
+    // successfully read 8 bytes (sizeof uint64_t) from the event file, it
+    // indicates an event has occurred.
+    reading = io::read(eventfd.get(), &data, sizeof(data));
+    reading.onAny(defer(self(), &EventListener::notified));
+  }
+
+  virtual void finalize()
+  {
+    // Discard the nonblocking read.
+    reading.discard();
+
+    // Close the eventfd if needed.
+    if (eventfd.isSome()) {
+      Try<bool> close = internal::closeNotifier(eventfd.get());
+      if (close.isError()) {
+        LOG(ERROR) << "Closing eventfd " << eventfd.get()
+                   << " failed: " << close.error();
+      }
+    }
+  }
+
+private:
+  // This function is called when the nonblocking read on the eventfd has
+  // result, either because the event has happened, or an error has occurred.
+  void notified()
+  {
+    // Ignore this function if the promise is no longer pending.
+    if (!promise.future().isPending()) {
+      return;
+    }
+
+    // Since the future reading can only be discarded when the promise is no
+    // longer pending, we shall never see a discarded reading here because of
+    // the check in the beginning of the function.
+    CHECK(!reading.isDiscarded());
+
+    if (reading.isFailed()) {
+      promise.fail("Failed to read eventfd: " + reading.failure());
+    } else {
+      if (reading.get() == sizeof(data)) {
+        promise.set(data);
+      } else {
+        promise.fail("Read less than expected");
+      }
+    }
+
+    terminate(self());
+  }
+
+  std::string hierarchy;
+  std::string cgroup;
+  std::string control;
+  Option<std::string> args;
+  Promise<uint64_t> promise;
+  Future<size_t> reading;
+  Option<int> eventfd;  // The eventfd if opened.
+  uint64_t data; // The data read from the eventfd.
+};
+
+} // namespace internal {
+
+
+Future<uint64_t> listenEvent(const std::string& hierarchy,
+                             const std::string& cgroup,
+                             const std::string& control,
+                             const Option<std::string>& args)
+{
+  Try<bool> check = checkControl(hierarchy, cgroup, control);
+  if (check.isError()) {
+    return Future<uint64_t>::failed(check.error());
+  }
+
+  internal::EventListener* listener =
+    new internal::EventListener(hierarchy, cgroup, control, args);
+  Future<uint64_t> future = listener->future();
+  spawn(listener, true);
+  return future;
+}
+
 } // namespace cgroups {

Modified: incubator/mesos/trunk/src/linux/cgroups.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/linux/cgroups.hpp?rev=1370076&r1=1370075&r2=1370076&view=diff
==============================================================================
--- incubator/mesos/trunk/src/linux/cgroups.hpp (original)
+++ incubator/mesos/trunk/src/linux/cgroups.hpp Tue Aug  7 00:28:29 2012
@@ -25,8 +25,10 @@
 
 #include <sys/types.h>
 
-#include <stout/try.hpp>
+#include <process/future.hpp>
 
+#include <stout/option.hpp>
+#include <stout/try.hpp>
 
 namespace cgroups {
 
@@ -229,7 +231,22 @@ Try<bool> assignTask(const std::string& 
                      pid_t pid);
 
 
-} // namespace cgroups {
+// Listen on an event notifier and return a future which will become ready when
+// the certain event happens. This function will return a future failure if some
+// expected happens (e.g. the given hierarchy does not have the proper
+// subsystems attached).
+// @param   hierarchy   Path to the hierarchy root.
+// @param   cgroup      Path to the cgroup relative to the hierarchy root.
+// @param   control     Name of the control file.
+// @param   args        Control specific arguments.
+// @return  A future which contains the value read from the file when ready.
+//          Error if some unexpected happens.
+process::Future<uint64_t> listenEvent(const std::string& hierarchy,
+                                      const std::string& cgroup,
+                                      const std::string& control,
+                                      const Option<std::string>& args =
+                                        Option<std::string>::none());
 
+} // namespace cgroups {
 
 #endif // __CGROUPS_HPP__

Modified: incubator/mesos/trunk/src/tests/cgroups_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/cgroups_tests.cpp?rev=1370076&r1=1370075&r2=1370076&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/cgroups_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/cgroups_tests.cpp Tue Aug  7 00:28:29 2012
@@ -16,9 +16,13 @@
  * limitations under the License.
  */
 
+#include <assert.h>
+#include <signal.h>
+#include <stdlib.h>
 #include <unistd.h>
 
 #include <sys/types.h>
+#include <sys/wait.h>
 
 #include <gmock/gmock.h>
 
@@ -29,6 +33,8 @@
 
 #include "linux/cgroups.hpp"
 
+using namespace process;
+
 
 // Define the test fixture for the cgroups tests.
 class CgroupsTest : public ::testing::Test
@@ -381,3 +387,75 @@ TEST_F(CgroupsTest, ROOT_CGROUPS_GetTask
   EXPECT_NE(pids.find(1), pids.end());
   EXPECT_NE(pids.find(::getpid()), pids.end());
 }
+
+
+TEST_F(CgroupsTest, ROOT_CGROUPS_ListenEvent)
+{
+  // Disable oom killer.
+  Try<bool> disableResult = cgroups::writeControl(hierarchy,
+                                                  "/prof",
+                                                  "memory.oom_control",
+                                                  "1");
+  ASSERT_TRUE(disableResult.isSome());
+
+  // Limit the memory usage of "/prof" to 64MB.
+  size_t limit = 1024 * 1024 * 64;
+  Try<bool> writeResult = cgroups::writeControl(hierarchy,
+                                                "/prof",
+                                                "memory.limit_in_bytes",
+                                                stringify(limit));
+  ASSERT_TRUE(writeResult.isSome());
+
+  // Listen on oom events for "/prof" cgroup.
+  Future<uint64_t> future =
+    cgroups::listenEvent(hierarchy,
+                         "/prof",
+                         "memory.oom_control");
+  ASSERT_FALSE(future.isFailed());
+
+  // Test the cancellation.
+  future.discard();
+
+  // Test the normal operation below.
+  future = cgroups::listenEvent(hierarchy,
+                                "/prof",
+                                "memory.oom_control");
+  ASSERT_FALSE(future.isFailed());
+
+  pid_t pid = ::fork();
+  ASSERT_NE(-1, pid);
+
+  if (pid) {
+    // In parent process.
+    future.await(5.0); // Timeout in 5 seconds.
+
+    EXPECT_TRUE(future.isReady());
+
+    // Kill the child process.
+    EXPECT_NE(-1, ::kill(pid, SIGKILL));
+
+    // Wait for the child process.
+    int status;
+    EXPECT_NE(-1, ::waitpid((pid_t) -1, &status, 0));
+  } else {
+    // In child process. We try to trigger an oom here.
+    // Put self into the "/prof" cgroup.
+    Try<bool> assignResult = cgroups::assignTask(hierarchy,
+                                                 "/prof",
+                                                 ::getpid());
+    if (assignResult.isError()) {
+      FAIL() << "Failed to assign cgroup: " << assignResult.error();
+    }
+
+    // Blow up the memory.
+    size_t limit = 1024 * 1024 * 512;
+    char* ptr = (char*) ::malloc(limit);
+    assert(ptr != NULL);
+    for (size_t i = 0; i < limit; i++) {
+      ptr[i] = '\1';
+    }
+
+    // Should not reach here.
+    FAIL() << "OOM does not happen!";
+  }
+}