You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/04/27 17:38:34 UTC

[09/10] qpid-dispatch git commit: DISPATCH-390: Restore HTTP using libwebsockets

DISPATCH-390: Restore HTTP using libwebsockets

- single HTTP thread uses libwebsockets standard polling features
- works with released libwebsockets, no patches required


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/16980f67
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/16980f67
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/16980f67

Branch: refs/heads/master
Commit: 16980f6703f0b2b03a320c89cad60cdf669770d7
Parents: 20b06ac
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Apr 12 19:29:14 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 27 13:31:36 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                   |   5 +-
 cmake/FindLibWebSockets.cmake    |  20 +-
 include/qpid/dispatch/server.h   |  11 +
 src/connection_manager.c         |   3 +-
 src/container.c                  |   2 +-
 src/http-libwebsockets.c         | 796 ++++++++++++++++++++--------------
 src/http-none.c                  |  22 +-
 src/http.h                       |  19 +-
 src/policy.c                     |   8 +-
 src/server.c                     | 103 +++--
 src/server_private.h             |  14 +-
 tests/system_tests_http.py       |   8 +
 tests/system_tests_one_router.py |   8 +
 13 files changed, 584 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a06c67f..46f651e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -103,11 +103,8 @@ find_library(rt_lib rt)
 find_package(Proton 0.15 REQUIRED)
 
 ## Optional dependencies
-
 include(FindLibWebSockets)
-# FIXME aconway 2017-01-19: websockets disbled for temporary proactor work.
-# option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND})
-set(USE_LIBWEBSOCKETS OFF)
+option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND})
 
 ##
 ## Find Valgrind

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/cmake/FindLibWebSockets.cmake
----------------------------------------------------------------------
diff --git a/cmake/FindLibWebSockets.cmake b/cmake/FindLibWebSockets.cmake
index 18ef18a..a15d0d2 100644
--- a/cmake/FindLibWebSockets.cmake
+++ b/cmake/FindLibWebSockets.cmake
@@ -44,22 +44,10 @@ find_path(LIBWEBSOCKETS_INCLUDE_DIRS
 
 include(FindPackageHandleStandardArgs)
 
-find_package_handle_standard_args(LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
-
-if(LIBWEBSOCKETS_FOUND)
-  # For the moment we need a patched version of LibWebSockets:
-  # https://github.com/alanconway/libwebsockets/tree/v2.1-stable-aconway-adopt-ssl
-  # This function check verifies we have it.
-  set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS})
-  set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES})
-  check_function_exists(lws_adopt_socket_vhost LWS_ADOPT_SOCKET_VHOST_FOUND)
-  if (NOT LWS_ADOPT_SOCKET_VHOST_FOUND)
-    message("Cannot use LibWebSockets, no function lws_adopt_socket_vhost")
-    unset(LIBWEBSOCKETS_FOUND)
-  endif()
-endif()
+find_package_handle_standard_args(
+  LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
 
 if(NOT LIBWEBSOCKETS_FOUND)
-  set(LIBWEBSOCKETS_LIBRARIES "")
-  set(LIBWEBSOCKETS_INCLUDE_DIRS "")
+  unset(LIBWEBSOCKETS_LIBRARIES)
+  unset(LIBWEBSOCKETS_INCLUDE_DIRS)
 endif()

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index ec885ae..eb5214b 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -519,6 +519,17 @@ bool qd_connector_connect(qd_connector_t *ct);
 qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *display_name_service);
 
 /**
+ * Get the name of the connection, based on its IP address.
+ */
+const char* qd_connection_name(const qd_connection_t *c);
+
+
+/**
+ * Get the remote host IP address of the connection.
+ */
+const char* qd_connection_remote_ip(const qd_connection_t *c);
+
+/**
  * @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 4bc18ce..fefc961 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -607,8 +607,7 @@ void qd_connection_manager_start(qd_dispatch_t *qd)
 
     while (li) {
         if (!li->pn_listener) {
-            qd_listener_listen(li);
-            if (!li->pn_listener && first_start) {
+            if (!qd_listener_listen(li) && first_start) {
                 qd_log(qd->connection_manager->log_source, QD_LOG_CRITICAL,
                        "Listen on %s failed during initial config", li->config.host_port);
                 exit(1);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index ddc0418..639ffa2 100644
--- a/src/container.c
+++ b/src/container.c
@@ -375,7 +375,7 @@ static void add_link_to_free_list(qd_pn_free_link_session_list_t  *free_link_ses
 
 
 /*
- * FIXME aconway 2017-04-12: IMO this should not be necessary, we should
+ * TODO aconway 2017-04-12: IMO this should not be necessary, we should
  * be able to pn_*_free links and sessions directly the handler function.
  * They will not actually be freed from memory till the event, connection,
  * proactor etc. have all released their references.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 745b090..9f6fbe8 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,10 +19,11 @@
 
 #include <qpid/dispatch/atomic.h>
 #include <qpid/dispatch/amqp.h>
-#include <qpid/dispatch/driver.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/timer.h>
 
+#include <proton/connection_driver.h>
+
 #include <libwebsockets.h>
 
 #include <assert.h>
@@ -33,432 +34,555 @@
 #include "server_private.h"
 #include "config.h"
 
+static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */
+
+/* Log for LWS messages. For dispatch server messages use qd_http_server_t::log */
 static qd_log_source_t* http_log;
 
-static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH";
+static qd_log_level_t qd_level(int lll) {
+    switch (lll) {
+    case LLL_ERR: return QD_LOG_ERROR;
+    case LLL_WARN: return QD_LOG_WARNING;
+    /* LWS is noisy compared to dispatch on the informative levels, downgrade */
+    case LLL_NOTICE: return QD_LOG_DEBUG;
+    default: return QD_LOG_TRACE; /* Everything else to trace  */
+    }
+}
+
+static void logger(int lll, const char *line)  {
+    size_t  len = strlen(line);
+    while (len > 1 && isspace(line[len-1])) { /* Strip trailing newline */
+        --len;
+    }
+    qd_log(http_log, qd_level(lll), "%.*s", len, line);
+}
+
+static void log_init() {
+    http_log = qd_log_source("HTTP");
+    int levels = 0;
+    for (int i = 0; i < LLL_COUNT; ++i) {
+        int lll = 1<<i;
+        levels |= qd_log_enabled(http_log, qd_level(lll)) ? lll : 0;
+    }
+    lws_set_log_level(levels, logger);
+}
+
+/* Intermediate write buffer: LWS needs extra header space on write.  */
+typedef struct buffer_t {
+    char *start;
+    size_t size, cap;
+} buffer_t;
+
+/* Ensure size bytes in buffer, make buf empty if alloc fails */
+static void buffer_set_size(buffer_t *buf, size_t size) {
+    if (size > buf->cap) {
+        buf->cap = (size > buf->cap * 2) ? size : buf->cap * 2;
+        buf->start = realloc(buf->start, buf->cap);
+    }
+    if (buf->start) {
+        buf->size = size;
+    } else {
+        buf->size = buf->cap = 0;
+    }
+}
 
-/* Associate file-descriptors, LWS instances and qdpn_connectors */
-typedef struct fd_data_t {
-    qdpn_connector_t *connector;
+/* AMQPWS connection: set as lws user data and qd_conn->context */
+typedef struct connection_t {
+    pn_connection_driver_t driver;
+    qd_connection_t* qd_conn;
+    buffer_t wbuf;   /* LWS requires allocated header space at start of buffer */
     struct lws *wsi;
-} fd_data_t;
+} connection_t;
 
-/* HTTP server state shared by all listeners  */
+/* Navigating from WSI pointer to qd objects */
+static qd_http_server_t *wsi_server(struct lws *wsi);
+static qd_http_listener_t *wsi_listener(struct lws *wsi);
+static qd_log_source_t *wsi_log(struct lws *wsi);
+
+
+/* Declare LWS callbacks and protocol list */
+static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
+                         void *user, void *in, size_t len);
+static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
+                           void *user, void *in, size_t len);
+
+static struct lws_protocols protocols[] = {
+    /* HTTP only protocol comes first */
+    {
+        "http-only",
+        callback_http,
+        0,
+    },
+    /* "amqp" is the official oasis AMQP over WebSocket protocol name */
+    {
+        "amqp",
+        callback_amqpws,
+        sizeof(connection_t),
+    },
+    /* "binary" is an alias for "amqp", for compatibility with clients designed
+     * to work with a WebSocket proxy
+     */
+    {
+        "binary",
+        callback_amqpws,
+        sizeof(connection_t),
+    },
+    { NULL, NULL, 0, 0 } /* terminator */
+};
+
+
+static inline int unexpected_close(struct lws *wsi, const char *msg) {
+    lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION,
+                     (unsigned char*)msg, strlen(msg));
+    char peer[64];
+    lws_get_peer_simple(wsi, peer, sizeof(peer));
+    qd_log(wsi_log(wsi), QD_LOG_ERROR, "Error on HTTP connection from %s: %s", peer, msg);
+    return -1;
+}
+
+static int handle_events(connection_t* c) {
+    if (!c->qd_conn) {
+        return unexpected_close(c->wsi, "not-established");
+    }
+    pn_event_t *e;
+    while ((e = pn_connection_driver_next_event(&c->driver))) {
+        qd_connection_handle(c->qd_conn, e);
+    }
+    if (pn_connection_driver_write_buffer(&c->driver).size) {
+        lws_callback_on_writable(c->wsi);
+    }
+    if (pn_connection_driver_finished(&c->driver)) {
+        lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
+        return -1;
+    }
+    return 0;
+}
+
+/* The server has a bounded, thread-safe queue for external work */
+typedef struct work_t {
+    enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+    void *value;
+} work_t;
+
+#define WORK_MAX 8              /* Just decouple threads, not a big buffer */
+
+typedef struct work_queue_t {
+    sys_mutex_t *lock;
+    sys_cond_t *cond;
+    work_t work[WORK_MAX];
+    size_t head, len;          /* Ring buffer */
+} work_queue_t;
+
+/* HTTP Server runs in a single thread, communication from other threads via work_queue */
 struct qd_http_server_t {
-    qd_dispatch_t *dispatch;
+    qd_server_t *server;
+    sys_thread_t *thread;
+    work_queue_t work;
     qd_log_source_t *log;
-    sys_mutex_t *lock;         /* For now use LWS as a thread-unsafe library. */
     struct lws_context *context;
-    qd_timer_t *timer;
-    int vhost_id;               /* unique identifier for vhost name */
-    fd_data_t *fd;              /* indexed by file descriptor */
-    size_t fd_len;
+    pn_timestamp_t now;         /* Cache current time in thread_run */
+    pn_timestamp_t next_tick;   /* Next requested tick service */
 };
 
-/* Per-HTTP-listener */
+static void work_queue_destroy(work_queue_t *wq) {
+    if (wq->lock) sys_mutex_free(wq->lock);
+    if (wq->cond) sys_cond_free(wq->cond);
+}
+
+static void work_queue_init(work_queue_t *wq) {
+    wq->lock = sys_mutex();
+    wq->cond = sys_cond();
+}
+
+ /* Block till there is space */
+static void work_push(qd_http_server_t *hs, work_t w) {
+    work_queue_t *wq = &hs->work;
+    sys_mutex_lock(wq->lock);
+    while (wq->len == WORK_MAX) {
+        lws_cancel_service(hs->context); /* Wake up the run thread to clear space */
+        sys_cond_wait(wq->cond, wq->lock);
+    }
+    wq->work[(wq->head + wq->len) % WORK_MAX] = w;
+    ++wq->len;
+    sys_mutex_unlock(wq->lock);
+    lws_cancel_service(hs->context); /* Wake up the run thread to handle my work */
+}
+
+/* Non-blocking, return { W_NONE, NULL } if empty */
+static work_t work_pop(qd_http_server_t *hs) {
+    work_t w = { W_NONE, NULL };
+    work_queue_t *wq = &hs->work;
+    sys_mutex_lock(wq->lock);
+    if (wq->len > 0) {
+        w = wq->work[wq->head];
+        wq->head = (wq->head + 1) % WORK_MAX;
+        --wq->len;
+        sys_cond_signal(wq->cond);
+    }
+    sys_mutex_unlock(wq->lock);
+    return w;
+}
+
+/* Each qd_http_listener_t is associated with an lws_vhost */
 struct qd_http_listener_t {
+    qd_listener_t *listener;
     qd_http_server_t *server;
     struct lws_vhost *vhost;
     struct lws_http_mount mount;
-    char name[256];             /* vhost name */
 };
 
-/* Get wsi/connector associated with fd or NULL if nothing on record. */
-static inline fd_data_t *fd_data(qd_http_server_t *s, int fd) {
-    fd_data_t *d = (fd < s->fd_len) ? &s->fd[fd] : NULL;
-    return (d && (d->connector || d->wsi)) ? d : NULL;
+void qd_http_listener_free(qd_http_listener_t *hl) {
+    if (!hl) return;
+    if (hl->listener) {
+        hl->listener->http = NULL;
+        qd_listener_decref(hl->listener);
+    }
+    free(hl);
 }
 
-static inline qd_http_server_t *wsi_http_server(struct lws *wsi) {
-    return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
+static qd_http_listener_t *qd_http_listener(qd_http_server_t *hs, qd_listener_t *li) {
+    qd_http_listener_t *hl = calloc(1, sizeof(*hl));
+    if (hl) {
+        hl->server = hs;
+        hl->listener = li;
+        li->http = hl;
+        sys_atomic_inc(&li->ref_count); /* Keep it around till qd_http_server_free() */
+    } else {
+        qd_log(hs->log, QD_LOG_CRITICAL, "No memory for HTTP listen on %s",
+               li->config.host_port);
+    }
+    return hl;
 }
 
-static inline qdpn_connector_t *wsi_connector(struct lws *wsi) {
-    fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
-    return d ? d->connector : NULL;
-}
+static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
+    log_init();                 /* Update log flags at each listener */
 
-static inline fd_data_t *set_fd(qd_http_server_t *s, int fd, qdpn_connector_t *c, struct lws *wsi) {
-    if (!s->fd || fd >= s->fd_len) {
-        size_t oldlen = s->fd_len;
-        s->fd_len = fd + 16;    /* Don't double, low-range FDs will be re-used first. */
-        void *newfds = realloc(s->fd, s->fd_len*sizeof(*s->fd));
-        if (!newfds) return NULL;
-        s->fd = newfds;
-        memset(s->fd + oldlen, 0, sizeof(*s->fd)*(s->fd_len - oldlen));
-    }
-    fd_data_t *d = &s->fd[fd];
-    d->connector = c;
-    d->wsi = wsi;
-    return d;
-}
+    qd_server_config_t *config = &hl->listener->config;
 
-/* Push read data into the transport.
- * Return 0 on success, number of bytes un-pushed on failure.
- */
-static int transport_push(pn_transport_t *t, pn_bytes_t buf) {
-    ssize_t cap;
-    while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) {
-        if (buf.size > cap) {
-            pn_transport_push(t, buf.start, cap);
-            buf.start += cap;
-            buf.size -= cap;
-        } else {
-            pn_transport_push(t, buf.start, buf.size);
-            buf.size = 0;
-        }
+    int port = qd_port_int(config->port);
+    if (port < 0) {
+        qd_log(hs->log, QD_LOG_ERROR, "HTTP listener %s has invalid port %s",
+               config->host_port, config->port);
+        goto error;
     }
-    return buf.size;
-}
+    struct lws_http_mount *m = &hl->mount;
+    m->mountpoint = "/";    /* URL mount point */
+    m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
+    m->origin = (config->http_root && *config->http_root) ? /* File system root */
+        config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
+    m->def = "index.html";  /* Default file name */
+    m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
 
-static inline int normal_close(struct lws *wsi, const char *msg) {
-    lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
-    return -1;
+    struct lws_context_creation_info info = {0};
+    info.mounts = m;
+    info.port = port;
+    info.protocols = protocols;
+    info.keepalive_timeout = 1;
+    info.ssl_cipher_list = CIPHER_LIST;
+    info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
+    if (config->ssl_profile) {
+        info.ssl_cert_filepath = config->ssl_certificate_file;
+        info.ssl_private_key_filepath = config->ssl_private_key_file;
+        info.ssl_private_key_password = config->ssl_password;
+        info.ssl_ca_filepath = config->ssl_trusted_certificates;
+        info.options |=
+            LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
+            (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
+            (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
+    }
+    info.vhost_name = hl->listener->config.host_port;
+    hl->vhost = lws_create_vhost(hs->context, &info);
+    if (hl->vhost) {
+        /* Store hl pointer in vhost */
+        void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl));
+        memcpy(vp, &hl, sizeof(hl));
+        qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", config->host_port);
+        return;
+    } else {
+        qd_log(hs->log, QD_LOG_NOTICE, "Error listening for HTTP on %s", config->host_port);
+        goto error;
+    }
+    return;
+
+  error:
+    if (hl->listener->exit_on_error) {
+        qd_log(hs->log, QD_LOG_CRITICAL, "Shutting down, required listener failed %s",
+               config->host_port);
+        exit(1);
+    }
+    qd_http_listener_free(hl);
 }
 
-static inline int unexpected_close(struct lws *wsi, const char *msg) {
-    lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg));
-    return -1;
+static void listener_close(qd_http_listener_t *hl, qd_http_server_t *hs) {
+    /* TODO aconway 2017-04-13: can't easily stop listeners under libwebsockets */
+    qd_log(hs->log, QD_LOG_ERROR, "Cannot close HTTP listener %s",
+           hl->listener->config.host_port);
 }
 
 /*
- * Callback for un-promoted HTTP connections, and low-level external poll operations.
+ * LWS callback for un-promoted HTTP connections.
  * Note main HTTP file serving is handled by the "mount" struct below.
- * Called with http lock held.
  */
 static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
                          void *user, void *in, size_t len)
 {
     switch (reason) {
 
-    case LWS_CALLBACK_HTTP:     /* Called if file mount can't find the file */
-        lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+    case LWS_CALLBACK_PROTOCOL_DESTROY:
+        qd_http_listener_free(wsi_listener(wsi));
         return -1;
 
-    case LWS_CALLBACK_ADD_POLL_FD: {
-        /* Record WSI against FD here, the connector will be recorded when lws_service returns. */
-        set_fd(wsi_http_server(wsi), lws_get_socket_fd(wsi), 0, wsi);
-        break;
-    }
-    case LWS_CALLBACK_DEL_POLL_FD: {
-        fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
-        if (d) {
-            /* Tell dispatch to forget this FD, but let LWS do the actual close() */
-            if (d->connector) qdpn_connector_mark_closed(d->connector);
-            memset(d, 0, sizeof(*d));
-        }
-        break;
-    }
-    case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
-        struct lws_pollargs *p = (struct lws_pollargs*)in;
-        qdpn_connector_t *c = wsi_connector(wsi);
-        if (c) {
-            if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
-            if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-        }
-        break;
+    case LWS_CALLBACK_HTTP: {
+        /* Called if file mount can't find the file */
+        lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+        return -1;
     }
 
-    /* NOTE: Not using LWS_CALLBACK_LOCK/UNLOCK_POLL as we are serializing all HTTP work for now. */
-
     default:
-        break;
+        return 0;
     }
-
-    return 0;
 }
 
-/* Buffer to allocate extra header space required by LWS.  */
-typedef struct buffer_t { char *start; size_t size; size_t cap; } buffer_t;
+/* Wake up a connection managed by the http server thread */
+static void connection_wake(qd_connection_t *qd_conn)
+{
+    connection_t *c = qd_conn->context;
+    if (c && qd_conn->listener->http) {
+        qd_http_server_t *hs = qd_conn->listener->http->server;
+        work_t w = { W_WAKE, c };
+        work_push(hs, w);
+    }
+}
 
-/* Callbacks for promoted AMQP over WS connections.
- * Called with http lock held.
- */
+/* Callbacks for promoted AMQP over WS connections. */
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
 {
-    qdpn_connector_t *c = wsi_connector(wsi);
-    pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
+    qd_http_server_t *hs = wsi_server(wsi);
+    connection_t *c = (connection_t*)user;
 
     switch (reason) {
 
     case LWS_CALLBACK_ESTABLISHED: {
-        qd_log(wsi_http_server(wsi)->log, QD_LOG_DEBUG,
-               "Upgraded incoming HTTP connection from  %s[%"PRIu64"] to AMQP over WebSocket",
-               qdpn_connector_name(c),
-               qd_connection_connection_id((qd_connection_t*)qdpn_connector_context(c)));
-        memset(user, 0, sizeof(buffer_t));
-        break;
+        /* Upgrade accepted HTTP connection to AMQPWS */
+        memset(c, 0, sizeof(*c));
+        c->wsi = wsi;
+        qd_http_listener_t *hl = wsi_listener(wsi);
+        if (hl == NULL) {
+            return unexpected_close(c->wsi, "cannot-upgrade");
+        }
+        c->qd_conn = qd_server_connection(hs->server, &hl->listener->config);
+        if (c->qd_conn == NULL) {
+            return unexpected_close(c->wsi, "out-of-memory");
+        }
+        c->qd_conn->context = c;
+        c->qd_conn->wake = connection_wake;
+        c->qd_conn->listener = hl->listener;
+        lws_get_peer_simple(wsi, c->qd_conn->rhost, sizeof(c->qd_conn->rhost));
+        int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL);
+        if (err) {
+            return unexpected_close(c->wsi, pn_code(err));
+        }
+        strncpy(c->qd_conn->rhost_port, c->qd_conn->rhost, sizeof(c->qd_conn->rhost_port));
+        qd_log(hs->log, QD_LOG_DEBUG,
+               "[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
+               qd_connection_connection_id(c->qd_conn), qd_connection_name(c->qd_conn));
+        return handle_events(c);
     }
 
     case LWS_CALLBACK_SERVER_WRITEABLE: {
-        ssize_t size;
-        if (!t || (size = pn_transport_pending(t)) < 0) {
-            return normal_close(wsi, "write-closed");
-        }
-        if (size > 0) {
-            const void *start = pn_transport_head(t);
-            /* lws_write() demands LWS_PRE bytes of free space before the data */
-            size_t tmpsize = size + LWS_PRE;
-            buffer_t *wtmp = (buffer_t*)user;
-            if (wtmp->start == NULL || wtmp->cap < tmpsize) {
-                wtmp->start = realloc(wtmp->start, tmpsize);
-                wtmp->size = wtmp->cap = tmpsize;
-            }
-            if (wtmp->start == NULL) {
-                return unexpected_close(wsi, "out-of-memory");
+        if (handle_events(c)) return -1;
+        pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
+        if (dbuf.size) {
+            /* lws_write() demands LWS_PRE bytes of free space before the data,
+             * so we must copy from the driver's buffer to larger temporary wbuf
+             */
+            buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
+            if (c->wbuf.start == NULL) {
+                return unexpected_close(c->wsi, "out-of-memory");
             }
-            void *tmpstart = wtmp->start + LWS_PRE;
-            memcpy(tmpstart, start, size);
-            ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY);
+            unsigned char* buf = (unsigned char*)c->wbuf.start + LWS_PRE;
+            memcpy(buf, dbuf.start, dbuf.size);
+            ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
             if (wrote < 0) {
-                pn_transport_close_head(t);
-                return normal_close(wsi, "write-error");
+                pn_connection_driver_write_close(&c->driver);
+                return unexpected_close(c->wsi, "write-error");
             } else {
-                pn_transport_pop(t, (size_t)wrote);
+                pn_connection_driver_write_done(&c->driver, wrote);
             }
         }
-        break;
+        return handle_events(c);
     }
 
     case LWS_CALLBACK_RECEIVE: {
-        if (!t || pn_transport_capacity(t) < 0) {
-            return normal_close(wsi, "read-closed");
-        }
-        if (transport_push(t, pn_bytes(len, in))) {
-            return unexpected_close(wsi, "read-overflow");
+        while (len > 0) {
+            if (handle_events(c)) return -1;
+            pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
+            if (dbuf.size == 0) {
+                return unexpected_close(c->wsi, "unexpected-data");
+            }
+            size_t copy = (len < dbuf.size) ? len : dbuf.size;
+            memcpy(dbuf.start, in, copy);
+            pn_connection_driver_read_done(&c->driver, copy);
+            len -= copy;
+            in = (char*)in + copy;
         }
-        break;
+        return handle_events(c);
     }
 
-    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
-        if (t) {
-            pn_transport_close_tail(t);
+    case LWS_CALLBACK_USER: {
+        pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+        if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) {
+            hs->next_tick = next_tick;
         }
+        return handle_events(c);
+    }
 
-    case LWS_CALLBACK_CLOSED:
-        break;
-
-    default:
-        break;
+    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
+        pn_connection_driver_read_close(&c->driver);
+        return handle_events(c);
     }
-    return 0;
-}
 
-static void check_timer(void *void_http_server) {
-    qd_http_server_t *s = (qd_http_server_t*)void_http_server;
-    /* Run LWS global timer and forced-service checks. */
-    sys_mutex_lock(s->lock);
-    lws_service_fd(s->context, NULL);
-    while (!lws_service_adjust_timeout(s->context, 1, 0)) {
-        /* -1 timeout means just do forced service */
-        lws_plat_service_tsi(s->context, -1, 0);
-    }
-    if (!s->timer) {
-        s->timer = qd_timer(s->dispatch, check_timer, s);
-    }
-    sys_mutex_unlock(s->lock);
-    /* Timer is locked using server lock. */
-    qd_timer_cancel(s->timer);
-    qd_timer_schedule(s->timer, 1000); /* LWS wants per-second wakeups */
-}
+    case LWS_CALLBACK_CLOSED: {
+        if (c->driver.transport) {
+            pn_connection_driver_close(&c->driver);
+            handle_events(c);
+        }
+        pn_connection_driver_destroy(&c->driver);
+        free(c->wbuf.start);
+        return -1;
+    }
 
-static qd_http_listener_t * qdpn_connector_http_listener(qdpn_connector_t* c) {
-    qd_listener_t* ql = (qd_listener_t*)qdpn_listener_context(qdpn_connector_listener(c));
-    return qd_listener_http(ql);
+    default:
+        return 0;
+    }
 }
 
-static void http_connector_process(qdpn_connector_t *c) {
-    qd_http_listener_t *hl = qdpn_connector_http_listener(c);
-    qd_http_server_t *s = hl->server;
-    sys_mutex_lock(s->lock);
-    int fd = qdpn_connector_get_fd(c);
-    fd_data_t *d = fd_data(s, fd);
-    /* Make sure we are still tracking this fd, could have been closed by timer */
-    if (d) {
-        pn_transport_t *t = qdpn_connector_transport(c);
-        int flags =
-            (qdpn_connector_hangup(c) ? POLLHUP : 0) |
-            (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) |
-            (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0);
-        struct lws_pollfd pfd = { fd, flags, flags };
-        if (pn_transport_pending(t) > 0) {
-            lws_callback_on_writable(d->wsi);
+#define DEFAULT_TICK 1000
+
+static void* http_thread_run(void* v) {
+    qd_http_server_t *hs = v;
+    qd_log(hs->log, QD_LOG_INFO, "HTTP server thread running");
+    int result = 0;
+    while(result >= 0) {
+        /* Send a USER event to run transport ticks, may decrease hs->next_tick. */
+        hs->now = qd_timer_now();
+        hs->next_tick = hs->now + DEFAULT_TICK;
+        lws_callback_all_protocol(hs->context, &protocols[1], LWS_CALLBACK_USER);
+        lws_callback_all_protocol(hs->context, &protocols[2], LWS_CALLBACK_USER);
+        pn_millis_t timeout = (hs->next_tick > hs->now) ? hs->next_tick - hs->now : 1;
+        result = lws_service(hs->context, timeout);
+
+        /* Process any work items on the queue */
+        for (work_t w = work_pop(hs); w.type != W_NONE; w = work_pop(hs)) {
+            switch (w.type) {
+            case W_NONE:
+                break;
+            case W_STOP:
+                result = -1;
+                break;
+            case W_LISTEN:
+                listener_start((qd_http_listener_t*)w.value, hs);
+                break;
+            case W_CLOSE:
+                listener_close((qd_http_listener_t*)w.value, hs);
+                break;
+            case W_WAKE: {
+                connection_t *c = w.value;
+                pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+                                 PN_CONNECTION_WAKE);
+                handle_events(c);
+                break;
+            }
+            }
         }
-        lws_service_fd(s->context, &pfd);
-        d = fd_data(s, fd);    /* We may have stopped tracking during service */
-        if (pn_transport_capacity(t) > 0)
-            qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
-        if (pn_transport_pending(t) > 0 || (d && lws_partial_buffered(d->wsi)))
-            qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-        pn_timestamp_t wake = pn_transport_tick(t, qdpn_now(NULL));
-        if (wake) qdpn_connector_wakeup(c, wake);
-    }
-    sys_mutex_unlock(s->lock);
-    check_timer(s);             /* Make sure the timer is running */
+    }
+    qd_log(hs->log, QD_LOG_INFO, "HTTP server thread exit");
+    return NULL;
 }
 
-/* Dispatch closes a connector because it is HUP, socket_error or transport_closed()  */
-static void http_connector_close(qdpn_connector_t *c) {
-    int fd = qdpn_connector_get_fd(c);
-    qd_http_server_t *s = qdpn_connector_http_listener(c)->server;
-    sys_mutex_lock(s->lock);
-    fd_data_t *d = fd_data(s, fd);
-    if (d) {                    /* Only if we are still tracking fd */
-        /* Shutdown but let LWS do the close(),  possibly in later timer */
-        shutdown(qdpn_connector_get_fd(c), SHUT_RDWR);
-        short flags = POLLIN|POLLOUT|POLLHUP;
-        struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
-        lws_service_fd(s->context, &pfd);
-        qdpn_connector_mark_closed(c);
-        memset(d, 0 , sizeof(*d));
-    }
-    sys_mutex_unlock(s->lock);
+void qd_http_server_free(qd_http_server_t *hs) {
+    if (!hs) return;
+    if (hs->thread) {
+        /* Thread safe, stop via work queue then clean up */
+        work_t work = { W_STOP, NULL };
+        work_push(hs, work);
+        sys_thread_join(hs->thread);
+        sys_thread_free(hs->thread);
+        hs->thread = NULL;
+    }
+    work_queue_destroy(&hs->work);
+    if (hs->context) lws_context_destroy(hs->context);
+    free(hs);
 }
 
-static struct qdpn_connector_methods_t http_methods = {
-    http_connector_process,
-    http_connector_close
-};
-
-void qd_http_listener_accept(qd_http_listener_t *hl, qdpn_connector_t *c) {
-    qd_http_server_t *s = hl->server;
-    sys_mutex_lock(s->lock);
-    int fd = qdpn_connector_get_fd(c);
-    struct lws *wsi = lws_adopt_socket_vhost(hl->vhost, fd);
-    fd_data_t *d = fd_data(s, fd);
-    if (d) {          /* FD was adopted by LWS, so dispatch must not close it */
-        qdpn_connector_set_methods(c, &http_methods);
-        if (wsi) d->connector = c;
-    }
-    sys_mutex_unlock(s->lock);
-    if (!wsi) {       /* accept failed, dispatch should forget the FD. */
-        qdpn_connector_mark_closed(c);
+qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) {
+    log_init();
+    qd_http_server_t *hs = calloc(1, sizeof(*hs));
+    if (hs) {
+        work_queue_init(&hs->work);
+        struct lws_context_creation_info info = {0};
+        info.gid = info.uid = -1;
+        info.user = hs;
+        info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
+        info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
+            LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
+            LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+        info.max_http_header_pool = 32;
+        info.timeout_secs = 1;
+
+        hs->context = lws_create_context(&info);
+        hs->server = s;
+        hs->log = log;              /* For messages from this file */
+        if (!hs->context) {
+            qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
+            qd_http_server_free(hs);
+            hs = NULL;
+        }
     }
+    return hs;
 }
 
-static struct lws_protocols protocols[] = {
-    /* HTTP only protocol comes first */
-    {
-        "http-only",
-        callback_http,
-        0,
-    },
-    /* "amqp" is the official oasis AMQP over WebSocket protocol name */
-    {
-        "amqp",
-        callback_amqpws,
-        sizeof(buffer_t),
-    },
-    /* "binary" is an alias for "amqp", for compatibility with clients designed
-     * to work with a WebSocket proxy
-     */
-    {
-        "binary",
-        callback_amqpws,
-        sizeof(buffer_t),
-    },
-    { NULL, NULL, 0, 0 } /* terminator */
-};
+/* Thread safe calls that put items on work queue */
 
-static qd_log_level_t qd_level(int lll) {
-    switch (lll) {
-    case LLL_ERR: return QD_LOG_ERROR;
-    case LLL_WARN: return QD_LOG_WARNING;
-    case LLL_NOTICE: return QD_LOG_INFO;
-    case LLL_INFO:return QD_LOG_DEBUG;
-    case LLL_DEBUG: return QD_LOG_TRACE;
-    default: return QD_LOG_NONE;
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li)
+{
+    sys_mutex_lock(hs->work.lock);
+    if (!hs->thread) {
+        hs->thread = sys_thread(http_thread_run, hs);
     }
+    bool ok = hs->thread;
+    sys_mutex_unlock(hs->work.lock);
+    if (!ok) return NULL;
+
+    qd_http_listener_t *hl = qd_http_listener(hs, li);
+    if (hl) {
+        work_t w = { W_LISTEN, hl };
+        work_push(hs, w);
+    }
+    return hl;
 }
 
-static void emit_lws_log(int lll, const char *line)  {
-    size_t  len = strlen(line);
-    while (len > 1 && isspace(line[len-1]))
-        --len;
-    qd_log(http_log, qd_level(lll), "%.*s", len, line);
-}
-
-qd_http_server_t *qd_http_server(qd_dispatch_t *d, qd_log_source_t *log) {
-    if (!http_log) http_log = qd_log_source("HTTP");
-    qd_http_server_t *s = calloc(1, sizeof(*s));
-    if (!s) return NULL;
-    s->log = log;
-    s->lock = sys_mutex();
-    s->dispatch = d;
-    int levels =
-        (qd_log_enabled(log, QD_LOG_ERROR) ? LLL_ERR : 0) |
-        (qd_log_enabled(log, QD_LOG_WARNING) ? LLL_WARN : 0) |
-        (qd_log_enabled(log, QD_LOG_INFO) ? LLL_NOTICE : 0) |
-        (qd_log_enabled(log, QD_LOG_DEBUG) ? LLL_INFO : 0) |
-        (qd_log_enabled(log, QD_LOG_TRACE) ? LLL_DEBUG : 0);
-    lws_set_log_level(levels, emit_lws_log);
-
-    struct lws_context_creation_info info = {0};
-    info.gid = info.uid = -1;
-    info.user = s;
-    info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
-    info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
-        LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
-        LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
-    info.max_http_header_pool = 32;
-    info.timeout_secs = 1;
-    s->context = lws_create_context(&info);
-    if (!s->context) {
-        free(s);
-        return NULL;
-    }
-    return s;
+void qd_http_listener_close(qd_http_listener_t *hl)
+{
+    work_t w = { W_CLOSE, hl };
+    work_push(hl->server, w);
 }
 
-void qd_http_server_free(qd_http_server_t *s) {
-    sys_mutex_free(s->lock);
-    lws_context_destroy(s->context);
-    if (s->timer) qd_timer_free(s->timer);
-    if (s->fd) free(s->fd);
-    free(s);
+static qd_http_server_t *wsi_server(struct lws *wsi) {
+    return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
 }
 
-qd_http_listener_t *qd_http_listener(qd_http_server_t *s, const qd_server_config_t *config) {
-    qd_http_listener_t *hl = calloc(1, sizeof(*hl));
-    if (!hl) return NULL;
-    hl->server = s;
-
-    struct lws_context_creation_info info = {0};
-
-    struct lws_http_mount *m = &hl->mount;
-    m->mountpoint = "/";    /* URL mount point */
-    m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
-    m->origin = (config->http_root && *config->http_root) ? /* File system root */
-        config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
-    m->def = "index.html";  /* Default file name */
-    m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
-    info.mounts = m;
-    info.port = CONTEXT_PORT_NO_LISTEN_SERVER; /* Don't use LWS listener */
-    info.protocols = protocols;
-    info.keepalive_timeout = 1;
-    info.ssl_cipher_list = CIPHER_LIST;
-    info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
-    if (config->ssl_profile) {
-        info.ssl_cert_filepath = config->ssl_certificate_file;
-        info.ssl_private_key_filepath = config->ssl_private_key_file;
-        info.ssl_private_key_password = config->ssl_password;
-        info.ssl_ca_filepath = config->ssl_trusted_certificates;
-        info.options |=
-            LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
-            (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
-            (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
-    }
-    snprintf(hl->name, sizeof(hl->name), "vhost%x", s->vhost_id++);
-    info.vhost_name = hl->name;
-    hl->vhost = lws_create_vhost(s->context, &info);
-    if (!hl->vhost) {
-        free(hl);
-        return NULL;
+static qd_http_listener_t *wsi_listener(struct lws *wsi) {
+    qd_http_listener_t *hl = NULL;
+    struct lws_vhost *vhost = lws_get_vhost(wsi);
+    if (vhost) {                /* Get qd_http_listener from vhost data */
+        void *vp = lws_protocol_vh_priv_get(vhost, &protocols[0]);
+        memcpy(&hl, vp, sizeof(hl));
     }
     return hl;
 }
 
-void qd_http_listener_free(qd_http_listener_t *hl) {
-    free(hl);
+static qd_log_source_t *wsi_log(struct lws *wsi) {
+    return wsi_server(wsi)->log;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http-none.c
----------------------------------------------------------------------
diff --git a/src/http-none.c b/src/http-none.c
index a8953e5..57869d5 100644
--- a/src/http-none.c
+++ b/src/http-none.c
@@ -20,29 +20,21 @@
 #include <qpid/dispatch/log.h>
 #include "http.h"
 
+struct qd_dispatch_t;
+
 /* No HTTP implementation available. */
 
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *d, qd_log_source_t *log)
+qd_http_server_t *qd_http_server(struct qd_server_t *s, qd_log_source_t *log)
 {
     qd_log(log, QD_LOG_WARNING, "HTTP support is not available");
     return 0;
 }
 
-void qd_http_server_free(qd_http_server_t *h)
-{
-}
+void qd_http_server_free(qd_http_server_t *h) {}
 
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
-                                     const struct qd_server_config_t *config)
-{
-    return 0;
-}
+void* qd_http_server_run(void* qd_http_server) { return 0; }
+
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li) { return 0; }
 
-void qd_http_listener_free(qd_http_listener_t *hl)
-{
-}
 
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c)
-{
-}
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http.h
----------------------------------------------------------------------
diff --git a/src/http.h b/src/http.h
index fae3f1d..a169998 100644
--- a/src/http.h
+++ b/src/http.h
@@ -23,17 +23,18 @@
 typedef struct qd_http_listener_t qd_http_listener_t;
 typedef struct qd_http_server_t qd_http_server_t;
 
-struct qd_dispatch_t;
-struct qd_log_source_t;
+struct qd_server_t;
 struct qd_server_config_t;
-struct qdpn_connector_t;
+struct qd_listener_t;
+struct qd_log_source_t;
 
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *dispatch, struct qd_log_source_t *log);
+/* Create a HTTP server */
+qd_http_server_t *qd_http_server(struct qd_server_t *server, struct qd_log_source_t *log);
+
+/* Free the HTTP server */
 void qd_http_server_free(qd_http_server_t*);
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
-                                     const struct qd_server_config_t *config);
-void qd_http_listener_free(qd_http_listener_t *hl);
-/* On error, qdpn_connector_closed(c) is true. */
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c);
+
+/* Listening for HTTP, thread safe. */
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li);
 
 #endif // QD_HTTP_H

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index a55f245..2ddbf94 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -396,7 +396,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
     pn_connection_t *conn = qd_connection_pn(qd_conn);
     qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server);
     qd_policy_t *policy = qd->policy;
-    const char *hostip = qd_connection_hostip(qd_conn);
+    const char *hostip = qd_connection_remote_ip(qd_conn);
     const char *vhost = pn_connection_remote_hostname(conn);
     if (result) {
         qd_log(policy->log_source,
@@ -567,7 +567,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con
 
 bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
 {
-    const char *hostip = qd_connection_hostip(qd_conn);
+    const char *hostip = qd_connection_remote_ip(qd_conn);
     const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn));
 
     if (qd_conn->policy_settings->maxSenders) {
@@ -618,7 +618,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
 
 bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
 {
-    const char *hostip = qd_connection_hostip(qd_conn);
+    const char *hostip = qd_connection_remote_ip(qd_conn);
     const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn));
 
     if (qd_conn->policy_settings->maxReceivers) {
@@ -683,7 +683,7 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) {
     if (policy->enableVhostPolicy) {
         // Open connection or not based on policy.
         pn_transport_t *pn_trans = pn_connection_transport(conn);
-        const char *hostip = qd_connection_hostip(qd_conn);
+        const char *hostip = qd_connection_remote_ip(qd_conn);
         const char *pcrh = pn_connection_remote_hostname(conn);
         const char *vhost = (pcrh ? pcrh : "");
         const char *conn_name = qd_connection_name(qd_conn);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index e7fa34d..066c862 100644
--- a/src/server.c
+++ b/src/server.c
@@ -475,9 +475,14 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c
     pn_data_exit(pn_connection_properties(conn));
 }
 
+/* Wake function for proactor-manaed connections */
+static void connection_wake(qd_connection_t *ctx) {
+    if (ctx->pn_conn) pn_connection_wake(ctx->pn_conn);
+}
 
-/* Construct a new qd_connection. */
-static qd_connection_t *qd_connection(qd_server_t *server, qd_server_config_t *config) {
+/* Construct a new qd_connection. Thread safe. */
+qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *config)
+{
     qd_connection_t *ctx = new_qd_connection_t();
     if (!ctx) return NULL;
     ZERO(ctx);
@@ -491,6 +496,7 @@ static qd_connection_t *qd_connection(qd_server_t *server, qd_server_config_t *c
         return NULL;
     }
     ctx->server = server;
+    ctx->wake = connection_wake; /* Default, over-ridden for HTTP connections */
     pn_connection_set_context(ctx->pn_conn, ctx);
     DEQ_ITEM_INIT(ctx);
     DEQ_INIT(ctx->deferred_calls);
@@ -508,7 +514,7 @@ static void on_accept(pn_event_t *e)
     assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
     pn_listener_t *pn_listener = pn_event_listener(e);
     qd_listener_t *listener = pn_listener_get_context(pn_listener);
-    qd_connection_t *ctx = qd_connection(listener->server, &listener->config);
+    qd_connection_t *ctx = qd_server_connection(listener->server, &listener->config);
     if (!ctx) {
         qd_log(listener->server->log_source, QD_LOG_CRITICAL,
                "Allocation failure during accept to %s", listener->config.host_port);
@@ -549,25 +555,19 @@ void connect_fail(qd_connection_t *ctx, const char *name, const char *descriptio
 
 
 /* Get the host IP address for the remote end */
-static int set_remote_host_port(qd_connection_t *ctx) {
+static void set_rhost_port(qd_connection_t *ctx) {
     pn_transport_t *tport  = pn_connection_transport(ctx->pn_conn);
-    const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport));
-    int err = 0;
-    if (!addr) {
-        err = -1;
-        qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
-    } else {
+    const struct sockaddr_storage* addr =
+        pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport));
+    if (addr) {
         char rport[NI_MAXSERV] = "";
         int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr),
                               ctx->rhost, sizeof(ctx->rhost), rport, sizeof(rport),
                               NI_NUMERICHOST | NI_NUMERICSERV);
         if (!err) {
             snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s", ctx->rhost, rport);
-        } else {
-            qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
         }
     }
-    return err;
 }
 
 
@@ -581,7 +581,6 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
     //
     // Proton pushes out its trace to transport_tracer() which in turn writes a trace
     // message to the qdrouter log If trace level logging is enabled on the router set
-    // PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
     //
     if (qd_log_enabled(ctx->server->log_source, QD_LOG_TRACE)) {
         pn_transport_trace(tport, PN_TRACE_FRM);
@@ -593,9 +592,9 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
         config = &ctx->listener->config;
         const char *name = config->host_port;
         pn_transport_set_server(tport);
+        set_rhost_port(ctx);
 
-        if (set_remote_host_port(ctx) == 0 &&
-            qd_policy_socket_accept(server->qd->policy, ctx->rhost))
+        if (qd_policy_socket_accept(server->qd->policy, ctx->rhost))
         {
             ctx->policy_counted = true;
         } else {
@@ -667,10 +666,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event);
 
 static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
     qd_log_source_t *log = qd_server->log_source;
-
-    /* FIXME aconway 2017-02-20: HTTP support */
     qd_listener_t *li = (qd_listener_t*) pn_listener_get_context(pn_event_listener(e));
     const char *host_port = li->config.host_port;
+
     switch (pn_event_type(e)) {
 
     case PN_LISTENER_OPEN:
@@ -705,7 +703,7 @@ static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
 }
 
 
-static void qd_connection_free(qd_connection_t *ctx)
+void qd_connection_free(qd_connection_t *ctx)
 {
     qd_server_t *qd_server = ctx->server;
 
@@ -751,7 +749,7 @@ static void qd_connection_free(qd_connection_t *ctx)
 /* Events involving a connection or listener are serialized by the proactor so
  * only one event per connection / listener will be processed at a time.
  */
-static bool handle(pn_event_t *e, qd_server_t *qd_server) {
+static bool handle(qd_server_t *qd_server, pn_event_t *e) {
     pn_connection_t *pn_conn = pn_event_connection(e);
     qd_connection_t *ctx  = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL;
 
@@ -803,7 +801,7 @@ static bool handle(pn_event_t *e, qd_server_t *qd_server) {
     /* TODO aconway 2017-04-18: fold the container handler into the server */
     qd_container_handle_event(qd_server->container, e);
 
-    /* Free the connection after all other processing */
+    /* Free the connection after all other processing is complete */
     if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) {
         pn_connection_set_context(pn_conn, NULL);
         qd_connection_free(ctx);
@@ -819,7 +817,7 @@ static void *thread_run(void *arg)
         pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor);
         pn_event_t * e;
         while (running && (e = pn_event_batch_next(events))) {
-            running = handle(e, qd_server);
+            running = handle(qd_server, e);
         }
         pn_proactor_done(qd_server->proactor, events);
     }
@@ -836,7 +834,7 @@ static void try_open_lh(qd_connector_t *ct)
         return;
     }
 
-    qd_connection_t *ctx = qd_connection(ct->server, &ct->config);
+    qd_connection_t *ctx = qd_server_connection(ct->server, &ct->config);
     if (!ctx) {                 /* Try again later */
         qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure connecting to %s",
                ct->config.host_port);
@@ -987,7 +985,7 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
     qd_server->next_connection_id     = 1;
     qd_server->py_displayname_obj     = 0;
 
-    /* FIXME aconway 2017-01-20: restore HTTP support */
+    qd_server->http = qd_http_server(qd_server, qd_server->log_source);
 
     qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
 
@@ -1052,11 +1050,10 @@ void qd_server_stop(qd_dispatch_t *qd)
 
 void qd_server_activate(qd_connection_t *ctx)
 {
-    if (!ctx || !ctx->pn_conn)
-        return;
-    pn_connection_wake(ctx->pn_conn);
+    if (ctx) ctx->wake(ctx);
 }
 
+
 void qd_connection_set_context(qd_connection_t *conn, void *context)
 {
     conn->user_context = context;
@@ -1133,28 +1130,44 @@ qd_listener_t *qd_server_listener(qd_server_t *server)
     qd_listener_t *li = new_qd_listener_t();
     if (!li) return 0;
     ZERO(li);
-
     sys_atomic_init(&li->ref_count, 1);
     li->server      = server;
     li->http = NULL;
     return li;
 }
 
-
-bool qd_listener_listen(qd_listener_t *li) {
-    if (!li->pn_listener) {      /* Not already listening */
-        li->pn_listener = pn_listener();
-        if (!li->pn_listener) {
-            qd_log(li->server->log_source, QD_LOG_ERROR, "No memory listening on %s",
-                   li->config.host_port);
-            return false;
-        }
+static bool qd_listener_listen_pn(qd_listener_t *li) {
+   li->pn_listener = pn_listener();
+    if (li->pn_listener) {
         pn_listener_set_context(li->pn_listener, li);
-        /* Listen is asynchronous, log listening on PN_LISTENER_OPEN */
-        sys_atomic_inc(&li->ref_count);
-        pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, BACKLOG);
+        pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port,
+                           BACKLOG);
+        sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
+        /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
+    } else {
+        qd_log(li->server->log_source, QD_LOG_CRITICAL, "No memory listening on %s",
+               li->config.host_port);
+     }
+    return li->pn_listener;
+}
+
+static bool qd_listener_listen_http(qd_listener_t *li) {
+    if (li->server->http) {
+        /* qd_http_listener holds a reference to li, will decref when closed */
+        qd_http_server_listen(li->server->http, li);
+        return li->http;
+    } else {
+        qd_log(li->server->log_source, QD_LOG_ERROR, "No HTTP support to listen on %s",
+               li->config.host_port);
+        return false;
     }
-    return true;
+}
+
+
+bool qd_listener_listen(qd_listener_t *li) {
+    if (li->pn_listener || li->http) /* Already listening */
+        return true;
+    return li->config.http ? qd_listener_listen_http(li) : qd_listener_listen_pn(li);
 }
 
 
@@ -1162,7 +1175,6 @@ void qd_listener_decref(qd_listener_t* li)
 {
     if (li && sys_atomic_dec(&li->ref_count) == 1) {
         qd_server_config_free(&li->config);
-        if (li->http) qd_http_listener_free(li->http);
         free_qd_listener_t(li);
     }
 }
@@ -1238,6 +1250,11 @@ qd_http_listener_t *qd_listener_http(qd_listener_t *li) {
     return li->http;
 }
 
-const char* qd_connection_hostip(const qd_connection_t *c) {
+const char* qd_connection_remote_ip(const qd_connection_t *c) {
     return c->rhost;
 }
+
+/* Expose event handling for HTTP connections */
+void qd_connection_handle(qd_connection_t *c, pn_event_t *e) {
+    handle(c->server, e);
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index a6543fa..6e6f1c3 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -39,16 +39,19 @@
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
 void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
 
-const char* qd_connection_name(const qd_connection_t *c);
+qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config);
+void qd_connection_free(qd_connection_t* conn);
+
 qd_connector_t* qd_connection_connector(const qd_connection_t *c);
-const char* qd_connection_hostip(const qd_connection_t *c);
 
-const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
+void qd_connection_handle(qd_connection_t *c, pn_event_t *e);
 
-qd_http_listener_t *qd_listener_http(qd_listener_t *l);
+
+const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
 
 qd_listener_t *qd_server_listener(qd_server_t *server);
 qd_connector_t *qd_server_connector(qd_server_t *server);
+
 void qd_connector_decref(qd_connector_t* ct);
 void qd_listener_decref(qd_listener_t* ct);
 void qd_server_config_free(qd_server_config_t *cf);
@@ -136,7 +139,7 @@ struct qd_connection_t {
     pn_ssl_t                 *ssl;
     qd_listener_t            *listener;
     qd_connector_t           *connector;
-    void                     *context; // Copy of context from listener or connector
+    void                     *context; // context from listener or connector
     void                     *user_context;
     void                     *link_context; // Context shared by this connection's links
     uint64_t                  connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky.
@@ -152,6 +155,7 @@ struct qd_connection_t {
     bool                      policy_counted;
     char                     *role;  //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
     qd_pn_free_link_session_list_t  free_link_session_list;
+    void (*wake)(qd_connection_t*); /* Wake method, different for HTTP vs. proactor */
     char rhost[NI_MAXHOST];     /* Remote host numeric IP for incoming connections */
     char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */
 };

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/tests/system_tests_http.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index 5627277..3d13c32 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -50,6 +50,14 @@ class RouterTestHttp(TestCase):
     def assert_get_cert(self, url):
         self.assertEqual("HTTP test\n", self.get_cert("%s/system_tests_http.txt" % url))
 
+    def test_listen_error(self):
+        """Make sure a router exits if an initial HTTP listener fails, doesn't hang"""
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'bad'}),
+            ('listener', {'port': 80, 'http':True})])
+        r = Qdrouterd(name="expect_fail", config=config, wait=False);
+        self.assertEqual(1, r.wait())
+
     def test_http_get(self):
         config = Qdrouterd.Config([
             ('router', {'id': 'QDR.HTTP'}),

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 0a982c7..483d5e2 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -57,6 +57,14 @@ class RouterTest(TestCase):
         cls.router.wait_ready()
         cls.address = cls.router.addresses[0]
 
+    def test_listen_error(self):
+        """Make sure a router exits if a initial listener fails, doesn't hang"""
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'bad'}),
+            ('listener', {'port': 80})])
+        r = Qdrouterd(name="expect_fail", config=config, wait=False);
+        self.assertEqual(1, r.wait())
+
     def test_01_pre_settled(self):
         addr = self.address+"/pre_settled/1"
         M1 = self.messenger()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org