You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/30 21:25:02 UTC

[2/2] qpid-dispatch git commit: DISPATCH-577: Server logs show unexpected POLLNVAL errors.

DISPATCH-577: Server logs show unexpected POLLNVAL errors.

Track open/close state directly on pn_transport, remove input_done/output_done
flags which were not always consistent.
Mark "unknown poll events" as socket_error.
Don't put socket_error connectors back into poll.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6e6929c1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6e6929c1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6e6929c1

Branch: refs/heads/master
Commit: 6e6929c121097329c9d922e3ac6e83c285ef156c
Parents: 3f4bc69
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Nov 29 11:20:29 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 30 16:24:23 2016 -0500

----------------------------------------------------------------------
 src/posix/driver.c | 148 +++++++++++++++++++-----------------------------
 1 file changed, 58 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6e6929c1/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index cdfdb3b..f1b1f00 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -127,8 +127,6 @@ struct qdpn_connector_t {
     bool pending_write;
     bool socket_error;
     bool closed;
-    bool input_done;
-    bool output_done;
 };
 
 ALLOC_DECLARE(qdpn_listener_t);
@@ -512,8 +510,6 @@ qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context
     c->wakeup = 0;
     c->connection = NULL;
     c->transport = pn_transport();
-    c->input_done = false;
-    c->output_done = false;
     c->context = context;
     c->listener = NULL;
     qdpn_driver_add_connector(driver, c);
@@ -693,96 +689,67 @@ static pn_timestamp_t qdpn_connector_tick(qdpn_connector_t *ctor, pn_timestamp_t
 
 void qdpn_connector_process(qdpn_connector_t *c)
 {
-    if (c) {
-        if (c->closed) return;
-
-        pn_transport_t *transport = c->transport;
-
-        ///
-        /// Socket read
-        ///
-        if (!c->input_done) {
-            ssize_t capacity = pn_transport_capacity(transport);
-            if (capacity > 0) {
-                c->status |= PN_SEL_RD;
-                if (c->pending_read) {
-                    c->pending_read = false;
-                    ssize_t n =  recv(c->fd, pn_transport_tail(transport), capacity, 0);
-                    if (n < 0) {
-                        if (errno != EAGAIN) {
-                            qdpn_log_errno(c->driver, "read");
-                            c->status &= ~PN_SEL_RD;
-                            c->input_done = true;
-                            pn_transport_close_tail( transport );
-                        }
-                    } else if (n == 0) {
-                        c->status &= ~PN_SEL_RD;
-                        c->input_done = true;
-                        pn_transport_close_tail( transport );
-                    } else {
-                        if (pn_transport_process(transport, (size_t) n) < 0) {
-                            c->status &= ~PN_SEL_RD;
-                            c->input_done = true;
-                        }
-                    }
-                }
-            }
-
-            capacity = pn_transport_capacity(transport);
-
-            if (capacity < 0) {
-                c->status &= ~PN_SEL_RD;
-                c->input_done = true;
-            }
-        }
+    if (!c || c->closed) return;
+    pn_transport_t *transport = c->transport;
 
-        ///
-        /// Event wakeup
-        ///
-        c->wakeup = qdpn_connector_tick(c, pn_i_now());
-
-        ///
-        /// Socket write
-        ///
-        if (!c->output_done) {
-            ssize_t pending = pn_transport_pending(transport);
-            if (pending > 0) {
-                c->status |= PN_SEL_WR;
-                if (c->pending_write) {
-                    c->pending_write = false;
-                    #ifdef MSG_NOSIGNAL
-                    ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL);
-                    #else
-                    ssize_t n = send(c->fd, pn_transport_head(transport), pending, 0);
-                    #endif
-                    if (n < 0) {
-                        // XXX
-                        if (errno != EAGAIN) {
-                            qdpn_log_errno(c->driver, "send");
-                            c->output_done = true;
-                            c->status &= ~PN_SEL_WR;
-                            pn_transport_close_head( transport );
-                        }
-                    } else if (n) {
-                        pn_transport_pop(transport, (size_t) n);
-                    }
+    ///
+    /// Socket read
+    ///
+    ssize_t capacity = pn_transport_capacity(transport);
+    if (capacity > 0) {
+        if (c->pending_read) {
+            c->pending_read = false;
+            ssize_t n =  recv(c->fd, pn_transport_tail(transport), capacity, 0);
+            if (n < 0) {
+                if (errno != EAGAIN) {
+                    qdpn_log_errno(c->driver, "read");
+                    pn_transport_close_tail( transport );
                 }
-            } else if (pending == 0) {
-                c->status &= ~PN_SEL_WR;
+            } else if (n == 0) { /* HUP */
+                pn_transport_close_tail( transport );
             } else {
-                c->output_done = true;
-                c->status &= ~PN_SEL_WR;
+                pn_transport_process(transport, (size_t) n);
             }
-        } else
-            c->status &= ~PN_SEL_WR;
+        }
+    }
 
-        // Closed?
+    ///
+    /// Event wakeup
+    ///
+    c->wakeup = qdpn_connector_tick(c, pn_i_now());
 
-        if (c->input_done && c->output_done) {
-            qd_log(c->driver->log, QD_LOG_TRACE, "Closed %s", c->name);
-            qdpn_connector_close(c);
+    ///
+    /// Socket write
+    ///
+    ssize_t pending = pn_transport_pending(transport);
+    if (pending > 0) {
+        if (c->pending_write) {
+            c->pending_write = false;
+#ifdef MSG_NOSIGNAL
+            ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL);
+#else
+            ssize_t n = send(c->fd, pn_transport_head(transport), pending, 0);
+#endif
+            if (n < 0) {
+                // XXX
+                if (errno != EAGAIN) {
+                    qdpn_log_errno(c->driver, "send");
+                    pn_transport_close_head( transport );
+                }
+            } else if (n) {
+                pn_transport_pop(transport, (size_t) n);
+            }
         }
     }
+
+    c->status = 0;
+    if (pn_transport_closed(transport)) {
+        qd_log(c->driver->log, QD_LOG_TRACE, "Closed %s", c->name);
+        qdpn_connector_close(c);
+    } else {
+        if (pn_transport_capacity(transport) > 0) c->status |= PN_SEL_RD;
+        if (pn_transport_pending(transport) > 0) c->status |= PN_SEL_WR;
+    }
 }
 
 // driver
@@ -875,7 +842,7 @@ static void qdpn_driver_rebuild(qdpn_driver_t *d)
 
     qdpn_connector_t *c = DEQ_HEAD(d->connectors);
     while (c) {
-        if (!c->closed) {
+        if (!c->closed && !c->socket_error) {
             d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
             d->fds[d->nfds].fd = c->fd;
             d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
@@ -937,9 +904,9 @@ int qdpn_driver_wait_3(qdpn_driver_t *d)
             c->pending_read = (idx && d->fds[idx].revents & POLLIN);
             c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
             c->pending_tick = (c->wakeup &&  c->wakeup <= now);
-            if (idx && d->fds[idx].revents & POLLERR)
+            if (idx && d->fds[idx].revents & POLLERR) {
                 c->socket_error = true;
-            else if (idx && (d->fds[idx].revents & POLLHUP)) {
+            } else if (idx && (d->fds[idx].revents & POLLHUP)) {
                 qd_log(c->driver->log, QD_LOG_TRACE, "hangup on connector %s", c->name);
                 /* poll() is signalling POLLHUP. to see what happened we need
                  * to do an actual recv() to get the error code. But we might
@@ -951,7 +918,8 @@ int qdpn_driver_wait_3(qdpn_driver_t *d)
                     c->pending_write = true;
             } else if (idx && (d->fds[idx].revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP))) {
                 qd_log(c->driver->log, QD_LOG_ERROR, "Unexpected poll events: %04x on %s",
-                          d->fds[idx].revents, c->name);
+                       d->fds[idx].revents, c->name);
+                c->socket_error = true;
             }
         }
         c = DEQ_NEXT(c);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org