You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/03/18 21:01:00 UTC

[qpid-dispatch] branch master updated: DISPATCH-1290: add simple http healthcheck and allow websockets to be disabled on http listener This closes #468

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e22255  DISPATCH-1290: add simple http healthcheck and allow websockets to be disabled on http listener This closes #468
1e22255 is described below

commit 1e22255eede6f82828a433839cb25087c2c7b300
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Fri Mar 15 23:24:34 2019 +0000

    DISPATCH-1290: add simple http healthcheck and allow websockets to be disabled on http listener
    This closes #468
---
 include/qpid/dispatch/server.h                | 10 ++++
 python/qpid_dispatch/management/qdrouter.json | 14 +++++-
 src/connection_manager.c                      |  5 +-
 src/http-libwebsockets.c                      | 70 ++++++++++++++++++++++++++-
 src/router_core/router_core.c                 | 37 +++++++-------
 tests/system_tests_http.py                    | 33 +++++++++++++
 6 files changed, 145 insertions(+), 24 deletions(-)

diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 94f14cc..5a4c115 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -133,11 +133,21 @@ typedef struct qd_server_config_t {
     char *protocol_family;
 
     /**
+     * Expose simple liveness check.
+     */
+    bool healthz;
+
+    /**
      * Export metrics.
      */
     bool metrics;
 
     /**
+     * Websockets enabled.
+     */
+    bool websockets;
+
+    /**
      * Accept HTTP connections, allow WebSocket "amqp" protocol upgrades.
      */
     bool http;
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index b1b253d..1d12338 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -838,10 +838,22 @@
                     "deprecationName": "failoverList",
                     "description": "A comma-separated list of failover urls to be supplied to connected clients.  Form: [(amqp|amqps|ws|wss)://]host_or_ip[:port]"
                 },
+                "healthz": {
+                    "type": "boolean",
+                    "default": true,
+                    "description": "Provide a simple HTTP based liveness test (using path /healthz). Assumes listener is enabled for http.",
+                    "create": true
+                },
                 "metrics": {
                     "type": "boolean",
                     "default": true,
-                    "description": "Export metrics in prometheus text format for the router (using path /metrics).",
+                    "description": "Export metrics in prometheus text format for the router (using path /metrics). Assumes listener is enabled for http.",
+                    "create": true
+                },
+                "websockets": {
+                    "type": "boolean",
+                    "default": true,
+                    "description": "For an http enabled listener, determines whether websockets access is enabled (true by default).",
                     "create": true
                 },
                 "http": {
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 8f8cbe7..6549e56 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -312,7 +312,9 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     config->role                 = qd_entity_get_string(entity, "role");              CHECK();
     config->inter_router_cost    = qd_entity_opt_long(entity, "cost", 1);             CHECK();
     config->protocol_family      = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK();
+    config->healthz              = qd_entity_opt_bool(entity, "healthz", true);       CHECK();
     config->metrics              = qd_entity_opt_bool(entity, "metrics", true);       CHECK();
+    config->websockets           = qd_entity_opt_bool(entity, "websockets", true);    CHECK();
     config->http                 = qd_entity_opt_bool(entity, "http", false);         CHECK();
     config->http_root_dir        = qd_entity_opt_string(entity, "httpRootDir", false);   CHECK();
 
@@ -324,9 +326,6 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     if (config->http && ! config->http_root_dir) {
         qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "HTTP service is requested but no httpRootDir specified. The router will serve AMQP-over-websockets but no static content.");
     }
-    if (config->metrics && !config->http) {
-        qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Metrics can only be exported on listener with http enabled.");
-    }
 
     config->max_frame_size       = qd_entity_get_long(entity, "maxFrameSize");        CHECK();
     config->max_sessions         = qd_entity_get_long(entity, "maxSessions");         CHECK();
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 0690c0c..850c646 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -117,6 +117,8 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len);
 static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
                                void *user, void *in, size_t len);
+static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason,
+                               void *user, void *in, size_t len);
 
 static struct lws_protocols protocols[] = {
     /* HTTP only protocol comes first */
@@ -144,6 +146,11 @@ static struct lws_protocols protocols[] = {
         callback_metrics,
         sizeof(stats_t),
     },
+    {
+        "healthz",
+        callback_healthz,
+        sizeof(stats_t),
+    },
     { NULL, NULL, 0, 0 } /* terminator */
 };
 
@@ -248,6 +255,7 @@ struct qd_http_listener_t {
     struct lws_vhost *vhost;
     struct lws_http_mount mount;
     struct lws_http_mount metrics;
+    struct lws_http_mount healthz;
 };
 
 void qd_http_listener_free(qd_http_listener_t *hl) {
@@ -301,14 +309,24 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
     m->def = "index.html";  /* Default file name */
     m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
     m->extra_mimetypes = mime_types;
+    struct lws_http_mount *tail = m;
     if (config->metrics) {
         struct lws_http_mount *metrics = &hl->metrics;
-        m->mount_next = metrics;
+        tail->mount_next = metrics;
+        tail = metrics;
         metrics->mountpoint = "/metrics";
         metrics->mountpoint_len = strlen(metrics->mountpoint);
         metrics->origin_protocol = LWSMPRO_CALLBACK;
         metrics->protocol = "http";
     }
+    if (config->healthz) {
+        struct lws_http_mount *healthz = &hl->healthz;
+        tail->mount_next = healthz;
+        healthz->mountpoint = "/healthz";
+        healthz->mountpoint_len = strlen(healthz->mountpoint);
+        healthz->origin_protocol = LWSMPRO_CALLBACK;
+        healthz->protocol = "healthz";
+    }
 
     struct lws_context_creation_info info = {0};
     info.mounts = m;
@@ -524,6 +542,54 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
     }
 }
 
+static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason,
+                               void *user, void *in, size_t len)
+{
+    qd_http_server_t *hs = wsi_server(wsi);
+    stats_t *stats = (stats_t*) user;
+    uint8_t buffer[LWS_PRE + 2048];
+    uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1];
+
+    switch (reason) {
+
+    case LWS_CALLBACK_HTTP: {
+        stats->wsi = wsi;
+        stats->server = hs;
+        //make dummy request for stats (pass in null ptr); this still excercises the
+        //path through core thread and back through callback on io thread which is
+        //a resonable initial liveness check
+        qdr_request_global_stats(hs->core, 0, handle_stats_results, (void*) stats);
+        return 0;
+    }
+
+    case LWS_CALLBACK_HTTP_WRITEABLE: {
+        //encode stats into buffer
+        if (!stats->headers_sent) {
+            if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end)
+                || add_header_by_name(wsi, "content-type:", "text/plain", &position, end)
+                || lws_add_http_header_content_length(wsi, 3, &position, end))
+                return 1;
+            if (lws_finalize_http_header(wsi, &position, end))
+                return 1;
+            stats->headers_sent = true;
+        }
+        position += lws_snprintf((char*) position, end - position, "OK\n");
+
+        int n = LWS_WRITE_HTTP_FINAL;
+        //write buffer
+        size_t available = position - start;
+	if (lws_write(wsi, (unsigned char*) start, available, n) != available)
+            return 1;
+        else if (lws_http_transaction_completed(wsi))
+            return -1;
+        else return 0;
+    }
+
+    default:
+        return 0;
+    }
+}
+
 /* Callbacks for promoted AMQP over WS connections. */
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
@@ -538,7 +604,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
         memset(c, 0, sizeof(*c));
         c->wsi = wsi;
         qd_http_listener_t *hl = wsi_listener(wsi);
-        if (hl == NULL) {
+        if (hl == NULL || !hl->listener->config.websockets) {
             return unexpected_close(c->wsi, "cannot-upgrade");
         }
         c->qd_conn = qd_server_connection(hs->server, &hl->listener->config);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index a66a703..281b260 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -758,24 +758,25 @@ static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t
 static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     qdr_global_stats_t *stats = action->args.stats_request.stats;
-    stats->addrs = DEQ_SIZE(core->addrs);
-    stats->links = DEQ_SIZE(core->open_links);
-    stats->routers = DEQ_SIZE(core->routers);
-    stats->connections = DEQ_SIZE(core->open_connections);
-    stats->link_routes = DEQ_SIZE(core->link_routes);
-    stats->auto_links = DEQ_SIZE(core->auto_links);
-    stats->presettled_deliveries = core->presettled_deliveries;
-    stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
-    stats->accepted_deliveries = core->accepted_deliveries;
-    stats->rejected_deliveries = core->rejected_deliveries;
-    stats->released_deliveries = core->released_deliveries;
-    stats->modified_deliveries = core->modified_deliveries;
-    stats->deliveries_ingress = core->deliveries_ingress;
-    stats->deliveries_egress = core->deliveries_egress;
-    stats->deliveries_transit = core->deliveries_transit;
-    stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container;
-    stats->deliveries_egress_route_container = core->deliveries_egress_route_container;
-
+    if (stats) {
+        stats->addrs = DEQ_SIZE(core->addrs);
+        stats->links = DEQ_SIZE(core->open_links);
+        stats->routers = DEQ_SIZE(core->routers);
+        stats->connections = DEQ_SIZE(core->open_connections);
+        stats->link_routes = DEQ_SIZE(core->link_routes);
+        stats->auto_links = DEQ_SIZE(core->auto_links);
+        stats->presettled_deliveries = core->presettled_deliveries;
+        stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
+        stats->accepted_deliveries = core->accepted_deliveries;
+        stats->rejected_deliveries = core->rejected_deliveries;
+        stats->released_deliveries = core->released_deliveries;
+        stats->modified_deliveries = core->modified_deliveries;
+        stats->deliveries_ingress = core->deliveries_ingress;
+        stats->deliveries_egress = core->deliveries_egress;
+        stats->deliveries_transit = core->deliveries_transit;
+        stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container;
+        stats->deliveries_egress_route_container = core->deliveries_egress_route_container;
+    }
     qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response);
     work->stats_handler = action->args.stats_request.handler;
     work->context = action->args.stats_request.context;
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index f7e901c..db8a65e 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -181,6 +181,39 @@ class RouterTestHttp(TestCase):
         for t in threads:
             if t.ex: raise t.ex
 
+    def test_http_healthz(self):
+
+        if not sys.version_info >= (2, 7):
+            return
+
+        config = Qdrouterd.Config([
+            ('router', {'id': 'QDR.HEALTHZ'}),
+            ('listener', {'port': self.get_port(), 'http': 'yes'}),
+            ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
+        ])
+        r = self.qdrouterd('metrics-test-router', config)
+
+        def test(port):
+            result = urlopen("http://localhost:%d/healthz" % port, cafile=self.ssl_file('ca-certificate.pem'))
+            self.assertEqual(200, result.getcode())
+
+        # Sequential calls on multiple ports
+        for port in r.ports: test(port)
+
+        # Concurrent calls on multiple ports
+        class TestThread(threading.Thread):
+            def __init__(self, port):
+                threading.Thread.__init__(self)
+                self.port, self.ex = port, None
+                self.start()
+            def run(self):
+                try: test(self.port)
+                except Exception as e: self.ex = e
+        threads = [TestThread(p) for p in r.ports + r.ports]
+        for t in threads: t.join()
+        for t in threads:
+            if t.ex: raise t.ex
+
     def test_https_get(self):
         def listener(**kwargs):
             args = dict(kwargs)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org