You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/18 19:00:16 UTC

mesos git commit: Refactored cgroups event listener to allow continuous monitoring.

Repository: mesos
Updated Branches:
  refs/heads/master 0d2c29b9f -> e56625892


Refactored cgroups event listener to allow continuous monitoring.

Review: https://reviews.apache.org/r/31008


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5662589
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5662589
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5662589

Branch: refs/heads/master
Commit: e5662589232aedb0f694546acb669d850937e6a5
Parents: 0d2c29b
Author: Chi Zhang <ch...@gmail.com>
Authored: Wed Feb 18 09:37:36 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 18 09:59:47 2015 -0800

----------------------------------------------------------------------
 src/linux/cgroups.cpp | 146 +++++++++++++++++++++++++++++----------------
 src/linux/cgroups.hpp |   4 ++
 2 files changed, 99 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e5662589/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index b6f75b1..a307e27 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1082,7 +1082,7 @@ Try<Nothing> assign(const string& hierarchy, const string& cgroup, pid_t pid)
 }
 
 
-namespace internal {
+namespace event {
 
 #ifndef EFD_SEMAPHORE
 #define EFD_SEMAPHORE (1 << 0)
@@ -1147,7 +1147,7 @@ static Try<int> registerNotifier(
     const string& control,
     const Option<string>& args = None())
 {
-  int efd = internal::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+  int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
   if (efd < 0) {
     return ErrnoError("Failed to create an eventfd");
   }
@@ -1191,50 +1191,64 @@ static Try<Nothing> unregisterNotifier(int fd)
 }
 
 
-// The process listening on event notifier. This class is invisible to users.
-class EventListener : public Process<EventListener>
+// The process listening on an event notifier. This class is internal
+// to the cgroup code and assumes parameters are valid. See the
+// comments of the public interface 'listen' for its usage.
+class Listener : public Process<Listener>
 {
 public:
-  EventListener(const string& _hierarchy,
-                const string& _cgroup,
-                const string& _control,
-                const Option<string>& _args)
+  Listener(const string& _hierarchy,
+           const string& _cgroup,
+           const string& _control,
+           const Option<string>& _args)
     : hierarchy(_hierarchy),
       cgroup(_cgroup),
       control(_control),
       args(_args),
       data(0) {}
 
-  virtual ~EventListener() {}
+  virtual ~Listener() {}
+
+  // Waits for the next event to occur, at which point the future
+  // becomes ready. Returns a failure if error occurs. If any previous
+  // call to 'listen' returns a failure, all subsequent calls to
+  // 'listen' will return failures as well (in that case, the user
+  // should consider terminate this process and create a new one if
+  // he/she still wants to monitor the events).
+  // TODO(chzhcn): If the user discards the returned future, currently
+  // we do not do anything. Consider a better discard semantics here.
+  Future<uint64_t> listen()
+  {
+    if (error.isSome()) {
+      return Failure(error.get());
+    }
 
-  Future<uint64_t> future() { return promise.future(); }
+    if (promise.isNone()) {
+      promise = Owned<Promise<uint64_t>>(new Promise<uint64_t>());
+
+      // Perform nonblocking read on the event file. The nonblocking
+      // read will start polling on the event file until it becomes
+      // readable. If we can successfully read 8 bytes (sizeof
+      // uint64_t) from the event file, it indicates that an event has
+      // occurred.
+      reading = io::read(eventfd.get(), &data, sizeof(data));
+      reading.onAny(defer(self(), &Listener::_listen));
+    }
+
+    return promise.get()->future();
+  }
 
 protected:
   virtual void initialize()
   {
-    // Stop the listener if no one cares. Note that here we explicitly specify
-    // the type of the terminate function because it is an overloaded function.
-    // The compiler complains if we do not do it.
-    promise.future().onDiscard(lambda::bind(
-        static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
-
     // Register an eventfd "notifier" for the given control.
-    Try<int> fd = internal::registerNotifier(hierarchy, cgroup, control, args);
+    Try<int> fd = registerNotifier(hierarchy, cgroup, control, args);
     if (fd.isError()) {
-      promise.fail("Failed to register notification eventfd: " + fd.error());
-      terminate(self());
-      return;
+      error = Error("Failed to register notification eventfd: " + fd.error());
+    } else {
+      // Remember the opened event file descriptor.
+      eventfd = fd.get();
     }
-
-    // Remember the opened event file descriptor.
-    eventfd = fd.get();
-
-    // Perform nonblocking read on the event file. The nonblocking read will
-    // start polling on the event file until it becomes readable. If we can
-    // successfully read 8 bytes (sizeof uint64_t) from the event file, it
-    // indicates an event has occurred.
-    reading = io::read(eventfd.get(), &data, sizeof(data));
-    reading.onAny(defer(self(), &EventListener::notified, lambda::_1));
   }
 
   virtual void finalize()
@@ -1244,47 +1258,60 @@ protected:
 
     // Unregister the eventfd if needed.
     if (eventfd.isSome()) {
-      Try<Nothing> unregister = internal::unregisterNotifier(eventfd.get());
+      Try<Nothing> unregister = unregisterNotifier(eventfd.get());
       if (unregister.isError()) {
-        LOG(ERROR) << "Failed to unregistering eventfd: " << unregister.error();
+        LOG(ERROR) << "Failed to unregister eventfd: " << unregister.error();
       }
     }
 
-    // TODO(benh): Discard our promise only after 'reading' has
-    // completed (ready, failed, or discarded).
-    promise.discard();
+    // TODO(chzhcn): Fail our promise only after 'reading' has
+    // completed (ready, failed or discarded).
+    if (promise.isSome()) {
+      promise.get()->fail("Event listener is terminating");
+    }
   }
 
 private:
   // This function is called when the nonblocking read on the eventfd has
   // result, either because the event has happened, or an error has occurred.
-  void notified(const Future<size_t>&)
+  void _listen()
   {
+    CHECK_SOME(promise);
+
+    if (reading.isReady() && reading.get() == sizeof(data)) {
+      promise.get()->set(data);
+
+      // After fulfilling the promise, reset to get ready for the next one.
+      promise = None();
+      return;
+    }
+
     if (reading.isDiscarded()) {
-      promise.discard();
+      error = Error("Reading eventfd stopped unexpectedly");
     } else if (reading.isFailed()) {
-      promise.fail("Failed to read eventfd: " + reading.failure());
-    } else if (reading.get() == sizeof(data)) {
-      promise.set(data);
+      error = Error("Failed to read eventfd: " + reading.failure());
     } else {
-      promise.fail("Read less than expected");
+      error = Error("Read less than expected. Expect " +
+                    stringify(sizeof(data)) + " bytes; actual " +
+                    stringify(reading.get()) + " bytes");
     }
 
-    terminate(self());
+    // Inform failure and not listen again.
+    promise.get()->fail(error.get().message);
   }
 
   const string hierarchy;
   const string cgroup;
   const string control;
   const Option<string> args;
-  Promise<uint64_t> promise;
+
+  Option<Owned<Promise<uint64_t>>> promise;
   Future<size_t> reading;
-  Option<int> eventfd;  // The eventfd if opened.
-  uint64_t data; // The data read from the eventfd.
+  Option<Error> error;
+  Option<int> eventfd;
+  uint64_t data;                // The data read from the eventfd last time.
 };
 
-} // namespace internal {
-
 
 Future<uint64_t> listen(
     const string& hierarchy,
@@ -1297,13 +1324,30 @@ Future<uint64_t> listen(
     return Failure(error.get());
   }
 
-  internal::EventListener* listener =
-    new internal::EventListener(hierarchy, cgroup, control, args);
-  Future<uint64_t> future = listener->future();
+  Listener* listener = new Listener(hierarchy, cgroup, control, args);
+
   spawn(listener, true);
+
+  Future<uint64_t> future = dispatch(listener, &Listener::listen);
+
+  // If the user doesn't care any more, or listening has had a result,
+  // terminate the listener.
+  future
+    .onDiscard(lambda::bind(
+        static_cast<void (*)(const UPID&, bool)>(terminate),
+        listener->self(),
+        true))
+    .onAny(lambda::bind(
+        static_cast<void (*)(const UPID&, bool)>(terminate),
+        listener->self(),
+        true));
+
   return future;
 }
 
+} // namespace event {
+
+
 namespace internal {
 
 namespace freezer {
@@ -2113,7 +2157,7 @@ Nothing _nothing() { return Nothing(); }
 
 Future<Nothing> listen(const string& hierarchy, const string& cgroup)
 {
-  return cgroups::listen(hierarchy, cgroup, "memory.oom_control")
+  return cgroups::event::listen(hierarchy, cgroup, "memory.oom_control")
     .then(lambda::bind(&_nothing));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e5662589/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index bf8efee..e07772f 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -315,6 +315,8 @@ Try<Nothing> assign(
     pid_t pid);
 
 
+namespace event {
+
 // Listen on an event notifier and return a future which will become ready when
 // the certain event happens. This function will return a future failure if some
 // expected happens (e.g. the given hierarchy does not have the proper
@@ -331,6 +333,8 @@ process::Future<uint64_t> listen(
     const std::string& control,
     const Option<std::string>& args = Option<std::string>::none());
 
+} // namespace event {
+
 
 // Destroy a cgroup under a given hierarchy. It will also recursively
 // destroy any sub-cgroups. If the freezer subsystem is attached to