You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:10 UTC
[13/30] mesos git commit: Moved event loop specific polling into
poll.cpp.
Moved event loop specific polling into poll.cpp.
Review: https://reviews.apache.org/r/27505
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/413ce94f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/413ce94f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/413ce94f
Branch: refs/heads/master
Commit: 413ce94f8d74b8c29657eef7dbc2f6ade4143bc7
Parents: f78ae66
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:22:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/poll.cpp | 129 +++++++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 112 ---------------------------
3 files changed, 130 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 2de9989..41c3bd1 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES = \
src/libev.cpp \
src/metrics/metrics.cpp \
src/pid.cpp \
+ src/poll.cpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll.cpp b/3rdparty/libprocess/src/poll.cpp
new file mode 100644
index 0000000..324e4dd
--- /dev/null
+++ b/3rdparty/libprocess/src/poll.cpp
@@ -0,0 +1,129 @@
+#include <ev.h>
+
+#include <process/future.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+ Poll()
+ {
+ // Need to explicitly instantiate the watchers.
+ watcher.io.reset(new ev_io());
+ watcher.async.reset(new ev_async());
+ }
+
+ // An I/O watcher for checking for readability or writeability and
+ // an async watcher for being able to discard the polling.
+ struct {
+ memory::shared_ptr<ev_io> io;
+ memory::shared_ptr<ev_async> async;
+ } watcher;
+
+ Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
+void polled(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ Poll* poll = (Poll*) watcher->data;
+
+ ev_io_stop(loop, poll->watcher.io.get());
+
+ // Stop the async watcher (also clears if pending so 'discard_poll'
+ // will not get invoked and we can delete 'poll' here).
+ ev_async_stop(loop, poll->watcher.async.get());
+
+ poll->promise.set(revents);
+
+ delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+ Poll* poll = (Poll*) watcher->data;
+
+ // Check and see if we have a pending 'polled' callback and if so
+ // let it "win".
+ if (ev_is_pending(poll->watcher.io.get())) {
+ return;
+ }
+
+ ev_async_stop(loop, poll->watcher.async.get());
+
+ // Stop the I/O watcher (but note we check if pending above) so it
+ // won't get invoked and we can delete 'poll' here.
+ ev_io_stop(loop, poll->watcher.io.get());
+
+ poll->promise.discard();
+
+ delete poll;
+}
+
+
+namespace io {
+namespace internal {
+
+// Helper/continuation of 'poll' on future discard.
+void _poll(const memory::shared_ptr<ev_async>& async)
+{
+ ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int fd, short events)
+{
+ Poll* poll = new Poll();
+
+ // Have the watchers data point back to the struct.
+ poll->watcher.async->data = poll;
+ poll->watcher.io->data = poll;
+
+ // Get a copy of the future to avoid any races with the event loop.
+ Future<short> future = poll->promise.future();
+
+ // Initialize and start the async watcher.
+ ev_async_init(poll->watcher.async.get(), discard_poll);
+ ev_async_start(loop, poll->watcher.async.get());
+
+ // Make sure we stop polling if a discard occurs on our future.
+ // Note that it's possible that we'll invoke '_poll' when someone
+ // does a discard even after the polling has already completed, but
+ // in this case while we will interrupt the event loop since the
+ // async watcher has already been stopped we won't cause
+ // 'discard_poll' to get invoked.
+ future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+ // Initialize and start the I/O watcher.
+ ev_io_init(poll->watcher.io.get(), polled, fd, events);
+ ev_io_start(loop, poll->watcher.io.get());
+
+ return future;
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int fd, short events)
+{
+ process::initialize();
+
+ // TODO(benh): Check if the file descriptor is non-blocking?
+
+ return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
+}
+
+} // namespace io {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d96d881..1fc8874 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -927,69 +927,6 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
}
-// Data necessary for polling so we can discard polling and actually
-// stop it in the event loop.
-struct Poll
-{
- Poll()
- {
- // Need to explicitly instantiate the watchers.
- watcher.io.reset(new ev_io());
- watcher.async.reset(new ev_async());
- }
-
- // An I/O watcher for checking for readability or writeability and
- // an async watcher for being able to discard the polling.
- struct {
- memory::shared_ptr<ev_io> io;
- memory::shared_ptr<ev_async> async;
- } watcher;
-
- Promise<short> promise;
-};
-
-
-// Event loop callback when I/O is ready on polling file descriptor.
-void polled(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- Poll* poll = (Poll*) watcher->data;
-
- ev_io_stop(loop, poll->watcher.io.get());
-
- // Stop the async watcher (also clears if pending so 'discard_poll'
- // will not get invoked and we can delete 'poll' here).
- ev_async_stop(loop, poll->watcher.async.get());
-
- poll->promise.set(revents);
-
- delete poll;
-}
-
-
-// Event loop callback when future associated with polling file
-// descriptor has been discarded.
-void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
-{
- Poll* poll = (Poll*) watcher->data;
-
- // Check and see if we have a pending 'polled' callback and if so
- // let it "win".
- if (ev_is_pending(poll->watcher.io.get())) {
- return;
- }
-
- ev_async_stop(loop, poll->watcher.async.get());
-
- // Stop the I/O watcher (but note we check if pending above) so it
- // won't get invoked and we can delete 'poll' here.
- ev_io_stop(loop, poll->watcher.io.get());
-
- poll->promise.discard();
-
- delete poll;
-}
-
-
void* serve(void* arg)
{
ev_loop(((struct ev_loop*) arg), 0);
@@ -3179,47 +3116,8 @@ void post(const UPID& from,
namespace io {
-
namespace internal {
-// Helper/continuation of 'poll' on future discard.
-void _poll(const memory::shared_ptr<ev_async>& async)
-{
- ev_async_send(loop, async.get());
-}
-
-
-Future<short> poll(int fd, short events)
-{
- Poll* poll = new Poll();
-
- // Have the watchers data point back to the struct.
- poll->watcher.async->data = poll;
- poll->watcher.io->data = poll;
-
- // Get a copy of the future to avoid any races with the event loop.
- Future<short> future = poll->promise.future();
-
- // Initialize and start the async watcher.
- ev_async_init(poll->watcher.async.get(), discard_poll);
- ev_async_start(loop, poll->watcher.async.get());
-
- // Make sure we stop polling if a discard occurs on our future.
- // Note that it's possible that we'll invoke '_poll' when someone
- // does a discard even after the polling has already completed, but
- // in this case while we will interrupt the event loop since the
- // async watcher has already been stopped we won't cause
- // 'discard_poll' to get invoked.
- future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
-
- // Initialize and start the I/O watcher.
- ev_io_init(poll->watcher.io.get(), polled, fd, events);
- ev_io_start(loop, poll->watcher.io.get());
-
- return future;
-}
-
-
void read(
int fd,
void* data,
@@ -3366,16 +3264,6 @@ void write(
} // namespace internal {
-Future<short> poll(int fd, short events)
-{
- process::initialize();
-
- // TODO(benh): Check if the file descriptor is non-blocking?
-
- return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
-}
-
-
Future<size_t> read(int fd, void* data, size_t size)
{
process::initialize();