You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/12/07 17:16:43 UTC

qpid-dispatch git commit: DISPATCH-103: Router serves standalone console files, stability fixes.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 4ae3c161d -> 4fb102058


DISPATCH-103: Router serves standalone console files, stability fixes.

The router now serves the standalone console files so you can connect a browser
direct to a HTTP-enabled router port to use the console.

- requires libwebsockets 2.1 - packaged on fedora 25, build from source
  elsewhere.
- router must be installed `make install` to find the console files.

Fixed some stability and performance issues, currently appears to be stable and
reasonably fast.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/4fb10205
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/4fb10205
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/4fb10205

Branch: refs/heads/master
Commit: 4fb102058f0695e357859f281ac9723b519ad1c3
Parents: 4ae3c16
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Dec 5 17:04:38 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Dec 7 12:12:53 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt           |   5 +
 console/CMakeLists.txt   |   5 +-
 src/config.h.in          |   1 +
 src/http-libwebsockets.c | 297 ++++++++++++++++++++++++++----------------
 src/posix/driver.c       |  11 +-
 src/server.c             |   3 +-
 6 files changed, 205 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b0c1f45..51bbc17 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -70,6 +70,11 @@ set(DOC_INSTALL_DIR ${SHARE_INSTALL_DIR}/doc CACHE PATH "Documentation directory
 set(QD_DOC_INSTALL_DIR ${SHARE_INSTALL_DIR}/doc/qpid-dispatch CACHE PATH "Qpid dispatch documentation directory")
 set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory")
 set(QPID_DISPATCH_HOME_INSTALLED ${CMAKE_INSTALL_PREFIX}/${QPID_DISPATCH_HOME})
+
+set(CONSOLE_BASE_INSTALL_DIR "share/qpid-dispatch/console")
+set(CONSOLE_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/${CONSOLE_BASE_INSTALL_DIR}")
+set(CONSOLE_STAND_ALONE_INSTALL_DIR "${CONSOLE_INSTALL_DIR}/stand-alone")
+
 set(RUN ${PYTHON_EXECUTABLE} ${CMAKE_BINARY_DIR}/run.py)
 
 # define the configuration directory based on whether or not the install prefix is defined

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/console/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/console/CMakeLists.txt b/console/CMakeLists.txt
index e64f518..7ff9333 100644
--- a/console/CMakeLists.txt
+++ b/console/CMakeLists.txt
@@ -17,9 +17,7 @@
 ## under the License.
 ##
 
-set(CONSOLE_BASE_INSTALL_DIR "share/qpid-dispatch/console")
 set(CONSOLE_BASE_SOURCE_DIR "${CMAKE_SOURCE_DIR}/console/stand-alone/")
-set(CONSOLE_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/${CONSOLE_BASE_INSTALL_DIR}")
 
 ##
 ## Add option to not install the stand-alone console
@@ -29,7 +27,8 @@ if(CONSOLE_INSTALL)
 
 	# Static console files
 	install(
-		DIRECTORY ${CONSOLE_BASE_SOURCE_DIR} DESTINATION ${CONSOLE_INSTALL_DIR}/stand-alone
+	  DIRECTORY ${CONSOLE_BASE_SOURCE_DIR}
+          DESTINATION ${CONSOLE_STAND_ALONE_INSTALL_DIR}
     )
 
 endif(CONSOLE_INSTALL)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/config.h.in
----------------------------------------------------------------------
diff --git a/src/config.h.in b/src/config.h.in
index 0e0c23c..74a38fb 100644
--- a/src/config.h.in
+++ b/src/config.h.in
@@ -19,5 +19,6 @@
 
 #define QPID_DISPATCH_VERSION "${QPID_DISPATCH_VERSION}"
 #define QPID_DISPATCH_LIB "${QPID_DISPATCH_LIB}"
+#define QPID_CONSOLE_STAND_ALONE_INSTALL_DIR "${CONSOLE_STAND_ALONE_INSTALL_DIR}"
 #cmakedefine01 USE_MEMORY_POOL
 #cmakedefine01 QD_MEMORY_STATS

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index b33a0d5..c456b07 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -30,51 +30,59 @@
 #include <errno.h>
 
 #include "http.h"
+#include "config.h"
 
+/* Shared context for all HTTP connections.  */
 struct qd_http_t {
     sys_mutex_t *lock;
     qd_dispatch_t *dispatch;
     qd_log_source_t *log;
     struct lws_context *context;
     qd_timer_t *timer;
+    qdpn_connector_t **connectors; /* Indexed by file descriptor */
+    size_t connectors_len;
 };
 
-/* TODO aconway 2016-11-29: First cut serializes all access to libwebsockets.
- * LWS does have multi-thread facilities but it segregates file descriptors into
- * "serialization groups" which does not match well with dispatches current
- * and planned future threading strategies. Review when we refactor dispatch
- * to use the pn_proactor. At least 2 possibilities:
- *
- * - treat LWS as single-threaded IO code in the 'leader follower' model,
- *   analogous to how we handle libuv.
- * - work with LWS upstream to abstract out IO code so each LWS WSI can operate
- *   as a thread-independent unit, like the proton connection_driver.
- */
+static inline qdpn_connector_t *fd_connector(qd_http_t *h, int fd) {
+    return (fd < h->connectors_len) ? h->connectors[fd] : NULL;
+}
+
+static inline qd_http_t *wsi_http(struct lws *wsi) {
+    return (qd_http_t *)lws_context_user(lws_get_context(wsi));
+}
 
-static __thread struct {
-    qdpn_connector_t *connector; /* Set before each lws_service call */
-} per_thread = { NULL };
+static inline qdpn_connector_t *wsi_connector(struct lws *wsi) {
+    return fd_connector(wsi_http(wsi), lws_get_socket_fd(wsi));
+}
 
-typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t;
+static inline int set_fd(qd_http_t *h, int fd, qdpn_connector_t *c) {
+    if (fd >= h->connectors_len) {
+        size_t len = h->connectors_len;
+        h->connectors_len = (fd+1)*2;
+        h->connectors = realloc(h->connectors, h->connectors_len*sizeof(qdpn_connector_t*));
+        if (!h->connectors) return -1;
+        memset(h->connectors + len, 0, h->connectors_len - len);
+    }
+    h->connectors[fd] = c;
+    return 0;
+}
 
-/* Extra buffering per connection, stored in the lws_wsi_user() space. */
-typedef struct buffers_t {
-    buffer_t wtmp;  /* Temp buffer with pre-data header space required by LWS */
-    buffer_t over;  /* Can't control LWS read size, buffer the overflow */
-    char name[256]; /* Copy of connector name for use after connector detached */
-} buffers_t;
-
-static void resize(buffer_t *b, size_t size) {
-    /* FIXME aconway 2016-11-30: handle alloc failure */
-    if (b->start == NULL || b->cap < size) {
-        b->start = realloc(b->start, size);
-        b->size = b->cap = size;
+/* Mark the qd connector closed, but leave the FD for LWS to clean up */
+int mark_closed(struct lws *wsi) {
+    qd_http_t *h = wsi_http(wsi);
+    int fd = lws_get_socket_fd(wsi);
+    qdpn_connector_t *c = fd_connector(h, fd);
+    if (c) {
+        qdpn_connector_mark_closed(c);
+        return set_fd(h, fd, NULL);
     }
-    b->size = size;
+    return 0;
 }
 
-/* Push as much as possible into the transport, store overflow in over. */
-static void transport_push_max(pn_transport_t *t, pn_bytes_t buf, buffer_t *over) {
+/* Push read data into the transport.
+ * Return 0 on success, number of bytes un-pushed on failure.
+ */
+static int transport_push(pn_transport_t *t, pn_bytes_t buf) {
     ssize_t cap;
     while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) {
         if (buf.size > cap) {
@@ -86,57 +94,109 @@ static void transport_push_max(pn_transport_t *t, pn_bytes_t buf, buffer_t *over
             buf.size = 0;
         }
     }
-    if (buf.size > 0) {
-        if (buf.size > over->cap) {
-            resize(over, buf.size);
-        }
-        memmove(over->start, buf.start, buf.size);
-    }
-    over->size = buf.size;
+    return buf.size;
 }
 
-static qd_http_t *qd_http_from_wsi(struct lws *wsi) {
-    return (qd_http_t *)lws_context_user(lws_get_context(wsi));
+static int normal_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) {
+    lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
+    return -1;
 }
 
-static int do_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) {
-    if (c) qdpn_connector_mark_closed(c);
-    lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
+static int unexpected_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) {
+    lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg));
     return -1;
 }
 
+/*
+ * Callback for un-promoted HTTP connections, and low-level external poll operations.
+ * Note main HTTP file serving is handled by the "mount" struct below.
+ * Called with http lock held.
+ */
+static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
+                         void *user, void *in, size_t len)
+{
+    switch (reason) {
+
+    case LWS_CALLBACK_HTTP: {   /* Called if file mount can't find the file */
+        lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, "file not found");
+        return -1;
+    }
+
+    case LWS_CALLBACK_CLOSED_HTTP:
+        mark_closed(wsi);
+        break;
+
+        /* low-level 'protocol[0]' callbacks for all protocols   */
+    case LWS_CALLBACK_DEL_POLL_FD: {
+        if (mark_closed(wsi)) {
+            lws_return_http_status(wsi, HTTP_STATUS_INTERNAL_SERVER_ERROR, "out of memory");
+            return -1;
+        }
+        break;
+    }
+
+    case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
+        struct lws_pollargs *p = (struct lws_pollargs*)in;
+        qdpn_connector_t *c = wsi_connector(wsi);
+        if (c) {
+            if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
+            if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
+        }
+        break;
+    }
+
+    default:
+        break;
+    }
+
+    return 0;
+}
+
+/* Buffer to allocate extra header space required by LWS.  */
+typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t;
+
+/* Callbacks for promoted AMQP over WS connections.
+ * Called with http lock held.
+ */
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
 {
-    buffers_t *b = (buffers_t*)user;
-    qd_http_t *h = qd_http_from_wsi(wsi);
-    qdpn_connector_t *c = per_thread.connector;
+    qd_http_t *h = wsi_http(wsi);
+    qdpn_connector_t *c = wsi_connector(wsi);
     pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
+    const char *name = c ? qdpn_connector_name(c) : "<unknown>";
 
     switch (reason) {
 
     case LWS_CALLBACK_ESTABLISHED: {
-        memset(b, 0, sizeof(*b));
-        strncpy(b->name, qdpn_connector_name(c), sizeof(b->name));
-        qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", b->name);
+        memset(user, 0, sizeof(buffer_t));
+        qd_log(h->log, QD_LOG_TRACE, "HTTP from %s upgraded to AMQP/WebSocket", name);
         break;
     }
 
     case LWS_CALLBACK_SERVER_WRITEABLE: {
         ssize_t size;
         if (!t || (size = pn_transport_pending(t)) < 0) {
-            return do_close(wsi, c, "write-closed");
+            return normal_close(wsi, c, "write-closed");
         }
         if (size > 0) {
-            pn_bytes_t wbuf = { size, pn_transport_head(t) };
+            const void *start = pn_transport_head(t);
             /* lws_write() demands LWS_PRE bytes of free space before the data */
-            resize(&b->wtmp, wbuf.size + LWS_PRE);
-            unsigned char *start = (unsigned char*)b->wtmp.start + LWS_PRE;
-            memcpy(start, wbuf.start, wbuf.size);
-            ssize_t wrote = lws_write(wsi, start, wbuf.size, LWS_WRITE_BINARY);
+            size_t tmpsize = size + LWS_PRE;
+            buffer_t *wtmp = (buffer_t*)user;
+            if (wtmp->start == NULL || wtmp->cap < tmpsize) {
+                wtmp->start = realloc(wtmp->start, tmpsize);
+                wtmp->size = wtmp->cap = tmpsize;
+            }
+            if (wtmp->start == NULL) {
+                return unexpected_close(wsi, c, "out-of-memory");
+            }
+            void *tmpstart = wtmp->start + LWS_PRE;
+            memcpy(tmpstart, start, size);
+            ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY);
             if (wrote < 0) {
                 pn_transport_close_head(t);
-                return do_close(wsi, c, "write-error");
+                return normal_close(wsi, c, "write-error");
             } else {
                 pn_transport_pop(t, (size_t)wrote);
             }
@@ -146,26 +206,23 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
 
     case LWS_CALLBACK_RECEIVE: {
         if (!t || pn_transport_capacity(t) < 0) {
-            do_close(wsi, c, "read-closed");
+            return normal_close(wsi, c, "read-closed");
         }
-        assert(b->over.size == 0);
-        transport_push_max(t, pn_bytes(len, in), &b->over);
-        if (b->over.size > 0) {
-            qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", b->name);
+        if (transport_push(t, pn_bytes(len, in))) {
+            return unexpected_close(wsi, c, "read-overflow");
         }
         break;
     }
 
-    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
-        qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", b->name);
-        if (t) pn_transport_close_tail(t);
-        return do_close(wsi, c, "peer-close");
-    }
+    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
+        mark_closed(wsi);
+        if (t) {
+            pn_transport_close_tail(t);
+        }
 
-    case LWS_CALLBACK_CLOSED: {
-        qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", b->name);
+    case LWS_CALLBACK_CLOSED:
+        mark_closed(wsi);
         break;
-    }
 
     default:
         break;
@@ -173,78 +230,93 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
     return 0;
 }
 
-static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
-                         void *in, size_t len)
-{
-    return 0;
-}
+/* Mount the console directory into URL space at /  */
+static const struct lws_http_mount console_mount = {
+    NULL,		/* linked-list pointer to next*/
+    "/",		/* mountpoint in URL namespace on this vhost */
+    QPID_CONSOLE_STAND_ALONE_INSTALL_DIR, /* where to go on the filesystem for that */
+    "index.html",        /* default filename if none given */
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    0,
+    0,
+    0,
+    0,
+    0,
+    0,
+    LWSMPRO_FILE,	/* mount type is a directory in a filesystem */
+    1,                  /* strlen("/"), ie length of the mountpoint */
+};
 
 static void check_timer(void *void_http) {
-    per_thread.connector = NULL; /* Not servicing any connector */
     qd_http_t *h = (qd_http_t*)void_http;
     sys_mutex_lock(h->lock);
-    /* Run LWS global timer checks. */
+    /* Run LWS global timer and forced-service checks. */
     lws_service_fd(h->context, NULL);
+    while (!lws_service_adjust_timeout(h->context, 1, 0)) {
+        /* -1 timeout means just do forced service */
+        lws_plat_service_tsi(h->context, -1, 0);
+    }
     if (!h->timer) {
         h->timer = qd_timer(h->dispatch, check_timer, h);
     }
     qd_timer_cancel(h->timer);
-    qd_timer_schedule(h->timer, 1000);
+    qd_timer_schedule(h->timer, 1000); /* LWS wants per-second wakeups */
     sys_mutex_unlock(h->lock);
 }
 
 void qd_http_connector_process(qdpn_connector_t *c) {
-    per_thread.connector = c;  /* Pass to lws via thread-local storage */
-
+    qd_http_t * h = qdpn_listener_http(qdpn_connector_listener(c));
+    sys_mutex_lock(h->lock);
+    int fd = qdpn_connector_get_fd(c);
     struct lws *wsi = (struct lws*)qdpn_connector_http(c);
-    buffers_t *b = (buffers_t*)lws_wsi_user(wsi);
-    qd_http_t * h = qd_http_from_wsi(wsi);
-    pn_transport_t *t = qdpn_connector_transport(c);
-
-    int flags =
-        (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) |
-        (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0);
-
-    if (b && b->over.size) {         /* Consume last over-buffered read */
-        transport_push_max(t, pn_bytes(b->over.size, b->over.start), &b->over);
-        if (b->over.size) {
-            flags &= ~POLLIN;   /* Don't let LIBWS read if we still are over */
+    /* Make sure we are still tracking this fd, could have been closed by timer */
+    if (wsi) {
+        pn_transport_t *t = qdpn_connector_transport(c);
+        int flags =
+            (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) |
+            (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0);
+        struct lws_pollfd pfd = { fd, flags, flags };
+        if (pn_transport_pending(t) > 0) {
+            lws_callback_on_writable(wsi);
+        }
+        lws_service_fd(h->context, &pfd);
+        if (pn_transport_closed(t)) {
+            mark_closed(wsi);   /* Don't let the server close the FD. */
+        } else {
+            if (pn_transport_capacity(t) > 0)
+                qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
+            if (pn_transport_pending(t) > 0 || lws_partial_buffered(wsi))
+                qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
+            qdpn_connector_wakeup(c, pn_transport_tick(t, qdpn_now(NULL)));
         }
     }
-    sys_mutex_lock(h->lock);
-    struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
-    lws_service_fd(h->context, &pfd);
     sys_mutex_unlock(h->lock);
-
-    if (pn_transport_capacity(t) > 0)
-        qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
-    if (pn_transport_pending(t) > 0)
-        qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-    pn_timestamp_t now = qdpn_now(NULL);
-    pn_timestamp_t next = pn_transport_tick(t, now);
-    /* If we have overflow, re-process immediately after dispatch, otherwise at
-     * next proton tick. */
-    qdpn_connector_wakeup(c, (b && b->over.size) ? now : next);
-    check_timer(h);
+    check_timer(h);             /* Make sure the timer is running */
 }
 
 qd_http_connector_t *qd_http_connector(qd_http_t *h, qdpn_connector_t *c) {
+    if (set_fd(h, qdpn_connector_get_fd(c), c)) {
+        return NULL;
+    }
     struct lws* wsi = lws_adopt_socket(h->context, qdpn_connector_get_fd(c));
     return (qd_http_connector_t*)wsi;
 }
 
 static struct lws_protocols protocols[] = {
-    /* first protocol must always be HTTP handler */
+    /* HTTP only protocol comes first */
     {
-        "http-only",		/* name */
-        callback_http,		/* callback */
-        sizeof(buffers_t),                      /* user data size */
+        "http-only",
+        callback_http,
+        0,
     },
      /* "amqp" is the official oasis AMQP over WebSocket protocol name */
     {
         "amqp",
         callback_amqpws,
-        sizeof(buffers_t),
+        sizeof(buffer_t),
     },
     /* "binary" is an alias for "amqp", for compatibility with clients designed
      * to work with a WebSocket proxy
@@ -252,12 +324,13 @@ static struct lws_protocols protocols[] = {
     {
         "binary",
         callback_amqpws,
-        sizeof(buffers_t),
+        sizeof(buffer_t),
     },
 };
 
 qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) {
     qd_http_t *h = calloc(1, sizeof(qd_http_t));
+    if (!h) return NULL;
     h->lock = sys_mutex();
     h->dispatch = d;
     h->log = log;
@@ -268,8 +341,10 @@ qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) {
     info.protocols = protocols;
     info.gid = info.uid = -1;
     info.user = h;
+    info.mounts = &console_mount; /* Serve the console files */
+    info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
     h->context = lws_create_context(&info);
-    h->timer =  NULL;           /* Initialized later. */
+    h->timer = NULL;            /* Can't init timer here, server not initialized. */
     return h;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index 53ea809..741f500 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -134,6 +134,7 @@ struct qdpn_connector_t {
     bool pending_read:1;
     bool pending_write:1;
     bool socket_error:1;
+    bool hangup:1;
     bool closed:1;
 };
 
@@ -528,6 +529,7 @@ qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context
     c->pending_read = false;
     c->pending_write = false;
     c->socket_error = false;
+    c->hangup = false;
     c->name[0] = '\0';
     c->idx = 0;
     c->fd = fd;
@@ -622,7 +624,9 @@ qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor)
     return ctor ? ctor->listener : NULL;
 }
 
-/* FD is already closed, update the connector state */
+/* Mark the connector as closed, but don't close the FD (already closed or
+ * will be closed elsewhere)
+ */
 void qdpn_connector_mark_closed(qdpn_connector_t *ctor)
 {
     if (!ctor) return;
@@ -632,6 +636,7 @@ void qdpn_connector_mark_closed(qdpn_connector_t *ctor)
         qd_log(ctor->driver->log, QD_LOG_TRACE, "closed %s", ctor->name);
         ctor->closed = true;
         ctor->driver->closed_count++;
+        ctor->http = NULL;
     }
     sys_mutex_unlock(ctor->driver->lock);
 }
@@ -880,7 +885,7 @@ static void qdpn_driver_rebuild(qdpn_driver_t *d)
 
     qdpn_connector_t *c = DEQ_HEAD(d->connectors);
     while (c) {
-        if (!c->closed && !c->socket_error) {
+        if (!c->closed && !c->socket_error && !c->hangup) {
             d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
             d->fds[d->nfds].fd = c->fd;
             d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
@@ -946,9 +951,11 @@ int qdpn_driver_wait_3(qdpn_driver_t *d)
             if (revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP)) {
                 qd_log(c->driver->log, QD_LOG_ERROR, "unexpected poll events %04x on %s",
                        revents, c->name);
+                c->socket_error = true;
             }
             if (revents & POLLHUP) {
                 qd_log(c->driver->log, QD_LOG_TRACE, "hangup on %s", c->name);
+                c->hangup = true;
                 /* poll() is signalling POLLHUP. To see what happened we need
                  * to do an actual recv() to get the error code. But we might
                  * be in a state where we're not interested in input, in that

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 40b5699..1e813aa 100644
--- a/src/server.c
+++ b/src/server.c
@@ -573,7 +573,8 @@ static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr)
     const char *cname = qdpn_connector_name(cxtr);
     const char *host = qd_listener->config->host;
     const char *port = qd_listener->config->port;
-    snprintf(buf, size, "incoming connection from %s to %s:%s", cname, host, port);
+    snprintf(buf, size, "incoming %s connection from %s to %s:%s",
+             qdpn_connector_http(cxtr) ? "HTTP" : "AMQP", cname, host, port);
     return buf;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org