You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/13 09:30:28 UTC

git commit: Fixed bug in io::poll.

Repository: mesos
Updated Branches:
  refs/heads/master 4aa3ec22c -> 849fc4d36


Fixed bug in io::poll.

When a future returned from process::io::poll is discarded we need to
fully remove the ev_io watchers from libev otherwise the reuse of
those file descriptors can cause untended side effects because they're
still being used by the kernel/libev.

Review: https://reviews.apache.org/r/24194


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/849fc4d3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/849fc4d3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/849fc4d3

Branch: refs/heads/master
Commit: 849fc4d361e40062073324153ba97e98e294fdf2
Parents: 4aa3ec2
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 1 13:01:15 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Aug 13 00:30:08 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 186 ++++++++++++++++++++++++-------
 1 file changed, 147 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/849fc4d3/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 14cf317..c2bee98 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -560,10 +560,18 @@ static ev_timer timeouts_watcher;
 // Server watcher for accepting connections.
 static ev_io server_watcher;
 
-// Queue of I/O watchers.
+// Queue of I/O watchers to be asynchronously added to the event loop
+// (protected by 'watchers' below).
+// TODO(benh): Replace this queue with functions that we put in
+// 'functions' below that perform the ev_io_start themselves.
 static queue<ev_io*>* watchers = new queue<ev_io*>();
 static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
 
+// Queue of functions to be invoked asynchronously within the vent
+// loop (protected by 'watchers' below).
+static queue<lambda::function<void(void)> >* functions =
+  new queue<lambda::function<void(void)> >();
+
 // We store the timers in a map of lists indexed by the timeout of the
 // timer so that we can have two timers that have the same timeout. We
 // exploit that the map is SORTED!
@@ -876,6 +884,40 @@ static Message* parse(Request* request)
   return message;
 }
 
+// Wrapper around function we want to run in the event loop.
+template <typename T>
+void _run_in_event_loop(
+    const lambda::function<Future<T>(void)>& f,
+    const Owned<Promise<T> >& promise)
+{
+  // Don't bother running the function if the future has been discarded.
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+  } else {
+    promise->set(f());
+  }
+}
+
+
+// Helper for running a function in the event loop.
+template <typename T>
+Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
+{
+  Owned<Promise<T> > promise(new Promise<T>());
+
+  Future<T> future = promise->future();
+
+  // Enqueue the function.
+  synchronized (watchers) {
+    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+  }
+
+  // Interrupt the loop.
+  ev_async_send(loop, &async_watcher);
+
+  return future;
+}
+
 
 void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 {
@@ -886,6 +928,11 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
       watchers->pop();
       ev_io_start(loop, watcher);
     }
+
+    while (!functions->empty()) {
+      (functions->front())();
+      functions->pop();
+    }
   }
 
   synchronized (timeouts) {
@@ -1344,14 +1391,66 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+  Poll()
+  {
+    // Need to explicitly instantiate the watchers.
+    watcher.io.reset(new ev_io());
+    watcher.async.reset(new ev_async());
+  }
+
+  // An I/O watcher for checking for readability or writeability and
+  // an async watcher for being able to discard the polling.
+  struct {
+    memory::shared_ptr<ev_io> io;
+    memory::shared_ptr<ev_async> async;
+  } watcher;
+
+  Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
 void polled(struct ev_loop* loop, ev_io* watcher, int revents)
 {
-  Promise<short>* promise = (Promise<short>*) watcher->data;
-  promise->set(revents);
-  delete promise;
+  Poll* poll = (Poll*) watcher->data;
+
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  // Stop the async watcher (also clears if pending so 'discard_poll'
+  // will not get invoked and we can delete 'poll' here).
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  poll->promise.set(revents);
+
+  delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  // Check and see if we have a pending 'polled' callback and if so
+  // let it "win".
+  if (ev_is_pending(poll->watcher.io.get())) {
+    return;
+  }
 
-  ev_io_stop(loop, watcher);
-  delete watcher;
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  // Stop the I/O watcher (but note we check if pending above) so it
+  // won't get invoked and we can delete 'poll' here.
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  poll->promise.discard();
+
+  delete poll;
 }
 
 
@@ -3586,6 +3685,44 @@ namespace io {
 
 namespace internal {
 
+// Helper/continuation of 'poll' on future discard.
+void _poll(const memory::shared_ptr<ev_async>& async)
+{
+  ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int fd, short events)
+{
+  Poll* poll = new Poll();
+
+  // Have the watchers data point back to the struct.
+  poll->watcher.async->data = poll;
+  poll->watcher.io->data = poll;
+
+  // Get a copy of the future to avoid any races with the event loop.
+  Future<short> future = poll->promise.future();
+
+  // Initialize and start the async watcher.
+  ev_async_init(poll->watcher.async.get(), discard_poll);
+  ev_async_start(loop, poll->watcher.async.get());
+
+  // Make sure we stop polling if a discard occurs on our future.
+  // Note that it's possible that we'll invoke '_poll' when someone
+  // does a discard even after the polling has already completed, but
+  // in this case while we will interrupt the event loop since the
+  // async watcher has already been stopped we won't cause
+  // 'discard_poll' to get invoked.
+  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+  // Initialize and start the I/O watcher.
+  ev_io_init(poll->watcher.io.get(), polled, fd, events);
+  ev_io_start(loop, poll->watcher.io.get());
+
+  return future;
+}
+
+
 void read(
     int fd,
     void* data,
@@ -3595,6 +3732,7 @@ void read(
 {
   // Ignore this function if the read operation has been discarded.
   if (promise->future().hasDiscard()) {
+    CHECK(!future.isPending());
     promise->discard();
     return;
   }
@@ -3614,7 +3752,7 @@ void read(
       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
         // Restart the read operation.
         Future<short> future =
-          poll(fd, process::io::READ).onAny(
+          io::poll(fd, process::io::READ).onAny(
               lambda::bind(&internal::read,
                            fd,
                            data,
@@ -3705,7 +3843,7 @@ void write(
       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
         // Restart the write operation.
         Future<short> future =
-          poll(fd, process::io::WRITE).onAny(
+          io::poll(fd, process::io::WRITE).onAny(
               lambda::bind(&internal::write,
                            fd,
                            data,
@@ -3737,37 +3875,7 @@ Future<short> poll(int fd, short events)
 
   // TODO(benh): Check if the file descriptor is non-blocking?
 
-  Promise<short>* promise = new Promise<short>();
-
-  // Get a copy of the future to avoid any races with the event loop.
-  Future<short> future = promise->future();
-
-  // Make sure we stop polling if a discard occurs on our future.
-  // TODO(benh): This is actually insuffient in as much as we need to
-  // interrupt the libev event loop and stop and remove the
-  // watcher. This has been left as a TODO since (a) it's a
-  // non-trivial change (i.e., updating 'handle_async' to also remove
-  // watchers) and (b) it's most likely that the file descriptor being
-  // polled will be closed after the promise is discarded which will
-  // invoke 'polled' which will then cause the watcher to be stopped
-  // and deleted. Note that we needed to make Promise<T>::discard a
-  // 'friend' which should be removed once we clean this up.
-  future.onDiscard(lambda::bind(&process::internal::discarded<short>, future));
-
-  ev_io* watcher = new ev_io();
-  watcher->data = promise;
-
-  ev_io_init(watcher, polled, fd, events);
-
-  // Enqueue the watcher.
-  synchronized (watchers) {
-    watchers->push(watcher);
-  }
-
-  // Interrupt the loop.
-  ev_async_send(loop, &async_watcher);
-
-  return future;
+  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
 }