You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/04/27 17:38:34 UTC
[09/10] qpid-dispatch git commit: DISPATCH-390: Restore HTTP using
libwebsockets
DISPATCH-390: Restore HTTP using libwebsockets
- single HTTP thread uses libwebsockets standard polling features
- works with released libwebsockets, no patches required
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/16980f67
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/16980f67
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/16980f67
Branch: refs/heads/master
Commit: 16980f6703f0b2b03a320c89cad60cdf669770d7
Parents: 20b06ac
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Apr 12 19:29:14 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 27 13:31:36 2017 -0400
----------------------------------------------------------------------
CMakeLists.txt | 5 +-
cmake/FindLibWebSockets.cmake | 20 +-
include/qpid/dispatch/server.h | 11 +
src/connection_manager.c | 3 +-
src/container.c | 2 +-
src/http-libwebsockets.c | 796 ++++++++++++++++++++--------------
src/http-none.c | 22 +-
src/http.h | 19 +-
src/policy.c | 8 +-
src/server.c | 103 +++--
src/server_private.h | 14 +-
tests/system_tests_http.py | 8 +
tests/system_tests_one_router.py | 8 +
13 files changed, 584 insertions(+), 435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a06c67f..46f651e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -103,11 +103,8 @@ find_library(rt_lib rt)
find_package(Proton 0.15 REQUIRED)
## Optional dependencies
-
include(FindLibWebSockets)
-# FIXME aconway 2017-01-19: websockets disbled for temporary proactor work.
-# option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND})
-set(USE_LIBWEBSOCKETS OFF)
+option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND})
##
## Find Valgrind
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/cmake/FindLibWebSockets.cmake
----------------------------------------------------------------------
diff --git a/cmake/FindLibWebSockets.cmake b/cmake/FindLibWebSockets.cmake
index 18ef18a..a15d0d2 100644
--- a/cmake/FindLibWebSockets.cmake
+++ b/cmake/FindLibWebSockets.cmake
@@ -44,22 +44,10 @@ find_path(LIBWEBSOCKETS_INCLUDE_DIRS
include(FindPackageHandleStandardArgs)
-find_package_handle_standard_args(LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
-
-if(LIBWEBSOCKETS_FOUND)
- # For the moment we need a patched version of LibWebSockets:
- # https://github.com/alanconway/libwebsockets/tree/v2.1-stable-aconway-adopt-ssl
- # This function check verifies we have it.
- set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS})
- set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES})
- check_function_exists(lws_adopt_socket_vhost LWS_ADOPT_SOCKET_VHOST_FOUND)
- if (NOT LWS_ADOPT_SOCKET_VHOST_FOUND)
- message("Cannot use LibWebSockets, no function lws_adopt_socket_vhost")
- unset(LIBWEBSOCKETS_FOUND)
- endif()
-endif()
+find_package_handle_standard_args(
+ LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
if(NOT LIBWEBSOCKETS_FOUND)
- set(LIBWEBSOCKETS_LIBRARIES "")
- set(LIBWEBSOCKETS_INCLUDE_DIRS "")
+ unset(LIBWEBSOCKETS_LIBRARIES)
+ unset(LIBWEBSOCKETS_INCLUDE_DIRS)
endif()
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index ec885ae..eb5214b 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -519,6 +519,17 @@ bool qd_connector_connect(qd_connector_t *ct);
qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *display_name_service);
/**
+ * Get the name of the connection, based on its IP address.
+ */
+const char* qd_connection_name(const qd_connection_t *c);
+
+
+/**
+ * Get the remote host IP address of the connection.
+ */
+const char* qd_connection_remote_ip(const qd_connection_t *c);
+
+/**
* @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 4bc18ce..fefc961 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -607,8 +607,7 @@ void qd_connection_manager_start(qd_dispatch_t *qd)
while (li) {
if (!li->pn_listener) {
- qd_listener_listen(li);
- if (!li->pn_listener && first_start) {
+ if (!qd_listener_listen(li) && first_start) {
qd_log(qd->connection_manager->log_source, QD_LOG_CRITICAL,
"Listen on %s failed during initial config", li->config.host_port);
exit(1);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index ddc0418..639ffa2 100644
--- a/src/container.c
+++ b/src/container.c
@@ -375,7 +375,7 @@ static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_ses
/*
- * FIXME aconway 2017-04-12: IMO this should not be necessary, we should
+ * TODO aconway 2017-04-12: IMO this should not be necessary, we should
* be able to pn_*_free links and sessions directly the handler function.
* They will not actually be freed from memory till the event, connection,
* proactor etc. have all released their references.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 745b090..9f6fbe8 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,10 +19,11 @@
#include <qpid/dispatch/atomic.h>
#include <qpid/dispatch/amqp.h>
-#include <qpid/dispatch/driver.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
+#include <proton/connection_driver.h>
+
#include <libwebsockets.h>
#include <assert.h>
@@ -33,432 +34,555 @@
#include "server_private.h"
#include "config.h"
+static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */
+
+/* Log for LWS messages. For dispatch server messages use qd_http_server_t::log */
static qd_log_source_t* http_log;
-static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH";
+static qd_log_level_t qd_level(int lll) {
+ switch (lll) {
+ case LLL_ERR: return QD_LOG_ERROR;
+ case LLL_WARN: return QD_LOG_WARNING;
+ /* LWS is noisy compared to dispatch on the informative levels, downgrade */
+ case LLL_NOTICE: return QD_LOG_DEBUG;
+ default: return QD_LOG_TRACE; /* Everything else to trace */
+ }
+}
+
+static void logger(int lll, const char *line) {
+ size_t len = strlen(line);
+ while (len > 1 && isspace(line[len-1])) { /* Strip trailing newline */
+ --len;
+ }
+ qd_log(http_log, qd_level(lll), "%.*s", len, line);
+}
+
+static void log_init() {
+ http_log = qd_log_source("HTTP");
+ int levels = 0;
+ for (int i = 0; i < LLL_COUNT; ++i) {
+ int lll = 1<<i;
+ levels |= qd_log_enabled(http_log, qd_level(lll)) ? lll : 0;
+ }
+ lws_set_log_level(levels, logger);
+}
+
+/* Intermediate write buffer: LWS needs extra header space on write. */
+typedef struct buffer_t {
+ char *start;
+ size_t size, cap;
+} buffer_t;
+
+/* Ensure size bytes in buffer, make buf empty if alloc fails */
+static void buffer_set_size(buffer_t *buf, size_t size) {
+ if (size > buf->cap) {
+ buf->cap = (size > buf->cap * 2) ? size : buf->cap * 2;
+ buf->start = realloc(buf->start, buf->cap);
+ }
+ if (buf->start) {
+ buf->size = size;
+ } else {
+ buf->size = buf->cap = 0;
+ }
+}
-/* Associate file-descriptors, LWS instances and qdpn_connectors */
-typedef struct fd_data_t {
- qdpn_connector_t *connector;
+/* AMQPWS connection: set as lws user data and qd_conn->context */
+typedef struct connection_t {
+ pn_connection_driver_t driver;
+ qd_connection_t* qd_conn;
+ buffer_t wbuf; /* LWS requires allocated header space at start of buffer */
struct lws *wsi;
-} fd_data_t;
+} connection_t;
-/* HTTP server state shared by all listeners */
+/* Navigating from WSI pointer to qd objects */
+static qd_http_server_t *wsi_server(struct lws *wsi);
+static qd_http_listener_t *wsi_listener(struct lws *wsi);
+static qd_log_source_t *wsi_log(struct lws *wsi);
+
+
+/* Declare LWS callbacks and protocol list */
+static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
+static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
+
+static struct lws_protocols protocols[] = {
+ /* HTTP only protocol comes first */
+ {
+ "http-only",
+ callback_http,
+ 0,
+ },
+ /* "amqp" is the official oasis AMQP over WebSocket protocol name */
+ {
+ "amqp",
+ callback_amqpws,
+ sizeof(connection_t),
+ },
+ /* "binary" is an alias for "amqp", for compatibility with clients designed
+ * to work with a WebSocket proxy
+ */
+ {
+ "binary",
+ callback_amqpws,
+ sizeof(connection_t),
+ },
+ { NULL, NULL, 0, 0 } /* terminator */
+};
+
+
+static inline int unexpected_close(struct lws *wsi, const char *msg) {
+ lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION,
+ (unsigned char*)msg, strlen(msg));
+ char peer[64];
+ lws_get_peer_simple(wsi, peer, sizeof(peer));
+ qd_log(wsi_log(wsi), QD_LOG_ERROR, "Error on HTTP connection from %s: %s", peer, msg);
+ return -1;
+}
+
+static int handle_events(connection_t* c) {
+ if (!c->qd_conn) {
+ return unexpected_close(c->wsi, "not-established");
+ }
+ pn_event_t *e;
+ while ((e = pn_connection_driver_next_event(&c->driver))) {
+ qd_connection_handle(c->qd_conn, e);
+ }
+ if (pn_connection_driver_write_buffer(&c->driver).size) {
+ lws_callback_on_writable(c->wsi);
+ }
+ if (pn_connection_driver_finished(&c->driver)) {
+ lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
+ return -1;
+ }
+ return 0;
+}
+
+/* The server has a bounded, thread-safe queue for external work */
+typedef struct work_t {
+ enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+ void *value;
+} work_t;
+
+#define WORK_MAX 8 /* Just decouple threads, not a big buffer */
+
+typedef struct work_queue_t {
+ sys_mutex_t *lock;
+ sys_cond_t *cond;
+ work_t work[WORK_MAX];
+ size_t head, len; /* Ring buffer */
+} work_queue_t;
+
+/* HTTP Server runs in a single thread, communication from other threads via work_queue */
struct qd_http_server_t {
- qd_dispatch_t *dispatch;
+ qd_server_t *server;
+ sys_thread_t *thread;
+ work_queue_t work;
qd_log_source_t *log;
- sys_mutex_t *lock; /* For now use LWS as a thread-unsafe library. */
struct lws_context *context;
- qd_timer_t *timer;
- int vhost_id; /* unique identifier for vhost name */
- fd_data_t *fd; /* indexed by file descriptor */
- size_t fd_len;
+ pn_timestamp_t now; /* Cache current time in thread_run */
+ pn_timestamp_t next_tick; /* Next requested tick service */
};
-/* Per-HTTP-listener */
+static void work_queue_destroy(work_queue_t *wq) {
+ if (wq->lock) sys_mutex_free(wq->lock);
+ if (wq->cond) sys_cond_free(wq->cond);
+}
+
+static void work_queue_init(work_queue_t *wq) {
+ wq->lock = sys_mutex();
+ wq->cond = sys_cond();
+}
+
+ /* Block till there is space */
+static void work_push(qd_http_server_t *hs, work_t w) {
+ work_queue_t *wq = &hs->work;
+ sys_mutex_lock(wq->lock);
+ while (wq->len == WORK_MAX) {
+ lws_cancel_service(hs->context); /* Wake up the run thread to clear space */
+ sys_cond_wait(wq->cond, wq->lock);
+ }
+ wq->work[(wq->head + wq->len) % WORK_MAX] = w;
+ ++wq->len;
+ sys_mutex_unlock(wq->lock);
+ lws_cancel_service(hs->context); /* Wake up the run thread to handle my work */
+}
+
+/* Non-blocking, return { W_NONE, NULL } if empty */
+static work_t work_pop(qd_http_server_t *hs) {
+ work_t w = { W_NONE, NULL };
+ work_queue_t *wq = &hs->work;
+ sys_mutex_lock(wq->lock);
+ if (wq->len > 0) {
+ w = wq->work[wq->head];
+ wq->head = (wq->head + 1) % WORK_MAX;
+ --wq->len;
+ sys_cond_signal(wq->cond);
+ }
+ sys_mutex_unlock(wq->lock);
+ return w;
+}
+
+/* Each qd_http_listener_t is associated with an lws_vhost */
struct qd_http_listener_t {
+ qd_listener_t *listener;
qd_http_server_t *server;
struct lws_vhost *vhost;
struct lws_http_mount mount;
- char name[256]; /* vhost name */
};
-/* Get wsi/connector associated with fd or NULL if nothing on record. */
-static inline fd_data_t *fd_data(qd_http_server_t *s, int fd) {
- fd_data_t *d = (fd < s->fd_len) ? &s->fd[fd] : NULL;
- return (d && (d->connector || d->wsi)) ? d : NULL;
+void qd_http_listener_free(qd_http_listener_t *hl) {
+ if (!hl) return;
+ if (hl->listener) {
+ hl->listener->http = NULL;
+ qd_listener_decref(hl->listener);
+ }
+ free(hl);
}
-static inline qd_http_server_t *wsi_http_server(struct lws *wsi) {
- return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
+static qd_http_listener_t *qd_http_listener(qd_http_server_t *hs, qd_listener_t *li) {
+ qd_http_listener_t *hl = calloc(1, sizeof(*hl));
+ if (hl) {
+ hl->server = hs;
+ hl->listener = li;
+ li->http = hl;
+ sys_atomic_inc(&li->ref_count); /* Keep it around till qd_http_server_free() */
+ } else {
+ qd_log(hs->log, QD_LOG_CRITICAL, "No memory for HTTP listen on %s",
+ li->config.host_port);
+ }
+ return hl;
}
-static inline qdpn_connector_t *wsi_connector(struct lws *wsi) {
- fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
- return d ? d->connector : NULL;
-}
+static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
+ log_init(); /* Update log flags at each listener */
-static inline fd_data_t *set_fd(qd_http_server_t *s, int fd, qdpn_connector_t *c, struct lws *wsi) {
- if (!s->fd || fd >= s->fd_len) {
- size_t oldlen = s->fd_len;
- s->fd_len = fd + 16; /* Don't double, low-range FDs will be re-used first. */
- void *newfds = realloc(s->fd, s->fd_len*sizeof(*s->fd));
- if (!newfds) return NULL;
- s->fd = newfds;
- memset(s->fd + oldlen, 0, sizeof(*s->fd)*(s->fd_len - oldlen));
- }
- fd_data_t *d = &s->fd[fd];
- d->connector = c;
- d->wsi = wsi;
- return d;
-}
+ qd_server_config_t *config = &hl->listener->config;
-/* Push read data into the transport.
- * Return 0 on success, number of bytes un-pushed on failure.
- */
-static int transport_push(pn_transport_t *t, pn_bytes_t buf) {
- ssize_t cap;
- while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) {
- if (buf.size > cap) {
- pn_transport_push(t, buf.start, cap);
- buf.start += cap;
- buf.size -= cap;
- } else {
- pn_transport_push(t, buf.start, buf.size);
- buf.size = 0;
- }
+ int port = qd_port_int(config->port);
+ if (port < 0) {
+ qd_log(hs->log, QD_LOG_ERROR, "HTTP listener %s has invalid port %s",
+ config->host_port, config->port);
+ goto error;
}
- return buf.size;
-}
+ struct lws_http_mount *m = &hl->mount;
+ m->mountpoint = "/"; /* URL mount point */
+ m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
+ m->origin = (config->http_root && *config->http_root) ? /* File system root */
+ config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
+ m->def = "index.html"; /* Default file name */
+ m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
-static inline int normal_close(struct lws *wsi, const char *msg) {
- lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
- return -1;
+ struct lws_context_creation_info info = {0};
+ info.mounts = m;
+ info.port = port;
+ info.protocols = protocols;
+ info.keepalive_timeout = 1;
+ info.ssl_cipher_list = CIPHER_LIST;
+ info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
+ if (config->ssl_profile) {
+ info.ssl_cert_filepath = config->ssl_certificate_file;
+ info.ssl_private_key_filepath = config->ssl_private_key_file;
+ info.ssl_private_key_password = config->ssl_password;
+ info.ssl_ca_filepath = config->ssl_trusted_certificates;
+ info.options |=
+ LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
+ (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
+ (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
+ }
+ info.vhost_name = hl->listener->config.host_port;
+ hl->vhost = lws_create_vhost(hs->context, &info);
+ if (hl->vhost) {
+ /* Store hl pointer in vhost */
+ void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl));
+ memcpy(vp, &hl, sizeof(hl));
+ qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", config->host_port);
+ return;
+ } else {
+ qd_log(hs->log, QD_LOG_NOTICE, "Error listening for HTTP on %s", config->host_port);
+ goto error;
+ }
+ return;
+
+ error:
+ if (hl->listener->exit_on_error) {
+ qd_log(hs->log, QD_LOG_CRITICAL, "Shutting down, required listener failed %s",
+ config->host_port);
+ exit(1);
+ }
+ qd_http_listener_free(hl);
}
-static inline int unexpected_close(struct lws *wsi, const char *msg) {
- lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg));
- return -1;
+static void listener_close(qd_http_listener_t *hl, qd_http_server_t *hs) {
+ /* TODO aconway 2017-04-13: can't easily stop listeners under libwebsockets */
+ qd_log(hs->log, QD_LOG_ERROR, "Cannot close HTTP listener %s",
+ hl->listener->config.host_port);
}
/*
- * Callback for un-promoted HTTP connections, and low-level external poll operations.
+ * LWS callback for un-promoted HTTP connections.
* Note main HTTP file serving is handled by the "mount" struct below.
- * Called with http lock held.
*/
static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
switch (reason) {
- case LWS_CALLBACK_HTTP: /* Called if file mount can't find the file */
- lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+ case LWS_CALLBACK_PROTOCOL_DESTROY:
+ qd_http_listener_free(wsi_listener(wsi));
return -1;
- case LWS_CALLBACK_ADD_POLL_FD: {
- /* Record WSI against FD here, the connector will be recorded when lws_service returns. */
- set_fd(wsi_http_server(wsi), lws_get_socket_fd(wsi), 0, wsi);
- break;
- }
- case LWS_CALLBACK_DEL_POLL_FD: {
- fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
- if (d) {
- /* Tell dispatch to forget this FD, but let LWS do the actual close() */
- if (d->connector) qdpn_connector_mark_closed(d->connector);
- memset(d, 0, sizeof(*d));
- }
- break;
- }
- case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
- struct lws_pollargs *p = (struct lws_pollargs*)in;
- qdpn_connector_t *c = wsi_connector(wsi);
- if (c) {
- if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
- if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
- }
- break;
+ case LWS_CALLBACK_HTTP: {
+ /* Called if file mount can't find the file */
+ lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+ return -1;
}
- /* NOTE: Not using LWS_CALLBACK_LOCK/UNLOCK_POLL as we are serializing all HTTP work for now. */
-
default:
- break;
+ return 0;
}
-
- return 0;
}
-/* Buffer to allocate extra header space required by LWS. */
-typedef struct buffer_t { char *start; size_t size; size_t cap; } buffer_t;
+/* Wake up a connection managed by the http server thread */
+static void connection_wake(qd_connection_t *qd_conn)
+{
+ connection_t *c = qd_conn->context;
+ if (c && qd_conn->listener->http) {
+ qd_http_server_t *hs = qd_conn->listener->http->server;
+ work_t w = { W_WAKE, c };
+ work_push(hs, w);
+ }
+}
-/* Callbacks for promoted AMQP over WS connections.
- * Called with http lock held.
- */
+/* Callbacks for promoted AMQP over WS connections. */
static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
- qdpn_connector_t *c = wsi_connector(wsi);
- pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
+ qd_http_server_t *hs = wsi_server(wsi);
+ connection_t *c = (connection_t*)user;
switch (reason) {
case LWS_CALLBACK_ESTABLISHED: {
- qd_log(wsi_http_server(wsi)->log, QD_LOG_DEBUG,
- "Upgraded incoming HTTP connection from %s[%"PRIu64"] to AMQP over WebSocket",
- qdpn_connector_name(c),
- qd_connection_connection_id((qd_connection_t*)qdpn_connector_context(c)));
- memset(user, 0, sizeof(buffer_t));
- break;
+ /* Upgrade accepted HTTP connection to AMQPWS */
+ memset(c, 0, sizeof(*c));
+ c->wsi = wsi;
+ qd_http_listener_t *hl = wsi_listener(wsi);
+ if (hl == NULL) {
+ return unexpected_close(c->wsi, "cannot-upgrade");
+ }
+ c->qd_conn = qd_server_connection(hs->server, &hl->listener->config);
+ if (c->qd_conn == NULL) {
+ return unexpected_close(c->wsi, "out-of-memory");
+ }
+ c->qd_conn->context = c;
+ c->qd_conn->wake = connection_wake;
+ c->qd_conn->listener = hl->listener;
+ lws_get_peer_simple(wsi, c->qd_conn->rhost, sizeof(c->qd_conn->rhost));
+ int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL);
+ if (err) {
+ return unexpected_close(c->wsi, pn_code(err));
+ }
+ strncpy(c->qd_conn->rhost_port, c->qd_conn->rhost, sizeof(c->qd_conn->rhost_port));
+ qd_log(hs->log, QD_LOG_DEBUG,
+ "[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
+ qd_connection_connection_id(c->qd_conn), qd_connection_name(c->qd_conn));
+ return handle_events(c);
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
- ssize_t size;
- if (!t || (size = pn_transport_pending(t)) < 0) {
- return normal_close(wsi, "write-closed");
- }
- if (size > 0) {
- const void *start = pn_transport_head(t);
- /* lws_write() demands LWS_PRE bytes of free space before the data */
- size_t tmpsize = size + LWS_PRE;
- buffer_t *wtmp = (buffer_t*)user;
- if (wtmp->start == NULL || wtmp->cap < tmpsize) {
- wtmp->start = realloc(wtmp->start, tmpsize);
- wtmp->size = wtmp->cap = tmpsize;
- }
- if (wtmp->start == NULL) {
- return unexpected_close(wsi, "out-of-memory");
+ if (handle_events(c)) return -1;
+ pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
+ if (dbuf.size) {
+ /* lws_write() demands LWS_PRE bytes of free space before the data,
+ * so we must copy from the driver's buffer to larger temporary wbuf
+ */
+ buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
+ if (c->wbuf.start == NULL) {
+ return unexpected_close(c->wsi, "out-of-memory");
}
- void *tmpstart = wtmp->start + LWS_PRE;
- memcpy(tmpstart, start, size);
- ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY);
+ unsigned char* buf = (unsigned char*)c->wbuf.start + LWS_PRE;
+ memcpy(buf, dbuf.start, dbuf.size);
+ ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
if (wrote < 0) {
- pn_transport_close_head(t);
- return normal_close(wsi, "write-error");
+ pn_connection_driver_write_close(&c->driver);
+ return unexpected_close(c->wsi, "write-error");
} else {
- pn_transport_pop(t, (size_t)wrote);
+ pn_connection_driver_write_done(&c->driver, wrote);
}
}
- break;
+ return handle_events(c);
}
case LWS_CALLBACK_RECEIVE: {
- if (!t || pn_transport_capacity(t) < 0) {
- return normal_close(wsi, "read-closed");
- }
- if (transport_push(t, pn_bytes(len, in))) {
- return unexpected_close(wsi, "read-overflow");
+ while (len > 0) {
+ if (handle_events(c)) return -1;
+ pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
+ if (dbuf.size == 0) {
+ return unexpected_close(c->wsi, "unexpected-data");
+ }
+ size_t copy = (len < dbuf.size) ? len : dbuf.size;
+ memcpy(dbuf.start, in, copy);
+ pn_connection_driver_read_done(&c->driver, copy);
+ len -= copy;
+ in = (char*)in + copy;
}
- break;
+ return handle_events(c);
}
- case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
- if (t) {
- pn_transport_close_tail(t);
+ case LWS_CALLBACK_USER: {
+ pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+ if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) {
+ hs->next_tick = next_tick;
}
+ return handle_events(c);
+ }
- case LWS_CALLBACK_CLOSED:
- break;
-
- default:
- break;
+ case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
+ pn_connection_driver_read_close(&c->driver);
+ return handle_events(c);
}
- return 0;
-}
-static void check_timer(void *void_http_server) {
- qd_http_server_t *s = (qd_http_server_t*)void_http_server;
- /* Run LWS global timer and forced-service checks. */
- sys_mutex_lock(s->lock);
- lws_service_fd(s->context, NULL);
- while (!lws_service_adjust_timeout(s->context, 1, 0)) {
- /* -1 timeout means just do forced service */
- lws_plat_service_tsi(s->context, -1, 0);
- }
- if (!s->timer) {
- s->timer = qd_timer(s->dispatch, check_timer, s);
- }
- sys_mutex_unlock(s->lock);
- /* Timer is locked using server lock. */
- qd_timer_cancel(s->timer);
- qd_timer_schedule(s->timer, 1000); /* LWS wants per-second wakeups */
-}
+ case LWS_CALLBACK_CLOSED: {
+ if (c->driver.transport) {
+ pn_connection_driver_close(&c->driver);
+ handle_events(c);
+ }
+ pn_connection_driver_destroy(&c->driver);
+ free(c->wbuf.start);
+ return -1;
+ }
-static qd_http_listener_t * qdpn_connector_http_listener(qdpn_connector_t* c) {
- qd_listener_t* ql = (qd_listener_t*)qdpn_listener_context(qdpn_connector_listener(c));
- return qd_listener_http(ql);
+ default:
+ return 0;
+ }
}
-static void http_connector_process(qdpn_connector_t *c) {
- qd_http_listener_t *hl = qdpn_connector_http_listener(c);
- qd_http_server_t *s = hl->server;
- sys_mutex_lock(s->lock);
- int fd = qdpn_connector_get_fd(c);
- fd_data_t *d = fd_data(s, fd);
- /* Make sure we are still tracking this fd, could have been closed by timer */
- if (d) {
- pn_transport_t *t = qdpn_connector_transport(c);
- int flags =
- (qdpn_connector_hangup(c) ? POLLHUP : 0) |
- (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) |
- (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0);
- struct lws_pollfd pfd = { fd, flags, flags };
- if (pn_transport_pending(t) > 0) {
- lws_callback_on_writable(d->wsi);
+#define DEFAULT_TICK 1000
+
+static void* http_thread_run(void* v) {
+ qd_http_server_t *hs = v;
+ qd_log(hs->log, QD_LOG_INFO, "HTTP server thread running");
+ int result = 0;
+ while(result >= 0) {
+ /* Send a USER event to run transport ticks, may decrease hs->next_tick. */
+ hs->now = qd_timer_now();
+ hs->next_tick = hs->now + DEFAULT_TICK;
+ lws_callback_all_protocol(hs->context, &protocols[1], LWS_CALLBACK_USER);
+ lws_callback_all_protocol(hs->context, &protocols[2], LWS_CALLBACK_USER);
+ pn_millis_t timeout = (hs->next_tick > hs->now) ? hs->next_tick - hs->now : 1;
+ result = lws_service(hs->context, timeout);
+
+ /* Process any work items on the queue */
+ for (work_t w = work_pop(hs); w.type != W_NONE; w = work_pop(hs)) {
+ switch (w.type) {
+ case W_NONE:
+ break;
+ case W_STOP:
+ result = -1;
+ break;
+ case W_LISTEN:
+ listener_start((qd_http_listener_t*)w.value, hs);
+ break;
+ case W_CLOSE:
+ listener_close((qd_http_listener_t*)w.value, hs);
+ break;
+ case W_WAKE: {
+ connection_t *c = w.value;
+ pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+ PN_CONNECTION_WAKE);
+ handle_events(c);
+ break;
+ }
+ }
}
- lws_service_fd(s->context, &pfd);
- d = fd_data(s, fd); /* We may have stopped tracking during service */
- if (pn_transport_capacity(t) > 0)
- qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
- if (pn_transport_pending(t) > 0 || (d && lws_partial_buffered(d->wsi)))
- qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
- pn_timestamp_t wake = pn_transport_tick(t, qdpn_now(NULL));
- if (wake) qdpn_connector_wakeup(c, wake);
- }
- sys_mutex_unlock(s->lock);
- check_timer(s); /* Make sure the timer is running */
+ }
+ qd_log(hs->log, QD_LOG_INFO, "HTTP server thread exit");
+ return NULL;
}
-/* Dispatch closes a connector because it is HUP, socket_error or transport_closed() */
-static void http_connector_close(qdpn_connector_t *c) {
- int fd = qdpn_connector_get_fd(c);
- qd_http_server_t *s = qdpn_connector_http_listener(c)->server;
- sys_mutex_lock(s->lock);
- fd_data_t *d = fd_data(s, fd);
- if (d) { /* Only if we are still tracking fd */
- /* Shutdown but let LWS do the close(), possibly in later timer */
- shutdown(qdpn_connector_get_fd(c), SHUT_RDWR);
- short flags = POLLIN|POLLOUT|POLLHUP;
- struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
- lws_service_fd(s->context, &pfd);
- qdpn_connector_mark_closed(c);
- memset(d, 0 , sizeof(*d));
- }
- sys_mutex_unlock(s->lock);
+void qd_http_server_free(qd_http_server_t *hs) {
+ if (!hs) return;
+ if (hs->thread) {
+ /* Thread safe, stop via work queue then clean up */
+ work_t work = { W_STOP, NULL };
+ work_push(hs, work);
+ sys_thread_join(hs->thread);
+ sys_thread_free(hs->thread);
+ hs->thread = NULL;
+ }
+ work_queue_destroy(&hs->work);
+ if (hs->context) lws_context_destroy(hs->context);
+ free(hs);
}
-static struct qdpn_connector_methods_t http_methods = {
- http_connector_process,
- http_connector_close
-};
-
-void qd_http_listener_accept(qd_http_listener_t *hl, qdpn_connector_t *c) {
- qd_http_server_t *s = hl->server;
- sys_mutex_lock(s->lock);
- int fd = qdpn_connector_get_fd(c);
- struct lws *wsi = lws_adopt_socket_vhost(hl->vhost, fd);
- fd_data_t *d = fd_data(s, fd);
- if (d) { /* FD was adopted by LWS, so dispatch must not close it */
- qdpn_connector_set_methods(c, &http_methods);
- if (wsi) d->connector = c;
- }
- sys_mutex_unlock(s->lock);
- if (!wsi) { /* accept failed, dispatch should forget the FD. */
- qdpn_connector_mark_closed(c);
+qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) {
+ log_init();
+ qd_http_server_t *hs = calloc(1, sizeof(*hs));
+ if (hs) {
+ work_queue_init(&hs->work);
+ struct lws_context_creation_info info = {0};
+ info.gid = info.uid = -1;
+ info.user = hs;
+ info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
+ info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
+ LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
+ LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ info.max_http_header_pool = 32;
+ info.timeout_secs = 1;
+
+ hs->context = lws_create_context(&info);
+ hs->server = s;
+ hs->log = log; /* For messages from this file */
+ if (!hs->context) {
+ qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
+ qd_http_server_free(hs);
+ hs = NULL;
+ }
}
+ return hs;
}
-static struct lws_protocols protocols[] = {
- /* HTTP only protocol comes first */
- {
- "http-only",
- callback_http,
- 0,
- },
- /* "amqp" is the official oasis AMQP over WebSocket protocol name */
- {
- "amqp",
- callback_amqpws,
- sizeof(buffer_t),
- },
- /* "binary" is an alias for "amqp", for compatibility with clients designed
- * to work with a WebSocket proxy
- */
- {
- "binary",
- callback_amqpws,
- sizeof(buffer_t),
- },
- { NULL, NULL, 0, 0 } /* terminator */
-};
+/* Thread safe calls that put items on work queue */
-static qd_log_level_t qd_level(int lll) {
- switch (lll) {
- case LLL_ERR: return QD_LOG_ERROR;
- case LLL_WARN: return QD_LOG_WARNING;
- case LLL_NOTICE: return QD_LOG_INFO;
- case LLL_INFO:return QD_LOG_DEBUG;
- case LLL_DEBUG: return QD_LOG_TRACE;
- default: return QD_LOG_NONE;
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li)
+{
+ sys_mutex_lock(hs->work.lock);
+ if (!hs->thread) {
+ hs->thread = sys_thread(http_thread_run, hs);
}
+ bool ok = hs->thread;
+ sys_mutex_unlock(hs->work.lock);
+ if (!ok) return NULL;
+
+ qd_http_listener_t *hl = qd_http_listener(hs, li);
+ if (hl) {
+ work_t w = { W_LISTEN, hl };
+ work_push(hs, w);
+ }
+ return hl;
}
-static void emit_lws_log(int lll, const char *line) {
- size_t len = strlen(line);
- while (len > 1 && isspace(line[len-1]))
- --len;
- qd_log(http_log, qd_level(lll), "%.*s", len, line);
-}
-
-qd_http_server_t *qd_http_server(qd_dispatch_t *d, qd_log_source_t *log) {
- if (!http_log) http_log = qd_log_source("HTTP");
- qd_http_server_t *s = calloc(1, sizeof(*s));
- if (!s) return NULL;
- s->log = log;
- s->lock = sys_mutex();
- s->dispatch = d;
- int levels =
- (qd_log_enabled(log, QD_LOG_ERROR) ? LLL_ERR : 0) |
- (qd_log_enabled(log, QD_LOG_WARNING) ? LLL_WARN : 0) |
- (qd_log_enabled(log, QD_LOG_INFO) ? LLL_NOTICE : 0) |
- (qd_log_enabled(log, QD_LOG_DEBUG) ? LLL_INFO : 0) |
- (qd_log_enabled(log, QD_LOG_TRACE) ? LLL_DEBUG : 0);
- lws_set_log_level(levels, emit_lws_log);
-
- struct lws_context_creation_info info = {0};
- info.gid = info.uid = -1;
- info.user = s;
- info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
- info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
- LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
- LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- info.max_http_header_pool = 32;
- info.timeout_secs = 1;
- s->context = lws_create_context(&info);
- if (!s->context) {
- free(s);
- return NULL;
- }
- return s;
+void qd_http_listener_close(qd_http_listener_t *hl)
+{
+ work_t w = { W_CLOSE, hl };
+ work_push(hl->server, w);
}
-void qd_http_server_free(qd_http_server_t *s) {
- sys_mutex_free(s->lock);
- lws_context_destroy(s->context);
- if (s->timer) qd_timer_free(s->timer);
- if (s->fd) free(s->fd);
- free(s);
+static qd_http_server_t *wsi_server(struct lws *wsi) {
+ return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
}
-qd_http_listener_t *qd_http_listener(qd_http_server_t *s, const qd_server_config_t *config) {
- qd_http_listener_t *hl = calloc(1, sizeof(*hl));
- if (!hl) return NULL;
- hl->server = s;
-
- struct lws_context_creation_info info = {0};
-
- struct lws_http_mount *m = &hl->mount;
- m->mountpoint = "/"; /* URL mount point */
- m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
- m->origin = (config->http_root && *config->http_root) ? /* File system root */
- config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
- m->def = "index.html"; /* Default file name */
- m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
- info.mounts = m;
- info.port = CONTEXT_PORT_NO_LISTEN_SERVER; /* Don't use LWS listener */
- info.protocols = protocols;
- info.keepalive_timeout = 1;
- info.ssl_cipher_list = CIPHER_LIST;
- info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
- if (config->ssl_profile) {
- info.ssl_cert_filepath = config->ssl_certificate_file;
- info.ssl_private_key_filepath = config->ssl_private_key_file;
- info.ssl_private_key_password = config->ssl_password;
- info.ssl_ca_filepath = config->ssl_trusted_certificates;
- info.options |=
- LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
- (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
- (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
- }
- snprintf(hl->name, sizeof(hl->name), "vhost%x", s->vhost_id++);
- info.vhost_name = hl->name;
- hl->vhost = lws_create_vhost(s->context, &info);
- if (!hl->vhost) {
- free(hl);
- return NULL;
+static qd_http_listener_t *wsi_listener(struct lws *wsi) {
+ qd_http_listener_t *hl = NULL;
+ struct lws_vhost *vhost = lws_get_vhost(wsi);
+ if (vhost) { /* Get qd_http_listener from vhost data */
+ void *vp = lws_protocol_vh_priv_get(vhost, &protocols[0]);
+ memcpy(&hl, vp, sizeof(hl));
}
return hl;
}
-void qd_http_listener_free(qd_http_listener_t *hl) {
- free(hl);
+static qd_log_source_t *wsi_log(struct lws *wsi) {
+ return wsi_server(wsi)->log;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http-none.c
----------------------------------------------------------------------
diff --git a/src/http-none.c b/src/http-none.c
index a8953e5..57869d5 100644
--- a/src/http-none.c
+++ b/src/http-none.c
@@ -20,29 +20,21 @@
#include <qpid/dispatch/log.h>
#include "http.h"
+struct qd_dispatch_t;
+
/* No HTTP implementation available. */
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *d, qd_log_source_t *log)
+qd_http_server_t *qd_http_server(struct qd_server_t *s, qd_log_source_t *log)
{
qd_log(log, QD_LOG_WARNING, "HTTP support is not available");
return 0;
}
-void qd_http_server_free(qd_http_server_t *h)
-{
-}
+void qd_http_server_free(qd_http_server_t *h) {}
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
- const struct qd_server_config_t *config)
-{
- return 0;
-}
+void* qd_http_server_run(void* qd_http_server) { return 0; }
+
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li) { return 0; }
-void qd_http_listener_free(qd_http_listener_t *hl)
-{
-}
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c)
-{
-}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http.h
----------------------------------------------------------------------
diff --git a/src/http.h b/src/http.h
index fae3f1d..a169998 100644
--- a/src/http.h
+++ b/src/http.h
@@ -23,17 +23,18 @@
typedef struct qd_http_listener_t qd_http_listener_t;
typedef struct qd_http_server_t qd_http_server_t;
-struct qd_dispatch_t;
-struct qd_log_source_t;
+struct qd_server_t;
struct qd_server_config_t;
-struct qdpn_connector_t;
+struct qd_listener_t;
+struct qd_log_source_t;
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *dispatch, struct qd_log_source_t *log);
+/* Create a HTTP server */
+qd_http_server_t *qd_http_server(struct qd_server_t *server, struct qd_log_source_t *log);
+
+/* Free the HTTP server */
void qd_http_server_free(qd_http_server_t*);
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
- const struct qd_server_config_t *config);
-void qd_http_listener_free(qd_http_listener_t *hl);
-/* On error, qdpn_connector_closed(c) is true. */
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c);
+
+/* Listening for HTTP, thread safe. */
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li);
#endif // QD_HTTP_H
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index a55f245..2ddbf94 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -396,7 +396,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
pn_connection_t *conn = qd_connection_pn(qd_conn);
qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server);
qd_policy_t *policy = qd->policy;
- const char *hostip = qd_connection_hostip(qd_conn);
+ const char *hostip = qd_connection_remote_ip(qd_conn);
const char *vhost = pn_connection_remote_hostname(conn);
if (result) {
qd_log(policy->log_source,
@@ -567,7 +567,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con
bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
{
- const char *hostip = qd_connection_hostip(qd_conn);
+ const char *hostip = qd_connection_remote_ip(qd_conn);
const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn));
if (qd_conn->policy_settings->maxSenders) {
@@ -618,7 +618,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_
bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
{
- const char *hostip = qd_connection_hostip(qd_conn);
+ const char *hostip = qd_connection_remote_ip(qd_conn);
const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn));
if (qd_conn->policy_settings->maxReceivers) {
@@ -683,7 +683,7 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) {
if (policy->enableVhostPolicy) {
// Open connection or not based on policy.
pn_transport_t *pn_trans = pn_connection_transport(conn);
- const char *hostip = qd_connection_hostip(qd_conn);
+ const char *hostip = qd_connection_remote_ip(qd_conn);
const char *pcrh = pn_connection_remote_hostname(conn);
const char *vhost = (pcrh ? pcrh : "");
const char *conn_name = qd_connection_name(qd_conn);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index e7fa34d..066c862 100644
--- a/src/server.c
+++ b/src/server.c
@@ -475,9 +475,14 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c
pn_data_exit(pn_connection_properties(conn));
}
+/* Wake function for proactor-manaed connections */
+static void connection_wake(qd_connection_t *ctx) {
+ if (ctx->pn_conn) pn_connection_wake(ctx->pn_conn);
+}
-/* Construct a new qd_connection. */
-static qd_connection_t *qd_connection(qd_server_t *server, qd_server_config_t *config) {
+/* Construct a new qd_connection. Thread safe. */
+qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *config)
+{
qd_connection_t *ctx = new_qd_connection_t();
if (!ctx) return NULL;
ZERO(ctx);
@@ -491,6 +496,7 @@ static qd_connection_t *qd_connection(qd_server_t *server, qd_server_config_t *c
return NULL;
}
ctx->server = server;
+ ctx->wake = connection_wake; /* Default, over-ridden for HTTP connections */
pn_connection_set_context(ctx->pn_conn, ctx);
DEQ_ITEM_INIT(ctx);
DEQ_INIT(ctx->deferred_calls);
@@ -508,7 +514,7 @@ static void on_accept(pn_event_t *e)
assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
pn_listener_t *pn_listener = pn_event_listener(e);
qd_listener_t *listener = pn_listener_get_context(pn_listener);
- qd_connection_t *ctx = qd_connection(listener->server, &listener->config);
+ qd_connection_t *ctx = qd_server_connection(listener->server, &listener->config);
if (!ctx) {
qd_log(listener->server->log_source, QD_LOG_CRITICAL,
"Allocation failure during accept to %s", listener->config.host_port);
@@ -549,25 +555,19 @@ void connect_fail(qd_connection_t *ctx, const char *name, const char *descriptio
/* Get the host IP address for the remote end */
-static int set_remote_host_port(qd_connection_t *ctx) {
+static void set_rhost_port(qd_connection_t *ctx) {
pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
- const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport));
- int err = 0;
- if (!addr) {
- err = -1;
- qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
- } else {
+ const struct sockaddr_storage* addr =
+ pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport));
+ if (addr) {
char rport[NI_MAXSERV] = "";
int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr),
ctx->rhost, sizeof(ctx->rhost), rport, sizeof(rport),
NI_NUMERICHOST | NI_NUMERICSERV);
if (!err) {
snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s", ctx->rhost, rport);
- } else {
- qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
}
}
- return err;
}
@@ -581,7 +581,6 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
//
// Proton pushes out its trace to transport_tracer() which in turn writes a trace
// message to the qdrouter log If trace level logging is enabled on the router set
- // PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
//
if (qd_log_enabled(ctx->server->log_source, QD_LOG_TRACE)) {
pn_transport_trace(tport, PN_TRACE_FRM);
@@ -593,9 +592,9 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
config = &ctx->listener->config;
const char *name = config->host_port;
pn_transport_set_server(tport);
+ set_rhost_port(ctx);
- if (set_remote_host_port(ctx) == 0 &&
- qd_policy_socket_accept(server->qd->policy, ctx->rhost))
+ if (qd_policy_socket_accept(server->qd->policy, ctx->rhost))
{
ctx->policy_counted = true;
} else {
@@ -667,10 +666,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event);
static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
qd_log_source_t *log = qd_server->log_source;
-
- /* FIXME aconway 2017-02-20: HTTP support */
qd_listener_t *li = (qd_listener_t*) pn_listener_get_context(pn_event_listener(e));
const char *host_port = li->config.host_port;
+
switch (pn_event_type(e)) {
case PN_LISTENER_OPEN:
@@ -705,7 +703,7 @@ static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
}
-static void qd_connection_free(qd_connection_t *ctx)
+void qd_connection_free(qd_connection_t *ctx)
{
qd_server_t *qd_server = ctx->server;
@@ -751,7 +749,7 @@ static void qd_connection_free(qd_connection_t *ctx)
/* Events involving a connection or listener are serialized by the proactor so
* only one event per connection / listener will be processed at a time.
*/
-static bool handle(pn_event_t *e, qd_server_t *qd_server) {
+static bool handle(qd_server_t *qd_server, pn_event_t *e) {
pn_connection_t *pn_conn = pn_event_connection(e);
qd_connection_t *ctx = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL;
@@ -803,7 +801,7 @@ static bool handle(pn_event_t *e, qd_server_t *qd_server) {
/* TODO aconway 2017-04-18: fold the container handler into the server */
qd_container_handle_event(qd_server->container, e);
- /* Free the connection after all other processing */
+ /* Free the connection after all other processing is complete */
if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) {
pn_connection_set_context(pn_conn, NULL);
qd_connection_free(ctx);
@@ -819,7 +817,7 @@ static void *thread_run(void *arg)
pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor);
pn_event_t * e;
while (running && (e = pn_event_batch_next(events))) {
- running = handle(e, qd_server);
+ running = handle(qd_server, e);
}
pn_proactor_done(qd_server->proactor, events);
}
@@ -836,7 +834,7 @@ static void try_open_lh(qd_connector_t *ct)
return;
}
- qd_connection_t *ctx = qd_connection(ct->server, &ct->config);
+ qd_connection_t *ctx = qd_server_connection(ct->server, &ct->config);
if (!ctx) { /* Try again later */
qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure connecting to %s",
ct->config.host_port);
@@ -987,7 +985,7 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
qd_server->next_connection_id = 1;
qd_server->py_displayname_obj = 0;
- /* FIXME aconway 2017-01-20: restore HTTP support */
+ qd_server->http = qd_http_server(qd_server, qd_server->log_source);
qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
@@ -1052,11 +1050,10 @@ void qd_server_stop(qd_dispatch_t *qd)
void qd_server_activate(qd_connection_t *ctx)
{
- if (!ctx || !ctx->pn_conn)
- return;
- pn_connection_wake(ctx->pn_conn);
+ if (ctx) ctx->wake(ctx);
}
+
void qd_connection_set_context(qd_connection_t *conn, void *context)
{
conn->user_context = context;
@@ -1133,28 +1130,44 @@ qd_listener_t *qd_server_listener(qd_server_t *server)
qd_listener_t *li = new_qd_listener_t();
if (!li) return 0;
ZERO(li);
-
sys_atomic_init(&li->ref_count, 1);
li->server = server;
li->http = NULL;
return li;
}
-
-bool qd_listener_listen(qd_listener_t *li) {
- if (!li->pn_listener) { /* Not already listening */
- li->pn_listener = pn_listener();
- if (!li->pn_listener) {
- qd_log(li->server->log_source, QD_LOG_ERROR, "No memory listening on %s",
- li->config.host_port);
- return false;
- }
+static bool qd_listener_listen_pn(qd_listener_t *li) {
+ li->pn_listener = pn_listener();
+ if (li->pn_listener) {
pn_listener_set_context(li->pn_listener, li);
- /* Listen is asynchronous, log listening on PN_LISTENER_OPEN */
- sys_atomic_inc(&li->ref_count);
- pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, BACKLOG);
+ pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port,
+ BACKLOG);
+ sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
+ /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
+ } else {
+ qd_log(li->server->log_source, QD_LOG_CRITICAL, "No memory listening on %s",
+ li->config.host_port);
+ }
+ return li->pn_listener;
+}
+
+static bool qd_listener_listen_http(qd_listener_t *li) {
+ if (li->server->http) {
+ /* qd_http_listener holds a reference to li, will decref when closed */
+ qd_http_server_listen(li->server->http, li);
+ return li->http;
+ } else {
+ qd_log(li->server->log_source, QD_LOG_ERROR, "No HTTP support to listen on %s",
+ li->config.host_port);
+ return false;
}
- return true;
+}
+
+
+bool qd_listener_listen(qd_listener_t *li) {
+ if (li->pn_listener || li->http) /* Already listening */
+ return true;
+ return li->config.http ? qd_listener_listen_http(li) : qd_listener_listen_pn(li);
}
@@ -1162,7 +1175,6 @@ void qd_listener_decref(qd_listener_t* li)
{
if (li && sys_atomic_dec(&li->ref_count) == 1) {
qd_server_config_free(&li->config);
- if (li->http) qd_http_listener_free(li->http);
free_qd_listener_t(li);
}
}
@@ -1238,6 +1250,11 @@ qd_http_listener_t *qd_listener_http(qd_listener_t *li) {
return li->http;
}
-const char* qd_connection_hostip(const qd_connection_t *c) {
+const char* qd_connection_remote_ip(const qd_connection_t *c) {
return c->rhost;
}
+
+/* Expose event handling for HTTP connections */
+void qd_connection_handle(qd_connection_t *c, pn_event_t *e) {
+ handle(c->server, e);
+}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index a6543fa..6e6f1c3 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -39,16 +39,19 @@
qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
-const char* qd_connection_name(const qd_connection_t *c);
+qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config);
+void qd_connection_free(qd_connection_t* conn);
+
qd_connector_t* qd_connection_connector(const qd_connection_t *c);
-const char* qd_connection_hostip(const qd_connection_t *c);
-const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
+void qd_connection_handle(qd_connection_t *c, pn_event_t *e);
-qd_http_listener_t *qd_listener_http(qd_listener_t *l);
+
+const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
qd_listener_t *qd_server_listener(qd_server_t *server);
qd_connector_t *qd_server_connector(qd_server_t *server);
+
void qd_connector_decref(qd_connector_t* ct);
void qd_listener_decref(qd_listener_t* ct);
void qd_server_config_free(qd_server_config_t *cf);
@@ -136,7 +139,7 @@ struct qd_connection_t {
pn_ssl_t *ssl;
qd_listener_t *listener;
qd_connector_t *connector;
- void *context; // Copy of context from listener or connector
+ void *context; // context from listener or connector
void *user_context;
void *link_context; // Context shared by this connection's links
uint64_t connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky.
@@ -152,6 +155,7 @@ struct qd_connection_t {
bool policy_counted;
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
qd_pn_free_link_session_list_t free_link_session_list;
+ void (*wake)(qd_connection_t*); /* Wake method, different for HTTP vs. proactor */
char rhost[NI_MAXHOST]; /* Remote host numeric IP for incoming connections */
char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */
};
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/tests/system_tests_http.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index 5627277..3d13c32 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -50,6 +50,14 @@ class RouterTestHttp(TestCase):
def assert_get_cert(self, url):
self.assertEqual("HTTP test\n", self.get_cert("%s/system_tests_http.txt" % url))
+ def test_listen_error(self):
+ """Make sure a router exits if an initial HTTP listener fails, doesn't hang"""
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'standalone', 'id': 'bad'}),
+ ('listener', {'port': 80, 'http':True})])
+ r = Qdrouterd(name="expect_fail", config=config, wait=False);
+ self.assertEqual(1, r.wait())
+
def test_http_get(self):
config = Qdrouterd.Config([
('router', {'id': 'QDR.HTTP'}),
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 0a982c7..483d5e2 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -57,6 +57,14 @@ class RouterTest(TestCase):
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
+ def test_listen_error(self):
+ """Make sure a router exits if a initial listener fails, doesn't hang"""
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'standalone', 'id': 'bad'}),
+ ('listener', {'port': 80})])
+ r = Qdrouterd(name="expect_fail", config=config, wait=False);
+ self.assertEqual(1, r.wait())
+
def test_01_pre_settled(self):
addr = self.address+"/pre_settled/1"
M1 = self.messenger()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org