You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/18 19:00:16 UTC
mesos git commit: Refactored cgroups event listener to allow
continuous monitoring.
Repository: mesos
Updated Branches:
refs/heads/master 0d2c29b9f -> e56625892
Refactored cgroups event listener to allow continuous monitoring.
Review: https://reviews.apache.org/r/31008
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5662589
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5662589
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5662589
Branch: refs/heads/master
Commit: e5662589232aedb0f694546acb669d850937e6a5
Parents: 0d2c29b
Author: Chi Zhang <ch...@gmail.com>
Authored: Wed Feb 18 09:37:36 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 18 09:59:47 2015 -0800
----------------------------------------------------------------------
src/linux/cgroups.cpp | 146 +++++++++++++++++++++++++++++----------------
src/linux/cgroups.hpp | 4 ++
2 files changed, 99 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e5662589/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index b6f75b1..a307e27 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1082,7 +1082,7 @@ Try<Nothing> assign(const string& hierarchy, const string& cgroup, pid_t pid)
}
-namespace internal {
+namespace event {
#ifndef EFD_SEMAPHORE
#define EFD_SEMAPHORE (1 << 0)
@@ -1147,7 +1147,7 @@ static Try<int> registerNotifier(
const string& control,
const Option<string>& args = None())
{
- int efd = internal::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+ int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
return ErrnoError("Failed to create an eventfd");
}
@@ -1191,50 +1191,64 @@ static Try<Nothing> unregisterNotifier(int fd)
}
-// The process listening on event notifier. This class is invisible to users.
-class EventListener : public Process<EventListener>
+// The process listening on an event notifier. This class is internal
+// to the cgroup code and assumes parameters are valid. See the
+// comments of the public interface 'listen' for its usage.
+class Listener : public Process<Listener>
{
public:
- EventListener(const string& _hierarchy,
- const string& _cgroup,
- const string& _control,
- const Option<string>& _args)
+ Listener(const string& _hierarchy,
+ const string& _cgroup,
+ const string& _control,
+ const Option<string>& _args)
: hierarchy(_hierarchy),
cgroup(_cgroup),
control(_control),
args(_args),
data(0) {}
- virtual ~EventListener() {}
+ virtual ~Listener() {}
+
+ // Waits for the next event to occur, at which point the future
+ // becomes ready. Returns a failure if error occurs. If any previous
+ // call to 'listen' returns a failure, all subsequent calls to
+ // 'listen' will return failures as well (in that case, the user
+ // should consider terminate this process and create a new one if
+ // he/she still wants to monitor the events).
+ // TODO(chzhcn): If the user discards the returned future, currently
+ // we do not do anything. Consider a better discard semantics here.
+ Future<uint64_t> listen()
+ {
+ if (error.isSome()) {
+ return Failure(error.get());
+ }
- Future<uint64_t> future() { return promise.future(); }
+ if (promise.isNone()) {
+ promise = Owned<Promise<uint64_t>>(new Promise<uint64_t>());
+
+ // Perform nonblocking read on the event file. The nonblocking
+ // read will start polling on the event file until it becomes
+ // readable. If we can successfully read 8 bytes (sizeof
+ // uint64_t) from the event file, it indicates that an event has
+ // occurred.
+ reading = io::read(eventfd.get(), &data, sizeof(data));
+ reading.onAny(defer(self(), &Listener::_listen));
+ }
+
+ return promise.get()->future();
+ }
protected:
virtual void initialize()
{
- // Stop the listener if no one cares. Note that here we explicitly specify
- // the type of the terminate function because it is an overloaded function.
- // The compiler complains if we do not do it.
- promise.future().onDiscard(lambda::bind(
- static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
-
// Register an eventfd "notifier" for the given control.
- Try<int> fd = internal::registerNotifier(hierarchy, cgroup, control, args);
+ Try<int> fd = registerNotifier(hierarchy, cgroup, control, args);
if (fd.isError()) {
- promise.fail("Failed to register notification eventfd: " + fd.error());
- terminate(self());
- return;
+ error = Error("Failed to register notification eventfd: " + fd.error());
+ } else {
+ // Remember the opened event file descriptor.
+ eventfd = fd.get();
}
-
- // Remember the opened event file descriptor.
- eventfd = fd.get();
-
- // Perform nonblocking read on the event file. The nonblocking read will
- // start polling on the event file until it becomes readable. If we can
- // successfully read 8 bytes (sizeof uint64_t) from the event file, it
- // indicates an event has occurred.
- reading = io::read(eventfd.get(), &data, sizeof(data));
- reading.onAny(defer(self(), &EventListener::notified, lambda::_1));
}
virtual void finalize()
@@ -1244,47 +1258,60 @@ protected:
// Unregister the eventfd if needed.
if (eventfd.isSome()) {
- Try<Nothing> unregister = internal::unregisterNotifier(eventfd.get());
+ Try<Nothing> unregister = unregisterNotifier(eventfd.get());
if (unregister.isError()) {
- LOG(ERROR) << "Failed to unregistering eventfd: " << unregister.error();
+ LOG(ERROR) << "Failed to unregister eventfd: " << unregister.error();
}
}
- // TODO(benh): Discard our promise only after 'reading' has
- // completed (ready, failed, or discarded).
- promise.discard();
+ // TODO(chzhcn): Fail our promise only after 'reading' has
+ // completed (ready, failed or discarded).
+ if (promise.isSome()) {
+ promise.get()->fail("Event listener is terminating");
+ }
}
private:
// This function is called when the nonblocking read on the eventfd has
// result, either because the event has happened, or an error has occurred.
- void notified(const Future<size_t>&)
+ void _listen()
{
+ CHECK_SOME(promise);
+
+ if (reading.isReady() && reading.get() == sizeof(data)) {
+ promise.get()->set(data);
+
+ // After fulfilling the promise, reset to get ready for the next one.
+ promise = None();
+ return;
+ }
+
if (reading.isDiscarded()) {
- promise.discard();
+ error = Error("Reading eventfd stopped unexpectedly");
} else if (reading.isFailed()) {
- promise.fail("Failed to read eventfd: " + reading.failure());
- } else if (reading.get() == sizeof(data)) {
- promise.set(data);
+ error = Error("Failed to read eventfd: " + reading.failure());
} else {
- promise.fail("Read less than expected");
+ error = Error("Read less than expected. Expect " +
+ stringify(sizeof(data)) + " bytes; actual " +
+ stringify(reading.get()) + " bytes");
}
- terminate(self());
+ // Inform failure and not listen again.
+ promise.get()->fail(error.get().message);
}
const string hierarchy;
const string cgroup;
const string control;
const Option<string> args;
- Promise<uint64_t> promise;
+
+ Option<Owned<Promise<uint64_t>>> promise;
Future<size_t> reading;
- Option<int> eventfd; // The eventfd if opened.
- uint64_t data; // The data read from the eventfd.
+ Option<Error> error;
+ Option<int> eventfd;
+ uint64_t data; // The data read from the eventfd last time.
};
-} // namespace internal {
-
Future<uint64_t> listen(
const string& hierarchy,
@@ -1297,13 +1324,30 @@ Future<uint64_t> listen(
return Failure(error.get());
}
- internal::EventListener* listener =
- new internal::EventListener(hierarchy, cgroup, control, args);
- Future<uint64_t> future = listener->future();
+ Listener* listener = new Listener(hierarchy, cgroup, control, args);
+
spawn(listener, true);
+
+ Future<uint64_t> future = dispatch(listener, &Listener::listen);
+
+ // If the user doesn't care any more, or listening has had a result,
+ // terminate the listener.
+ future
+ .onDiscard(lambda::bind(
+ static_cast<void (*)(const UPID&, bool)>(terminate),
+ listener->self(),
+ true))
+ .onAny(lambda::bind(
+ static_cast<void (*)(const UPID&, bool)>(terminate),
+ listener->self(),
+ true));
+
return future;
}
+} // namespace event {
+
+
namespace internal {
namespace freezer {
@@ -2113,7 +2157,7 @@ Nothing _nothing() { return Nothing(); }
Future<Nothing> listen(const string& hierarchy, const string& cgroup)
{
- return cgroups::listen(hierarchy, cgroup, "memory.oom_control")
+ return cgroups::event::listen(hierarchy, cgroup, "memory.oom_control")
.then(lambda::bind(&_nothing));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e5662589/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index bf8efee..e07772f 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -315,6 +315,8 @@ Try<Nothing> assign(
pid_t pid);
+namespace event {
+
// Listen on an event notifier and return a future which will become ready when
// the certain event happens. This function will return a future failure if some
// expected happens (e.g. the given hierarchy does not have the proper
@@ -331,6 +333,8 @@ process::Future<uint64_t> listen(
const std::string& control,
const Option<std::string>& args = Option<std::string>::none());
+} // namespace event {
+
// Destroy a cgroup under a given hierarchy. It will also recursively
// destroy any sub-cgroups. If the freezer subsystem is attached to