You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2019/03/06 23:09:53 UTC

[qpid-dispatch] branch DISPATCH-1278 updated (e8b44b9 -> ecd5cb5)

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a change to branch DISPATCH-1278
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


 discard e8b44b9  DISPATCH-1278: initial support for prometheus metrics export
     new ecd5cb5  DISPATCH-1278: initial support for prometheus metrics export

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e8b44b9)
            \
             N -- N -- N   refs/heads/DISPATCH-1278 (ecd5cb5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/qpid/dispatch/router_core.h   |  2 +-
 src/http-libwebsockets.c              | 27 ++++++++++++++++++---------
 src/router_core/router_core.c         | 13 ++++++++++---
 src/router_core/router_core_private.h |  2 ++
 4 files changed, 31 insertions(+), 13 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/01: DISPATCH-1278: initial support for prometheus metrics export

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch DISPATCH-1278
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit ecd5cb559aee818fecf436409ab3b7e478d58260
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Mar 6 10:30:50 2019 +0000

    DISPATCH-1278: initial support for prometheus metrics export
---
 include/qpid/dispatch/router_core.h           |  24 ++++
 include/qpid/dispatch/server.h                |   5 +
 python/qpid_dispatch/management/qdrouter.json |   6 +
 src/connection_manager.c                      |   4 +
 src/dispatch.c                                |   4 +
 src/dispatch_private.h                        |   2 +
 src/http-libwebsockets.c                      | 170 +++++++++++++++++++++++++-
 src/router_core/router_core.c                 |  42 +++++++
 src/router_core/router_core_private.h         |  12 +-
 tests/system_tests_http.py                    |  36 ++++++
 10 files changed, 303 insertions(+), 2 deletions(-)

diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index f7d6c51..2f7da25 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -837,4 +837,28 @@ qdr_connection_info_t *qdr_connection_info(bool             is_encrypted,
                                            int              ssl_ssf,
                                            bool             ssl);
 
+
+typedef struct {
+    size_t connections;
+    size_t links;
+    size_t addrs;
+    size_t routers;
+    size_t link_routes;
+    size_t auto_links;
+    size_t presettled_deliveries;
+    size_t dropped_presettled_deliveries;
+    size_t accepted_deliveries;
+    size_t rejected_deliveries;
+    size_t released_deliveries;
+    size_t modified_deliveries;
+    size_t deliveries_ingress;
+    size_t deliveries_egress;
+    size_t deliveries_transit;
+    size_t deliveries_ingress_route_container;
+    size_t deliveries_egress_route_container;
+}  qdr_global_stats_t;
+ALLOC_DECLARE(qdr_global_stats_t);
+typedef void (*qdr_global_stats_handler_t) (void *context);
+void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t callback, void *context);
+
 #endif
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index e56519f..94f14cc 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -133,6 +133,11 @@ typedef struct qd_server_config_t {
     char *protocol_family;
 
     /**
+     * Export metrics.
+     */
+    bool metrics;
+
+    /**
      * Accept HTTP connections, allow WebSocket "amqp" protocol upgrades.
      */
     bool http;
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 861d620..e69ab17 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -828,6 +828,12 @@
                     "deprecationName": "failoverList",
                     "description": "A comma-separated list of failover urls to be supplied to connected clients.  Form: [(amqp|amqps|ws|wss)://]host_or_ip[:port]"
                 },
+                "metrics": {
+                    "type": "boolean",
+                    "default": true,
+                    "description": "Export metrics in prometheus text format for the router (using path /metrics).",
+                    "create": true
+                },
                 "http": {
                     "type": "boolean",
                     "default": false,
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 98eadd4..8f8cbe7 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -312,6 +312,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     config->role                 = qd_entity_get_string(entity, "role");              CHECK();
     config->inter_router_cost    = qd_entity_opt_long(entity, "cost", 1);             CHECK();
     config->protocol_family      = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK();
+    config->metrics              = qd_entity_opt_bool(entity, "metrics", true);       CHECK();
     config->http                 = qd_entity_opt_bool(entity, "http", false);         CHECK();
     config->http_root_dir        = qd_entity_opt_string(entity, "httpRootDir", false);   CHECK();
 
@@ -323,6 +324,9 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     if (config->http && ! config->http_root_dir) {
         qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "HTTP service is requested but no httpRootDir specified. The router will serve AMQP-over-websockets but no static content.");
     }
+    if (config->metrics && !config->http) {
+        qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Metrics can only be exported on listener with http enabled.");
+    }
 
     config->max_frame_size       = qd_entity_get_long(entity, "maxFrameSize");        CHECK();
     config->max_sessions         = qd_entity_get_long(entity, "maxSessions");         CHECK();
diff --git a/src/dispatch.c b/src/dispatch.c
index 4906b39..eb2d195 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -367,3 +367,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
 
 void qd_dispatch_router_lock(qd_dispatch_t *qd) { sys_mutex_lock(qd->router->lock); }
 void qd_dispatch_router_unlock(qd_dispatch_t *qd) { sys_mutex_unlock(qd->router->lock); }
+
+qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd) {
+    return qd->router->router_core;
+}
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index f5a089c..a01db9b 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -124,4 +124,6 @@ void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void *impl);
 /** Set the agent */
 void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent);
 
+qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd);
+
 #endif
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index b2f0d9c..0690c0c 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,6 +19,7 @@
 
 #include <qpid/dispatch/atomic.h>
 #include <qpid/dispatch/amqp.h>
+#include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/timer.h>
 
@@ -95,6 +96,14 @@ typedef struct connection_t {
     struct lws *wsi;
 } connection_t;
 
+typedef struct stats_t {
+    size_t current;
+    bool headers_sent;
+    qdr_global_stats_t stats;
+    qd_http_server_t *server;
+    struct lws *wsi;
+} stats_t;
+
 /* Navigating from WSI pointer to qd objects */
 static qd_http_server_t *wsi_server(struct lws *wsi);
 static qd_http_listener_t *wsi_listener(struct lws *wsi);
@@ -106,6 +115,8 @@ static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
                          void *user, void *in, size_t len);
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len);
+static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
+                               void *user, void *in, size_t len);
 
 static struct lws_protocols protocols[] = {
     /* HTTP only protocol comes first */
@@ -128,6 +139,11 @@ static struct lws_protocols protocols[] = {
         callback_amqpws,
         sizeof(connection_t),
     },
+    {
+        "http",
+        callback_metrics,
+        sizeof(stats_t),
+    },
     { NULL, NULL, 0, 0 } /* terminator */
 };
 
@@ -161,7 +177,7 @@ static int handle_events(connection_t* c) {
 
 /* The server has a bounded, thread-safe queue for external work */
 typedef struct work_t {
-    enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+    enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP, W_HANDLE_STATS } type;
     void *value;
 } work_t;
 
@@ -177,6 +193,7 @@ typedef struct work_queue_t {
 /* HTTP Server runs in a single thread, communication from other threads via work_queue */
 struct qd_http_server_t {
     qd_server_t *server;
+    qdr_core_t *core;
     sys_thread_t *thread;
     work_queue_t work;
     qd_log_source_t *log;
@@ -230,6 +247,7 @@ struct qd_http_listener_t {
     qd_http_server_t *server;
     struct lws_vhost *vhost;
     struct lws_http_mount mount;
+    struct lws_http_mount metrics;
 };
 
 void qd_http_listener_free(qd_http_listener_t *hl) {
@@ -283,6 +301,14 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
     m->def = "index.html";  /* Default file name */
     m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
     m->extra_mimetypes = mime_types;
+    if (config->metrics) {
+        struct lws_http_mount *metrics = &hl->metrics;
+        m->mount_next = metrics;
+        metrics->mountpoint = "/metrics";
+        metrics->mountpoint_len = strlen(metrics->mountpoint);
+        metrics->origin_protocol = LWSMPRO_CALLBACK;
+        metrics->protocol = "http";
+    }
 
     struct lws_context_creation_info info = {0};
     info.mounts = m;
@@ -361,6 +387,143 @@ static void connection_wake(qd_connection_t *qd_conn)
     }
 }
 
+static void handle_stats_results(void *context)
+{
+    stats_t* stats = (stats_t*) context;
+    qd_http_server_t *hs = stats->server;
+    if (hs) {
+        work_t w = { W_HANDLE_STATS, stats->wsi };
+        work_push(hs, w);
+    }
+}
+
+typedef int (*int_metric) (qdr_global_stats_t *stats);
+typedef struct metric_definition {
+    const char* name;
+    const char* type;
+    int_metric value;
+} metric_definition;
+
+static int stats_get_connections(qdr_global_stats_t *stats) { return stats->connections; }
+static int stats_get_links(qdr_global_stats_t *stats) { return stats->links; }
+static int stats_get_addrs(qdr_global_stats_t *stats) { return stats->addrs; }
+static int stats_get_routers(qdr_global_stats_t *stats) { return stats->routers; }
+static int stats_get_link_routes(qdr_global_stats_t *stats) { return stats->link_routes; }
+static int stats_get_auto_links(qdr_global_stats_t *stats) { return stats->auto_links; }
+static int stats_get_presettled_deliveries(qdr_global_stats_t *stats) { return stats->presettled_deliveries; }
+static int stats_get_dropped_presettled_deliveries(qdr_global_stats_t *stats) { return stats->dropped_presettled_deliveries; }
+static int stats_get_accepted_deliveries(qdr_global_stats_t *stats) { return stats->accepted_deliveries; }
+static int stats_get_released_deliveries(qdr_global_stats_t *stats) { return stats->released_deliveries; }
+static int stats_get_rejected_deliveries(qdr_global_stats_t *stats) { return stats->rejected_deliveries; }
+static int stats_get_modified_deliveries(qdr_global_stats_t *stats) { return stats->modified_deliveries; }
+static int stats_get_deliveries_ingress(qdr_global_stats_t *stats) { return stats->deliveries_ingress; }
+static int stats_get_deliveries_egress(qdr_global_stats_t *stats) { return stats->deliveries_egress; }
+static int stats_get_deliveries_transit(qdr_global_stats_t *stats) { return stats->deliveries_transit; }
+static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_ingress_route_container; }
+static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_egress_route_container; }
+
+static struct metric_definition metrics[] = {
+    {"connections", "gauge", stats_get_connections},
+    {"links", "gauge", stats_get_links},
+    {"addresses", "gauge", stats_get_addrs},
+    {"routers", "gauge", stats_get_routers},
+    {"link_routes", "gauge", stats_get_link_routes},
+    {"auto_links", "gauge", stats_get_auto_links},
+    {"presettled_deliveries", "counter", stats_get_presettled_deliveries},
+    {"dropped_presettled_deliveries", "counter", stats_get_dropped_presettled_deliveries},
+    {"accepted_deliveries", "counter", stats_get_accepted_deliveries},
+    {"released_deliveries", "counter", stats_get_released_deliveries},
+    {"rejected_deliveries", "counter", stats_get_rejected_deliveries},
+    {"modified_deliveries", "counter", stats_get_modified_deliveries},
+    {"deliveries_ingress", "counter", stats_get_deliveries_ingress},
+    {"deliveries_egress", "counter", stats_get_deliveries_egress},
+    {"deliveries_transit", "counter", stats_get_deliveries_transit},
+    {"deliveries_ingress_route_container", "counter", stats_get_deliveries_ingress_route_container},
+    {"deliveries_egress_route_container", "counter", stats_get_deliveries_egress_route_container}
+};
+static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]);
+
+static bool write_stats(uint8_t **position, const uint8_t * const end, const char* name, const char* type, int value)
+{
+    //11 chars + type + 2*name + 20 chars for int
+    size_t length = 11 + strlen(type) + strlen(name)*2 + 20;
+    if (end - *position >= length) {
+        *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s %s\n", name, type);
+        *position += lws_snprintf((char*) *position, end - *position, "%s %i\n", name, value);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+static bool write_metric(uint8_t **position, const uint8_t * const end, metric_definition* definition, qdr_global_stats_t* stats)
+{
+    return write_stats(position, end, definition->name, definition->type, definition->value(stats));
+}
+
+static int add_header_by_name(struct lws *wsi, const char* name, const char* value, uint8_t** position, uint8_t* end)
+{
+    return lws_add_http_header_by_name(wsi, (unsigned char*) name, (unsigned char*) value, strlen(value), position, end);
+}
+
+static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
+                               void *user, void *in, size_t len)
+{
+    qd_http_server_t *hs = wsi_server(wsi);
+    stats_t *stats = (stats_t*) user;
+    uint8_t buffer[LWS_PRE + 2048];
+    uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1];
+
+    switch (reason) {
+
+    case LWS_CALLBACK_HTTP: {
+        stats->wsi = wsi;
+        stats->server = hs;
+        //request stats from core thread
+        qdr_request_global_stats(hs->core, &stats->stats, handle_stats_results, (void*) stats);
+        return 0;
+    }
+
+    case LWS_CALLBACK_HTTP_WRITEABLE: {
+        //encode stats into buffer
+        if (!stats->headers_sent) {
+            if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end)
+                || add_header_by_name(wsi, "content-type:", "text/plain", &position, end)
+                || add_header_by_name(wsi, "connection:", "close", &position, end))
+                return 1;
+            if (lws_finalize_http_header(wsi, &position, end))
+                return 1;
+            stats->headers_sent = true;
+        }
+
+        while (stats->current < metrics_length) {
+            if (write_metric(&position, end, &metrics[stats->current], &stats->stats)) {
+                stats->current++;
+                qd_log(hs->log, QD_LOG_DEBUG, "wrote metric %i of %i", stats->current, metrics_length);
+            } else {
+                qd_log(hs->log, QD_LOG_DEBUG, "insufficient space in buffer");
+                break;
+            }
+        }
+        int n = stats->current < metrics_length ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
+
+        //write buffer
+        size_t available = position - start;
+	if (lws_write(wsi, (unsigned char*) start, available, n) != available)
+            return 1;
+        if (n == LWS_WRITE_HTTP_FINAL) {
+            if (lws_http_transaction_completed(wsi)) return -1;
+        } else {
+            lws_callback_on_writable(wsi);
+        }
+        return 0;
+    }
+
+    default:
+        return 0;
+    }
+}
+
 /* Callbacks for promoted AMQP over WS connections. */
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
@@ -494,6 +657,9 @@ static void* http_thread_run(void* v) {
             case W_CLOSE:
                 listener_close((qd_http_listener_t*)w.value, hs);
                 break;
+            case W_HANDLE_STATS:
+                lws_callback_on_writable((struct lws*) w.value);
+                break;
             case W_WAKE: {
                 connection_t *c = w.value;
                 pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
@@ -546,6 +712,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) {
         hs->context = lws_create_context(&info);
         hs->server = s;
         hs->log = log;              /* For messages from this file */
+        hs->core = 0; // not yet available
         if (!hs->context) {
             qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
             qd_http_server_free(hs);
@@ -559,6 +726,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) {
 
 qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li)
 {
+    hs->core = qd_dispatch_router_core(qd_server_dispatch(hs->server));
     sys_mutex_lock(hs->work.lock);
     if (!hs->thread) {
         hs->thread = sys_thread(http_thread_run, hs);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c2dbdaa..05fe970 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -748,3 +748,45 @@ void qdr_connection_work_free_CT(qdr_connection_work_t *work)
     qdr_terminus_free(work->target);
     free_qdr_connection_work_t(work);
 }
+
+static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t *work)
+{
+    work->stats_handler(work->context);
+}
+
+static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_global_stats_t *stats = action->args.stats_request.stats;
+    stats->addrs = DEQ_SIZE(core->addrs);
+    stats->links = DEQ_SIZE(core->open_links);
+    stats->routers = DEQ_SIZE(core->routers);
+    stats->connections = DEQ_SIZE(core->open_connections);
+    stats->link_routes = DEQ_SIZE(core->link_routes);
+    stats->auto_links = DEQ_SIZE(core->auto_links);
+    stats->presettled_deliveries = core->presettled_deliveries;
+    stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
+    stats->accepted_deliveries = core->accepted_deliveries;
+    stats->rejected_deliveries = core->rejected_deliveries;
+    stats->released_deliveries = core->released_deliveries;
+    stats->modified_deliveries = core->modified_deliveries;
+    stats->deliveries_ingress = core->deliveries_ingress;
+    stats->deliveries_egress = core->deliveries_egress;
+    stats->deliveries_transit = core->deliveries_transit;
+    stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container;
+    stats->deliveries_egress_route_container = core->deliveries_egress_route_container;
+
+    qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response);
+    work->stats_handler = action->args.stats_request.handler;
+    work->context = action->args.stats_request.context;
+    qdr_post_general_work_CT(core, work);
+}
+
+void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t callback, void *context)
+{
+    qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, "global_stats_request");
+    action->args.stats_request.stats = stats;
+    action->args.stats_request.handler = callback;
+    action->args.stats_request.context = context;
+    qdr_action_enqueue(core, action);
+}
+
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0865c78..efba16f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -166,6 +166,15 @@ struct qdr_action_t {
         } agent;
 
         //
+        // Arguments for stats request actions
+        //
+        struct {
+            qdr_global_stats_t             *stats;
+            qdr_global_stats_handler_t     handler;
+            void                           *context;
+        } stats_request;
+
+        //
         // Arguments for general use
         //
         struct {
@@ -203,6 +212,8 @@ struct qdr_general_work_t {
     qd_message_t               *msg;
     uint64_t                    in_conn_id;
     int                         treatment;
+    qdr_global_stats_handler_t  stats_handler;
+    void                       *context;
 };
 
 ALLOC_DECLARE(qdr_general_work_t);
@@ -301,7 +312,6 @@ struct qdr_query_t {
 
 DEQ_DECLARE(qdr_query_t, qdr_query_list_t); 
 
-
 struct qdr_node_t {
     DEQ_LINKS(qdr_node_t);
     qdr_address_t    *owning_addr;
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index 70c87c4..7e7ba8e 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -149,6 +149,42 @@ class RouterTestHttp(TestCase):
         # https not configured
         self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch" % r.ports[0])
 
+    def test_http_metrics(self):
+
+        if not sys.version_info >= (2, 7):
+            return
+
+        config = Qdrouterd.Config([
+            ('router', {'id': 'QDR.METRICS'}),
+            ('listener', {'port': self.get_port(), 'http': 'yes'}),
+            ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
+        ])
+        r = self.qdrouterd('metrics-test-router', config)
+
+        def test(port):
+            result = urlopen("http://localhost:%d/metrics" % port, cafile=self.ssl_file('ca-certificate.pem'))
+            self.assertEqual(200, result.getcode())
+            data = result.read().decode('utf-8')
+            assert('connections' in data)
+            assert('deliveries_ingress' in data)
+
+        # Sequential calls on multiple ports
+        for port in r.ports: test(port)
+
+        # Concurrent calls on multiple ports
+        class TestThread(threading.Thread):
+            def __init__(self, port):
+                threading.Thread.__init__(self)
+                self.port, self.ex = port, None
+                self.start()
+            def run(self):
+                try: test(self.port)
+                except Exception as e: self.ex = e
+        threads = [TestThread(p) for p in r.ports + r.ports]
+        for t in threads: t.join()
+        for t in threads:
+            if t.ex: raise t.ex
+
     def test_https_get(self):
         if not sys.version_info >= (2, 9):
             return


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org