You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:06 UTC

[09/30] mesos git commit: Used io::poll instead of libev for ignore_data.

Used io::poll instead of libev for ignore_data.

Review: https://reviews.apache.org/r/27508


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f94d9a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f94d9a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f94d9a8

Branch: refs/heads/master
Commit: 9f94d9a8a42352345bffb0f1be0253952731b316
Parents: 4751167
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:43:29 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 54 ++++++++++++++------------------
 1 file changed, 23 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f94d9a8/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 528cb88..bc2884b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -644,12 +644,8 @@ void recv_data(DataDecoder* decoder, int s)
 // SocketManager::send(Message) where we don't care about the data
 // received we mostly just want to know when the socket has been
 // closed.
-void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void ignore_data(Socket* socket, int s)
 {
-  Socket* socket = (Socket*) watcher->data;
-
-  int s = watcher->fd;
-
   while (true) {
     const ssize_t size = 80 * 1024;
     ssize_t length = 0;
@@ -663,6 +659,8 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
       continue;
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
+      io::poll(s, io::READ)
+        .onAny(lambda::bind(&ignore_data, socket, s));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -673,9 +671,7 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
         VLOG(2) << "Socket closed while receiving";
       }
       socket_manager->close(s);
-      ev_io_stop(loop, watcher);
       delete socket;
-      delete watcher;
       break;
     } else {
       VLOG(2) << "Ignoring " << length << " bytes of data received "
@@ -861,9 +857,11 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
     delete watcher;
   } else {
     // We're connected! Now let's do some receiving.
+    Socket* socket = (Socket*) watcher->data;
     ev_io_stop(loop, watcher);
-    ev_io_init(watcher, ignore_data, s, EV_READ);
-    ev_io_start(loop, watcher);
+    delete watcher;
+    io::poll(s, io::READ)
+      .onAny(lambda::bind(&ignore_data, socket, s));
   }
 }
 
@@ -1591,17 +1589,18 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
         // Wait for socket to be connected.
         ev_io_init(watcher, receiving_connect, s, EV_WRITE);
-      } else {
-        ev_io_init(watcher, ignore_data, s, EV_READ);
-      }
 
-      // Enqueue the watcher.
-      synchronized (watchers) {
-        watchers->push(watcher);
-      }
+        // Enqueue the watcher.
+        synchronized (watchers) {
+          watchers->push(watcher);
+        }
 
-      // Interrupt the loop.
-      ev_async_send(loop, &async_watcher);
+        // Interrupt the loop.
+        ev_async_send(loop, &async_watcher);
+      } else {
+        io::poll(s, io::READ)
+          .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
+      }
     }
 
     links[to].insert(process);
@@ -1741,21 +1740,14 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      // Allocate and initialize a watcher for reading data from this
-      // socket. Note that we don't expect to receive anything other
-      // than HTTP '202 Accepted' responses which we anyway ignore.
-      ev_io* watcher = new ev_io();
-      watcher->data = new Socket(sockets[s]);
-
-      ev_io_init(watcher, ignore_data, s, EV_READ);
-
-      // Enqueue the watcher.
-      synchronized (watchers) {
-        watchers->push(watcher);
-      }
+      // Read and ignore data from this socket. Note that we don't
+      // expect to receive anything other than HTTP '202 Accepted'
+      // responses which we just ignore.
+      io::poll(s, io::READ)
+        .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
 
       // Allocate and initialize a watcher for sending the message.
-      watcher = new ev_io();
+      ev_io* watcher = new ev_io();
       watcher->data = new MessageEncoder(sockets[s], message);
 
       // Try and connect to the node using this socket.