You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/30 21:25:02 UTC
[2/2] qpid-dispatch git commit: DISPATCH-577: Server logs show
unexpected POLLNVAL errors.
DISPATCH-577: Server logs show unexpected POLLNVAL errors.
Track open/close state directly on pn_transport, remove input_done/output_done
flags which were not always consistent.
Mark "unknown poll events" as socket_error.
Don't put socket_error connectors back into poll.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6e6929c1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6e6929c1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6e6929c1
Branch: refs/heads/master
Commit: 6e6929c121097329c9d922e3ac6e83c285ef156c
Parents: 3f4bc69
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Nov 29 11:20:29 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 30 16:24:23 2016 -0500
----------------------------------------------------------------------
src/posix/driver.c | 148 +++++++++++++++++++-----------------------------
1 file changed, 58 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6e6929c1/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index cdfdb3b..f1b1f00 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -127,8 +127,6 @@ struct qdpn_connector_t {
bool pending_write;
bool socket_error;
bool closed;
- bool input_done;
- bool output_done;
};
ALLOC_DECLARE(qdpn_listener_t);
@@ -512,8 +510,6 @@ qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context
c->wakeup = 0;
c->connection = NULL;
c->transport = pn_transport();
- c->input_done = false;
- c->output_done = false;
c->context = context;
c->listener = NULL;
qdpn_driver_add_connector(driver, c);
@@ -693,96 +689,67 @@ static pn_timestamp_t qdpn_connector_tick(qdpn_connector_t *ctor, pn_timestamp_t
void qdpn_connector_process(qdpn_connector_t *c)
{
- if (c) {
- if (c->closed) return;
-
- pn_transport_t *transport = c->transport;
-
- ///
- /// Socket read
- ///
- if (!c->input_done) {
- ssize_t capacity = pn_transport_capacity(transport);
- if (capacity > 0) {
- c->status |= PN_SEL_RD;
- if (c->pending_read) {
- c->pending_read = false;
- ssize_t n = recv(c->fd, pn_transport_tail(transport), capacity, 0);
- if (n < 0) {
- if (errno != EAGAIN) {
- qdpn_log_errno(c->driver, "read");
- c->status &= ~PN_SEL_RD;
- c->input_done = true;
- pn_transport_close_tail( transport );
- }
- } else if (n == 0) {
- c->status &= ~PN_SEL_RD;
- c->input_done = true;
- pn_transport_close_tail( transport );
- } else {
- if (pn_transport_process(transport, (size_t) n) < 0) {
- c->status &= ~PN_SEL_RD;
- c->input_done = true;
- }
- }
- }
- }
-
- capacity = pn_transport_capacity(transport);
-
- if (capacity < 0) {
- c->status &= ~PN_SEL_RD;
- c->input_done = true;
- }
- }
+ if (!c || c->closed) return;
+ pn_transport_t *transport = c->transport;
- ///
- /// Event wakeup
- ///
- c->wakeup = qdpn_connector_tick(c, pn_i_now());
-
- ///
- /// Socket write
- ///
- if (!c->output_done) {
- ssize_t pending = pn_transport_pending(transport);
- if (pending > 0) {
- c->status |= PN_SEL_WR;
- if (c->pending_write) {
- c->pending_write = false;
- #ifdef MSG_NOSIGNAL
- ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL);
- #else
- ssize_t n = send(c->fd, pn_transport_head(transport), pending, 0);
- #endif
- if (n < 0) {
- // XXX
- if (errno != EAGAIN) {
- qdpn_log_errno(c->driver, "send");
- c->output_done = true;
- c->status &= ~PN_SEL_WR;
- pn_transport_close_head( transport );
- }
- } else if (n) {
- pn_transport_pop(transport, (size_t) n);
- }
+ ///
+ /// Socket read
+ ///
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ if (c->pending_read) {
+ c->pending_read = false;
+ ssize_t n = recv(c->fd, pn_transport_tail(transport), capacity, 0);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ qdpn_log_errno(c->driver, "read");
+ pn_transport_close_tail( transport );
}
- } else if (pending == 0) {
- c->status &= ~PN_SEL_WR;
+ } else if (n == 0) { /* HUP */
+ pn_transport_close_tail( transport );
} else {
- c->output_done = true;
- c->status &= ~PN_SEL_WR;
+ pn_transport_process(transport, (size_t) n);
}
- } else
- c->status &= ~PN_SEL_WR;
+ }
+ }
- // Closed?
+ ///
+ /// Event wakeup
+ ///
+ c->wakeup = qdpn_connector_tick(c, pn_i_now());
- if (c->input_done && c->output_done) {
- qd_log(c->driver->log, QD_LOG_TRACE, "Closed %s", c->name);
- qdpn_connector_close(c);
+ ///
+ /// Socket write
+ ///
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ if (c->pending_write) {
+ c->pending_write = false;
+#ifdef MSG_NOSIGNAL
+ ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL);
+#else
+ ssize_t n = send(c->fd, pn_transport_head(transport), pending, 0);
+#endif
+ if (n < 0) {
+ // XXX
+ if (errno != EAGAIN) {
+ qdpn_log_errno(c->driver, "send");
+ pn_transport_close_head( transport );
+ }
+ } else if (n) {
+ pn_transport_pop(transport, (size_t) n);
+ }
}
}
+
+ c->status = 0;
+ if (pn_transport_closed(transport)) {
+ qd_log(c->driver->log, QD_LOG_TRACE, "Closed %s", c->name);
+ qdpn_connector_close(c);
+ } else {
+ if (pn_transport_capacity(transport) > 0) c->status |= PN_SEL_RD;
+ if (pn_transport_pending(transport) > 0) c->status |= PN_SEL_WR;
+ }
}
// driver
@@ -875,7 +842,7 @@ static void qdpn_driver_rebuild(qdpn_driver_t *d)
qdpn_connector_t *c = DEQ_HEAD(d->connectors);
while (c) {
- if (!c->closed) {
+ if (!c->closed && !c->socket_error) {
d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
d->fds[d->nfds].fd = c->fd;
d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
@@ -937,9 +904,9 @@ int qdpn_driver_wait_3(qdpn_driver_t *d)
c->pending_read = (idx && d->fds[idx].revents & POLLIN);
c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
c->pending_tick = (c->wakeup && c->wakeup <= now);
- if (idx && d->fds[idx].revents & POLLERR)
+ if (idx && d->fds[idx].revents & POLLERR) {
c->socket_error = true;
- else if (idx && (d->fds[idx].revents & POLLHUP)) {
+ } else if (idx && (d->fds[idx].revents & POLLHUP)) {
qd_log(c->driver->log, QD_LOG_TRACE, "hangup on connector %s", c->name);
/* poll() is signalling POLLHUP. to see what happened we need
* to do an actual recv() to get the error code. But we might
@@ -951,7 +918,8 @@ int qdpn_driver_wait_3(qdpn_driver_t *d)
c->pending_write = true;
} else if (idx && (d->fds[idx].revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP))) {
qd_log(c->driver->log, QD_LOG_ERROR, "Unexpected poll events: %04x on %s",
- d->fds[idx].revents, c->name);
+ d->fds[idx].revents, c->name);
+ c->socket_error = true;
}
}
c = DEQ_NEXT(c);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org