You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/13 09:30:28 UTC
git commit: Fixed bug in io::poll.
Repository: mesos
Updated Branches:
refs/heads/master 4aa3ec22c -> 849fc4d36
Fixed bug in io::poll.
When a future returned from process::io::poll is discarded we need to
fully remove the ev_io watchers from libev otherwise the reuse of
those file descriptors can cause untended side effects because they're
still being used by the kernel/libev.
Review: https://reviews.apache.org/r/24194
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/849fc4d3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/849fc4d3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/849fc4d3
Branch: refs/heads/master
Commit: 849fc4d361e40062073324153ba97e98e294fdf2
Parents: 4aa3ec2
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 1 13:01:15 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Aug 13 00:30:08 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 186 ++++++++++++++++++++++++-------
1 file changed, 147 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/849fc4d3/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 14cf317..c2bee98 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -560,10 +560,18 @@ static ev_timer timeouts_watcher;
// Server watcher for accepting connections.
static ev_io server_watcher;
-// Queue of I/O watchers.
+// Queue of I/O watchers to be asynchronously added to the event loop
+// (protected by 'watchers' below).
+// TODO(benh): Replace this queue with functions that we put in
+// 'functions' below that perform the ev_io_start themselves.
static queue<ev_io*>* watchers = new queue<ev_io*>();
static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
+// Queue of functions to be invoked asynchronously within the vent
+// loop (protected by 'watchers' below).
+static queue<lambda::function<void(void)> >* functions =
+ new queue<lambda::function<void(void)> >();
+
// We store the timers in a map of lists indexed by the timeout of the
// timer so that we can have two timers that have the same timeout. We
// exploit that the map is SORTED!
@@ -876,6 +884,40 @@ static Message* parse(Request* request)
return message;
}
+// Wrapper around function we want to run in the event loop.
+template <typename T>
+void _run_in_event_loop(
+ const lambda::function<Future<T>(void)>& f,
+ const Owned<Promise<T> >& promise)
+{
+ // Don't bother running the function if the future has been discarded.
+ if (promise->future().hasDiscard()) {
+ promise->discard();
+ } else {
+ promise->set(f());
+ }
+}
+
+
+// Helper for running a function in the event loop.
+template <typename T>
+Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
+{
+ Owned<Promise<T> > promise(new Promise<T>());
+
+ Future<T> future = promise->future();
+
+ // Enqueue the function.
+ synchronized (watchers) {
+ functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+ }
+
+ // Interrupt the loop.
+ ev_async_send(loop, &async_watcher);
+
+ return future;
+}
+
void handle_async(struct ev_loop* loop, ev_async* _, int revents)
{
@@ -886,6 +928,11 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
watchers->pop();
ev_io_start(loop, watcher);
}
+
+ while (!functions->empty()) {
+ (functions->front())();
+ functions->pop();
+ }
}
synchronized (timeouts) {
@@ -1344,14 +1391,66 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
}
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+ Poll()
+ {
+ // Need to explicitly instantiate the watchers.
+ watcher.io.reset(new ev_io());
+ watcher.async.reset(new ev_async());
+ }
+
+ // An I/O watcher for checking for readability or writeability and
+ // an async watcher for being able to discard the polling.
+ struct {
+ memory::shared_ptr<ev_io> io;
+ memory::shared_ptr<ev_async> async;
+ } watcher;
+
+ Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
void polled(struct ev_loop* loop, ev_io* watcher, int revents)
{
- Promise<short>* promise = (Promise<short>*) watcher->data;
- promise->set(revents);
- delete promise;
+ Poll* poll = (Poll*) watcher->data;
+
+ ev_io_stop(loop, poll->watcher.io.get());
+
+ // Stop the async watcher (also clears if pending so 'discard_poll'
+ // will not get invoked and we can delete 'poll' here).
+ ev_async_stop(loop, poll->watcher.async.get());
+
+ poll->promise.set(revents);
+
+ delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+ Poll* poll = (Poll*) watcher->data;
+
+ // Check and see if we have a pending 'polled' callback and if so
+ // let it "win".
+ if (ev_is_pending(poll->watcher.io.get())) {
+ return;
+ }
- ev_io_stop(loop, watcher);
- delete watcher;
+ ev_async_stop(loop, poll->watcher.async.get());
+
+ // Stop the I/O watcher (but note we check if pending above) so it
+ // won't get invoked and we can delete 'poll' here.
+ ev_io_stop(loop, poll->watcher.io.get());
+
+ poll->promise.discard();
+
+ delete poll;
}
@@ -3586,6 +3685,44 @@ namespace io {
namespace internal {
+// Helper/continuation of 'poll' on future discard.
+void _poll(const memory::shared_ptr<ev_async>& async)
+{
+ ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int fd, short events)
+{
+ Poll* poll = new Poll();
+
+ // Have the watchers data point back to the struct.
+ poll->watcher.async->data = poll;
+ poll->watcher.io->data = poll;
+
+ // Get a copy of the future to avoid any races with the event loop.
+ Future<short> future = poll->promise.future();
+
+ // Initialize and start the async watcher.
+ ev_async_init(poll->watcher.async.get(), discard_poll);
+ ev_async_start(loop, poll->watcher.async.get());
+
+ // Make sure we stop polling if a discard occurs on our future.
+ // Note that it's possible that we'll invoke '_poll' when someone
+ // does a discard even after the polling has already completed, but
+ // in this case while we will interrupt the event loop since the
+ // async watcher has already been stopped we won't cause
+ // 'discard_poll' to get invoked.
+ future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+ // Initialize and start the I/O watcher.
+ ev_io_init(poll->watcher.io.get(), polled, fd, events);
+ ev_io_start(loop, poll->watcher.io.get());
+
+ return future;
+}
+
+
void read(
int fd,
void* data,
@@ -3595,6 +3732,7 @@ void read(
{
// Ignore this function if the read operation has been discarded.
if (promise->future().hasDiscard()) {
+ CHECK(!future.isPending());
promise->discard();
return;
}
@@ -3614,7 +3752,7 @@ void read(
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// Restart the read operation.
Future<short> future =
- poll(fd, process::io::READ).onAny(
+ io::poll(fd, process::io::READ).onAny(
lambda::bind(&internal::read,
fd,
data,
@@ -3705,7 +3843,7 @@ void write(
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// Restart the write operation.
Future<short> future =
- poll(fd, process::io::WRITE).onAny(
+ io::poll(fd, process::io::WRITE).onAny(
lambda::bind(&internal::write,
fd,
data,
@@ -3737,37 +3875,7 @@ Future<short> poll(int fd, short events)
// TODO(benh): Check if the file descriptor is non-blocking?
- Promise<short>* promise = new Promise<short>();
-
- // Get a copy of the future to avoid any races with the event loop.
- Future<short> future = promise->future();
-
- // Make sure we stop polling if a discard occurs on our future.
- // TODO(benh): This is actually insuffient in as much as we need to
- // interrupt the libev event loop and stop and remove the
- // watcher. This has been left as a TODO since (a) it's a
- // non-trivial change (i.e., updating 'handle_async' to also remove
- // watchers) and (b) it's most likely that the file descriptor being
- // polled will be closed after the promise is discarded which will
- // invoke 'polled' which will then cause the watcher to be stopped
- // and deleted. Note that we needed to make Promise<T>::discard a
- // 'friend' which should be removed once we clean this up.
- future.onDiscard(lambda::bind(&process::internal::discarded<short>, future));
-
- ev_io* watcher = new ev_io();
- watcher->data = promise;
-
- ev_io_init(watcher, polled, fd, events);
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
-
- return future;
+ return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
}