You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/08/06 13:57:53 UTC

[qpid-dispatch] branch main updated: DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332.

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 9275c1e  DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332.
9275c1e is described below

commit 9275c1e72251340953747ae80b7d50e752377312
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Aug 4 12:45:23 2021 -0400

    DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332.
---
 tests/system_tests_edge_router.py | 321 ++++++++++++++------------------------
 1 file changed, 115 insertions(+), 206 deletions(-)

diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 84ce7b4..628f72b 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -619,9 +619,7 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[2].addresses[0],
                                           self.routers[2].addresses[0],
-                                          "multicast.24",
-                                          self.routers[2].addresses[0],
-                                          subscriber_count=3)
+                                          "multicast.24")
         test.run()
         self.assertIsNone(test.error)
 
@@ -635,9 +633,7 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[3].addresses[0],
                                           self.routers[3].addresses[0],
-                                          "multicast.25",
-                                          self.routers[0].addresses[0],
-                                          subscriber_count=2)
+                                          "multicast.25")
         test.run()
         self.assertIsNone(test.error)
 
@@ -651,9 +647,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[0].addresses[0],
                                           self.routers[2].addresses[0],
-                                          "multicast.26",
-                                          self.routers[0].addresses[0],
-                                          subscriber_count=3)
+                                          "multicast.26")
         test.run()
         self.assertIsNone(test.error)
 
@@ -666,9 +660,7 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[3].addresses[0],
                                           self.routers[0].addresses[0],
-                                          "multicast.27",
-                                          self.routers[0].addresses[0],
-                                          subscriber_count=2)
+                                          "multicast.27")
         test.run()
         self.assertIsNone(test.error)
 
@@ -682,9 +674,7 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[3].addresses[0],
                                           self.routers[1].addresses[0],
-                                          "multicast.28",
-                                          self.routers[0].addresses[0],
-                                          subscriber_count=2)
+                                          "multicast.28")
         test.run()
         self.assertIsNone(test.error)
 
@@ -697,9 +687,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[4].addresses[0],
                                           self.routers[0].addresses[0],
-                                          "multicast.29",
-                                          self.routers[0].addresses[0],
-                                          subscriber_count=3)
+                                          "multicast.29")
         test.run()
         self.assertIsNone(test.error)
 
@@ -711,9 +699,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[4].addresses[0],
                                           self.routers[5].addresses[0],
-                                          "multicast.30",
-                                          self.routers[0].addresses[0],
-                                          subscriber_count=3)
+                                          "multicast.30")
         test.run()
         self.assertIsNone(test.error)
 
@@ -730,9 +716,7 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[2].addresses[0],
                                           "multicast.31",
-                                          self.routers[2].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=3)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -747,9 +731,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[3].addresses[0],
                                           "multicast.32",
-                                          self.routers[0].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=2)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -764,9 +746,7 @@ class RouterTest(TestCase):
                                           self.routers[0].addresses[0],
                                           self.routers[2].addresses[0],
                                           "multicast.33",
-                                          self.routers[0].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=3)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -780,9 +760,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[0].addresses[0],
                                           "multicast.34",
-                                          self.routers[0].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=2)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -797,9 +775,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[1].addresses[0],
                                           "multicast.35",
-                                          self.routers[0].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=2)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -813,9 +789,7 @@ class RouterTest(TestCase):
                                           self.routers[4].addresses[0],
                                           self.routers[0].addresses[0],
                                           "multicast.36",
-                                          self.routers[0].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=3)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -828,9 +802,7 @@ class RouterTest(TestCase):
                                           self.routers[4].addresses[0],
                                           self.routers[5].addresses[0],
                                           "multicast.37",
-                                          self.routers[0].addresses[0],
-                                          large_msg=True,
-                                          subscriber_count=3)
+                                          large_msg=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1001,9 +973,7 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[2].addresses[0],
                                           "multicast.52",
-                                          self.routers[2].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1018,9 +988,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[3].addresses[0],
                                           "multicast.53",
-                                          self.routers[0].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=2)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1035,9 +1003,7 @@ class RouterTest(TestCase):
                                           self.routers[0].addresses[0],
                                           self.routers[2].addresses[0],
                                           "multicast.54",
-                                          self.routers[0].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1051,9 +1017,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[0].addresses[0],
                                           "multicast.55",
-                                          self.routers[0].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=2)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1068,9 +1032,7 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[1].addresses[0],
                                           "multicast.56",
-                                          self.routers[0].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=2)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1084,9 +1046,7 @@ class RouterTest(TestCase):
                                           self.routers[4].addresses[0],
                                           self.routers[0].addresses[0],
                                           "multicast.57",
-                                          self.routers[0].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1099,9 +1059,7 @@ class RouterTest(TestCase):
                                           self.routers[4].addresses[0],
                                           self.routers[5].addresses[0],
                                           "multicast.58",
-                                          self.routers[0].addresses[0],
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1118,10 +1076,8 @@ class RouterTest(TestCase):
                                           self.routers[2].addresses[0],
                                           self.routers[2].addresses[0],
                                           "multicast.59",
-                                          self.routers[2].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1136,10 +1092,8 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[3].addresses[0],
                                           "multicast.60",
-                                          self.routers[0].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=2)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1154,10 +1108,8 @@ class RouterTest(TestCase):
                                           self.routers[0].addresses[0],
                                           self.routers[2].addresses[0],
                                           "multicast.61",
-                                          self.routers[0].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1171,10 +1123,8 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[0].addresses[0],
                                           "multicast.62",
-                                          self.routers[0].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=2)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1189,10 +1139,8 @@ class RouterTest(TestCase):
                                           self.routers[3].addresses[0],
                                           self.routers[1].addresses[0],
                                           "multicast.63",
-                                          self.routers[0].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=2)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1206,10 +1154,8 @@ class RouterTest(TestCase):
                                           self.routers[4].addresses[0],
                                           self.routers[0].addresses[0],
                                           "multicast.64",
-                                          self.routers[0].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1222,10 +1168,8 @@ class RouterTest(TestCase):
                                           self.routers[4].addresses[0],
                                           self.routers[5].addresses[0],
                                           "multicast.65",
-                                          self.routers[0].addresses[0],
                                           large_msg=True,
-                                          anon_sender=True,
-                                          subscriber_count=3)
+                                          anon_sender=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -2499,8 +2443,8 @@ class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
 
 class MobileAddressMulticastTest(MessagingHandler):
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,
-                 sender_host, address, check_addr_host=None, large_msg=False,
-                 anon_sender=False, subscriber_count=0):
+                 sender_host, address, large_msg=False,
+                 anon_sender=False):
         super(MobileAddressMulticastTest, self).__init__()
         self.receiver1_host = receiver1_host
         self.receiver2_host = receiver2_host
@@ -2508,7 +2452,6 @@ class MobileAddressMulticastTest(MessagingHandler):
         self.sender_host = sender_host
         self.address = address
         self.anon_sender = anon_sender
-        self.subscriber_count = subscriber_count
 
         # One sender connection and two receiver connections
         self.receiver1_conn = None
@@ -2547,11 +2490,12 @@ class MobileAddressMulticastTest(MessagingHandler):
         # address  has propagated.
         self.max_attempts = 5
         self.num_attempts = 0
-        self.num_attempts = 0
         self.container = None
-        self.check_addr_host = check_addr_host
-        if not self.check_addr_host:
-            self.check_addr_host = self.sender_host
+        self.test_msg_received_r1 = False
+        self.test_msg_received_r2 = False
+        self.test_msg_received_r3 = False
+        self.initial_msg_sent = False
+        self.n_accepted = 0
 
         if self.large_msg:
             self.body = "0123456789101112131415" * 5000
@@ -2576,105 +2520,95 @@ class MobileAddressMulticastTest(MessagingHandler):
         if self.sender_conn:
             self.sender_conn.close()
 
-    def create_sndr(self):
-        self.sender_conn = self.container.connect(self.sender_host)
-        if self.anon_sender:
-            self.sender = self.container.create_sender(self.sender_conn)
-        else:
-            self.sender = self.container.create_sender(self.sender_conn,
-                                                       self.address)
-
-    def check_address(self):
-        local_node = Node.connect(self.check_addr_host, timeout=TIMEOUT)
-        outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
-        found = False
-
-        subscriber_count_index = outs.attribute_names.index("subscriberCount")
-        remote_count_index = outs.attribute_names.index("remoteCount")
-
-        self.num_attempts += 1
-        for result in outs.results:
-            if self.address in result[0]:
-                # We are good if the sum of subscriberCount and remoteCount
-                # equals the total subscriber_count
-                if self.subscriber_count == 0 or (result[subscriber_count_index] + result[remote_count_index] == self.subscriber_count):
-                    # The address is in the address table and the subscriber count is as expected.
-                    # subscriberCount match means that both edge routers have
-                    # told the interior router about the existence of the address
-                    # If this has not happened yet, we will try again.
-                    found = True
-                    self.create_sndr()
-                    local_node.close()
-                    self.addr_timer.cancel()
-                    break
-
-        if not found:
-            if self.num_attempts < self.max_attempts:
-                self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
-            else:
-                self.error = "Unable to create sender because of " \
-                             "absence of address in the address table"
-                self.timeout()
-                local_node.close()
-
     def on_start(self, event):
         self.reactor = event.reactor
         self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
-        # Create two receivers
         self.receiver1_conn = event.container.connect(self.receiver1_host)
         self.receiver2_conn = event.container.connect(self.receiver2_host)
         self.receiver3_conn = event.container.connect(self.receiver3_host)
+
+        # Create receivers and sender all in one shot, no need to check for any address table
+        # before creating sender
         self.receiver1 = event.container.create_receiver(self.receiver1_conn,
                                                          self.address)
         self.receiver2 = event.container.create_receiver(self.receiver2_conn,
                                                          self.address)
         self.receiver3 = event.container.create_receiver(self.receiver3_conn,
                                                          self.address)
-        self.container = event.container
+        self.sender_conn = event.container.connect(self.sender_host)
+        if self.anon_sender:
+            self.sender = event.container.create_sender(self.sender_conn)
+        else:
+            self.sender = event.container.create_sender(self.sender_conn,
+                                                        self.address)
 
-    def on_link_opened(self, event):
-        if event.receiver == self.receiver1 or \
-                event.receiver == self.receiver2 or \
-                event.receiver == self.receiver3:
-            self.r_attaches += 1
-            if self.r_attaches == 3:
-                self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
+    def send_test_message(self):
+        msg = Message(body="Test Message")
+        if self.anon_sender:
+            msg.address = self.address
+        self.sender.send(msg)
+
+    def send(self):
+        if self.large_msg:
+            msg = Message(body=self.body, properties=self.properties)
+        else:
+            msg = Message(body="Message %d" % self.n_sent)
+        if self.anon_sender:
+            msg.address = self.address
+        msg.correlation_id = self.n_sent
+        self.sender.send(msg)
+
+    def on_accepted(self, event):
+        if self.test_msg_received_r1 and self.test_msg_received_r2 and self.test_msg_received_r3:
+            # All receivers have received the test message.
+            # Now fire off 100 messages to see if the message was multicasted to all
+            # receivers.
+            self.n_accepted += 1
+            while self.n_sent < self.count:
+                self.send()
+                self.n_sent += 1
+        else:
+            self.send_test_message()
 
     def on_sendable(self, event):
-        while self.n_sent < self.count:
-            msg = None
-            if self.large_msg:
-                msg = Message(body=self.body, properties=self.properties)
-            else:
-                msg = Message(body="Message %d" % self.n_sent)
-            if self.anon_sender:
-                msg.address = self.address
-            msg.correlation_id = self.n_sent
-            self.sender.send(msg)
-            self.n_sent += 1
+        if not self.initial_msg_sent:
+            # First send a single test message. This message
+            # could be accepted or released based on if
+            # some receiver is already online to receive the message
+            self.send_test_message()
+            self.initial_msg_sent = True
 
     def on_message(self, event):
         if event.receiver == self.receiver1:
-            if self.recvd1_msgs.get(event.message.correlation_id):
-                self.dup_msg = event.message.correlation_id
-                self.receiver_name = "Receiver 1"
-                self.timeout()
-            self.n_rcvd1 += 1
-            self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id
+            if event.message.body == "Test Message":
+                self.test_msg_received_r1 = True
+            else:
+                if self.recvd1_msgs.get(event.message.correlation_id):
+                    self.dup_msg = event.message.correlation_id
+                    self.receiver_name = "Receiver 1"
+                    self.timeout()
+                self.n_rcvd1 += 1
+                self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id
         if event.receiver == self.receiver2:
-            if self.recvd2_msgs.get(event.message.correlation_id):
-                self.dup_msg = event.message.correlation_id
-                self.receiver_name = "Receiver 2"
-                self.timeout()
-            self.n_rcvd2 += 1
-            self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id
+            if event.message.body == "Test Message":
+                self.test_msg_received_r2 = True
+            else:
+                if self.recvd2_msgs.get(event.message.correlation_id):
+                    self.dup_msg = event.message.correlation_id
+                    self.receiver_name = "Receiver 2"
+                    self.timeout()
+                self.n_rcvd2 += 1
+                self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id
         if event.receiver == self.receiver3:
-            if self.recvd3_msgs.get(event.message.correlation_id):
-                self.dup_msg = event.message.correlation_id
-                self.receiver_name = "Receiver 3"
-                self.timeout()
-            self.n_rcvd3 += 1
-            self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id
+            if event.message.body == "Test Message":
+                self.test_msg_received_r3 = True
+            else:
+                if self.recvd3_msgs.get(event.message.correlation_id):
+                    self.dup_msg = event.message.correlation_id
+                    self.receiver_name = "Receiver 3"
+                    self.timeout()
+                self.n_rcvd3 += 1
+                self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id
 
         if self.n_rcvd1 == self.count and self.n_rcvd2 == self.count and \
                 self.n_rcvd3 == self.count:
@@ -2684,6 +2618,9 @@ class MobileAddressMulticastTest(MessagingHandler):
             self.receiver3_conn.close()
             self.sender_conn.close()
 
+    def on_released(self, event):
+        self.send_test_message()
+
     def run(self):
         Container(self).run()
 
@@ -2692,15 +2629,15 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
     # failure scenario - cause some receiving clients to close while a large
     # message is in transit
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,
-                 sender_host, address, check_addr_host=None, large_msg=True):
+                 sender_host, address, large_msg=True, anon_sender=False):
         super(MobileAddrMcastDroppedRxTest, self).__init__(receiver1_host,
                                                            receiver2_host,
                                                            receiver3_host,
                                                            sender_host,
                                                            address,
-                                                           check_addr_host=check_addr_host,
-                                                           large_msg=large_msg)
-        self.n_accepted = 0
+                                                           large_msg=large_msg,
+                                                           anon_sender=anon_sender)
+
         self.n_released = 0
         self.recv1_closed = False
         self.recv2_closed = False
@@ -2725,59 +2662,31 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
                 self.receiver2_conn.close()
 
     def on_accepted(self, event):
-        self.n_accepted += 1
+        super(MobileAddrMcastDroppedRxTest, self).on_accepted(event)
         self._check_done()
 
     def on_released(self, event):
+        super(MobileAddrMcastDroppedRxTest, self).on_released(event)
         self.n_released += 1
         self._check_done()
 
 
-class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddressMulticastTest):
+class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddrMcastDroppedRxTest):
     # failure scenario - cause some receiving clients to close while a large
     # message is in transit
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,
-                 sender_host, address, check_addr_host=None, large_msg=True, anon_sender=True):
+                 sender_host, address, large_msg=True, anon_sender=True):
         super(MobileAddrMcastAnonSenderDroppedRxTest, self).__init__(receiver1_host,
                                                                      receiver2_host,
                                                                      receiver3_host,
                                                                      sender_host,
                                                                      address,
-                                                                     check_addr_host=check_addr_host,
                                                                      large_msg=large_msg,
                                                                      anon_sender=anon_sender)
-        self.n_accepted = 0
         self.n_released = 0
         self.recv1_closed = False
         self.recv2_closed = False
 
-    def _check_done(self):
-        if self.n_accepted + self.n_released == self.count:
-            self.receiver3_conn.close()
-            self.sender_conn.close()
-            self.timer.cancel()
-
-    def on_message(self, event):
-        super(MobileAddrMcastAnonSenderDroppedRxTest, self).on_message(event)
-
-        # start closing receivers
-        if self.n_rcvd1 == 50:
-            if not self.recv1_closed:
-                self.receiver1_conn.close()
-                self.recv1_closed = True
-        if self.n_rcvd2 == 75:
-            if not self.recv2_closed:
-                self.recv2_closed = True
-                self.receiver2_conn.close()
-
-    def on_accepted(self, event):
-        self.n_accepted += 1
-        self._check_done()
-
-    def on_released(self, event):
-        self.n_released += 1
-        self._check_done()
-
 
 class MobileAddressEventTest(MessagingHandler):
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org