You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:10 UTC

[13/30] mesos git commit: Moved event loop specific polling into poll.cpp.

Moved event loop specific polling into poll.cpp.

Review: https://reviews.apache.org/r/27505


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/413ce94f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/413ce94f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/413ce94f

Branch: refs/heads/master
Commit: 413ce94f8d74b8c29657eef7dbc2f6ade4143bc7
Parents: f78ae66
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:22:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |   1 +
 3rdparty/libprocess/src/poll.cpp    | 129 +++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 112 ---------------------------
 3 files changed, 130 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 2de9989..41c3bd1 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES =		\
   src/libev.cpp			\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
+  src/poll.cpp			\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll.cpp b/3rdparty/libprocess/src/poll.cpp
new file mode 100644
index 0000000..324e4dd
--- /dev/null
+++ b/3rdparty/libprocess/src/poll.cpp
@@ -0,0 +1,129 @@
+#include <ev.h>
+
+#include <process/future.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+  Poll()
+  {
+    // Need to explicitly instantiate the watchers.
+    watcher.io.reset(new ev_io());
+    watcher.async.reset(new ev_async());
+  }
+
+  // An I/O watcher for checking for readability or writeability and
+  // an async watcher for being able to discard the polling.
+  struct {
+    memory::shared_ptr<ev_io> io;
+    memory::shared_ptr<ev_async> async;
+  } watcher;
+
+  Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
+void polled(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  // Stop the async watcher (also clears if pending so 'discard_poll'
+  // will not get invoked and we can delete 'poll' here).
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  poll->promise.set(revents);
+
+  delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  // Check and see if we have a pending 'polled' callback and if so
+  // let it "win".
+  if (ev_is_pending(poll->watcher.io.get())) {
+    return;
+  }
+
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  // Stop the I/O watcher (but note we check if pending above) so it
+  // won't get invoked and we can delete 'poll' here.
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  poll->promise.discard();
+
+  delete poll;
+}
+
+
+namespace io {
+namespace internal {
+
+// Helper/continuation of 'poll' on future discard.
+void _poll(const memory::shared_ptr<ev_async>& async)
+{
+  ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int fd, short events)
+{
+  Poll* poll = new Poll();
+
+  // Have the watchers data point back to the struct.
+  poll->watcher.async->data = poll;
+  poll->watcher.io->data = poll;
+
+  // Get a copy of the future to avoid any races with the event loop.
+  Future<short> future = poll->promise.future();
+
+  // Initialize and start the async watcher.
+  ev_async_init(poll->watcher.async.get(), discard_poll);
+  ev_async_start(loop, poll->watcher.async.get());
+
+  // Make sure we stop polling if a discard occurs on our future.
+  // Note that it's possible that we'll invoke '_poll' when someone
+  // does a discard even after the polling has already completed, but
+  // in this case while we will interrupt the event loop since the
+  // async watcher has already been stopped we won't cause
+  // 'discard_poll' to get invoked.
+  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+  // Initialize and start the I/O watcher.
+  ev_io_init(poll->watcher.io.get(), polled, fd, events);
+  ev_io_start(loop, poll->watcher.io.get());
+
+  return future;
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int fd, short events)
+{
+  process::initialize();
+
+  // TODO(benh): Check if the file descriptor is non-blocking?
+
+  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
+}
+
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d96d881..1fc8874 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -927,69 +927,6 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
-// Data necessary for polling so we can discard polling and actually
-// stop it in the event loop.
-struct Poll
-{
-  Poll()
-  {
-    // Need to explicitly instantiate the watchers.
-    watcher.io.reset(new ev_io());
-    watcher.async.reset(new ev_async());
-  }
-
-  // An I/O watcher for checking for readability or writeability and
-  // an async watcher for being able to discard the polling.
-  struct {
-    memory::shared_ptr<ev_io> io;
-    memory::shared_ptr<ev_async> async;
-  } watcher;
-
-  Promise<short> promise;
-};
-
-
-// Event loop callback when I/O is ready on polling file descriptor.
-void polled(struct ev_loop* loop, ev_io* watcher, int revents)
-{
-  Poll* poll = (Poll*) watcher->data;
-
-  ev_io_stop(loop, poll->watcher.io.get());
-
-  // Stop the async watcher (also clears if pending so 'discard_poll'
-  // will not get invoked and we can delete 'poll' here).
-  ev_async_stop(loop, poll->watcher.async.get());
-
-  poll->promise.set(revents);
-
-  delete poll;
-}
-
-
-// Event loop callback when future associated with polling file
-// descriptor has been discarded.
-void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
-{
-  Poll* poll = (Poll*) watcher->data;
-
-  // Check and see if we have a pending 'polled' callback and if so
-  // let it "win".
-  if (ev_is_pending(poll->watcher.io.get())) {
-    return;
-  }
-
-  ev_async_stop(loop, poll->watcher.async.get());
-
-  // Stop the I/O watcher (but note we check if pending above) so it
-  // won't get invoked and we can delete 'poll' here.
-  ev_io_stop(loop, poll->watcher.io.get());
-
-  poll->promise.discard();
-
-  delete poll;
-}
-
-
 void* serve(void* arg)
 {
   ev_loop(((struct ev_loop*) arg), 0);
@@ -3179,47 +3116,8 @@ void post(const UPID& from,
 
 
 namespace io {
-
 namespace internal {
 
-// Helper/continuation of 'poll' on future discard.
-void _poll(const memory::shared_ptr<ev_async>& async)
-{
-  ev_async_send(loop, async.get());
-}
-
-
-Future<short> poll(int fd, short events)
-{
-  Poll* poll = new Poll();
-
-  // Have the watchers data point back to the struct.
-  poll->watcher.async->data = poll;
-  poll->watcher.io->data = poll;
-
-  // Get a copy of the future to avoid any races with the event loop.
-  Future<short> future = poll->promise.future();
-
-  // Initialize and start the async watcher.
-  ev_async_init(poll->watcher.async.get(), discard_poll);
-  ev_async_start(loop, poll->watcher.async.get());
-
-  // Make sure we stop polling if a discard occurs on our future.
-  // Note that it's possible that we'll invoke '_poll' when someone
-  // does a discard even after the polling has already completed, but
-  // in this case while we will interrupt the event loop since the
-  // async watcher has already been stopped we won't cause
-  // 'discard_poll' to get invoked.
-  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
-
-  // Initialize and start the I/O watcher.
-  ev_io_init(poll->watcher.io.get(), polled, fd, events);
-  ev_io_start(loop, poll->watcher.io.get());
-
-  return future;
-}
-
-
 void read(
     int fd,
     void* data,
@@ -3366,16 +3264,6 @@ void write(
 } // namespace internal {
 
 
-Future<short> poll(int fd, short events)
-{
-  process::initialize();
-
-  // TODO(benh): Check if the file descriptor is non-blocking?
-
-  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
-}
-
-
 Future<size_t> read(int fd, void* data, size_t size)
 {
   process::initialize();