You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/08/06 13:57:53 UTC
[qpid-dispatch] branch main updated: DISPATCH-2164: Removed timers
and subsciber count checks in favor of sending test messages. This closes
#1332.
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 9275c1e DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332.
9275c1e is described below
commit 9275c1e72251340953747ae80b7d50e752377312
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Aug 4 12:45:23 2021 -0400
DISPATCH-2164: Removed timers and subsciber count checks in favor of sending test messages. This closes #1332.
---
tests/system_tests_edge_router.py | 321 ++++++++++++++------------------------
1 file changed, 115 insertions(+), 206 deletions(-)
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 84ce7b4..628f72b 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -619,9 +619,7 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[2].addresses[0],
self.routers[2].addresses[0],
- "multicast.24",
- self.routers[2].addresses[0],
- subscriber_count=3)
+ "multicast.24")
test.run()
self.assertIsNone(test.error)
@@ -635,9 +633,7 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[3].addresses[0],
self.routers[3].addresses[0],
- "multicast.25",
- self.routers[0].addresses[0],
- subscriber_count=2)
+ "multicast.25")
test.run()
self.assertIsNone(test.error)
@@ -651,9 +647,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[0].addresses[0],
self.routers[2].addresses[0],
- "multicast.26",
- self.routers[0].addresses[0],
- subscriber_count=3)
+ "multicast.26")
test.run()
self.assertIsNone(test.error)
@@ -666,9 +660,7 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[3].addresses[0],
self.routers[0].addresses[0],
- "multicast.27",
- self.routers[0].addresses[0],
- subscriber_count=2)
+ "multicast.27")
test.run()
self.assertIsNone(test.error)
@@ -682,9 +674,7 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[3].addresses[0],
self.routers[1].addresses[0],
- "multicast.28",
- self.routers[0].addresses[0],
- subscriber_count=2)
+ "multicast.28")
test.run()
self.assertIsNone(test.error)
@@ -697,9 +687,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[4].addresses[0],
self.routers[0].addresses[0],
- "multicast.29",
- self.routers[0].addresses[0],
- subscriber_count=3)
+ "multicast.29")
test.run()
self.assertIsNone(test.error)
@@ -711,9 +699,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[4].addresses[0],
self.routers[5].addresses[0],
- "multicast.30",
- self.routers[0].addresses[0],
- subscriber_count=3)
+ "multicast.30")
test.run()
self.assertIsNone(test.error)
@@ -730,9 +716,7 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[2].addresses[0],
"multicast.31",
- self.routers[2].addresses[0],
- large_msg=True,
- subscriber_count=3)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -747,9 +731,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[3].addresses[0],
"multicast.32",
- self.routers[0].addresses[0],
- large_msg=True,
- subscriber_count=2)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -764,9 +746,7 @@ class RouterTest(TestCase):
self.routers[0].addresses[0],
self.routers[2].addresses[0],
"multicast.33",
- self.routers[0].addresses[0],
- large_msg=True,
- subscriber_count=3)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -780,9 +760,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[0].addresses[0],
"multicast.34",
- self.routers[0].addresses[0],
- large_msg=True,
- subscriber_count=2)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -797,9 +775,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[1].addresses[0],
"multicast.35",
- self.routers[0].addresses[0],
- large_msg=True,
- subscriber_count=2)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -813,9 +789,7 @@ class RouterTest(TestCase):
self.routers[4].addresses[0],
self.routers[0].addresses[0],
"multicast.36",
- self.routers[0].addresses[0],
- large_msg=True,
- subscriber_count=3)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -828,9 +802,7 @@ class RouterTest(TestCase):
self.routers[4].addresses[0],
self.routers[5].addresses[0],
"multicast.37",
- self.routers[0].addresses[0],
- large_msg=True,
- subscriber_count=3)
+ large_msg=True)
test.run()
self.assertIsNone(test.error)
@@ -1001,9 +973,7 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[2].addresses[0],
"multicast.52",
- self.routers[2].addresses[0],
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1018,9 +988,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[3].addresses[0],
"multicast.53",
- self.routers[0].addresses[0],
- anon_sender=True,
- subscriber_count=2)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1035,9 +1003,7 @@ class RouterTest(TestCase):
self.routers[0].addresses[0],
self.routers[2].addresses[0],
"multicast.54",
- self.routers[0].addresses[0],
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1051,9 +1017,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[0].addresses[0],
"multicast.55",
- self.routers[0].addresses[0],
- anon_sender=True,
- subscriber_count=2)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1068,9 +1032,7 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[1].addresses[0],
"multicast.56",
- self.routers[0].addresses[0],
- anon_sender=True,
- subscriber_count=2)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1084,9 +1046,7 @@ class RouterTest(TestCase):
self.routers[4].addresses[0],
self.routers[0].addresses[0],
"multicast.57",
- self.routers[0].addresses[0],
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1099,9 +1059,7 @@ class RouterTest(TestCase):
self.routers[4].addresses[0],
self.routers[5].addresses[0],
"multicast.58",
- self.routers[0].addresses[0],
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1118,10 +1076,8 @@ class RouterTest(TestCase):
self.routers[2].addresses[0],
self.routers[2].addresses[0],
"multicast.59",
- self.routers[2].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1136,10 +1092,8 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[3].addresses[0],
"multicast.60",
- self.routers[0].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=2)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1154,10 +1108,8 @@ class RouterTest(TestCase):
self.routers[0].addresses[0],
self.routers[2].addresses[0],
"multicast.61",
- self.routers[0].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1171,10 +1123,8 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[0].addresses[0],
"multicast.62",
- self.routers[0].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=2)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1189,10 +1139,8 @@ class RouterTest(TestCase):
self.routers[3].addresses[0],
self.routers[1].addresses[0],
"multicast.63",
- self.routers[0].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=2)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1206,10 +1154,8 @@ class RouterTest(TestCase):
self.routers[4].addresses[0],
self.routers[0].addresses[0],
"multicast.64",
- self.routers[0].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -1222,10 +1168,8 @@ class RouterTest(TestCase):
self.routers[4].addresses[0],
self.routers[5].addresses[0],
"multicast.65",
- self.routers[0].addresses[0],
large_msg=True,
- anon_sender=True,
- subscriber_count=3)
+ anon_sender=True)
test.run()
self.assertIsNone(test.error)
@@ -2499,8 +2443,8 @@ class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
class MobileAddressMulticastTest(MessagingHandler):
def __init__(self, receiver1_host, receiver2_host, receiver3_host,
- sender_host, address, check_addr_host=None, large_msg=False,
- anon_sender=False, subscriber_count=0):
+ sender_host, address, large_msg=False,
+ anon_sender=False):
super(MobileAddressMulticastTest, self).__init__()
self.receiver1_host = receiver1_host
self.receiver2_host = receiver2_host
@@ -2508,7 +2452,6 @@ class MobileAddressMulticastTest(MessagingHandler):
self.sender_host = sender_host
self.address = address
self.anon_sender = anon_sender
- self.subscriber_count = subscriber_count
# One sender connection and two receiver connections
self.receiver1_conn = None
@@ -2547,11 +2490,12 @@ class MobileAddressMulticastTest(MessagingHandler):
# address has propagated.
self.max_attempts = 5
self.num_attempts = 0
- self.num_attempts = 0
self.container = None
- self.check_addr_host = check_addr_host
- if not self.check_addr_host:
- self.check_addr_host = self.sender_host
+ self.test_msg_received_r1 = False
+ self.test_msg_received_r2 = False
+ self.test_msg_received_r3 = False
+ self.initial_msg_sent = False
+ self.n_accepted = 0
if self.large_msg:
self.body = "0123456789101112131415" * 5000
@@ -2576,105 +2520,95 @@ class MobileAddressMulticastTest(MessagingHandler):
if self.sender_conn:
self.sender_conn.close()
- def create_sndr(self):
- self.sender_conn = self.container.connect(self.sender_host)
- if self.anon_sender:
- self.sender = self.container.create_sender(self.sender_conn)
- else:
- self.sender = self.container.create_sender(self.sender_conn,
- self.address)
-
- def check_address(self):
- local_node = Node.connect(self.check_addr_host, timeout=TIMEOUT)
- outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
- found = False
-
- subscriber_count_index = outs.attribute_names.index("subscriberCount")
- remote_count_index = outs.attribute_names.index("remoteCount")
-
- self.num_attempts += 1
- for result in outs.results:
- if self.address in result[0]:
- # We are good if the sum of subscriberCount and remoteCount
- # equals the total subscriber_count
- if self.subscriber_count == 0 or (result[subscriber_count_index] + result[remote_count_index] == self.subscriber_count):
- # The address is in the address table and the subscriber count is as expected.
- # subscriberCount match means that both edge routers have
- # told the interior router about the existence of the address
- # If this has not happened yet, we will try again.
- found = True
- self.create_sndr()
- local_node.close()
- self.addr_timer.cancel()
- break
-
- if not found:
- if self.num_attempts < self.max_attempts:
- self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
- else:
- self.error = "Unable to create sender because of " \
- "absence of address in the address table"
- self.timeout()
- local_node.close()
-
def on_start(self, event):
self.reactor = event.reactor
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
- # Create two receivers
self.receiver1_conn = event.container.connect(self.receiver1_host)
self.receiver2_conn = event.container.connect(self.receiver2_host)
self.receiver3_conn = event.container.connect(self.receiver3_host)
+
+ # Create receivers and sender all in one shot, no need to check for any address table
+ # before creating sender
self.receiver1 = event.container.create_receiver(self.receiver1_conn,
self.address)
self.receiver2 = event.container.create_receiver(self.receiver2_conn,
self.address)
self.receiver3 = event.container.create_receiver(self.receiver3_conn,
self.address)
- self.container = event.container
+ self.sender_conn = event.container.connect(self.sender_host)
+ if self.anon_sender:
+ self.sender = event.container.create_sender(self.sender_conn)
+ else:
+ self.sender = event.container.create_sender(self.sender_conn,
+ self.address)
- def on_link_opened(self, event):
- if event.receiver == self.receiver1 or \
- event.receiver == self.receiver2 or \
- event.receiver == self.receiver3:
- self.r_attaches += 1
- if self.r_attaches == 3:
- self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
+ def send_test_message(self):
+ msg = Message(body="Test Message")
+ if self.anon_sender:
+ msg.address = self.address
+ self.sender.send(msg)
+
+ def send(self):
+ if self.large_msg:
+ msg = Message(body=self.body, properties=self.properties)
+ else:
+ msg = Message(body="Message %d" % self.n_sent)
+ if self.anon_sender:
+ msg.address = self.address
+ msg.correlation_id = self.n_sent
+ self.sender.send(msg)
+
+ def on_accepted(self, event):
+ if self.test_msg_received_r1 and self.test_msg_received_r2 and self.test_msg_received_r3:
+ # All receivers have received the test message.
+ # Now fire off 100 messages to see if the message was multicasted to all
+ # receivers.
+ self.n_accepted += 1
+ while self.n_sent < self.count:
+ self.send()
+ self.n_sent += 1
+ else:
+ self.send_test_message()
def on_sendable(self, event):
- while self.n_sent < self.count:
- msg = None
- if self.large_msg:
- msg = Message(body=self.body, properties=self.properties)
- else:
- msg = Message(body="Message %d" % self.n_sent)
- if self.anon_sender:
- msg.address = self.address
- msg.correlation_id = self.n_sent
- self.sender.send(msg)
- self.n_sent += 1
+ if not self.initial_msg_sent:
+ # First send a single test message. This message
+ # could be accepted or released based on if
+ # some receiver is already online to receive the message
+ self.send_test_message()
+ self.initial_msg_sent = True
def on_message(self, event):
if event.receiver == self.receiver1:
- if self.recvd1_msgs.get(event.message.correlation_id):
- self.dup_msg = event.message.correlation_id
- self.receiver_name = "Receiver 1"
- self.timeout()
- self.n_rcvd1 += 1
- self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id
+ if event.message.body == "Test Message":
+ self.test_msg_received_r1 = True
+ else:
+ if self.recvd1_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 1"
+ self.timeout()
+ self.n_rcvd1 += 1
+ self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id
if event.receiver == self.receiver2:
- if self.recvd2_msgs.get(event.message.correlation_id):
- self.dup_msg = event.message.correlation_id
- self.receiver_name = "Receiver 2"
- self.timeout()
- self.n_rcvd2 += 1
- self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id
+ if event.message.body == "Test Message":
+ self.test_msg_received_r2 = True
+ else:
+ if self.recvd2_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 2"
+ self.timeout()
+ self.n_rcvd2 += 1
+ self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id
if event.receiver == self.receiver3:
- if self.recvd3_msgs.get(event.message.correlation_id):
- self.dup_msg = event.message.correlation_id
- self.receiver_name = "Receiver 3"
- self.timeout()
- self.n_rcvd3 += 1
- self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id
+ if event.message.body == "Test Message":
+ self.test_msg_received_r3 = True
+ else:
+ if self.recvd3_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 3"
+ self.timeout()
+ self.n_rcvd3 += 1
+ self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id
if self.n_rcvd1 == self.count and self.n_rcvd2 == self.count and \
self.n_rcvd3 == self.count:
@@ -2684,6 +2618,9 @@ class MobileAddressMulticastTest(MessagingHandler):
self.receiver3_conn.close()
self.sender_conn.close()
+ def on_released(self, event):
+ self.send_test_message()
+
def run(self):
Container(self).run()
@@ -2692,15 +2629,15 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
# failure scenario - cause some receiving clients to close while a large
# message is in transit
def __init__(self, receiver1_host, receiver2_host, receiver3_host,
- sender_host, address, check_addr_host=None, large_msg=True):
+ sender_host, address, large_msg=True, anon_sender=False):
super(MobileAddrMcastDroppedRxTest, self).__init__(receiver1_host,
receiver2_host,
receiver3_host,
sender_host,
address,
- check_addr_host=check_addr_host,
- large_msg=large_msg)
- self.n_accepted = 0
+ large_msg=large_msg,
+ anon_sender=anon_sender)
+
self.n_released = 0
self.recv1_closed = False
self.recv2_closed = False
@@ -2725,59 +2662,31 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
self.receiver2_conn.close()
def on_accepted(self, event):
- self.n_accepted += 1
+ super(MobileAddrMcastDroppedRxTest, self).on_accepted(event)
self._check_done()
def on_released(self, event):
+ super(MobileAddrMcastDroppedRxTest, self).on_released(event)
self.n_released += 1
self._check_done()
-class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddressMulticastTest):
+class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddrMcastDroppedRxTest):
# failure scenario - cause some receiving clients to close while a large
# message is in transit
def __init__(self, receiver1_host, receiver2_host, receiver3_host,
- sender_host, address, check_addr_host=None, large_msg=True, anon_sender=True):
+ sender_host, address, large_msg=True, anon_sender=True):
super(MobileAddrMcastAnonSenderDroppedRxTest, self).__init__(receiver1_host,
receiver2_host,
receiver3_host,
sender_host,
address,
- check_addr_host=check_addr_host,
large_msg=large_msg,
anon_sender=anon_sender)
- self.n_accepted = 0
self.n_released = 0
self.recv1_closed = False
self.recv2_closed = False
- def _check_done(self):
- if self.n_accepted + self.n_released == self.count:
- self.receiver3_conn.close()
- self.sender_conn.close()
- self.timer.cancel()
-
- def on_message(self, event):
- super(MobileAddrMcastAnonSenderDroppedRxTest, self).on_message(event)
-
- # start closing receivers
- if self.n_rcvd1 == 50:
- if not self.recv1_closed:
- self.receiver1_conn.close()
- self.recv1_closed = True
- if self.n_rcvd2 == 75:
- if not self.recv2_closed:
- self.recv2_closed = True
- self.receiver2_conn.close()
-
- def on_accepted(self, event):
- self.n_accepted += 1
- self._check_done()
-
- def on_released(self, event):
- self.n_released += 1
- self._check_done()
-
class MobileAddressEventTest(MessagingHandler):
def __init__(self, receiver1_host, receiver2_host, receiver3_host,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org