You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/05/16 18:54:37 UTC
[2/2] qpid-dispatch git commit: DISPATCH-774: backport http changes
to 0.8.x, use stock libwebsockets
DISPATCH-774: backport http changes to 0.8.x, use stock libwebsockets
The 0.8.x http-libwebsockets implementation now works with the standard
libwebsockets 2.1 release, it does not require a special patched version.
The changes are based on master, with some differences because of other changes
to the IO code on master.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b1e27480
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b1e27480
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b1e27480
Branch: refs/heads/0.8.x
Commit: b1e27480135ab4ff536a3b8d31a7c8ffb7753677
Parents: ced41ae
Author: Alan Conway <ac...@redhat.com>
Authored: Tue May 16 13:43:21 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue May 16 14:43:44 2017 -0400
----------------------------------------------------------------------
CMakeLists.txt | 2 +-
cmake/FindLibWebSockets.cmake | 31 +-
src/CMakeLists.txt | 9 +-
src/http-libwebsockets.c | 838 ++++++++++++++++++++++---------------
src/http-none.c | 27 +-
src/http.h | 20 +-
src/server.c | 128 +++---
src/server_private.h | 31 +-
8 files changed, 639 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2187774..2bd4b24 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -103,7 +103,7 @@ find_library(rt_lib rt)
find_package(Proton 0.15 REQUIRED)
## Optional dependencies
-include(FindLibWebSockets)
+find_package(LibWebSockets 2)
option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND})
##
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/cmake/FindLibWebSockets.cmake
----------------------------------------------------------------------
diff --git a/cmake/FindLibWebSockets.cmake b/cmake/FindLibWebSockets.cmake
index 18ef18a..3723f5c 100644
--- a/cmake/FindLibWebSockets.cmake
+++ b/cmake/FindLibWebSockets.cmake
@@ -42,24 +42,19 @@ find_path(LIBWEBSOCKETS_INCLUDE_DIRS
PATHS /usr/include
)
-include(FindPackageHandleStandardArgs)
-
-find_package_handle_standard_args(LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
-
-if(LIBWEBSOCKETS_FOUND)
- # For the moment we need a patched version of LibWebSockets:
- # https://github.com/alanconway/libwebsockets/tree/v2.1-stable-aconway-adopt-ssl
- # This function check verifies we have it.
- set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS})
- set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES})
- check_function_exists(lws_adopt_socket_vhost LWS_ADOPT_SOCKET_VHOST_FOUND)
- if (NOT LWS_ADOPT_SOCKET_VHOST_FOUND)
- message("Cannot use LibWebSockets, no function lws_adopt_socket_vhost")
- unset(LIBWEBSOCKETS_FOUND)
+# We need vhost support which appeared in v2.0 of libwebsockets
+set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS})
+set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES})
+set(MSG DEFAULT_MSG)
+if (LIBWEBSOCKETS_LIBRARIES AND LIBWEBSOCKETS_INCLUDE_DIRS)
+ check_function_exists(lws_create_vhost LIBWEBSOCKETS_OK)
+ if (NOT LIBWEBSOCKETS_OK)
+ set(MSG "Cannot use LibWebSockets version < 2 in ${LIBWEBSOCKETS_LIBRARIES}")
+ set(LIBWEBSOCKETS_LIBRARIES "NOTFOUND")
+ set(LIBWEBSOCKETS_INCLUDE_DIRS "NOTFOUND")
endif()
endif()
-if(NOT LIBWEBSOCKETS_FOUND)
- set(LIBWEBSOCKETS_LIBRARIES "")
- set(LIBWEBSOCKETS_INCLUDE_DIRS "")
-endif()
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(LIBWEBSOCKETS ${MSG} LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4c00206..5edea1d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,8 +39,10 @@ add_custom_command (
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
- ${LIBWEBSOCKETS_INCLUDE_DIRS}
)
+if (USE_LIBWEBSOCKETS)
+ include_directories(${LIBWEBSOCKETS_INCLUDE_DIRS})
+endif()
# Build the qpid-dispatch library.
set(qpid_dispatch_SOURCES
@@ -112,7 +114,10 @@ if (CMAKE_C_COMPILER_ID STREQUAL "GNU")
endif (CMAKE_C_COMPILER_ID STREQUAL "GNU")
add_library(qpid-dispatch SHARED ${qpid_dispatch_SOURCES})
-target_link_libraries(qpid-dispatch ${Proton_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES} ${LIBWEBSOCKETS_LIBRARIES})
+target_link_libraries(qpid-dispatch ${Proton_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES})
+if (USE_LIBWEBSOCKETS)
+ target_link_libraries(qpid-dispatch ${LIBWEBSOCKETS_LIBRARIES})
+endif()
set_target_properties(qpid-dispatch PROPERTIES
LINK_FLAGS "${CATCH_UNDEFINED}"
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 745b090..dc8ff58 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,446 +19,620 @@
#include <qpid/dispatch/atomic.h>
#include <qpid/dispatch/amqp.h>
-#include <qpid/dispatch/driver.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
+#include <proton/connection_driver.h>
+
#include <libwebsockets.h>
#include <assert.h>
+#include <ctype.h>
#include <errno.h>
#include <inttypes.h>
+#include <time.h>
#include "http.h"
#include "server_private.h"
#include "config.h"
+static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */
+
+/* Log for LWS messages. For dispatch server messages use qd_http_server_t::log */
static qd_log_source_t* http_log;
-static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH";
+static qd_log_level_t qd_level(int lll) {
+ switch (lll) {
+ case LLL_ERR: return QD_LOG_ERROR;
+ case LLL_WARN: return QD_LOG_WARNING;
+ /* LWS is noisy compared to dispatch on the informative levels, downgrade */
+ case LLL_NOTICE: return QD_LOG_DEBUG;
+ default: return QD_LOG_TRACE; /* Everything else to trace */
+ }
+}
+
+static void logger(int lll, const char *line) {
+ size_t len = strlen(line);
+ while (len > 1 && isspace(line[len-1])) { /* Strip trailing newline */
+ --len;
+ }
+ qd_log(http_log, qd_level(lll), "%.*s", len, line);
+}
+
+static void log_init() {
+ http_log = qd_log_source("HTTP");
+ int levels = 0;
+ for (int i = 0; i < LLL_COUNT; ++i) {
+ int lll = 1<<i;
+ levels |= qd_log_enabled(http_log, qd_level(lll)) ? lll : 0;
+ }
+ lws_set_log_level(levels, logger);
+}
+
+/* Intermediate write buffer: LWS needs extra header space on write. */
+typedef struct buffer_t {
+ char *start;
+ size_t size, cap;
+} buffer_t;
+
+/* Ensure size bytes in buffer, make buf empty if alloc fails */
+static void buffer_set_size(buffer_t *buf, size_t size) {
+ if (size > buf->cap) {
+ buf->cap = (size > buf->cap * 2) ? size : buf->cap * 2;
+ buf->start = realloc(buf->start, buf->cap);
+ }
+ if (buf->start) {
+ buf->size = size;
+ } else {
+ buf->size = buf->cap = 0;
+ }
+}
-/* Associate file-descriptors, LWS instances and qdpn_connectors */
-typedef struct fd_data_t {
- qdpn_connector_t *connector;
+/* AMQPWS connection: set as lws user data and qd_conn->context */
+struct qd_http_connection_t {
+ pn_connection_driver_t driver;
+ qd_connection_t* qd_conn;
+ buffer_t wbuf; /* LWS requires allocated header space at start of buffer */
struct lws *wsi;
-} fd_data_t;
+ char name[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port */
+ char hostip[NI_MAXHOST]; /* Remote host IP address */
+ bool closed;
+};
+
+/* Navigating from WSI pointer to qd objects */
+static qd_http_server_t *wsi_server(struct lws *wsi);
+static qd_http_listener_t *wsi_listener(struct lws *wsi);
+static qd_log_source_t *wsi_log(struct lws *wsi);
-/* HTTP server state shared by all listeners */
+
+/* Declare LWS callbacks and protocol list */
+static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
+static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
+
+static struct lws_protocols protocols[] = {
+ /* HTTP only protocol comes first */
+ {
+ "http-only",
+ callback_http,
+ 0,
+ },
+ /* "amqp" is the official oasis AMQP over WebSocket protocol name */
+ {
+ "amqp",
+ callback_amqpws,
+ sizeof(qd_http_connection_t),
+ },
+ /* "binary" is an alias for "amqp", for compatibility with clients designed
+ * to work with a WebSocket proxy
+ */
+ {
+ "binary",
+ callback_amqpws,
+ sizeof(qd_http_connection_t),
+ },
+ { NULL, NULL, 0, 0 } /* terminator */
+};
+
+
+static inline int unexpected_close(struct lws *wsi, const char *msg) {
+ lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION,
+ (unsigned char*)msg, strlen(msg));
+ char peer[64] = "";
+ lws_get_peer_simple(wsi, peer, sizeof(peer));
+ qd_log(wsi_log(wsi), QD_LOG_ERROR, "Error on HTTP connection from %s: %s", peer, msg);
+ return -1;
+}
+
+static int handle_events(qd_http_connection_t* c) {
+ if (!c->qd_conn) {
+ return unexpected_close(c->wsi, "not-established");
+ }
+ qd_connection_process(c->qd_conn);
+ if (pn_connection_driver_write_buffer(&c->driver).size) {
+ lws_callback_on_writable(c->wsi);
+ }
+ if (pn_connection_driver_finished(&c->driver)) {
+ lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
+ c->closed = true;
+ qd_connection_process(c->qd_conn);
+ return -1;
+ }
+ return 0;
+}
+
+/* The server has a bounded, thread-safe queue for external work */
+typedef struct work_t {
+ enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+ void *value;
+} work_t;
+
+#define WORK_MAX 8 /* Just decouple threads, not a big buffer */
+
+typedef struct work_queue_t {
+ sys_mutex_t *lock;
+ sys_cond_t *cond;
+ work_t work[WORK_MAX];
+ size_t head, len; /* Ring buffer */
+} work_queue_t;
+
+/* HTTP Server runs in a single thread, communication from other threads via work_queue */
struct qd_http_server_t {
- qd_dispatch_t *dispatch;
+ qd_server_t *server;
+ sys_thread_t *thread;
+ work_queue_t work;
qd_log_source_t *log;
- sys_mutex_t *lock; /* For now use LWS as a thread-unsafe library. */
struct lws_context *context;
- qd_timer_t *timer;
- int vhost_id; /* unique identifier for vhost name */
- fd_data_t *fd; /* indexed by file descriptor */
- size_t fd_len;
+ pn_timestamp_t now; /* Cache current time in thread_run */
+ pn_timestamp_t next_tick; /* Next requested tick service */
};
-/* Per-HTTP-listener */
+static void work_queue_destroy(work_queue_t *wq) {
+ if (wq->lock) sys_mutex_free(wq->lock);
+ if (wq->cond) sys_cond_free(wq->cond);
+}
+
+static void work_queue_init(work_queue_t *wq) {
+ wq->lock = sys_mutex();
+ wq->cond = sys_cond();
+}
+
+ /* Block till there is space */
+static void work_push(qd_http_server_t *hs, work_t w) {
+ work_queue_t *wq = &hs->work;
+ sys_mutex_lock(wq->lock);
+ while (wq->len == WORK_MAX) {
+ lws_cancel_service(hs->context); /* Wake up the run thread to clear space */
+ sys_cond_wait(wq->cond, wq->lock);
+ }
+ wq->work[(wq->head + wq->len) % WORK_MAX] = w;
+ ++wq->len;
+ sys_mutex_unlock(wq->lock);
+ lws_cancel_service(hs->context); /* Wake up the run thread to handle my work */
+}
+
+/* Non-blocking, return { W_NONE, NULL } if empty */
+static work_t work_pop(qd_http_server_t *hs) {
+ work_t w = { W_NONE, NULL };
+ work_queue_t *wq = &hs->work;
+ sys_mutex_lock(wq->lock);
+ if (wq->len > 0) {
+ w = wq->work[wq->head];
+ wq->head = (wq->head + 1) % WORK_MAX;
+ --wq->len;
+ sys_cond_signal(wq->cond);
+ }
+ sys_mutex_unlock(wq->lock);
+ return w;
+}
+
+/* Each qd_http_listener_t is associated with an lws_vhost */
struct qd_http_listener_t {
+ qd_listener_t *listener;
qd_http_server_t *server;
struct lws_vhost *vhost;
+ char host_port[NI_MAXHOST + NI_MAXSERV];
struct lws_http_mount mount;
- char name[256]; /* vhost name */
};
-/* Get wsi/connector associated with fd or NULL if nothing on record. */
-static inline fd_data_t *fd_data(qd_http_server_t *s, int fd) {
- fd_data_t *d = (fd < s->fd_len) ? &s->fd[fd] : NULL;
- return (d && (d->connector || d->wsi)) ? d : NULL;
+void qd_http_listener_free(qd_http_listener_t *hl) {
+ if (!hl) return;
+ if (hl->listener) {
+ hl->listener->http = NULL;
+ }
+ free(hl);
}
-static inline qd_http_server_t *wsi_http_server(struct lws *wsi) {
- return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
+static qd_http_listener_t *qd_http_listener(qd_http_server_t *hs, qd_listener_t *li) {
+ qd_http_listener_t *hl = calloc(1, sizeof(*hl));
+ if (hl) {
+ hl->server = hs;
+ hl->listener = li;
+ li->http = hl;
+ } else {
+ qd_log(hs->log, QD_LOG_CRITICAL, "No memory for HTTP listen on %s", hl->host_port);
+ }
+ return hl;
}
-static inline qdpn_connector_t *wsi_connector(struct lws *wsi) {
- fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
- return d ? d->connector : NULL;
+static int qd_port_int(const char* port_str) {
+ if (!strcmp(port_str, "amqp")) return 5672;
+ if (!strcmp(port_str, "amqps")) return 5671;
+ errno = 0;
+ unsigned long n = strtoul(port_str, NULL, 10);
+ if (errno || n > 0xFFFF) return -1;
+ return n;
}
-static inline fd_data_t *set_fd(qd_http_server_t *s, int fd, qdpn_connector_t *c, struct lws *wsi) {
- if (!s->fd || fd >= s->fd_len) {
- size_t oldlen = s->fd_len;
- s->fd_len = fd + 16; /* Don't double, low-range FDs will be re-used first. */
- void *newfds = realloc(s->fd, s->fd_len*sizeof(*s->fd));
- if (!newfds) return NULL;
- s->fd = newfds;
- memset(s->fd + oldlen, 0, sizeof(*s->fd)*(s->fd_len - oldlen));
- }
- fd_data_t *d = &s->fd[fd];
- d->connector = c;
- d->wsi = wsi;
- return d;
-}
+static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
+ log_init(); /* Update log flags at each listener */
-/* Push read data into the transport.
- * Return 0 on success, number of bytes un-pushed on failure.
- */
-static int transport_push(pn_transport_t *t, pn_bytes_t buf) {
- ssize_t cap;
- while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) {
- if (buf.size > cap) {
- pn_transport_push(t, buf.start, cap);
- buf.start += cap;
- buf.size -= cap;
- } else {
- pn_transport_push(t, buf.start, buf.size);
- buf.size = 0;
- }
+ const qd_server_config_t *config = hl->listener->config;
+
+ int port = qd_port_int(config->port);
+ snprintf(hl->host_port, sizeof(hl->host_port), "%s:%s", config->host, config->port);
+ if (port <= 0) {
+ qd_log(hs->log, QD_LOG_ERROR, "HTTP listener %s invalid port", hl->host_port);
+ goto error;
}
- return buf.size;
-}
+ struct lws_http_mount *m = &hl->mount;
+ m->mountpoint = "/"; /* URL mount point */
+ m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
+ m->origin = (config->http_root && *config->http_root) ? /* File system root */
+ config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
+ m->def = "index.html"; /* Default file name */
+ m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
-static inline int normal_close(struct lws *wsi, const char *msg) {
- lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
- return -1;
+ struct lws_context_creation_info info = {0};
+ info.mounts = m;
+ info.port = port;
+ info.protocols = protocols;
+ info.keepalive_timeout = 1;
+ info.ssl_cipher_list = CIPHER_LIST;
+ info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
+ if (config->ssl_profile) {
+ info.ssl_cert_filepath = config->ssl_certificate_file;
+ info.ssl_private_key_filepath = config->ssl_private_key_file;
+ info.ssl_private_key_password = config->ssl_password;
+ info.ssl_ca_filepath = config->ssl_trusted_certificates;
+ info.options |=
+ LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
+ (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
+ (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
+ }
+ info.vhost_name = hl->host_port;
+ hl->vhost = lws_create_vhost(hs->context, &info);
+ if (hl->vhost) {
+ /* Store hl pointer in vhost */
+ void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl));
+ memcpy(vp, &hl, sizeof(hl));
+ qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", hl->host_port);
+ return;
+ } else {
+ qd_log(hs->log, QD_LOG_NOTICE, "Error listening for HTTP on %s", hl->host_port);
+ goto error;
+ }
+ return;
+
+ error:
+ qd_http_listener_free(hl);
}
-static inline int unexpected_close(struct lws *wsi, const char *msg) {
- lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg));
- return -1;
+static void listener_close(qd_http_listener_t *hl, qd_http_server_t *hs) {
+ /* TODO aconway 2017-04-13: can't easily stop listeners under libwebsockets */
+ qd_log(hs->log, QD_LOG_ERROR, "Cannot close HTTP listener %s", hl->host_port);
}
/*
- * Callback for un-promoted HTTP connections, and low-level external poll operations.
+ * LWS callback for un-promoted HTTP connections.
* Note main HTTP file serving is handled by the "mount" struct below.
- * Called with http lock held.
*/
static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
switch (reason) {
- case LWS_CALLBACK_HTTP: /* Called if file mount can't find the file */
- lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: {
+ /* TODO aconway 2017-05-15: policy support */
+ char peer[64];
+ lws_get_peer_simple(wsi, peer, sizeof(peer));
+ qd_log(wsi_log(wsi), QD_LOG_DEBUG, "Incoming HTTP connection to %s from %s",
+ wsi_listener(wsi)->host_port, peer);
+ return 0;
+ }
+ case LWS_CALLBACK_PROTOCOL_DESTROY:
+ qd_http_listener_free(wsi_listener(wsi));
return -1;
- case LWS_CALLBACK_ADD_POLL_FD: {
- /* Record WSI against FD here, the connector will be recorded when lws_service returns. */
- set_fd(wsi_http_server(wsi), lws_get_socket_fd(wsi), 0, wsi);
- break;
- }
- case LWS_CALLBACK_DEL_POLL_FD: {
- fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
- if (d) {
- /* Tell dispatch to forget this FD, but let LWS do the actual close() */
- if (d->connector) qdpn_connector_mark_closed(d->connector);
- memset(d, 0, sizeof(*d));
- }
- break;
- }
- case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
- struct lws_pollargs *p = (struct lws_pollargs*)in;
- qdpn_connector_t *c = wsi_connector(wsi);
- if (c) {
- if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
- if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
- }
- break;
+ case LWS_CALLBACK_HTTP: {
+ /* Called if file mount can't find the file */
+ lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+ return -1;
}
- /* NOTE: Not using LWS_CALLBACK_LOCK/UNLOCK_POLL as we are serializing all HTTP work for now. */
-
default:
- break;
+ return 0;
}
-
- return 0;
}
-/* Buffer to allocate extra header space required by LWS. */
-typedef struct buffer_t { char *start; size_t size; size_t cap; } buffer_t;
+const char *qd_http_connection_name(qd_http_connection_t* hc) { return hc->name; }
+const char *qd_http_connection_hostip(qd_http_connection_t* hc) { return hc->hostip; }
+bool qd_http_connection_closed(qd_http_connection_t* hc) { return hc->closed; }
-/* Callbacks for promoted AMQP over WS connections.
- * Called with http lock held.
- */
+/* Wake up a connection managed by the http server thread */
+void qd_http_connection_wake(qd_http_connection_t *c)
+{
+ if (c && c->qd_conn->listener) {
+ qd_http_server_t *hs = wsi_server(c->wsi);
+ work_t w = { W_WAKE, c };
+ work_push(hs, w);
+ }
+}
+
+/* Callbacks for promoted AMQP over WS connections. */
static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
- qdpn_connector_t *c = wsi_connector(wsi);
- pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
+ qd_http_server_t *hs = wsi_server(wsi);
+ qd_http_connection_t *c = (qd_http_connection_t*)user;
switch (reason) {
case LWS_CALLBACK_ESTABLISHED: {
- qd_log(wsi_http_server(wsi)->log, QD_LOG_DEBUG,
- "Upgraded incoming HTTP connection from %s[%"PRIu64"] to AMQP over WebSocket",
- qdpn_connector_name(c),
- qd_connection_connection_id((qd_connection_t*)qdpn_connector_context(c)));
- memset(user, 0, sizeof(buffer_t));
- break;
+ /* Upgrade accepted HTTP connection to AMQPWS */
+ memset(c, 0, sizeof(*c));
+ c->wsi = wsi;
+ qd_http_listener_t *hl = wsi_listener(wsi);
+ if (hl == NULL) {
+ return unexpected_close(c->wsi, "cannot-upgrade");
+ }
+ qd_connection_t *ctx = c->qd_conn = qd_server_connection_allocate();
+ if (c->qd_conn == NULL) {
+ return unexpected_close(c->wsi, "out-of-memory");
+ }
+ c->qd_conn->context = c;
+ c->qd_conn->listener = hl->listener;
+ lws_get_peer_simple(wsi, c->hostip, sizeof(c->hostip));
+ strncpy(c->name, c->hostip, sizeof(c->name));
+ int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL);
+ if (err) {
+ return unexpected_close(c->wsi, pn_code(err));
+ }
+ c->qd_conn->http = c;
+ c->qd_conn->server = hs->server;
+ c->qd_conn->connection_id = qd_server_connection_id(c->qd_conn->server);
+ /* TODO aconway 2017-05-15: no policy checks for HTTP connections */
+ c->qd_conn->policy_counted = false;
+ const qd_server_config_t *config = hl->listener->config;
+ c->qd_conn->role = strdup(config->role);
+ c->qd_conn->pn_conn = c->driver.connection;
+ pn_connection_set_context(c->qd_conn->pn_conn, ctx);
+ c->qd_conn->collector = c->driver.collector;
+ qd_server_decorate_connection(c->qd_conn->server, c->qd_conn->pn_conn, config);
+
+ qd_log(hs->log, QD_LOG_DEBUG,
+ "[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
+ qd_connection_connection_id(c->qd_conn), c->hostip);
+ pn_connection_driver_bind(&c->driver);
+ return handle_events(c);
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
- ssize_t size;
- if (!t || (size = pn_transport_pending(t)) < 0) {
- return normal_close(wsi, "write-closed");
- }
- if (size > 0) {
- const void *start = pn_transport_head(t);
- /* lws_write() demands LWS_PRE bytes of free space before the data */
- size_t tmpsize = size + LWS_PRE;
- buffer_t *wtmp = (buffer_t*)user;
- if (wtmp->start == NULL || wtmp->cap < tmpsize) {
- wtmp->start = realloc(wtmp->start, tmpsize);
- wtmp->size = wtmp->cap = tmpsize;
+ if (handle_events(c)) return -1;
+ pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
+ if (dbuf.size) {
+ /* lws_write() demands LWS_PRE bytes of free space before the data,
+ * so we must copy from the driver's buffer to larger temporary wbuf
+ */
+ buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
+ if (c->wbuf.start == NULL) {
+ return unexpected_close(c->wsi, "out-of-memory");
}
- if (wtmp->start == NULL) {
- return unexpected_close(wsi, "out-of-memory");
- }
- void *tmpstart = wtmp->start + LWS_PRE;
- memcpy(tmpstart, start, size);
- ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY);
+ unsigned char* buf = (unsigned char*)c->wbuf.start + LWS_PRE;
+ memcpy(buf, dbuf.start, dbuf.size);
+ ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
if (wrote < 0) {
- pn_transport_close_head(t);
- return normal_close(wsi, "write-error");
+ pn_connection_driver_write_close(&c->driver);
+ return unexpected_close(c->wsi, "write-error");
} else {
- pn_transport_pop(t, (size_t)wrote);
+ pn_connection_driver_write_done(&c->driver, wrote);
}
}
- break;
+ return handle_events(c);
}
case LWS_CALLBACK_RECEIVE: {
- if (!t || pn_transport_capacity(t) < 0) {
- return normal_close(wsi, "read-closed");
- }
- if (transport_push(t, pn_bytes(len, in))) {
- return unexpected_close(wsi, "read-overflow");
+ while (len > 0) {
+ if (handle_events(c)) return -1;
+ pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
+ if (dbuf.size == 0) {
+ return unexpected_close(c->wsi, "unexpected-data");
+ }
+ size_t copy = (len < dbuf.size) ? len : dbuf.size;
+ memcpy(dbuf.start, in, copy);
+ pn_connection_driver_read_done(&c->driver, copy);
+ len -= copy;
+ in = (char*)in + copy;
}
- break;
+ return handle_events(c);
}
- case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
- if (t) {
- pn_transport_close_tail(t);
+ case LWS_CALLBACK_USER: {
+ pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+ if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) {
+ hs->next_tick = next_tick;
}
+ return handle_events(c);
+ }
- case LWS_CALLBACK_CLOSED:
- break;
+ case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
+ pn_connection_driver_read_close(&c->driver);
+ return handle_events(c);
+ }
+
+ case LWS_CALLBACK_CLOSED: {
+ qd_log(wsi_log(wsi), QD_LOG_DEBUG, "HTTP connection closed to %s from %s",
+ wsi_listener(wsi)->host_port, c->name);
+ if (c->driver.transport) {
+ pn_connection_driver_close(&c->driver);
+ handle_events(c);
+ }
+ pn_connection_driver_destroy(&c->driver);
+ c->qd_conn->pn_conn = NULL;
+ c->qd_conn->collector = NULL;
+ qd_connection_free(c->qd_conn);
+ free(c->wbuf.start);
+ return -1;
+ }
default:
- break;
+ return 0;
}
- return 0;
}
-static void check_timer(void *void_http_server) {
- qd_http_server_t *s = (qd_http_server_t*)void_http_server;
- /* Run LWS global timer and forced-service checks. */
- sys_mutex_lock(s->lock);
- lws_service_fd(s->context, NULL);
- while (!lws_service_adjust_timeout(s->context, 1, 0)) {
- /* -1 timeout means just do forced service */
- lws_plat_service_tsi(s->context, -1, 0);
- }
- if (!s->timer) {
- s->timer = qd_timer(s->dispatch, check_timer, s);
- }
- sys_mutex_unlock(s->lock);
- /* Timer is locked using server lock. */
- qd_timer_cancel(s->timer);
- qd_timer_schedule(s->timer, 1000); /* LWS wants per-second wakeups */
-}
+#define DEFAULT_TICK 1000
-static qd_http_listener_t * qdpn_connector_http_listener(qdpn_connector_t* c) {
- qd_listener_t* ql = (qd_listener_t*)qdpn_listener_context(qdpn_connector_listener(c));
- return qd_listener_http(ql);
+static pn_timestamp_t now(void)
+{
+ struct timespec now;
+#ifdef CLOCK_MONOTONIC_COARSE
+ int cid = CLOCK_MONOTONIC_COARSE;
+#else
+ int cid = CLOCK_MONOTONIC;
+#endif
+ if (clock_gettime(cid, &now)) {
+ qd_error_errno(errno, "clock_gettime");
+ exit(1);
+ }
+ return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
}
-static void http_connector_process(qdpn_connector_t *c) {
- qd_http_listener_t *hl = qdpn_connector_http_listener(c);
- qd_http_server_t *s = hl->server;
- sys_mutex_lock(s->lock);
- int fd = qdpn_connector_get_fd(c);
- fd_data_t *d = fd_data(s, fd);
- /* Make sure we are still tracking this fd, could have been closed by timer */
- if (d) {
- pn_transport_t *t = qdpn_connector_transport(c);
- int flags =
- (qdpn_connector_hangup(c) ? POLLHUP : 0) |
- (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) |
- (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0);
- struct lws_pollfd pfd = { fd, flags, flags };
- if (pn_transport_pending(t) > 0) {
- lws_callback_on_writable(d->wsi);
+static void* http_thread_run(void* v) {
+ qd_http_server_t *hs = v;
+ qd_log(hs->log, QD_LOG_INFO, "HTTP server thread running");
+ int result = 0;
+ while(result >= 0) {
+ /* Send a USER event to run transport ticks, may decrease hs->next_tick. */
+ hs->now = now();
+ hs->next_tick = hs->now + DEFAULT_TICK;
+ lws_callback_all_protocol(hs->context, &protocols[1], LWS_CALLBACK_USER);
+ lws_callback_all_protocol(hs->context, &protocols[2], LWS_CALLBACK_USER);
+ pn_millis_t timeout = (hs->next_tick > hs->now) ? hs->next_tick - hs->now : 1;
+
+ /* Run LWS event loop*/
+ result = lws_service(hs->context, timeout);
+
+ /* Process any work items on the queue */
+ for (work_t w = work_pop(hs); w.type != W_NONE; w = work_pop(hs)) {
+ switch (w.type) {
+ case W_NONE:
+ break;
+ case W_STOP:
+ result = -1;
+ break;
+ case W_LISTEN:
+ listener_start((qd_http_listener_t*)w.value, hs);
+ break;
+ case W_CLOSE:
+ listener_close((qd_http_listener_t*)w.value, hs);
+ break;
+ case W_WAKE: {
+ qd_http_connection_t *c = w.value;
+ pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+ PN_CONNECTION_WAKE);
+ handle_events(c);
+ break;
+ }
+ }
}
- lws_service_fd(s->context, &pfd);
- d = fd_data(s, fd); /* We may have stopped tracking during service */
- if (pn_transport_capacity(t) > 0)
- qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
- if (pn_transport_pending(t) > 0 || (d && lws_partial_buffered(d->wsi)))
- qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
- pn_timestamp_t wake = pn_transport_tick(t, qdpn_now(NULL));
- if (wake) qdpn_connector_wakeup(c, wake);
- }
- sys_mutex_unlock(s->lock);
- check_timer(s); /* Make sure the timer is running */
+ }
+ qd_log(hs->log, QD_LOG_INFO, "HTTP server thread exit");
+ return NULL;
}
-/* Dispatch closes a connector because it is HUP, socket_error or transport_closed() */
-static void http_connector_close(qdpn_connector_t *c) {
- int fd = qdpn_connector_get_fd(c);
- qd_http_server_t *s = qdpn_connector_http_listener(c)->server;
- sys_mutex_lock(s->lock);
- fd_data_t *d = fd_data(s, fd);
- if (d) { /* Only if we are still tracking fd */
- /* Shutdown but let LWS do the close(), possibly in later timer */
- shutdown(qdpn_connector_get_fd(c), SHUT_RDWR);
- short flags = POLLIN|POLLOUT|POLLHUP;
- struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
- lws_service_fd(s->context, &pfd);
- qdpn_connector_mark_closed(c);
- memset(d, 0 , sizeof(*d));
- }
- sys_mutex_unlock(s->lock);
+void qd_http_server_free(qd_http_server_t *hs) {
+ if (!hs) return;
+ if (hs->thread) {
+ /* Thread safe, stop via work queue then clean up */
+ work_t work = { W_STOP, NULL };
+ work_push(hs, work);
+ sys_thread_join(hs->thread);
+ sys_thread_free(hs->thread);
+ hs->thread = NULL;
+ }
+ work_queue_destroy(&hs->work);
+ if (hs->context) lws_context_destroy(hs->context);
+ free(hs);
}
-static struct qdpn_connector_methods_t http_methods = {
- http_connector_process,
- http_connector_close
-};
-
-void qd_http_listener_accept(qd_http_listener_t *hl, qdpn_connector_t *c) {
- qd_http_server_t *s = hl->server;
- sys_mutex_lock(s->lock);
- int fd = qdpn_connector_get_fd(c);
- struct lws *wsi = lws_adopt_socket_vhost(hl->vhost, fd);
- fd_data_t *d = fd_data(s, fd);
- if (d) { /* FD was adopted by LWS, so dispatch must not close it */
- qdpn_connector_set_methods(c, &http_methods);
- if (wsi) d->connector = c;
- }
- sys_mutex_unlock(s->lock);
- if (!wsi) { /* accept failed, dispatch should forget the FD. */
- qdpn_connector_mark_closed(c);
+qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) {
+ log_init();
+ qd_http_server_t *hs = calloc(1, sizeof(*hs));
+ if (hs) {
+ work_queue_init(&hs->work);
+ struct lws_context_creation_info info = {0};
+ info.gid = info.uid = -1;
+ info.user = hs;
+ info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
+ info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
+ LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
+ LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ info.max_http_header_pool = 32;
+ info.timeout_secs = 1;
+
+ hs->context = lws_create_context(&info);
+ hs->server = s;
+ hs->log = log; /* For messages from this file */
+ if (!hs->context) {
+ qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
+ qd_http_server_free(hs);
+ hs = NULL;
+ }
}
+ return hs;
}
-static struct lws_protocols protocols[] = {
- /* HTTP only protocol comes first */
- {
- "http-only",
- callback_http,
- 0,
- },
- /* "amqp" is the official oasis AMQP over WebSocket protocol name */
- {
- "amqp",
- callback_amqpws,
- sizeof(buffer_t),
- },
- /* "binary" is an alias for "amqp", for compatibility with clients designed
- * to work with a WebSocket proxy
- */
- {
- "binary",
- callback_amqpws,
- sizeof(buffer_t),
- },
- { NULL, NULL, 0, 0 } /* terminator */
-};
+/* Thread safe calls that put items on work queue */
-static qd_log_level_t qd_level(int lll) {
- switch (lll) {
- case LLL_ERR: return QD_LOG_ERROR;
- case LLL_WARN: return QD_LOG_WARNING;
- case LLL_NOTICE: return QD_LOG_INFO;
- case LLL_INFO:return QD_LOG_DEBUG;
- case LLL_DEBUG: return QD_LOG_TRACE;
- default: return QD_LOG_NONE;
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li)
+{
+ sys_mutex_lock(hs->work.lock);
+ if (!hs->thread) {
+ hs->thread = sys_thread(http_thread_run, hs);
}
+ bool ok = hs->thread;
+ sys_mutex_unlock(hs->work.lock);
+ if (!ok) return NULL;
+
+ qd_http_listener_t *hl = qd_http_listener(hs, li);
+ if (hl) {
+ work_t w = { W_LISTEN, hl };
+ work_push(hs, w);
+ }
+ return hl;
}
-static void emit_lws_log(int lll, const char *line) {
- size_t len = strlen(line);
- while (len > 1 && isspace(line[len-1]))
- --len;
- qd_log(http_log, qd_level(lll), "%.*s", len, line);
-}
-
-qd_http_server_t *qd_http_server(qd_dispatch_t *d, qd_log_source_t *log) {
- if (!http_log) http_log = qd_log_source("HTTP");
- qd_http_server_t *s = calloc(1, sizeof(*s));
- if (!s) return NULL;
- s->log = log;
- s->lock = sys_mutex();
- s->dispatch = d;
- int levels =
- (qd_log_enabled(log, QD_LOG_ERROR) ? LLL_ERR : 0) |
- (qd_log_enabled(log, QD_LOG_WARNING) ? LLL_WARN : 0) |
- (qd_log_enabled(log, QD_LOG_INFO) ? LLL_NOTICE : 0) |
- (qd_log_enabled(log, QD_LOG_DEBUG) ? LLL_INFO : 0) |
- (qd_log_enabled(log, QD_LOG_TRACE) ? LLL_DEBUG : 0);
- lws_set_log_level(levels, emit_lws_log);
-
- struct lws_context_creation_info info = {0};
- info.gid = info.uid = -1;
- info.user = s;
- info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
- info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
- LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
- LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- info.max_http_header_pool = 32;
- info.timeout_secs = 1;
- s->context = lws_create_context(&info);
- if (!s->context) {
- free(s);
- return NULL;
- }
- return s;
+void qd_http_listener_close(qd_http_listener_t *hl)
+{
+ work_t w = { W_CLOSE, hl };
+ work_push(hl->server, w);
}
-void qd_http_server_free(qd_http_server_t *s) {
- sys_mutex_free(s->lock);
- lws_context_destroy(s->context);
- if (s->timer) qd_timer_free(s->timer);
- if (s->fd) free(s->fd);
- free(s);
+static qd_http_server_t *wsi_server(struct lws *wsi) {
+ return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
}
-qd_http_listener_t *qd_http_listener(qd_http_server_t *s, const qd_server_config_t *config) {
- qd_http_listener_t *hl = calloc(1, sizeof(*hl));
- if (!hl) return NULL;
- hl->server = s;
-
- struct lws_context_creation_info info = {0};
-
- struct lws_http_mount *m = &hl->mount;
- m->mountpoint = "/"; /* URL mount point */
- m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
- m->origin = (config->http_root && *config->http_root) ? /* File system root */
- config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
- m->def = "index.html"; /* Default file name */
- m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
- info.mounts = m;
- info.port = CONTEXT_PORT_NO_LISTEN_SERVER; /* Don't use LWS listener */
- info.protocols = protocols;
- info.keepalive_timeout = 1;
- info.ssl_cipher_list = CIPHER_LIST;
- info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
- if (config->ssl_profile) {
- info.ssl_cert_filepath = config->ssl_certificate_file;
- info.ssl_private_key_filepath = config->ssl_private_key_file;
- info.ssl_private_key_password = config->ssl_password;
- info.ssl_ca_filepath = config->ssl_trusted_certificates;
- info.options |=
- LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
- (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
- (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
- }
- snprintf(hl->name, sizeof(hl->name), "vhost%x", s->vhost_id++);
- info.vhost_name = hl->name;
- hl->vhost = lws_create_vhost(s->context, &info);
- if (!hl->vhost) {
- free(hl);
- return NULL;
+static qd_http_listener_t *wsi_listener(struct lws *wsi) {
+ qd_http_listener_t *hl = NULL;
+ struct lws_vhost *vhost = lws_get_vhost(wsi);
+ if (vhost) { /* Get qd_http_listener from vhost data */
+ void *vp = lws_protocol_vh_priv_get(vhost, &protocols[0]);
+ memcpy(&hl, vp, sizeof(hl));
}
return hl;
}
-void qd_http_listener_free(qd_http_listener_t *hl) {
- free(hl);
+static qd_log_source_t *wsi_log(struct lws *wsi) {
+ return wsi_server(wsi)->log;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/http-none.c
----------------------------------------------------------------------
diff --git a/src/http-none.c b/src/http-none.c
index b9af9e1..bdc32b0 100644
--- a/src/http-none.c
+++ b/src/http-none.c
@@ -18,32 +18,27 @@
*/
#include <qpid/dispatch/log.h>
-#include <qpid/dispatch/driver.h>
#include "http.h"
+struct qd_dispatch_t;
+
/* No HTTP implementation available. */
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *d, qd_log_source_t *log)
+qd_http_server_t *qd_http_server(struct qd_server_t *s, qd_log_source_t *log)
{
qd_log(log, QD_LOG_WARNING, "HTTP support is not available");
return 0;
}
-void qd_http_server_free(qd_http_server_t *h)
-{
-}
+void qd_http_server_free(qd_http_server_t *h) {}
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
- const struct qd_server_config_t *config)
-{
- return 0;
-}
+void* qd_http_server_run(void* qd_http_server) { return 0; }
-void qd_http_listener_free(qd_http_listener_t *hl)
-{
-}
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li) { return 0; }
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c)
-{
-}
+struct qd_qd_http_connection_t { int dummy; };
+const char *qd_http_connection_name(qd_http_connection_t* hc) { return ""; }
+const char *qd_http_connection_hostip(qd_http_connection_t* hc) { return ""; }
+bool qd_http_connection_closed(qd_http_connection_t* hc) { return false; }
+void qd_http_connection_wake(qd_http_connection_t *qd_conn) {}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/http.h
----------------------------------------------------------------------
diff --git a/src/http.h b/src/http.h
index fae3f1d..8c064d2 100644
--- a/src/http.h
+++ b/src/http.h
@@ -23,17 +23,21 @@
typedef struct qd_http_listener_t qd_http_listener_t;
typedef struct qd_http_server_t qd_http_server_t;
-struct qd_dispatch_t;
+struct qd_listener_t;
struct qd_log_source_t;
struct qd_server_config_t;
-struct qdpn_connector_t;
+struct qd_server_t;
+
+qd_http_server_t *qd_http_server(struct qd_server_t *server, struct qd_log_source_t *log);
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *dispatch, struct qd_log_source_t *log);
void qd_http_server_free(qd_http_server_t*);
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
- const struct qd_server_config_t *config);
-void qd_http_listener_free(qd_http_listener_t *hl);
-/* On error, qdpn_connector_closed(c) is true. */
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c);
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li);
+
+typedef struct qd_http_connection_t qd_http_connection_t;
+
+const char *qd_http_connection_name(qd_http_connection_t* hc);
+const char *qd_http_connection_hostip(qd_http_connection_t* hc);
+bool qd_http_connection_closed(qd_http_connection_t* hc);
+void qd_http_connection_wake(qd_http_connection_t* hc);
#endif // QD_HTTP_H
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 42fed58..8d069e2 100644
--- a/src/server.c
+++ b/src/server.c
@@ -92,18 +92,6 @@ struct qd_server_t {
};
/**
- * Listener objects represent the desire to accept incoming transport connections.
- */
-struct qd_listener_t {
- qd_server_t *server;
- const qd_server_config_t *config;
- void *context;
- qdpn_listener_t *pn_listener;
- qd_http_listener_t *http;
-};
-
-
-/**
* Connector objects represent the desire to create and maintain an outgoing transport connection.
*/
struct qd_connector_t {
@@ -160,7 +148,9 @@ static qd_thread_t *thread(qd_server_t *qd_server, int id)
return thread;
}
-static void free_qd_connection(qd_connection_t *ctx)
+static void invoke_deferred_calls(qd_connection_t *conn, bool discard);
+
+void qd_connection_free(qd_connection_t *ctx)
{
if (ctx->policy_settings) {
if (ctx->policy_settings->sources)
@@ -185,6 +175,10 @@ static void free_qd_connection(qd_connection_t *ctx)
free(ctx->role);
+ if (ctx->deferred_call_lock) {
+ invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
+ sys_mutex_free(ctx->deferred_call_lock);
+ }
free_qd_connection_t(ctx);
}
@@ -423,7 +417,7 @@ static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tpo
* Allocate a new qd_connection
* with DEQ items initialized, call lock allocated, and all other fields cleared.
*/
-static qd_connection_t *connection_allocate()
+qd_connection_t *qd_server_connection_allocate()
{
qd_connection_t *ctx = new_qd_connection_t();
ZERO(ctx);
@@ -434,6 +428,14 @@ static qd_connection_t *connection_allocate()
return ctx;
}
+/* Get the next connection ID, thread safe */
+uint64_t qd_server_connection_id(qd_server_t *server)
+{
+ sys_mutex_lock(server->lock);
+ uint64_t id = server->next_connection_id++;
+ sys_mutex_unlock(server->lock);
+ return id;
+}
void qd_connection_set_user(qd_connection_t *conn)
{
@@ -525,7 +527,7 @@ static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr)
}
-static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config)
+void qd_server_decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config)
{
size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
@@ -621,7 +623,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
char logbuf[qd_log_max_len()];
- ctx = connection_allocate();
+ ctx = qd_server_connection_allocate();
ctx->server = qd_server;
ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
ctx->pn_cxtr = cxtr;
@@ -638,7 +640,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
pn_connection_t *conn = pn_connection();
ctx->collector = pn_collector();
pn_connection_collect(conn, ctx->collector);
- decorate_connection(qd_server, conn, ctx->listener->config);
+ qd_server_decorate_connection(qd_server, conn, ctx->listener->config);
qdpn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
@@ -672,13 +674,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
pn_transport_set_tracer(tport, transport_tracer);
}
- if (li->http) {
- // Set up HTTP if configured, HTTP provides its own SSL.
- qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring HTTP%s on %s",
- config->ssl_profile ? "S" : "",
- log_incoming(logbuf, sizeof(logbuf), cxtr));
- qd_http_listener_accept(li->http, cxtr);
- } else if (config->ssl_profile) {
+ if (!li->http && config->ssl_profile) {
// Set up SSL if configured and HTTP is not providing SSL.
qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s",
log_incoming(logbuf, sizeof(logbuf), cxtr));
@@ -766,31 +762,32 @@ static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
}
-static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
+int qd_connection_process(qd_connection_t *ctx)
{
- qd_connection_t *ctx = qdpn_connector_context(cxtr);
int events = 0;
int passes = 0;
+ qdpn_connector_t *cxtr = ctx->pn_cxtr;
+ qd_http_connection_t *hc = ctx->http;
if (ctx->closed)
return 0;
-
+ qd_server_t *qd_server = ctx->server;
do {
passes++;
//
// Step the engine for pre-handler processing
//
- qdpn_connector_process(cxtr);
-
+ if (cxtr) {
+ qdpn_connector_process(cxtr);
+ }
//
// If the connector has closed, notify the client via callback.
//
- if (qdpn_connector_closed(cxtr)) {
+ if ((cxtr && qdpn_connector_closed(cxtr)) || (hc && qd_http_connection_closed(hc))) {
if (ctx->opened)
qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
- QD_CONN_EVENT_CLOSE,
- (qd_connection_t*) qdpn_connector_context(cxtr));
+ QD_CONN_EVENT_CLOSE, ctx);
ctx->closed = true;
events = 0;
break;
@@ -798,7 +795,7 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
invoke_deferred_calls(ctx, false);
- qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr);
+ qd_connection_t *qd_conn = ctx;
pn_collector_t *collector = qd_connection_collector(qd_conn);
pn_event_t *event;
@@ -1046,7 +1043,7 @@ static void *thread_run(void *arg)
// Even if the connector has failed there are still events that
// must be processed so that associated links will be cleaned up.
//
- work_done = process_connector(qd_server, cxtr);
+ work_done = qd_connection_process(ctx);
//
// Check to see if the connector was closed during processing
@@ -1070,9 +1067,7 @@ static void *thread_run(void *arg)
}
qdpn_connector_free(cxtr);
- invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
- sys_mutex_free(ctx->deferred_call_lock);
- free_qd_connection(ctx);
+ qd_connection_free(ctx);
qd_server->threads_active--;
sys_mutex_unlock(qd_server->lock);
} else {
@@ -1145,7 +1140,7 @@ static void cxtr_try_open(void *context)
if (ct->state != CXTR_STATE_CONNECTING)
return;
- qd_connection_t *ctx = connection_allocate();
+ qd_connection_t *ctx = qd_server_connection_allocate();
ctx->server = ct->server;
ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
ctx->pn_conn = pn_connection();
@@ -1161,7 +1156,7 @@ static void cxtr_try_open(void *context)
qd_log(ct->server->log_source, QD_LOG_INFO, "Connecting to %s:%s", ct->config->host, ct->config->port);
pn_connection_collect(ctx->pn_conn, ctx->collector);
- decorate_connection(ctx->server, ctx->pn_conn, ct->config);
+ qd_server_decorate_connection(ctx->server, ctx->pn_conn, ct->config);
//
// qdpn_connector is not thread safe
@@ -1175,8 +1170,7 @@ static void cxtr_try_open(void *context)
const qd_server_config_t *config = ct->config;
if (ctx->pn_cxtr == 0) {
- sys_mutex_free(ctx->deferred_call_lock);
- free_qd_connection(ctx);
+ qd_connection_free(ctx);
ct->delay = 10000;
qd_timer_schedule(ct->timer, ct->delay);
return;
@@ -1354,7 +1348,7 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
qd_server->heartbeat_timer = 0;
qd_server->next_connection_id = 1;
qd_server->py_displayname_obj = 0;
- qd_server->http = qd_http_server(qd, qd_server->log_source);
+ qd_server->http = qd_http_server(qd_server, qd_server->log_source);
qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
return qd_server;
@@ -1534,17 +1528,16 @@ void qd_server_resume(qd_dispatch_t *qd)
void qd_server_activate(qd_connection_t *ctx, bool awaken)
{
- if (!ctx)
- return;
-
qdpn_connector_t *ctor = ctx->pn_cxtr;
- if (!ctor)
- return;
-
- if (!qdpn_connector_closed(ctor)) {
- qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
- if (awaken)
- qdpn_driver_wakeup(ctx->server->driver);
+ qd_http_connection_t *hc = ctx->http;
+ if (ctor) {
+ if (!qdpn_connector_closed(ctor)) {
+ qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
+ if (awaken)
+ qdpn_driver_wakeup(ctx->server->driver);
+ }
+ } else if (hc) {
+ qd_http_connection_wake(hc);
}
}
@@ -1636,6 +1629,7 @@ qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *con
{
qd_server_t *qd_server = qd->server;
qd_listener_t *li = new_qd_listener_t();
+ ZERO(li);
if (!li)
return 0;
@@ -1646,23 +1640,23 @@ qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *con
li->http = NULL;
if (config->http) {
- li->http = qd_http_listener(qd_server->http, config);
+ li->http = qd_http_server_listen(qd_server->http, li);
if (!li->http) {
free_qd_listener_t(li);
qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start HTTP listener on %s:%s",
config->host, config->port);
return NULL;
}
- }
+ } else {
+ li->pn_listener = qdpn_listener(
+ qd_server->driver, config->host, config->port, config->protocol_family, li);
- li->pn_listener = qdpn_listener(
- qd_server->driver, config->host, config->port, config->protocol_family, li);
-
- if (!li->pn_listener) {
- free_qd_listener_t(li);
- qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s",
- config->host, config->port);
- return NULL;
+ if (!li->pn_listener) {
+ free_qd_listener_t(li);
+ qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s",
+ config->host, config->port);
+ return NULL;
+ }
}
qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s%s", config->host, config->port,
config->http ? (config->ssl_profile ? "(HTTPS)":"(HTTP)") : "");
@@ -1675,15 +1669,15 @@ void qd_server_listener_free(qd_listener_t* li)
{
if (!li)
return;
- if (li->http) qd_http_listener_free(li->http);
- qdpn_listener_free(li->pn_listener);
+ /* The http listener is freed by the http server */
+ if (li->pn_listener) qdpn_listener_free(li->pn_listener);
free_qd_listener_t(li);
}
void qd_server_listener_close(qd_listener_t* li)
{
- if (li)
+ if (li && li->pn_listener)
qdpn_listener_close(li->pn_listener);
}
@@ -1741,11 +1735,11 @@ void qd_server_timer_cancel_LH(qd_timer_t *timer)
qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
const char* qd_connection_name(const qd_connection_t *c) {
- return qdpn_connector_name(c->pn_cxtr);
+ return c->pn_cxtr ? qdpn_connector_name(c->pn_cxtr) : qd_http_connection_name(c->http);
}
const char* qd_connection_hostip(const qd_connection_t *c) {
- return qdpn_connector_hostip(c->pn_cxtr);
+ return c->pn_cxtr ? qdpn_connector_hostip(c->pn_cxtr) : qd_http_connection_hostip(c->http);
}
qd_connector_t* qd_connection_connector(const qd_connection_t *c) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 576b5ec..b613133 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -35,17 +35,22 @@
void qd_server_timer_pending_LH(qd_timer_t *timer);
void qd_server_timer_cancel_LH(qd_timer_t *timer);
-/* FIXME aconway 2017-01-19: to include/server.h? */
-
struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
const char* qd_connection_name(const qd_connection_t *c);
const char* qd_connection_hostip(const qd_connection_t *c);
qd_connector_t* qd_connection_connector(const qd_connection_t *c);
+void qd_connection_free(qd_connection_t *c);
+int qd_connection_process(qd_connection_t *ctx);
const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
-qd_http_listener_t *qd_listener_http(qd_listener_t *l);
+uint64_t qd_server_connection_id(qd_server_t *server);
+qd_connection_t *qd_server_connection_allocate();
+
+void qd_server_decorate_connection(
+ qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config);
+
#define CONTEXT_NO_OWNER -1
#define CONTEXT_UNSPECIFIED_OWNER -2
@@ -81,6 +86,25 @@ typedef struct qd_pn_free_link_session_t {
DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
+/**
+ * Listener objects represent the desire to accept incoming transport connections.
+ */
+struct qd_listener_t {
+ qd_server_t *server;
+ const qd_server_config_t *config;
+ void *context;
+ qdpn_listener_t *pn_listener;
+ qd_http_listener_t *http;
+};
+
/**
* Connection objects wrap Proton connection objects.
*/
@@ -92,6 +116,7 @@ struct qd_connection_t {
int owner_thread;
int enqueued;
qdpn_connector_t *pn_cxtr;
+ qd_http_connection_t *http;
pn_connection_t *pn_conn;
pn_collector_t *collector;
pn_ssl_t *ssl;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org