You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:01 UTC
[04/30] mesos git commit: Used io::poll instead of libev for
receiving_connect.
Used io::poll instead of libev for receiving_connect.
Review: https://reviews.apache.org/r/27509
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bc23da1b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bc23da1b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bc23da1b
Branch: refs/heads/master
Commit: bc23da1b04aeda9d35ee2f9952fc82248b1ff313
Parents: 9f94d9a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:49:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 36 ++++++++------------------------
1 file changed, 9 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bc23da1b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bc2884b..a33a201 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -839,10 +839,8 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
}
-void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void receiving_connect(Socket* socket, int s)
{
- int s = watcher->fd;
-
// Now check that a successful connection was made.
int opt;
socklen_t optlen = sizeof(opt);
@@ -851,15 +849,9 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
// Connect failure.
VLOG(1) << "Socket error while connecting";
socket_manager->close(s);
- Socket* socket = (Socket*) watcher->data;
delete socket;
- ev_io_stop(loop, watcher);
- delete watcher;
} else {
// We're connected! Now let's do some receiving.
- Socket* socket = (Socket*) watcher->data;
- ev_io_stop(loop, watcher);
- delete watcher;
io::poll(s, io::READ)
.onAny(lambda::bind(&ignore_data, socket, s));
}
@@ -1567,15 +1559,12 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
persists[to.node] = s;
- // Allocate and initialize a watcher for reading data from this
- // socket. Note that we don't expect to receive anything other
- // than HTTP '202 Accepted' responses which we anyway ignore.
- // We do, however, want to react when it gets closed so we can
- // generate appropriate lost events (since this is a 'link').
- ev_io* watcher = new ev_io();
- watcher->data = new Socket(sockets[s]);
-
- // Try and connect to the node using this socket.
+ // Try and connect to the node using this socket in order to
+ // start reading data. Note that we don't expect to receive
+ // anything other than HTTP '202 Accepted' responses which we
+ // anyway ignore. We do, however, want to react when it gets
+ // closed so we can generate appropriate lost events (since this
+ // is a 'link').
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
@@ -1588,15 +1577,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
}
// Wait for socket to be connected.
- ev_io_init(watcher, receiving_connect, s, EV_WRITE);
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
} else {
io::poll(s, io::READ)
.onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));