You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/03/18 21:01:00 UTC
[qpid-dispatch] branch master updated: DISPATCH-1290: add simple
http healthcheck and allow websockets to be disabled on http listener This
closes #468
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 1e22255 DISPATCH-1290: add simple http healthcheck and allow websockets to be disabled on http listener This closes #468
1e22255 is described below
commit 1e22255eede6f82828a433839cb25087c2c7b300
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Fri Mar 15 23:24:34 2019 +0000
DISPATCH-1290: add simple http healthcheck and allow websockets to be disabled on http listener
This closes #468
---
include/qpid/dispatch/server.h | 10 ++++
python/qpid_dispatch/management/qdrouter.json | 14 +++++-
src/connection_manager.c | 5 +-
src/http-libwebsockets.c | 70 ++++++++++++++++++++++++++-
src/router_core/router_core.c | 37 +++++++-------
tests/system_tests_http.py | 33 +++++++++++++
6 files changed, 145 insertions(+), 24 deletions(-)
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 94f14cc..5a4c115 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -133,11 +133,21 @@ typedef struct qd_server_config_t {
char *protocol_family;
/**
+ * Expose simple liveness check.
+ */
+ bool healthz;
+
+ /**
* Export metrics.
*/
bool metrics;
/**
+ * Websockets enabled.
+ */
+ bool websockets;
+
+ /**
* Accept HTTP connections, allow WebSocket "amqp" protocol upgrades.
*/
bool http;
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index b1b253d..1d12338 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -838,10 +838,22 @@
"deprecationName": "failoverList",
"description": "A comma-separated list of failover urls to be supplied to connected clients. Form: [(amqp|amqps|ws|wss)://]host_or_ip[:port]"
},
+ "healthz": {
+ "type": "boolean",
+ "default": true,
+ "description": "Provide a simple HTTP based liveness test (using path /healthz). Assumes listener is enabled for http.",
+ "create": true
+ },
"metrics": {
"type": "boolean",
"default": true,
- "description": "Export metrics in prometheus text format for the router (using path /metrics).",
+ "description": "Export metrics in prometheus text format for the router (using path /metrics). Assumes listener is enabled for http.",
+ "create": true
+ },
+ "websockets": {
+ "type": "boolean",
+ "default": true,
+ "description": "For an http enabled listener, determines whether websockets access is enabled (true by default).",
"create": true
},
"http": {
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 8f8cbe7..6549e56 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -312,7 +312,9 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
config->role = qd_entity_get_string(entity, "role"); CHECK();
config->inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK();
config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK();
+ config->healthz = qd_entity_opt_bool(entity, "healthz", true); CHECK();
config->metrics = qd_entity_opt_bool(entity, "metrics", true); CHECK();
+ config->websockets = qd_entity_opt_bool(entity, "websockets", true); CHECK();
config->http = qd_entity_opt_bool(entity, "http", false); CHECK();
config->http_root_dir = qd_entity_opt_string(entity, "httpRootDir", false); CHECK();
@@ -324,9 +326,6 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
if (config->http && ! config->http_root_dir) {
qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "HTTP service is requested but no httpRootDir specified. The router will serve AMQP-over-websockets but no static content.");
}
- if (config->metrics && !config->http) {
- qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Metrics can only be exported on listener with http enabled.");
- }
config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK();
config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK();
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 0690c0c..850c646 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -117,6 +117,8 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len);
static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len);
+static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
static struct lws_protocols protocols[] = {
/* HTTP only protocol comes first */
@@ -144,6 +146,11 @@ static struct lws_protocols protocols[] = {
callback_metrics,
sizeof(stats_t),
},
+ {
+ "healthz",
+ callback_healthz,
+ sizeof(stats_t),
+ },
{ NULL, NULL, 0, 0 } /* terminator */
};
@@ -248,6 +255,7 @@ struct qd_http_listener_t {
struct lws_vhost *vhost;
struct lws_http_mount mount;
struct lws_http_mount metrics;
+ struct lws_http_mount healthz;
};
void qd_http_listener_free(qd_http_listener_t *hl) {
@@ -301,14 +309,24 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
m->def = "index.html"; /* Default file name */
m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
m->extra_mimetypes = mime_types;
+ struct lws_http_mount *tail = m;
if (config->metrics) {
struct lws_http_mount *metrics = &hl->metrics;
- m->mount_next = metrics;
+ tail->mount_next = metrics;
+ tail = metrics;
metrics->mountpoint = "/metrics";
metrics->mountpoint_len = strlen(metrics->mountpoint);
metrics->origin_protocol = LWSMPRO_CALLBACK;
metrics->protocol = "http";
}
+ if (config->healthz) {
+ struct lws_http_mount *healthz = &hl->healthz;
+ tail->mount_next = healthz;
+ healthz->mountpoint = "/healthz";
+ healthz->mountpoint_len = strlen(healthz->mountpoint);
+ healthz->origin_protocol = LWSMPRO_CALLBACK;
+ healthz->protocol = "healthz";
+ }
struct lws_context_creation_info info = {0};
info.mounts = m;
@@ -524,6 +542,54 @@ static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
}
}
+static int callback_healthz(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len)
+{
+ qd_http_server_t *hs = wsi_server(wsi);
+ stats_t *stats = (stats_t*) user;
+ uint8_t buffer[LWS_PRE + 2048];
+ uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1];
+
+ switch (reason) {
+
+ case LWS_CALLBACK_HTTP: {
+ stats->wsi = wsi;
+ stats->server = hs;
+ //make dummy request for stats (pass in null ptr); this still excercises the
+ //path through core thread and back through callback on io thread which is
+ //a resonable initial liveness check
+ qdr_request_global_stats(hs->core, 0, handle_stats_results, (void*) stats);
+ return 0;
+ }
+
+ case LWS_CALLBACK_HTTP_WRITEABLE: {
+ //encode stats into buffer
+ if (!stats->headers_sent) {
+ if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end)
+ || add_header_by_name(wsi, "content-type:", "text/plain", &position, end)
+ || lws_add_http_header_content_length(wsi, 3, &position, end))
+ return 1;
+ if (lws_finalize_http_header(wsi, &position, end))
+ return 1;
+ stats->headers_sent = true;
+ }
+ position += lws_snprintf((char*) position, end - position, "OK\n");
+
+ int n = LWS_WRITE_HTTP_FINAL;
+ //write buffer
+ size_t available = position - start;
+ if (lws_write(wsi, (unsigned char*) start, available, n) != available)
+ return 1;
+ else if (lws_http_transaction_completed(wsi))
+ return -1;
+ else return 0;
+ }
+
+ default:
+ return 0;
+ }
+}
+
/* Callbacks for promoted AMQP over WS connections. */
static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
@@ -538,7 +604,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
memset(c, 0, sizeof(*c));
c->wsi = wsi;
qd_http_listener_t *hl = wsi_listener(wsi);
- if (hl == NULL) {
+ if (hl == NULL || !hl->listener->config.websockets) {
return unexpected_close(c->wsi, "cannot-upgrade");
}
c->qd_conn = qd_server_connection(hs->server, &hl->listener->config);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index a66a703..281b260 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -758,24 +758,25 @@ static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t
static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_global_stats_t *stats = action->args.stats_request.stats;
- stats->addrs = DEQ_SIZE(core->addrs);
- stats->links = DEQ_SIZE(core->open_links);
- stats->routers = DEQ_SIZE(core->routers);
- stats->connections = DEQ_SIZE(core->open_connections);
- stats->link_routes = DEQ_SIZE(core->link_routes);
- stats->auto_links = DEQ_SIZE(core->auto_links);
- stats->presettled_deliveries = core->presettled_deliveries;
- stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
- stats->accepted_deliveries = core->accepted_deliveries;
- stats->rejected_deliveries = core->rejected_deliveries;
- stats->released_deliveries = core->released_deliveries;
- stats->modified_deliveries = core->modified_deliveries;
- stats->deliveries_ingress = core->deliveries_ingress;
- stats->deliveries_egress = core->deliveries_egress;
- stats->deliveries_transit = core->deliveries_transit;
- stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container;
- stats->deliveries_egress_route_container = core->deliveries_egress_route_container;
-
+ if (stats) {
+ stats->addrs = DEQ_SIZE(core->addrs);
+ stats->links = DEQ_SIZE(core->open_links);
+ stats->routers = DEQ_SIZE(core->routers);
+ stats->connections = DEQ_SIZE(core->open_connections);
+ stats->link_routes = DEQ_SIZE(core->link_routes);
+ stats->auto_links = DEQ_SIZE(core->auto_links);
+ stats->presettled_deliveries = core->presettled_deliveries;
+ stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
+ stats->accepted_deliveries = core->accepted_deliveries;
+ stats->rejected_deliveries = core->rejected_deliveries;
+ stats->released_deliveries = core->released_deliveries;
+ stats->modified_deliveries = core->modified_deliveries;
+ stats->deliveries_ingress = core->deliveries_ingress;
+ stats->deliveries_egress = core->deliveries_egress;
+ stats->deliveries_transit = core->deliveries_transit;
+ stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container;
+ stats->deliveries_egress_route_container = core->deliveries_egress_route_container;
+ }
qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response);
work->stats_handler = action->args.stats_request.handler;
work->context = action->args.stats_request.context;
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index f7e901c..db8a65e 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -181,6 +181,39 @@ class RouterTestHttp(TestCase):
for t in threads:
if t.ex: raise t.ex
+ def test_http_healthz(self):
+
+ if not sys.version_info >= (2, 7):
+ return
+
+ config = Qdrouterd.Config([
+ ('router', {'id': 'QDR.HEALTHZ'}),
+ ('listener', {'port': self.get_port(), 'http': 'yes'}),
+ ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
+ ])
+ r = self.qdrouterd('metrics-test-router', config)
+
+ def test(port):
+ result = urlopen("http://localhost:%d/healthz" % port, cafile=self.ssl_file('ca-certificate.pem'))
+ self.assertEqual(200, result.getcode())
+
+ # Sequential calls on multiple ports
+ for port in r.ports: test(port)
+
+ # Concurrent calls on multiple ports
+ class TestThread(threading.Thread):
+ def __init__(self, port):
+ threading.Thread.__init__(self)
+ self.port, self.ex = port, None
+ self.start()
+ def run(self):
+ try: test(self.port)
+ except Exception as e: self.ex = e
+ threads = [TestThread(p) for p in r.ports + r.ports]
+ for t in threads: t.join()
+ for t in threads:
+ if t.ex: raise t.ex
+
def test_https_get(self):
def listener(**kwargs):
args = dict(kwargs)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org