You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/10/31 18:24:04 UTC

[qpid-dispatch] 04/05: DISPATCH-1409 - Added test case, fixed accounting bug found by the test case.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 1837ece08add3cfd1af377be4dfeb3203dc32339
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Oct 31 12:17:21 2019 -0400

    DISPATCH-1409 - Added test case, fixed accounting bug found by the test case.
---
 src/router_core/connections.c          |  10 ++-
 tests/system_tests_stuck_deliveries.py | 136 +++++++++++++++++++++++++++++++++
 2 files changed, 142 insertions(+), 4 deletions(-)

diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9229d1..984a7c2 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -233,17 +233,19 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link)
     if (link->credit_reported > 0 && pn_credit == 0) {
         //
         // The link has transitioned from positive credit to zero credit.
-        // Mark it as eligible for logging and record the time.
         //
-        link->reported_as_blocked = false;
         link->zero_credit_time = core->uptime_ticks;
-        core->links_blocked--;
-    } else if (link->credit_reported == 0 && pn_credit > 0)
+    } else if (link->credit_reported == 0 && pn_credit > 0) {
         //
         // The link has transitioned from zero credit to positive credit.
         // Clear the recorded time.
         //
         link->zero_credit_time = 0;
+        if (link->reported_as_blocked) {
+            link->reported_as_blocked = false;
+            core->links_blocked--;
+        }
+    }
 
     link->credit_reported = pn_credit;
 }
diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py
index c4f8b21..c81c001 100644
--- a/tests/system_tests_stuck_deliveries.py
+++ b/tests/system_tests_stuck_deliveries.py
@@ -153,6 +153,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_09_receiver_link_credit_test(self):
+        test = RxLinkCreditTest(self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -270,5 +275,136 @@ class DelayedSettlementTest(MessagingHandler):
         Container(self).run()
 
 
+class RxLinkCreditTest(MessagingHandler):
+    def __init__(self, host):
+        super(RxLinkCreditTest, self).__init__(prefetch = 0)
+        self.host = host
+
+        self.receiver_conn = None
+        self.query_conn    = None
+        self.addr          = "rx/link/credit/test"
+        self.credit_issued = 0
+        self.error         = None
+
+        self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '10Credits', '20Credits']
+        self.stage  = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - stage: %s" % self.stages[self.stage]
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+
+    def fail(self, error):
+        self.error = error
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(30.0, Timeout(self))
+        self.poll_timer     = None
+        self.receiver_conn  = event.container.connect(self.host)
+        self.query_conn     = event.container.connect(self.host)
+        self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True)
+        self.query_sender   = event.container.create_sender(self.query_conn, "$management")
+        self.receiver       = None
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.reply_addr = event.receiver.remote_source.address
+            self.proxy      = MgmtMsgProxy(self.reply_addr)
+            self.receiver   = event.container.create_receiver(self.receiver_conn, self.addr)
+            self.reply_receiver.flow(1)
+        elif event.receiver == self.receiver:
+            self.stage = 1
+            self.process()
+
+    def process(self):
+        if self.stage == 1:
+            #
+            # LinkBlocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 2:
+            #
+            # LinkUnblocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 3:
+            #
+            # 10Credits
+            #
+            msg = self.proxy.query_links()
+            self.query_sender.send(msg)            
+
+        elif self.stage == 4:
+            #
+            # 20Credits
+            #
+            msg = self.proxy.query_links()
+            self.query_sender.send(msg)            
+
+    def on_message(self, event):
+        if event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            self.reply_receiver.flow(1)
+            if self.stage == 1:
+                #
+                # LinkBlocked
+                #
+                if response.results[0].linksBlocked == 1:
+                    self.receiver.flow(10)
+                    self.stage = 2
+                    self.process()
+                    return
+
+            elif self.stage == 2:
+                #
+                # LinkUnblocked
+                #
+                if response.results[0].linksBlocked == 0:
+                    self.stage = 3
+                    self.process()
+                    return
+
+            elif self.stage == 3:
+                #
+                # 10Credits
+                #
+                for link in response.results:
+                    if 'M0' + self.addr == link.owningAddr:
+                        if link.creditAvailable == 10:
+                            self.receiver.flow(10)
+                            self.stage = 4
+                            self.process()
+                            return
+
+            elif self.stage == 4:
+                #
+                # 20Credits
+                #
+                for link in response.results:
+                    if 'M0' + self.addr == link.owningAddr:
+                        if link.creditAvailable == 20:
+                            self.fail(None)
+                            return
+            
+            self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
+
+    def poll_timeout(self):
+        self.process()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org