You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:39:06 UTC
[09/30] mesos git commit: Used io::poll instead of libev for
ignore_data.
Used io::poll instead of libev for ignore_data.
Review: https://reviews.apache.org/r/27508
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f94d9a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f94d9a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f94d9a8
Branch: refs/heads/master
Commit: 9f94d9a8a42352345bffb0f1be0253952731b316
Parents: 4751167
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:43:29 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 54 ++++++++++++++------------------
1 file changed, 23 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f94d9a8/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 528cb88..bc2884b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -644,12 +644,8 @@ void recv_data(DataDecoder* decoder, int s)
// SocketManager::send(Message) where we don't care about the data
// received we mostly just want to know when the socket has been
// closed.
-void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void ignore_data(Socket* socket, int s)
{
- Socket* socket = (Socket*) watcher->data;
-
- int s = watcher->fd;
-
while (true) {
const ssize_t size = 80 * 1024;
ssize_t length = 0;
@@ -663,6 +659,8 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
continue;
} else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Might block, try again later.
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, socket, s));
break;
} else if (length <= 0) {
// Socket error or closed.
@@ -673,9 +671,7 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
VLOG(2) << "Socket closed while receiving";
}
socket_manager->close(s);
- ev_io_stop(loop, watcher);
delete socket;
- delete watcher;
break;
} else {
VLOG(2) << "Ignoring " << length << " bytes of data received "
@@ -861,9 +857,11 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
delete watcher;
} else {
// We're connected! Now let's do some receiving.
+ Socket* socket = (Socket*) watcher->data;
ev_io_stop(loop, watcher);
- ev_io_init(watcher, ignore_data, s, EV_READ);
- ev_io_start(loop, watcher);
+ delete watcher;
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, socket, s));
}
}
@@ -1591,17 +1589,18 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
// Wait for socket to be connected.
ev_io_init(watcher, receiving_connect, s, EV_WRITE);
- } else {
- ev_io_init(watcher, ignore_data, s, EV_READ);
- }
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
+ // Enqueue the watcher.
+ synchronized (watchers) {
+ watchers->push(watcher);
+ }
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
+ // Interrupt the loop.
+ ev_async_send(loop, &async_watcher);
+ } else {
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
+ }
}
links[to].insert(process);
@@ -1741,21 +1740,14 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
- // Allocate and initialize a watcher for reading data from this
- // socket. Note that we don't expect to receive anything other
- // than HTTP '202 Accepted' responses which we anyway ignore.
- ev_io* watcher = new ev_io();
- watcher->data = new Socket(sockets[s]);
-
- ev_io_init(watcher, ignore_data, s, EV_READ);
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
+ // Read and ignore data from this socket. Note that we don't
+ // expect to receive anything other than HTTP '202 Accepted'
+ // responses which we just ignore.
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
// Allocate and initialize a watcher for sending the message.
- watcher = new ev_io();
+ ev_io* watcher = new ev_io();
watcher->data = new MessageEncoder(sockets[s], message);
// Try and connect to the node using this socket.