You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/10/31 18:24:00 UTC

[qpid-dispatch] branch master updated (6d8eeb6 -> 7dc31d2)

This is an automated email from the ASF dual-hosted git repository.

tross pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from 6d8eeb6  DISPATCH-1428 - Replace xrange with range since it is gone is Python 3.x
     new beb89a3  DISPATCH-1409 - Exposed available-credit and time-of-zero-credit as link attributes.  Added logging for links that have been stuck with no credit for more than ten seconds.
     new abda6a8  DISPATCH-1409 - Reduce the batch size for batched management queries to account for the increased number of attributes in the link entity type.
     new 8fe83b7  DISPATCH-1409 - Added global metric/gauge for links-blocked.  Updated qdstat man page.
     new 1837ece  DISPATCH-1409 - Added test case, fixed accounting bug found by the test case.
     new 7dc31d2  DISPATCH-1409 - Added a test for sender-link credit.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/man/qdstat.8.adoc                             |   9 +
 include/qpid/dispatch/router_core.h                |   3 +
 python/qpid_dispatch/management/client.py          |   4 +-
 python/qpid_dispatch/management/qdrouter.json      |  14 ++
 src/http-libwebsockets.c                           |   4 +-
 src/router_core/agent_link.c                       |  15 ++
 src/router_core/agent_link.h                       |   2 +-
 src/router_core/agent_router.c                     |   6 +
 src/router_core/agent_router.h                     |   2 +-
 src/router_core/connections.c                      |  43 +++-
 src/router_core/forwarder.c                        |   2 +
 .../stuck_delivery_detection/delivery_tracker.c    |  11 +
 src/router_core/router_core.c                      |   1 +
 src/router_core/router_core_private.h              |  19 +-
 src/router_core/transfer.c                         |   1 +
 src/router_node.c                                  |  13 ++
 tests/system_tests_stuck_deliveries.py             | 252 +++++++++++++++++++++
 tools/qdstat.in                                    |  19 +-
 18 files changed, 408 insertions(+), 12 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 03/05: DISPATCH-1409 - Added global metric/gauge for links-blocked. Updated qdstat man page.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 8fe83b73005c5c1b6322a6b0dad7356672b17f5f
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Wed Oct 30 16:58:08 2019 -0400

    DISPATCH-1409 - Added global metric/gauge for links-blocked.  Updated qdstat man page.
---
 docs/man/qdstat.8.adoc                                           | 9 +++++++++
 include/qpid/dispatch/router_core.h                              | 1 +
 python/qpid_dispatch/management/qdrouter.json                    | 5 +++++
 src/http-libwebsockets.c                                         | 4 +++-
 src/router_core/agent_router.c                                   | 6 ++++++
 src/router_core/agent_router.h                                   | 2 +-
 src/router_core/connections.c                                    | 8 ++++++--
 .../modules/stuck_delivery_detection/delivery_tracker.c          | 1 +
 src/router_core/router_core.c                                    | 1 +
 src/router_core/router_core_private.h                            | 1 +
 tools/qdstat.in                                                  | 4 ++++
 11 files changed, 38 insertions(+), 4 deletions(-)

diff --git a/docs/man/qdstat.8.adoc b/docs/man/qdstat.8.adoc
index aee0b38..caaed43 100644
--- a/docs/man/qdstat.8.adoc
+++ b/docs/man/qdstat.8.adoc
@@ -152,6 +152,15 @@ The number of settled deliveries on this link that were unsettled for more than
 rate::
 The average rate (over a period of five seconds) at which deliveries have been settled on this link.
 
+stuck::
+The number of deliveries on this link that are flagged as "stuck".  A delivery is considered stuck if it has been either undelivered or unsettled for more than 10 seconds.
+
+cred::
+The number of flow credits currently available on this link.
+
+blkd::
+The time elapsed since the available credit for this link became zero and has remained zero.
+
 admin::
 The administrative status of the link:
   - 'enabled' - The link is enabled for normal operation.
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index fab7e50..d8c7078 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -857,6 +857,7 @@ typedef struct {
     size_t deliveries_delayed_1sec;
     size_t deliveries_delayed_10sec;
     size_t deliveries_stuck;
+    size_t links_blocked;
     size_t deliveries_redirected_to_fallback;
 }  qdr_global_stats_t;
 ALLOC_DECLARE(qdr_global_stats_t);
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e7a90e5..ab241b7 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -510,6 +510,11 @@
 	                "description":"Number of links attached to the router node.",
 	                "graph": true
 	            },
+                "blockedLinkCount": {
+                    "type": "integer",
+                    "description": "The number of links that are flagged as blocked.  A blocked link is one in which the available credit has remained zero for more than 10 seconds.",
+                    "graph": true
+                },
 	            "nodeCount": {
 	                "type": "integer",
 	                "description":"Number of known peer router nodes.",
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index dda8b9d..c4dd366 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -472,6 +472,7 @@ static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats
 static int stats_get_deliveries_delayed_1sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_1sec; }
 static int stats_get_deliveries_delayed_10sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_10sec; }
 static int stats_get_deliveries_stuck(qdr_global_stats_t *stats) { return stats->deliveries_stuck; }
+static int stats_get_links_blocked(qdr_global_stats_t *stats) { return stats->links_blocked; }
 static int stats_get_deliveries_redirected_to_fallback(qdr_global_stats_t *stats) { return stats->deliveries_redirected_to_fallback; }
 
 static struct metric_definition metrics[] = {
@@ -494,7 +495,8 @@ static struct metric_definition metrics[] = {
     {"deliveries_egress_route_container", "counter", stats_get_deliveries_egress_route_container},
     {"deliveries_delayed_1sec", "counter", stats_get_deliveries_delayed_1sec},
     {"deliveries_delayed_10sec", "counter", stats_get_deliveries_delayed_10sec},
-    {"deliveries_stuck", "counter", stats_get_deliveries_stuck},
+    {"deliveries_stuck", "gauge", stats_get_deliveries_stuck},
+    {"links_blocked", "gauge", stats_get_links_blocked},
     {"deliveries_redirected_to_fallback", "counter", stats_get_deliveries_redirected_to_fallback}
 };
 static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]);
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index 7bdc934..b726686 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -53,6 +53,7 @@
 #define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER  26
 #define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER   27
 #define QDR_ROUTER_DELIVERIES_REDIRECTED               28
+#define QDR_ROUTER_LINKS_BLOCKED                       29
 
 
 const char *qdr_router_columns[] =
@@ -85,6 +86,7 @@ const char *qdr_router_columns[] =
      "deliveriesIngressRouteContainer",
      "deliveriesEgressRouteContainer",
      "deliveriesRedirectedToFallback",
+     "linksBlocked",
      0};
 
 
@@ -227,6 +229,10 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co
         qd_compose_insert_ulong(body, core->deliveries_redirected);
         break;
 
+    case QDR_ROUTER_LINKS_BLOCKED:
+        qd_compose_insert_uint(body, core->links_blocked);
+        break;
+
     default:
         qd_compose_insert_null(body);
         break;
diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h
index 7f0e271..97452af 100644
--- a/src/router_core/agent_router.h
+++ b/src/router_core/agent_router.h
@@ -21,7 +21,7 @@
 
 #include "router_core_private.h"
 
-#define QDR_ROUTER_COLUMN_COUNT  29
+#define QDR_ROUTER_COLUMN_COUNT  30
 
 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1];
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4fb5af9..a9229d1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -237,6 +237,7 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link)
         //
         link->reported_as_blocked = false;
         link->zero_credit_time = core->uptime_ticks;
+        core->links_blocked--;
     } else if (link->credit_reported == 0 && pn_credit > 0)
         //
         // The link has transitioned from zero credit to positive credit.
@@ -995,11 +996,14 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     // Log the link closure
     //
     qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] %s: del=%"PRIu64" presett=%"PRIu64" psdrop=%"PRIu64
-           " acc=%"PRIu64" rej=%"PRIu64" rel=%"PRIu64" mod=%"PRIu64" delay1=%"PRIu64" delay10=%"PRIu64,
+           " acc=%"PRIu64" rej=%"PRIu64" rel=%"PRIu64" mod=%"PRIu64" delay1=%"PRIu64" delay10=%"PRIu64" blocked=%s",
            conn->identity, link->identity, log_text, link->total_deliveries, link->presettled_deliveries,
            link->dropped_presettled_deliveries, link->accepted_deliveries, link->rejected_deliveries,
            link->released_deliveries, link->modified_deliveries, link->deliveries_delayed_1sec,
-           link->deliveries_delayed_10sec);
+           link->deliveries_delayed_10sec, link->reported_as_blocked ? "yes" : "no");
+
+    if (link->reported_as_blocked)
+        core->links_blocked--;
 
     free_qdr_link_t(link);
 }
diff --git a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
index 2494b5d..0c86803 100644
--- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -75,6 +75,7 @@ static void process_link_CT(qdr_core_t *core, qdr_link_t *link)
     if (!link->reported_as_blocked && link->zero_credit_time > 0 &&
         (core->uptime_ticks - link->zero_credit_time > stuck_age)) {
         link->reported_as_blocked = true;
+        core->links_blocked++;
         qd_log(core->log, QD_LOG_INFO,
                "[C%"PRIu64"][L%"PRIu64"] "
                "Link blocked with zero credit for %d seconds",
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index e5994a6..3ecea9c 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -859,6 +859,7 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action,
         stats->deliveries_delayed_1sec = core->deliveries_delayed_1sec;
         stats->deliveries_delayed_10sec = core->deliveries_delayed_10sec;
         stats->deliveries_stuck = core->deliveries_stuck;
+        stats->links_blocked = core->links_blocked;
         stats->deliveries_redirected_to_fallback = core->deliveries_redirected;
     }
     qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 683dde7..17256a5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -871,6 +871,7 @@ struct qdr_core_t {
     uint64_t deliveries_delayed_10sec;
     uint64_t deliveries_stuck;
     uint64_t deliveries_redirected;
+    uint32_t links_blocked;
 
     qdr_edge_conn_addr_t          edge_conn_addr;
     void                         *edge_context;
diff --git a/tools/qdstat.in b/tools/qdstat.in
index 6b8f4b1..267237f 100755
--- a/tools/qdstat.in
+++ b/tools/qdstat.in
@@ -314,6 +314,10 @@ class BusManager(Node):
                 rows.append(('Deliveries Delayed > 10sec', router.deliveriesDelayed10Sec))
                 rows.append(('Deliveries Stuck > 10sec',   router.deliveriesStuck))
                 rows.append(('Deliveries to Fallback',     router.deliveriesRedirectedToFallback))
+                try:
+                    rows.append(('Links Blocked', router.linksBlocked))
+                except:
+                    pass
             except:
                 pass
             rows.append(('Ingress Count', router.deliveriesIngress))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 05/05: DISPATCH-1409 - Added a test for sender-link credit.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 7dc31d20eb917678c8fedbd5cd487c5c4f5f6c50
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Oct 31 14:03:02 2019 -0400

    DISPATCH-1409 - Added a test for sender-link credit.
---
 tests/system_tests_stuck_deliveries.py | 116 +++++++++++++++++++++++++++++++++
 1 file changed, 116 insertions(+)

diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py
index c81c001..7758126 100644
--- a/tests/system_tests_stuck_deliveries.py
+++ b/tests/system_tests_stuck_deliveries.py
@@ -158,6 +158,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_10_sender_link_credit_test(self):
+        test = TxLinkCreditTest(self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -406,5 +411,116 @@ class RxLinkCreditTest(MessagingHandler):
         Container(self).run()
 
 
+class TxLinkCreditTest(MessagingHandler):
+    def __init__(self, host):
+        super(TxLinkCreditTest, self).__init__()
+        self.host = host
+
+        self.sender_conn   = None
+        self.query_conn    = None
+        self.addr          = "rx/link/credit/test"
+        self.credit_issued = 0
+        self.error         = None
+
+        self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '250Credits']
+        self.stage  = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - stage: %s" % self.stages[self.stage]
+        self.sender_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+
+    def fail(self, error):
+        self.error = error
+        self.sender_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(30.0, Timeout(self))
+        self.poll_timer     = None
+        self.sender_conn    = event.container.connect(self.host)
+        self.query_conn     = event.container.connect(self.host)
+        self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True)
+        self.query_sender   = event.container.create_sender(self.query_conn, "$management")
+        self.sender         = None
+        self.receiver       = None
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.reply_addr = event.receiver.remote_source.address
+            self.proxy      = MgmtMsgProxy(self.reply_addr)
+            self.sender     = event.container.create_sender(self.sender_conn, self.addr)
+        elif event.sender == self.sender:
+            self.stage = 1
+            self.process()
+
+    def process(self):
+        if self.stage == 1:
+            #
+            # LinkBlocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 2:
+            #
+            # LinkUnblocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 3:
+            #
+            # 250Credits
+            #
+            msg = self.proxy.query_links()
+            self.query_sender.send(msg)            
+
+    def on_message(self, event):
+        if event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            if self.stage == 1:
+                #
+                # LinkBlocked
+                #
+                if response.results[0].linksBlocked == 1:
+                    self.receiver = event.container.create_receiver(self.sender_conn, self.addr);
+                    self.stage = 2
+                    self.process()
+                    return
+
+            elif self.stage == 2:
+                #
+                # LinkUnblocked
+                #
+                if response.results[0].linksBlocked == 0:
+                    self.stage = 3
+                    self.process()
+                    return
+
+            elif self.stage == 3:
+                #
+                # 250Credits
+                #
+                for link in response.results:
+                    if 'M0' + self.addr == link.owningAddr:
+                        if link.creditAvailable == 250:
+                            self.fail(None)
+                            return
+
+            self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
+
+    def poll_timeout(self):
+        self.process()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/05: DISPATCH-1409 - Reduce the batch size for batched management queries to account for the increased number of attributes in the link entity type.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit abda6a8de0d4838cb022e686c35af8471a194f3b
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Wed Oct 30 16:25:30 2019 -0400

    DISPATCH-1409 - Reduce the batch size for batched management queries to account for the increased number of attributes in the link entity type.
---
 python/qpid_dispatch/management/client.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/python/qpid_dispatch/management/client.py b/python/qpid_dispatch/management/client.py
index b2a16f2..3fc8364 100644
--- a/python/qpid_dispatch/management/client.py
+++ b/python/qpid_dispatch/management/client.py
@@ -236,8 +236,8 @@ class Node(object):
         # too many rows. So, as a safety we are going to ask only for
         # MAX_ALLOWED_COUNT_PER_REQUEST. Since this is used by both qdstat
         # and qdmanage, we have determined that the optimal value for
-        # MAX_ALLOWED_COUNT_PER_REQUEST is 700
-        MAX_ALLOWED_COUNT_PER_REQUEST = 700
+        # MAX_ALLOWED_COUNT_PER_REQUEST is 500
+        MAX_ALLOWED_COUNT_PER_REQUEST = 500
 
         response_results = []
         response_attr_names = []


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 04/05: DISPATCH-1409 - Added test case, fixed accounting bug found by the test case.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 1837ece08add3cfd1af377be4dfeb3203dc32339
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Oct 31 12:17:21 2019 -0400

    DISPATCH-1409 - Added test case, fixed accounting bug found by the test case.
---
 src/router_core/connections.c          |  10 ++-
 tests/system_tests_stuck_deliveries.py | 136 +++++++++++++++++++++++++++++++++
 2 files changed, 142 insertions(+), 4 deletions(-)

diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9229d1..984a7c2 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -233,17 +233,19 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link)
     if (link->credit_reported > 0 && pn_credit == 0) {
         //
         // The link has transitioned from positive credit to zero credit.
-        // Mark it as eligible for logging and record the time.
         //
-        link->reported_as_blocked = false;
         link->zero_credit_time = core->uptime_ticks;
-        core->links_blocked--;
-    } else if (link->credit_reported == 0 && pn_credit > 0)
+    } else if (link->credit_reported == 0 && pn_credit > 0) {
         //
         // The link has transitioned from zero credit to positive credit.
         // Clear the recorded time.
         //
         link->zero_credit_time = 0;
+        if (link->reported_as_blocked) {
+            link->reported_as_blocked = false;
+            core->links_blocked--;
+        }
+    }
 
     link->credit_reported = pn_credit;
 }
diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py
index c4f8b21..c81c001 100644
--- a/tests/system_tests_stuck_deliveries.py
+++ b/tests/system_tests_stuck_deliveries.py
@@ -153,6 +153,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_09_receiver_link_credit_test(self):
+        test = RxLinkCreditTest(self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -270,5 +275,136 @@ class DelayedSettlementTest(MessagingHandler):
         Container(self).run()
 
 
+class RxLinkCreditTest(MessagingHandler):
+    def __init__(self, host):
+        super(RxLinkCreditTest, self).__init__(prefetch = 0)
+        self.host = host
+
+        self.receiver_conn = None
+        self.query_conn    = None
+        self.addr          = "rx/link/credit/test"
+        self.credit_issued = 0
+        self.error         = None
+
+        self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '10Credits', '20Credits']
+        self.stage  = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - stage: %s" % self.stages[self.stage]
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+
+    def fail(self, error):
+        self.error = error
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(30.0, Timeout(self))
+        self.poll_timer     = None
+        self.receiver_conn  = event.container.connect(self.host)
+        self.query_conn     = event.container.connect(self.host)
+        self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True)
+        self.query_sender   = event.container.create_sender(self.query_conn, "$management")
+        self.receiver       = None
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.reply_addr = event.receiver.remote_source.address
+            self.proxy      = MgmtMsgProxy(self.reply_addr)
+            self.receiver   = event.container.create_receiver(self.receiver_conn, self.addr)
+            self.reply_receiver.flow(1)
+        elif event.receiver == self.receiver:
+            self.stage = 1
+            self.process()
+
+    def process(self):
+        if self.stage == 1:
+            #
+            # LinkBlocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 2:
+            #
+            # LinkUnblocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 3:
+            #
+            # 10Credits
+            #
+            msg = self.proxy.query_links()
+            self.query_sender.send(msg)            
+
+        elif self.stage == 4:
+            #
+            # 20Credits
+            #
+            msg = self.proxy.query_links()
+            self.query_sender.send(msg)            
+
+    def on_message(self, event):
+        if event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            self.reply_receiver.flow(1)
+            if self.stage == 1:
+                #
+                # LinkBlocked
+                #
+                if response.results[0].linksBlocked == 1:
+                    self.receiver.flow(10)
+                    self.stage = 2
+                    self.process()
+                    return
+
+            elif self.stage == 2:
+                #
+                # LinkUnblocked
+                #
+                if response.results[0].linksBlocked == 0:
+                    self.stage = 3
+                    self.process()
+                    return
+
+            elif self.stage == 3:
+                #
+                # 10Credits
+                #
+                for link in response.results:
+                    if 'M0' + self.addr == link.owningAddr:
+                        if link.creditAvailable == 10:
+                            self.receiver.flow(10)
+                            self.stage = 4
+                            self.process()
+                            return
+
+            elif self.stage == 4:
+                #
+                # 20Credits
+                #
+                for link in response.results:
+                    if 'M0' + self.addr == link.owningAddr:
+                        if link.creditAvailable == 20:
+                            self.fail(None)
+                            return
+            
+            self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
+
+    def poll_timeout(self):
+        self.process()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/05: DISPATCH-1409 - Exposed available-credit and time-of-zero-credit as link attributes. Added logging for links that have been stuck with no credit for more than ten seconds.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit beb89a3d4a355af277f37248dac6bf3ae9733e96
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Wed Oct 30 11:55:55 2019 -0400

    DISPATCH-1409 - Exposed available-credit and time-of-zero-credit as link attributes.  Added logging for links that have been stuck with no credit for more than ten seconds.
---
 include/qpid/dispatch/router_core.h                |  2 ++
 python/qpid_dispatch/management/qdrouter.json      |  9 ++++++
 src/router_core/agent_link.c                       | 15 ++++++++++
 src/router_core/agent_link.h                       |  2 +-
 src/router_core/connections.c                      | 33 +++++++++++++++++++++-
 src/router_core/forwarder.c                        |  2 ++
 .../stuck_delivery_detection/delivery_tracker.c    | 10 +++++++
 src/router_core/router_core_private.h              | 18 ++++++++++--
 src/router_core/transfer.c                         |  1 +
 src/router_node.c                                  | 13 +++++++++
 tools/qdstat.in                                    | 15 +++++++++-
 11 files changed, 114 insertions(+), 6 deletions(-)

diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 5520888..fab7e50 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -685,6 +685,7 @@ typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
 typedef void (*qdr_link_drain_t)         (void *context, qdr_link_t *link, bool mode);
 typedef int  (*qdr_link_push_t)          (void *context, qdr_link_t *link, int limit);
 typedef uint64_t (*qdr_link_deliver_t)   (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
+typedef int (*qdr_link_get_credit_t)     (void *context, qdr_link_t *link);
 typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
 typedef void (*qdr_connection_close_t)   (void *context, qdr_connection_t *conn, qdr_error_t *error);
 
@@ -700,6 +701,7 @@ void qdr_connection_handlers(qdr_core_t             *core,
                              qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
+                             qdr_link_get_credit_t      get_credit,
                              qdr_delivery_update_t      delivery_update,
                              qdr_connection_close_t     conn_close);
 
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 7e54bc0..e7a90e5 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1534,6 +1534,15 @@
                     "graph": true,
                     "description": "The current number of deliveries that are unsettled and have been held in the router for more than 10 seconds."
                 },
+                "creditAvailable": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of credits available on this link."
+                },
+                "zeroCreditSeconds": {
+                    "type": "integer",
+                    "description": "The number of seconds that the link's available credit has remained zero."
+                },
                 "settleRate": {
                     "type": "integer",
                     "graph": true,
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index cb6e016..24452f4 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -48,6 +48,8 @@
 #define QDR_LINK_INGRESS_HISTOGRAM        24
 #define QDR_LINK_PRIORITY                 25
 #define QDR_LINK_SETTLE_RATE              26
+#define QDR_LINK_CREDIT_AVAILABLE         27
+#define QDR_LINK_ZERO_CREDIT_SECONDS      28
 
 const char *qdr_link_columns[] =
     {"name",
@@ -77,6 +79,8 @@ const char *qdr_link_columns[] =
      "ingressHistogram",
      "priority",
      "settleRate",
+     "creditAvailable",
+     "zeroCreditSeconds",
      0};
 
 static const char *qd_link_type_name(qd_link_type_t lt)
@@ -269,6 +273,17 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod
     }
         break;
 
+    case QDR_LINK_CREDIT_AVAILABLE:
+        qd_compose_insert_uint(body, link->credit_reported);
+        break;
+
+    case QDR_LINK_ZERO_CREDIT_SECONDS:
+        if (link->zero_credit_time == 0)
+            qd_compose_insert_uint(body, 0);
+        else
+            qd_compose_insert_uint(body, core->uptime_ticks - link->zero_credit_time);
+        break;
+
     default:
         qd_compose_insert_null(body);
         break;
diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h
index 20b6c65..7c50312 100644
--- a/src/router_core/agent_link.h
+++ b/src/router_core/agent_link.h
@@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t          *core,
                          qdr_query_t         *query,
                          qd_parsed_field_t   *in_body);
 
-#define QDR_LINK_COLUMN_COUNT  27
+#define QDR_LINK_COLUMN_COUNT  29
 
 const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1];
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index cc50ba2..4fb5af9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -223,6 +223,32 @@ const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *l
 }
 
 
+void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link)
+{
+    //
+    // Get Proton's view of this link's available credit.
+    //
+    int pn_credit = core->get_credit_handler(core->user_context, link);
+
+    if (link->credit_reported > 0 && pn_credit == 0) {
+        //
+        // The link has transitioned from positive credit to zero credit.
+        // Mark it as eligible for logging and record the time.
+        //
+        link->reported_as_blocked = false;
+        link->zero_credit_time = core->uptime_ticks;
+    } else if (link->credit_reported == 0 && pn_credit > 0)
+        //
+        // The link has transitioned from zero credit to positive credit.
+        // Clear the recorded time.
+        //
+        link->zero_credit_time = 0;
+
+    link->credit_reported = pn_credit;
+}
+
+
+
 int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
@@ -365,7 +391,8 @@ int qdr_connection_process(qdr_connection_t *conn)
             if (detach_sent) {
                 // let the core thread know so it can clean up
                 qdr_link_detach_sent(link);
-            }
+            } else
+                qdr_record_link_credit(core, link);
 
             ref = DEQ_NEXT(ref);
         }
@@ -506,6 +533,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;
     link->core_ticks     = conn->core->uptime_ticks;
+    link->zero_credit_time = conn->core->uptime_ticks;
     link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus);
 
     link->strip_annotations_in  = conn->strip_annotations_in;
@@ -591,6 +619,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
+                             qdr_link_get_credit_t      get_credit,
                              qdr_delivery_update_t      delivery_update,
                              qdr_connection_close_t     conn_close)
 {
@@ -604,6 +633,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
     core->drain_handler           = drain;
     core->push_handler            = push;
     core->deliver_handler         = deliver;
+    core->get_credit_handler      = get_credit;
     core->delivery_update_handler = delivery_update;
     core->conn_close_handler      = conn_close;
 }
@@ -1022,6 +1052,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->strip_prefix   = 0;
     link->attach_count   = 1;
     link->core_ticks     = core->uptime_ticks;
+    link->zero_credit_time = core->uptime_ticks;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 0a679aa..b33772a 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -888,6 +888,8 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING;
     out_link->admin_enabled  = true;
     out_link->attach_count   = 1;
+    out_link->core_ticks     = conn->core->uptime_ticks;
+    out_link->zero_credit_time = core->uptime_ticks;
 
     if (strip) {
         out_link->strip_prefix = strip;
diff --git a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
index eec3905..2494b5d 100644
--- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -71,6 +71,16 @@ static void process_link_CT(qdr_core_t *core, qdr_link_t *link)
         check_delivery_CT(core, link, dlv);
         dlv = DEQ_NEXT(dlv);
     }
+
+    if (!link->reported_as_blocked && link->zero_credit_time > 0 &&
+        (core->uptime_ticks - link->zero_credit_time > stuck_age)) {
+        link->reported_as_blocked = true;
+        qd_log(core->log, QD_LOG_INFO,
+               "[C%"PRIu64"][L%"PRIu64"] "
+               "Link blocked with zero credit for %d seconds",
+               link->conn ? link->conn->identity : 0, link->identity,
+               core->uptime_ticks - link->zero_credit_time);
+    }
 }
 
 
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index fa2591b..683dde7 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -431,9 +431,12 @@ struct qdr_link_t {
     qdr_delivery_ref_list_t  updated_deliveries; ///< References to deliveries (in the unsettled list) with updates.
     qdr_link_oper_status_t   oper_status;
     int                      capacity;
-    int                      credit_to_core; ///< Number of the available credits incrementally given to the core
-    int                      credit_pending; ///< Number of credits to be issued once consumers are available
-    int                      credit_stored;  ///< Number of credits given to the link before it was ready to process them.
+    int                      credit_to_core;    ///< Number of the available credits incrementally given to the core
+    int                      credit_pending;    ///< Number of credits to be issued once consumers are available
+    int                      credit_stored;     ///< Number of credits given to the link before it was ready to process them.
+    int                      credit_reported;   ///< Number of credits to expose to management
+    uint32_t                 zero_credit_time;  ///< Number of core ticks when credit last went to zero
+    bool                     reported_as_blocked; ///< The fact that this link has been blocked with zero credit has been logged
     bool                     admin_enabled;
     bool                     strip_annotations_in;
     bool                     strip_annotations_out;
@@ -806,6 +809,7 @@ struct qdr_core_t {
     qdr_link_drain_t          drain_handler;
     qdr_link_push_t           push_handler;
     qdr_link_deliver_t        deliver_handler;
+    qdr_link_get_credit_t     get_credit_handler;
     qdr_delivery_update_t     delivery_update_handler;
     qdr_connection_close_t    conn_close_handler;
 
@@ -997,4 +1001,12 @@ void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer);
  */
 void qdr_reset_sheaf(qdr_core_t *core, uint8_t n);
 
+/**
+ * Run in an IO thread.
+ *
+ * Records Proton's view of the link's available credit and tracks it for management and
+ * logging.
+ */
+void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link);
+
 #endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 1c0a4b8..175eba2 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -262,6 +262,7 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo
     action->args.connection.drain  = drain_mode;
 
     qdr_action_enqueue(core, action);
+    qdr_record_link_credit(core, link);
 }
 
 
diff --git a/src/router_node.c b/src/router_node.c
index 9f9d753..91e3e10 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1669,6 +1669,18 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
 }
 
 
+static int CORE_link_get_credit(void *context, qdr_link_t *link)
+{
+    qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t *plink = !!qlink ? qd_link_pn(qlink) : 0;
+
+    if (!plink)
+        return 0;
+
+    return pn_link_remote_credit(plink);
+}
+
+
 static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
 {
     qd_router_t   *router = (qd_router_t*) context;
@@ -1780,6 +1792,7 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             CORE_link_drain,
                             CORE_link_push,
                             CORE_link_deliver,
+                            CORE_link_get_credit,
                             CORE_delivery_update,
                             CORE_close_connection);
 
diff --git a/tools/qdstat.in b/tools/qdstat.in
index 59f0191..6b8f4b1 100755
--- a/tools/qdstat.in
+++ b/tools/qdstat.in
@@ -34,7 +34,7 @@ from time import ctime, strftime, gmtime
 import qpid_dispatch_site
 from qpid_dispatch.management.client import Url, Node, Entity
 from qpid_dispatch_internal.management.qdrouter import QdSchema
-from qpid_dispatch_internal.tools import Display, Header, Sorter, YN, Commas, TimeLong
+from qpid_dispatch_internal.tools import Display, Header, Sorter, YN, Commas, TimeLong, TimeShort
 from qpid_dispatch_internal.tools.command import (parse_args_qdstat, main,
                                                   opts_ssl_domain, opts_sasl,
                                                   opts_url)
@@ -347,6 +347,7 @@ class BusManager(Node):
                 'capacity', 'undeliveredCount', 'unsettledCount', 'deliveryCount',
                 'presettledCount', 'droppedPresettledCount', 'acceptedCount', 'rejectedCount', 'releasedCount',
                 'modifiedCount', 'deliveriesDelayed1Sec', 'deliveriesDelayed10Sec', 'deliveriesStuck',
+                'creditAvailable', 'zeroCreditSeconds',
                 'adminStatus', 'operStatus', 'linkName', 'priority', 'settleRate')
 
         objects = self.query('org.apache.qpid.dispatch.router.link', cols, limit=self.opts.limit)
@@ -355,6 +356,7 @@ class BusManager(Node):
         has_priority = False
         has_delayed  = False
         has_stuck    = False
+        has_credit   = False
 
         if show_date_id:
             self.display_datetime_router_id()
@@ -370,6 +372,8 @@ class BusManager(Node):
                     has_delayed = True
                 if hasattr(first_row, 'deliveriesStuck'):
                     has_stuck = True
+                if hasattr(first_row, 'creditAvailable'):
+                    has_credit = True
 
         if has_priority:
             heads.append(Header("pri"))
@@ -390,6 +394,9 @@ class BusManager(Node):
             heads.append(Header("rate"))
         if has_stuck:
             heads.append(Header("stuck"))
+        if has_credit:
+            heads.append(Header("cred"))
+            heads.append(Header("blkd"))
         if self.opts.verbose:
             heads.append(Header("admin"))
             heads.append(Header("oper"))
@@ -423,6 +430,12 @@ class BusManager(Node):
                 row.append(link.settleRate)
             if has_stuck:
                 row.append(link.deliveriesStuck)
+            if has_credit:
+                row.append(link.creditAvailable)
+                if link.zeroCreditSeconds > 0:
+                    row.append(TimeShort(link.zeroCreditSeconds * 1000000000))
+                else:
+                    row.append('-')
             if self.opts.verbose:
                 row.append(link.adminStatus)
                 row.append(link.operStatus)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org