You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/12/01 19:15:16 UTC

[2/2] qpid-dispatch git commit: DISPATCH-103: Fix Websocket Listeners

DISPATCH-103: Fix Websocket Listeners

Fix shutdown problems, websocket liseteners are now functioning properly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/69f52f28
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/69f52f28
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/69f52f28

Branch: refs/heads/master
Commit: 69f52f2836fd0324a4f49585d32e38dab72f9afc
Parents: 978e809
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Dec 1 13:34:44 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Dec 1 14:14:40 2016 -0500

----------------------------------------------------------------------
 src/http-libwebsockets.c | 82 ++++++++++++++++++-------------------------
 1 file changed, 34 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/69f52f28/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index c7c0043..9d2b3e5 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -59,8 +59,9 @@ typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t;
 
 /* Extra buffering per connection, stored in the lws_wsi_user() space. */
 typedef struct buffers_t {
-    buffer_t wtmp;    /* Temp buffer with pre-data header space required by LWS */
-    buffer_t over;    /* Can't control LWS read size, buffer the overflow */
+    buffer_t wtmp;  /* Temp buffer with pre-data header space required by LWS */
+    buffer_t over;  /* Can't control LWS read size, buffer the overflow */
+    char name[256]; /* Copy of connector name for use after connector detached */
 } buffers_t;
 
 static void resize(buffer_t *b, size_t size) {
@@ -98,27 +99,33 @@ static qd_http_t *qd_http_from_wsi(struct lws *wsi) {
     return (qd_http_t *)lws_context_user(lws_get_context(wsi));
 }
 
+static int do_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) {
+    if (c) qdpn_connector_mark_closed(c);
+    lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
+    return -1;
+}
+
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
 {
     buffers_t *b = (buffers_t*)user;
     qd_http_t *h = qd_http_from_wsi(wsi);
     qdpn_connector_t *c = per_thread.connector;
-    pn_transport_t *t = qdpn_connector_transport(c);
-    const char *name = qdpn_connector_name(c);
+    pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
 
     switch (reason) {
 
     case LWS_CALLBACK_ESTABLISHED: {
-        qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", name);
         memset(b, 0, sizeof(*b));
+        strncpy(b->name, qdpn_connector_name(c), sizeof(b->name));
+        qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", b->name);
         break;
     }
 
     case LWS_CALLBACK_SERVER_WRITEABLE: {
-        ssize_t size = pn_transport_pending(t);
-        if (size < 0) {
-            return -1;
+        ssize_t size;
+        if (!t || (size = pn_transport_pending(t)) < 0) {
+            return do_close(wsi, c, "write-closed");
         }
         if (size > 0) {
             pn_bytes_t wbuf = { size, pn_transport_head(t) };
@@ -129,7 +136,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
             ssize_t wrote = lws_write(wsi, start, wbuf.size, LWS_WRITE_BINARY);
             if (wrote < 0) {
                 pn_transport_close_head(t);
-                return -1;
+                return do_close(wsi, c, "write-error");
             } else {
                 pn_transport_pop(t, (size_t)wrote);
             }
@@ -138,26 +145,25 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
     }
 
     case LWS_CALLBACK_RECEIVE: {
-        if (pn_transport_capacity(t) < 0) {
-            return -1;
+        if (!t || pn_transport_capacity(t) < 0) {
+            do_close(wsi, c, "read-closed");
         }
         assert(b->over.size == 0);
         transport_push_max(t, pn_bytes(len, in), &b->over);
         if (b->over.size > 0) {
-            qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", name);
+            qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", b->name);
         }
         break;
     }
 
     case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
-        qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", name);
-        pn_transport_close_tail(t);
-        break;
+        qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", b->name);
+        if (t) pn_transport_close_tail(t);
+        return do_close(wsi, c, "peer-close");
     }
 
     case LWS_CALLBACK_CLOSED: {
-        qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", name);
-        qdpn_connector_mark_closed(c);
+        qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", b->name);
         break;
     }
 
@@ -170,38 +176,21 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
 static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
                          void *in, size_t len)
 {
-    qdpn_connector_t *c = per_thread.connector;
-    buffers_t *b = (buffers_t*)user;
-
-    switch (reason) {
-    case LWS_CALLBACK_ESTABLISHED: {
-        memset(b, 0, sizeof(*b));
-        break;
-    }
-    case LWS_CALLBACK_CLOSED: {
-        qdpn_connector_mark_closed(c);
-    }
-    default:
-        break;
-    }
     return 0;
 }
 
-static void set_timer_lh(qd_http_t *h);
-
-static void fire_timer(void *void_http) {
+static void check_timer(void *void_http) {
+    per_thread.connector = NULL; /* Not servicing any connector */
     qd_http_t *h = (qd_http_t*)void_http;
     sys_mutex_lock(h->lock);
+    /* Run LWS global timer checks. */
     lws_service_fd(h->context, NULL);
-    set_timer_lh(h);
-    sys_mutex_unlock(h->lock);
-}
-
-static void set_timer_lh(qd_http_t *h) {
     if (!h->timer) {
-        h->timer = qd_timer(h->dispatch, fire_timer, h);
+        h->timer = qd_timer(h->dispatch, check_timer, h);
     }
+    qd_timer_cancel(h->timer);
     qd_timer_schedule(h->timer, 1000);
+    sys_mutex_unlock(h->lock);
 }
 
 void qd_http_connector_process(qdpn_connector_t *c) {
@@ -218,28 +207,25 @@ void qd_http_connector_process(qdpn_connector_t *c) {
 
     if (b && b->over.size) {         /* Consume last over-buffered read */
         transport_push_max(t, pn_bytes(b->over.size, b->over.start), &b->over);
-        if (b->over.size) {         /* Don't let LIBWS read if we still are over */
-            flags &= ~POLLIN;
+        if (b->over.size) {
+            flags &= ~POLLIN;   /* Don't let LIBWS read if we still are over */
         }
     }
-
     sys_mutex_lock(h->lock);
     struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
     lws_service_fd(h->context, &pfd);
-    set_timer_lh(h);
     sys_mutex_unlock(h->lock);
 
     if (pn_transport_capacity(t) > 0)
         qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
     if (pn_transport_pending(t) > 0)
         qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-
     pn_timestamp_t now = qdpn_now(NULL);
     pn_timestamp_t next = pn_transport_tick(t, now);
     /* If we have overflow, re-process immediately after dispatch, otherwise at
-     * next proton tick.
-     */
+     * next proton tick. */
     qdpn_connector_wakeup(c, (b && b->over.size) ? now : next);
+    check_timer(h);
 }
 
 qd_http_connector_t *qd_http_connector(qd_http_t *h, qdpn_connector_t *c) {
@@ -268,7 +254,6 @@ static struct lws_protocols protocols[] = {
         callback_amqpws,
         sizeof(buffers_t),
     },
-
 };
 
 qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) {
@@ -277,6 +262,7 @@ qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) {
     h->dispatch = d;
     h->log = log;
     lws_set_log_level(0, NULL);
+
     struct lws_context_creation_info info = {0};
     info.port = CONTEXT_PORT_NO_LISTEN;
     info.protocols = protocols;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org