You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/10/31 18:24:05 UTC

[qpid-dispatch] 05/05: DISPATCH-1409 - Added a test for sender-link credit.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 7dc31d20eb917678c8fedbd5cd487c5c4f5f6c50
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Oct 31 14:03:02 2019 -0400

    DISPATCH-1409 - Added a test for sender-link credit.
---
 tests/system_tests_stuck_deliveries.py | 116 +++++++++++++++++++++++++++++++++
 1 file changed, 116 insertions(+)

diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py
index c81c001..7758126 100644
--- a/tests/system_tests_stuck_deliveries.py
+++ b/tests/system_tests_stuck_deliveries.py
@@ -158,6 +158,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_10_sender_link_credit_test(self):
+        test = TxLinkCreditTest(self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -406,5 +411,116 @@ class RxLinkCreditTest(MessagingHandler):
         Container(self).run()
 
 
+class TxLinkCreditTest(MessagingHandler):
+    def __init__(self, host):
+        super(TxLinkCreditTest, self).__init__()
+        self.host = host
+
+        self.sender_conn   = None
+        self.query_conn    = None
+        self.addr          = "rx/link/credit/test"
+        self.credit_issued = 0
+        self.error         = None
+
+        self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '250Credits']
+        self.stage  = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - stage: %s" % self.stages[self.stage]
+        self.sender_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+
+    def fail(self, error):
+        self.error = error
+        self.sender_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(30.0, Timeout(self))
+        self.poll_timer     = None
+        self.sender_conn    = event.container.connect(self.host)
+        self.query_conn     = event.container.connect(self.host)
+        self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True)
+        self.query_sender   = event.container.create_sender(self.query_conn, "$management")
+        self.sender         = None
+        self.receiver       = None
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.reply_addr = event.receiver.remote_source.address
+            self.proxy      = MgmtMsgProxy(self.reply_addr)
+            self.sender     = event.container.create_sender(self.sender_conn, self.addr)
+        elif event.sender == self.sender:
+            self.stage = 1
+            self.process()
+
+    def process(self):
+        if self.stage == 1:
+            #
+            # LinkBlocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 2:
+            #
+            # LinkUnblocked
+            #
+            msg = self.proxy.query_router()
+            self.query_sender.send(msg)
+
+        elif self.stage == 3:
+            #
+            # 250Credits
+            #
+            msg = self.proxy.query_links()
+            self.query_sender.send(msg)            
+
+    def on_message(self, event):
+        if event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            if self.stage == 1:
+                #
+                # LinkBlocked
+                #
+                if response.results[0].linksBlocked == 1:
+                    self.receiver = event.container.create_receiver(self.sender_conn, self.addr);
+                    self.stage = 2
+                    self.process()
+                    return
+
+            elif self.stage == 2:
+                #
+                # LinkUnblocked
+                #
+                if response.results[0].linksBlocked == 0:
+                    self.stage = 3
+                    self.process()
+                    return
+
+            elif self.stage == 3:
+                #
+                # 250Credits
+                #
+                for link in response.results:
+                    if 'M0' + self.addr == link.owningAddr:
+                        if link.creditAvailable == 250:
+                            self.fail(None)
+                            return
+
+            self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
+
+    def poll_timeout(self):
+        self.process()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org