You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:01 UTC

[04/30] mesos git commit: Used io::poll instead of libev for receiving_connect.

Used io::poll instead of libev for receiving_connect.

Review: https://reviews.apache.org/r/27509


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bc23da1b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bc23da1b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bc23da1b

Branch: refs/heads/master
Commit: bc23da1b04aeda9d35ee2f9952fc82248b1ff313
Parents: 9f94d9a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:49:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 36 ++++++++------------------------
 1 file changed, 9 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bc23da1b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bc2884b..a33a201 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -839,10 +839,8 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
-void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void receiving_connect(Socket* socket, int s)
 {
-  int s = watcher->fd;
-
   // Now check that a successful connection was made.
   int opt;
   socklen_t optlen = sizeof(opt);
@@ -851,15 +849,9 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
     socket_manager->close(s);
-    Socket* socket = (Socket*) watcher->data;
     delete socket;
-    ev_io_stop(loop, watcher);
-    delete watcher;
   } else {
     // We're connected! Now let's do some receiving.
-    Socket* socket = (Socket*) watcher->data;
-    ev_io_stop(loop, watcher);
-    delete watcher;
     io::poll(s, io::READ)
       .onAny(lambda::bind(&ignore_data, socket, s));
   }
@@ -1567,15 +1559,12 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
       persists[to.node] = s;
 
-      // Allocate and initialize a watcher for reading data from this
-      // socket. Note that we don't expect to receive anything other
-      // than HTTP '202 Accepted' responses which we anyway ignore.
-      // We do, however, want to react when it gets closed so we can
-      // generate appropriate lost events (since this is a 'link').
-      ev_io* watcher = new ev_io();
-      watcher->data = new Socket(sockets[s]);
-
-      // Try and connect to the node using this socket.
+      // Try and connect to the node using this socket in order to
+      // start reading data. Note that we don't expect to receive
+      // anything other than HTTP '202 Accepted' responses which we
+      // anyway ignore.  We do, however, want to react when it gets
+      // closed so we can generate appropriate lost events (since this
+      // is a 'link').
       sockaddr_in addr;
       memset(&addr, 0, sizeof(addr));
       addr.sin_family = PF_INET;
@@ -1588,15 +1577,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
         }
 
         // Wait for socket to be connected.
-        ev_io_init(watcher, receiving_connect, s, EV_WRITE);
-
-        // Enqueue the watcher.
-        synchronized (watchers) {
-          watchers->push(watcher);
-        }
-
-        // Interrupt the loop.
-        ev_async_send(loop, &async_watcher);
+        io::poll(s, io::WRITE)
+          .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
       } else {
         io::poll(s, io::READ)
           .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));