You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/07 02:28:29 UTC
svn commit: r1370076 - in /incubator/mesos/trunk/src: linux/cgroups.cpp
linux/cgroups.hpp tests/cgroups_tests.cpp
Author: benh
Date: Tue Aug 7 00:28:29 2012
New Revision: 1370076
URL: http://svn.apache.org/viewvc?rev=1370076&view=rev
Log:
Add APIs for event notification in cgroups (contributed by Jie Yu,
https://reviews.apache.org/r/5395).
Modified:
incubator/mesos/trunk/src/linux/cgroups.cpp
incubator/mesos/trunk/src/linux/cgroups.hpp
incubator/mesos/trunk/src/tests/cgroups_tests.cpp
Modified: incubator/mesos/trunk/src/linux/cgroups.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/linux/cgroups.cpp?rev=1370076&r1=1370075&r2=1370076&view=diff
==============================================================================
--- incubator/mesos/trunk/src/linux/cgroups.cpp (original)
+++ incubator/mesos/trunk/src/linux/cgroups.cpp Tue Aug 7 00:28:29 2012
@@ -20,11 +20,20 @@
#include <fts.h>
#include <unistd.h>
+#include <sys/syscall.h>
+
+#include <glog/logging.h>
+
#include <fstream>
#include <map>
#include <sstream>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp>
+
#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/stringify.hpp>
@@ -33,20 +42,11 @@
#include "linux/cgroups.hpp"
#include "linux/fs.hpp"
-
+using namespace process;
using namespace mesos::internal;
-
namespace cgroups {
-
-//////////////////////////////////////////////////////////////////////////////
-// The following *internal* functions provide very basic controls over Linux
-// cgroups. Sanity checks are removed from these functions for performance.
-// Users can always wrap these functions to provide various checks if needed.
-//////////////////////////////////////////////////////////////////////////////
-
-
namespace internal {
@@ -296,15 +296,9 @@ static Try<bool> writeControl(const std:
return true;
}
-
} // namespace internal {
-//////////////////////////////////////////////////////////////////////////////
-// The following functions are visible to users.
-//////////////////////////////////////////////////////////////////////////////
-
-
bool enabled()
{
return os::exists("/proc/cgroups");
@@ -749,4 +743,205 @@ Try<bool> assignTask(const std::string&
}
+namespace internal {
+
+#ifndef __NR_eventfd2
+#error "The eventfd2 syscall is unavailable."
+#endif
+
+#define EFD_SEMAPHORE (1 << 0)
+#define EFD_CLOEXEC O_CLOEXEC
+#define EFD_NONBLOCK O_NONBLOCK
+
+static int eventfd(unsigned int initval, int flags)
+{
+ return ::syscall(__NR_eventfd2, initval, flags);
+}
+
+
+// In cgroups, there is mechanism which allows to get notifications about
+// changing status of a cgroup. It is based on Linux eventfd. See more
+// information in the kernel documentation ("Notification API"). This function
+// will create an eventfd and write appropriate control file to correlate the
+// eventfd with a type of event so that users can start polling on the eventfd
+// to get notified. It returns the eventfd (file descriptor) if the notifier has
+// been successfully opened. This function assumes all the parameters are valid.
+// The eventfd is set to be non-blocking.
+// @param hierarchy Path to the hierarchy root.
+// @param cgroup Path to the cgroup relative to the hierarchy root.
+// @param control Name of the control file.
+// @param args Control specific arguments.
+// @return The eventfd if the operation succeeds.
+// Error if the operation fails.
+static Try<int> openNotifier(const std::string& hierarchy,
+ const std::string& cgroup,
+ const std::string& control,
+ const Option<std::string>& args =
+ Option<std::string>::none())
+{
+ int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+ if (efd < 0) {
+ return Try<int>::error(
+ "Create eventfd failed: " + std::string(strerror(errno)));
+ }
+
+ // Open the control file.
+ std::string path = hierarchy + "/" + cgroup + "/" + control;
+ Try<int> cfd = os::open(path, O_RDWR);
+ if (cfd.isError()) {
+ os::close(efd);
+ return Try<int>::error(cfd.error());
+ }
+
+ // Write the event control file (cgroup.event_control).
+ std::ostringstream out;
+ out << std::dec << efd << " " << cfd.get();
+ if (args.isSome()) {
+ out << " " << args.get();
+ }
+ Try<bool> write = internal::writeControl(hierarchy,
+ cgroup,
+ "cgroup.event_control",
+ out.str());
+ if (write.isError()) {
+ os::close(efd);
+ os::close(cfd.get());
+ return Try<int>::error(write.error());
+ }
+
+ os::close(cfd.get());
+
+ return efd;
+}
+
+
+// Close a notifier. The parameter fd is the eventfd returned by openNotifier.
+// @param fd The eventfd returned by openNotifier.
+// @return True if the operation succeeds.
+// Error if the operation fails.
+static Try<bool> closeNotifier(int fd)
+{
+ return os::close(fd);
+}
+
+
+// The process listening on event notifier. This class is invisible to users.
+class EventListener : public Process<EventListener>
+{
+public:
+ EventListener(const std::string& _hierarchy,
+ const std::string& _cgroup,
+ const std::string& _control,
+ const Option<std::string>& _args)
+ : hierarchy(_hierarchy),
+ cgroup(_cgroup),
+ control(_control),
+ args(_args),
+ data(0) {}
+
+ virtual ~EventListener() {}
+
+ Future<uint64_t> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop the listener if no one cares. Note that here we explicitly specify
+ // the type of the terminate function because it is an overloaded function.
+ // The compiler complains if we do not do it.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
+
+ // Open the event file.
+ Try<int> fd = internal::openNotifier(hierarchy, cgroup, control, args);
+ if (fd.isError()) {
+ promise.fail(fd.error());
+ terminate(self());
+ return;
+ }
+
+ // Remember the opened event file.
+ eventfd = fd.get();
+
+ // Perform nonblocking read on the event file. The nonblocking read will
+ // start polling on the event file until it becomes readable. If we can
+ // successfully read 8 bytes (sizeof uint64_t) from the event file, it
+ // indicates an event has occurred.
+ reading = io::read(eventfd.get(), &data, sizeof(data));
+ reading.onAny(defer(self(), &EventListener::notified));
+ }
+
+ virtual void finalize()
+ {
+ // Discard the nonblocking read.
+ reading.discard();
+
+ // Close the eventfd if needed.
+ if (eventfd.isSome()) {
+ Try<bool> close = internal::closeNotifier(eventfd.get());
+ if (close.isError()) {
+ LOG(ERROR) << "Closing eventfd " << eventfd.get()
+ << " failed: " << close.error();
+ }
+ }
+ }
+
+private:
+ // This function is called when the nonblocking read on the eventfd has
+ // result, either because the event has happened, or an error has occurred.
+ void notified()
+ {
+ // Ignore this function if the promise is no longer pending.
+ if (!promise.future().isPending()) {
+ return;
+ }
+
+ // Since the future reading can only be discarded when the promise is no
+ // longer pending, we shall never see a discarded reading here because of
+ // the check in the beginning of the function.
+ CHECK(!reading.isDiscarded());
+
+ if (reading.isFailed()) {
+ promise.fail("Failed to read eventfd: " + reading.failure());
+ } else {
+ if (reading.get() == sizeof(data)) {
+ promise.set(data);
+ } else {
+ promise.fail("Read less than expected");
+ }
+ }
+
+ terminate(self());
+ }
+
+ std::string hierarchy;
+ std::string cgroup;
+ std::string control;
+ Option<std::string> args;
+ Promise<uint64_t> promise;
+ Future<size_t> reading;
+ Option<int> eventfd; // The eventfd if opened.
+ uint64_t data; // The data read from the eventfd.
+};
+
+} // namespace internal {
+
+
+Future<uint64_t> listenEvent(const std::string& hierarchy,
+ const std::string& cgroup,
+ const std::string& control,
+ const Option<std::string>& args)
+{
+ Try<bool> check = checkControl(hierarchy, cgroup, control);
+ if (check.isError()) {
+ return Future<uint64_t>::failed(check.error());
+ }
+
+ internal::EventListener* listener =
+ new internal::EventListener(hierarchy, cgroup, control, args);
+ Future<uint64_t> future = listener->future();
+ spawn(listener, true);
+ return future;
+}
+
} // namespace cgroups {
Modified: incubator/mesos/trunk/src/linux/cgroups.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/linux/cgroups.hpp?rev=1370076&r1=1370075&r2=1370076&view=diff
==============================================================================
--- incubator/mesos/trunk/src/linux/cgroups.hpp (original)
+++ incubator/mesos/trunk/src/linux/cgroups.hpp Tue Aug 7 00:28:29 2012
@@ -25,8 +25,10 @@
#include <sys/types.h>
-#include <stout/try.hpp>
+#include <process/future.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
namespace cgroups {
@@ -229,7 +231,22 @@ Try<bool> assignTask(const std::string&
pid_t pid);
-} // namespace cgroups {
+// Listen on an event notifier and return a future which will become ready when
+// the certain event happens. This function will return a future failure if some
+// expected happens (e.g. the given hierarchy does not have the proper
+// subsystems attached).
+// @param hierarchy Path to the hierarchy root.
+// @param cgroup Path to the cgroup relative to the hierarchy root.
+// @param control Name of the control file.
+// @param args Control specific arguments.
+// @return A future which contains the value read from the file when ready.
+// Error if some unexpected happens.
+process::Future<uint64_t> listenEvent(const std::string& hierarchy,
+ const std::string& cgroup,
+ const std::string& control,
+ const Option<std::string>& args =
+ Option<std::string>::none());
+} // namespace cgroups {
#endif // __CGROUPS_HPP__
Modified: incubator/mesos/trunk/src/tests/cgroups_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/cgroups_tests.cpp?rev=1370076&r1=1370075&r2=1370076&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/cgroups_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/cgroups_tests.cpp Tue Aug 7 00:28:29 2012
@@ -16,9 +16,13 @@
* limitations under the License.
*/
+#include <assert.h>
+#include <signal.h>
+#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
+#include <sys/wait.h>
#include <gmock/gmock.h>
@@ -29,6 +33,8 @@
#include "linux/cgroups.hpp"
+using namespace process;
+
// Define the test fixture for the cgroups tests.
class CgroupsTest : public ::testing::Test
@@ -381,3 +387,75 @@ TEST_F(CgroupsTest, ROOT_CGROUPS_GetTask
EXPECT_NE(pids.find(1), pids.end());
EXPECT_NE(pids.find(::getpid()), pids.end());
}
+
+
+TEST_F(CgroupsTest, ROOT_CGROUPS_ListenEvent)
+{
+ // Disable oom killer.
+ Try<bool> disableResult = cgroups::writeControl(hierarchy,
+ "/prof",
+ "memory.oom_control",
+ "1");
+ ASSERT_TRUE(disableResult.isSome());
+
+ // Limit the memory usage of "/prof" to 64MB.
+ size_t limit = 1024 * 1024 * 64;
+ Try<bool> writeResult = cgroups::writeControl(hierarchy,
+ "/prof",
+ "memory.limit_in_bytes",
+ stringify(limit));
+ ASSERT_TRUE(writeResult.isSome());
+
+ // Listen on oom events for "/prof" cgroup.
+ Future<uint64_t> future =
+ cgroups::listenEvent(hierarchy,
+ "/prof",
+ "memory.oom_control");
+ ASSERT_FALSE(future.isFailed());
+
+ // Test the cancellation.
+ future.discard();
+
+ // Test the normal operation below.
+ future = cgroups::listenEvent(hierarchy,
+ "/prof",
+ "memory.oom_control");
+ ASSERT_FALSE(future.isFailed());
+
+ pid_t pid = ::fork();
+ ASSERT_NE(-1, pid);
+
+ if (pid) {
+ // In parent process.
+ future.await(5.0); // Timeout in 5 seconds.
+
+ EXPECT_TRUE(future.isReady());
+
+ // Kill the child process.
+ EXPECT_NE(-1, ::kill(pid, SIGKILL));
+
+ // Wait for the child process.
+ int status;
+ EXPECT_NE(-1, ::waitpid((pid_t) -1, &status, 0));
+ } else {
+ // In child process. We try to trigger an oom here.
+ // Put self into the "/prof" cgroup.
+ Try<bool> assignResult = cgroups::assignTask(hierarchy,
+ "/prof",
+ ::getpid());
+ if (assignResult.isError()) {
+ FAIL() << "Failed to assign cgroup: " << assignResult.error();
+ }
+
+ // Blow up the memory.
+ size_t limit = 1024 * 1024 * 512;
+ char* ptr = (char*) ::malloc(limit);
+ assert(ptr != NULL);
+ for (size_t i = 0; i < limit; i++) {
+ ptr[i] = '\1';
+ }
+
+ // Should not reach here.
+ FAIL() << "OOM does not happen!";
+ }
+}