You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/12/01 19:15:16 UTC
[2/2] qpid-dispatch git commit: DISPATCH-103: Fix Websocket Listeners
DISPATCH-103: Fix Websocket Listeners
Fix shutdown problems, websocket liseteners are now functioning properly.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/69f52f28
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/69f52f28
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/69f52f28
Branch: refs/heads/master
Commit: 69f52f2836fd0324a4f49585d32e38dab72f9afc
Parents: 978e809
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Dec 1 13:34:44 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Dec 1 14:14:40 2016 -0500
----------------------------------------------------------------------
src/http-libwebsockets.c | 82 ++++++++++++++++++-------------------------
1 file changed, 34 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/69f52f28/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index c7c0043..9d2b3e5 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -59,8 +59,9 @@ typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t;
/* Extra buffering per connection, stored in the lws_wsi_user() space. */
typedef struct buffers_t {
- buffer_t wtmp; /* Temp buffer with pre-data header space required by LWS */
- buffer_t over; /* Can't control LWS read size, buffer the overflow */
+ buffer_t wtmp; /* Temp buffer with pre-data header space required by LWS */
+ buffer_t over; /* Can't control LWS read size, buffer the overflow */
+ char name[256]; /* Copy of connector name for use after connector detached */
} buffers_t;
static void resize(buffer_t *b, size_t size) {
@@ -98,27 +99,33 @@ static qd_http_t *qd_http_from_wsi(struct lws *wsi) {
return (qd_http_t *)lws_context_user(lws_get_context(wsi));
}
+static int do_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) {
+ if (c) qdpn_connector_mark_closed(c);
+ lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
+ return -1;
+}
+
static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
buffers_t *b = (buffers_t*)user;
qd_http_t *h = qd_http_from_wsi(wsi);
qdpn_connector_t *c = per_thread.connector;
- pn_transport_t *t = qdpn_connector_transport(c);
- const char *name = qdpn_connector_name(c);
+ pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
switch (reason) {
case LWS_CALLBACK_ESTABLISHED: {
- qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", name);
memset(b, 0, sizeof(*b));
+ strncpy(b->name, qdpn_connector_name(c), sizeof(b->name));
+ qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", b->name);
break;
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
- ssize_t size = pn_transport_pending(t);
- if (size < 0) {
- return -1;
+ ssize_t size;
+ if (!t || (size = pn_transport_pending(t)) < 0) {
+ return do_close(wsi, c, "write-closed");
}
if (size > 0) {
pn_bytes_t wbuf = { size, pn_transport_head(t) };
@@ -129,7 +136,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
ssize_t wrote = lws_write(wsi, start, wbuf.size, LWS_WRITE_BINARY);
if (wrote < 0) {
pn_transport_close_head(t);
- return -1;
+ return do_close(wsi, c, "write-error");
} else {
pn_transport_pop(t, (size_t)wrote);
}
@@ -138,26 +145,25 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
}
case LWS_CALLBACK_RECEIVE: {
- if (pn_transport_capacity(t) < 0) {
- return -1;
+ if (!t || pn_transport_capacity(t) < 0) {
+ do_close(wsi, c, "read-closed");
}
assert(b->over.size == 0);
transport_push_max(t, pn_bytes(len, in), &b->over);
if (b->over.size > 0) {
- qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", name);
+ qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", b->name);
}
break;
}
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
- qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", name);
- pn_transport_close_tail(t);
- break;
+ qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", b->name);
+ if (t) pn_transport_close_tail(t);
+ return do_close(wsi, c, "peer-close");
}
case LWS_CALLBACK_CLOSED: {
- qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", name);
- qdpn_connector_mark_closed(c);
+ qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", b->name);
break;
}
@@ -170,38 +176,21 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
{
- qdpn_connector_t *c = per_thread.connector;
- buffers_t *b = (buffers_t*)user;
-
- switch (reason) {
- case LWS_CALLBACK_ESTABLISHED: {
- memset(b, 0, sizeof(*b));
- break;
- }
- case LWS_CALLBACK_CLOSED: {
- qdpn_connector_mark_closed(c);
- }
- default:
- break;
- }
return 0;
}
-static void set_timer_lh(qd_http_t *h);
-
-static void fire_timer(void *void_http) {
+static void check_timer(void *void_http) {
+ per_thread.connector = NULL; /* Not servicing any connector */
qd_http_t *h = (qd_http_t*)void_http;
sys_mutex_lock(h->lock);
+ /* Run LWS global timer checks. */
lws_service_fd(h->context, NULL);
- set_timer_lh(h);
- sys_mutex_unlock(h->lock);
-}
-
-static void set_timer_lh(qd_http_t *h) {
if (!h->timer) {
- h->timer = qd_timer(h->dispatch, fire_timer, h);
+ h->timer = qd_timer(h->dispatch, check_timer, h);
}
+ qd_timer_cancel(h->timer);
qd_timer_schedule(h->timer, 1000);
+ sys_mutex_unlock(h->lock);
}
void qd_http_connector_process(qdpn_connector_t *c) {
@@ -218,28 +207,25 @@ void qd_http_connector_process(qdpn_connector_t *c) {
if (b && b->over.size) { /* Consume last over-buffered read */
transport_push_max(t, pn_bytes(b->over.size, b->over.start), &b->over);
- if (b->over.size) { /* Don't let LIBWS read if we still are over */
- flags &= ~POLLIN;
+ if (b->over.size) {
+ flags &= ~POLLIN; /* Don't let LIBWS read if we still are over */
}
}
-
sys_mutex_lock(h->lock);
struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
lws_service_fd(h->context, &pfd);
- set_timer_lh(h);
sys_mutex_unlock(h->lock);
if (pn_transport_capacity(t) > 0)
qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
if (pn_transport_pending(t) > 0)
qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-
pn_timestamp_t now = qdpn_now(NULL);
pn_timestamp_t next = pn_transport_tick(t, now);
/* If we have overflow, re-process immediately after dispatch, otherwise at
- * next proton tick.
- */
+ * next proton tick. */
qdpn_connector_wakeup(c, (b && b->over.size) ? now : next);
+ check_timer(h);
}
qd_http_connector_t *qd_http_connector(qd_http_t *h, qdpn_connector_t *c) {
@@ -268,7 +254,6 @@ static struct lws_protocols protocols[] = {
callback_amqpws,
sizeof(buffers_t),
},
-
};
qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) {
@@ -277,6 +262,7 @@ qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) {
h->dispatch = d;
h->log = log;
lws_set_log_level(0, NULL);
+
struct lws_context_creation_info info = {0};
info.port = CONTEXT_PORT_NO_LISTEN;
info.protocols = protocols;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org