You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/03/27 17:30:07 UTC
qpid-dispatch git commit: DISPATCH-947 - Modified 3 additional tests
to not use messenger
Repository: qpid-dispatch
Updated Branches:
refs/heads/master e9d09419c -> 165907723
DISPATCH-947 - Modified 3 additional tests to not use messenger
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/16590772
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/16590772
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/16590772
Branch: refs/heads/master
Commit: 1659077239a727e4a5c88be13f7973319ece2175
Parents: e9d0941
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Mar 27 13:29:53 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Mar 27 13:29:53 2018 -0400
----------------------------------------------------------------------
tests/system_tests_two_routers.py | 523 +++++++++++++++++----------------
1 file changed, 267 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16590772/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 436dd76..0d64c8c 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -17,6 +17,7 @@
# under the License.
#
+from time import sleep
import unittest2 as unittest
import logging
from proton import Message, PENDING, ACCEPTED, REJECTED, Timeout
@@ -25,6 +26,7 @@ from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch.management.client import Node
+
# PROTON-828:
try:
from proton import MODIFIED
@@ -46,17 +48,15 @@ class TwoRouterTest(TestCase):
config = [
('router', {'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}),
- ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'linkCapacity': 500}),
- # The following listeners were exclusively added to test the stripAnnotations attribute in qdrouterd.conf file
- # Different listeners will be used to test all allowed values of stripAnnotations ('no', 'both', 'out', 'in')
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
- ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
# for testing pattern matching
@@ -151,261 +151,21 @@ class TwoRouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
- def test_05_semantics_multicast(self):
- addr = "amqp:/multicast.1"
- M1 = self.messenger()
- M2 = self.messenger()
- M3 = self.messenger()
- M4 = self.messenger()
-
- M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
- M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
- M1.start()
- M2.start()
- M3.start()
- M4.start()
-
- M2.subscribe(addr)
- M3.subscribe(addr)
- M4.subscribe(addr)
- self.routers[0].wait_address("multicast.1", 1, 1)
-
- tm = Message()
- rm = Message()
-
- tm.address = addr
- for i in range(100):
- tm.body = {'number': i}
- M1.put(tm)
- M1.send()
-
- for i in range(100):
- try:
- M2.recv(1)
- M2.get(rm)
- self.assertEqual(i, rm.body['number'])
- except:
- print "M2 at", i
-
- try:
- M3.recv(1)
- M3.get(rm)
- self.assertEqual(i, rm.body['number'])
- except:
- print "M3 at", i
-
- try:
- M4.recv(1)
- M4.get(rm)
- self.assertEqual(i, rm.body['number'])
- except:
- print "M4 at", i
-
- M1.stop()
- M2.stop()
- M3.stop()
- M4.stop()
-
def test_06_semantics_closest_is_local(self):
- addr = "amqp:/closest.1"
- M1 = self.messenger()
- M2 = self.messenger()
- M3 = self.messenger()
- M4 = self.messenger()
-
- M2.timeout = 0.1
- M3.timeout = 0.1
- M4.timeout = 0.1
-
- M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
- M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
- M1.start()
- M2.start()
- M3.start()
- M4.start()
-
- M2.subscribe(addr)
- self.routers[0].wait_address("closest.1", 0, 1)
- M3.subscribe(addr)
- M4.subscribe(addr)
- self.routers[0].wait_address("closest.1", 1, 1)
-
- tm = Message()
- rm = Message()
-
- tm.address = addr
- for i in range(30):
- tm.body = {'number': i}
- M1.put(tm)
- M1.send()
-
- i = 0
- rx_set = []
- for i in range(30):
- M3.recv(1)
- M3.get(rm)
- rx_set.append(rm.body['number'])
-
- try:
- M2.recv(1)
- self.assertEqual(0, "Unexpected messages arrived on M2")
- except AssertionError:
- raise
- except Exception:
- pass
-
- try:
- M4.recv(1)
- self.assertEqual(0, "Unexpected messages arrived on M4")
- except AssertionError:
- raise
- except Exception:
- pass
-
- self.assertEqual(30, len(rx_set))
- rx_set.sort()
- for i in range(30):
- self.assertEqual(i, rx_set[i])
-
- M1.stop()
- M2.stop()
- M3.stop()
- M4.stop()
+ test = SemanticsClosestIsLocal(self.routers[0].addresses[0], self.routers[1].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
def test_07_semantics_closest_is_remote(self):
- addr = "amqp:/closest.2"
- M1 = self.messenger()
- M2 = self.messenger()
- M3 = self.messenger()
- M4 = self.messenger()
-
- M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
- M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
- M1.start()
- M2.start()
- M3.start()
- M4.start()
-
- M2.subscribe(addr)
- M4.subscribe(addr)
- self.routers[0].wait_address("closest.2", 0, 1)
-
- tm = Message()
- rm = Message()
-
- tm.address = addr
- for i in range(30):
- tm.body = {'number': i}
- M1.put(tm)
- M1.send()
-
- i = 0
- rx_set = []
- for i in range(15):
- M2.recv(1)
- M2.get(rm)
- rx_set.append(rm.body['number'])
-
- M4.recv(1)
- M4.get(rm)
- rx_set.append(rm.body['number'])
-
- self.assertEqual(30, len(rx_set))
- rx_set.sort()
- for i in range(30):
- self.assertEqual(i, rx_set[i])
-
- M1.stop()
- M2.stop()
- M3.stop()
- M4.stop()
-
- def test_08_semantics_spread(self):
- addr = "amqp:/spread.1"
- M1 = self.messenger()
- M2 = self.messenger()
- M3 = self.messenger()
- M4 = self.messenger()
-
- M2.timeout = 0.1
- M3.timeout = 0.1
- M4.timeout = 0.1
-
- M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
- M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
- M1.start()
- M2.start()
- M3.start()
- M4.start()
- M2.subscribe(addr)
- M3.subscribe(addr)
- M4.subscribe(addr)
- self.routers[0].wait_address("spread.1", 1, 1)
-
- tm = Message()
- rm = Message()
-
- tm.address = addr
- for i in range(50):
- tm.body = {'number': i}
- M1.put(tm)
- M1.send()
-
- i = 0
- rx_set = []
- ca = 0
- cb = 0
- cc = 0
-
- while len(rx_set) < 50:
- try:
- M2.recv(1)
- M2.get(rm)
- rx_set.append(rm.body['number'])
- ca += 1
- except:
- pass
-
- try:
- M3.recv(1)
- M3.get(rm)
- rx_set.append(rm.body['number'])
- cb += 1
- except:
- pass
-
- try:
- M4.recv(1)
- M4.get(rm)
- rx_set.append(rm.body['number'])
- cc += 1
- except:
- pass
-
- self.assertEqual(50, len(rx_set))
- rx_set.sort()
- for i in range(50):
- self.assertEqual(i, rx_set[i])
- self.assertTrue(ca > 0)
- self.assertTrue(cb > 0)
- self.assertTrue(cc > 0)
+ test = SemanticsClosestIsRemote(self.routers[0].addresses[0], self.routers[1].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
- M1.stop()
- M2.stop()
- M3.stop()
- M4.stop()
+ def test_08_semantics_balanced(self):
+ test = SemanticsBalanced(self.routers[0].addresses[0], self.routers[0].addresses[1],
+ self.routers[1].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
def test_9_to_override(self):
addr = "amqp:/toov/1"
@@ -680,6 +440,7 @@ class Timeout(object):
def on_timer_task(self, event):
self.parent.timeout()
+
class LargeMessageStreamTest(MessagingHandler):
def __init__(self, address1, address2):
super(LargeMessageStreamTest, self).__init__()
@@ -799,6 +560,8 @@ class AttachOnInterRouterTest(MessagingHandler):
self.dest = "AOIRtest"
self.error = None
self.sender = None
+ self.timer = None
+ self.conn = None
def timeout(self):
self.error = "Timeout Expired"
@@ -1280,7 +1043,7 @@ class MulticastUnsettled(MessagingHandler):
def on_sendable(self, event):
if self.n_sent == 0:
- msg = Message(body="Appearance-Test")
+ msg = Message(body="MulticastUnsettled-Test")
self.sender.send(msg)
self.n_sent += 1
@@ -1299,5 +1062,253 @@ class MulticastUnsettled(MessagingHandler):
Container(self).run()
+class SemanticsClosestIsLocal(MessagingHandler):
+ def __init__(self, address1, address2):
+ super(SemanticsClosestIsLocal, self).__init__()
+ self.address1 = address1
+ self.address2 = address2
+ self.dest = "closest.1"
+ self.timer = None
+ self.conn1 = None
+ self.conn2 = None
+ self.sender = None
+ self.receiver_a = None
+ self.receiver_b = None
+ self.receiver_c = None
+ self.num_messages = 100
+ self.n_received_a = 0
+ self.n_received_b = 0
+ self.n_received_c = 0
+ self.error = None
+ self.n_sent = 0
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.conn1 = event.container.connect(self.address1)
+ self.conn2 = event.container.connect(self.address2)
+ self.sender = event.container.create_sender(self.conn1, self.dest)
+ # Receiver on same router as the sender must receive all the messages. The other two
+ # receivers are on the other router
+ self.receiver_a = event.container.create_receiver(self.conn1, self.dest, name="A")
+ self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
+ self.receiver_c = event.container.create_receiver(self.conn2, self.dest, name="C")
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
+ (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
+ self.conn1.close()
+ self.conn2.close()
+
+ def check_if_done(self):
+ if self.n_received_a == 100 and self.n_received_b + self.n_received_c == 0:
+ self.timer.cancel()
+ self.conn1.close()
+ self.conn2.close()
+
+ def on_sendable(self, event):
+ if self.n_sent < self.num_messages:
+ msg = Message(body="SemanticsClosestIsLocal-Test")
+ self.sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver_a:
+ self.n_received_a += 1
+ if event.receiver == self.receiver_b:
+ self.n_received_b += 1
+ if event.receiver == self.receiver_c:
+ self.n_received_c += 1
+
+ def on_accepted(self, event):
+ self.check_if_done()
+
+ def run(self):
+ Container(self).run()
+
+
+class SemanticsClosestIsRemote(MessagingHandler):
+ def __init__(self, address1, address2):
+ super(SemanticsClosestIsRemote, self).__init__()
+ self.address1 = address1
+ self.address2 = address2
+ self.dest = "closest.1"
+ self.timer = None
+ self.conn1 = None
+ self.conn2 = None
+ self.sender = None
+ self.receiver_a = None
+ self.receiver_b = None
+ self.receiver_c = None
+ self.num_messages = 100
+ self.n_received_a = 0
+ self.n_received_b = 0
+ self.error = None
+ self.n_sent = 0
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.conn1 = event.container.connect(self.address1)
+ self.conn2 = event.container.connect(self.address2)
+ self.sender = event.container.create_sender(self.conn1, self.dest)
+ # Receiver on same router as the sender must receive all the messages. The other two
+ # receivers are on the other router
+ self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
+ self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d rcvd=%d/%d" % \
+ (self.n_sent, self.n_received_a, self.n_received_b)
+ self.conn1.close()
+ self.conn2.close()
+
+ def check_if_done(self):
+ if self.n_received_a + self.n_received_b == 100 and self.n_received_a > 0 and self.n_received_b > 0:
+ self.timer.cancel()
+ self.conn1.close()
+ self.conn2.close()
+
+ def on_sendable(self, event):
+ if self.n_sent < self.num_messages:
+ msg = Message(body="SemanticsClosestIsRemote-Test")
+ self.sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver_a:
+ self.n_received_a += 1
+ if event.receiver == self.receiver_b:
+ self.n_received_b += 1
+
+ def on_accepted(self, event):
+ self.check_if_done()
+
+ def run(self):
+ Container(self).run()
+
+
+class CustomTimeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def addr_text(self, addr):
+ if not addr:
+ return ""
+ if addr[0] == 'M':
+ return addr[2:]
+ else:
+ return addr[1:]
+
+ def on_timer_task(self, event):
+ local_node = Node.connect(self.parent.address1, timeout=TIMEOUT)
+
+ res = local_node.query('org.apache.qpid.dispatch.router.address')
+ name = res.attribute_names.index('name')
+ found = False
+ for results in res.results:
+ if "balanced.1" == self.addr_text(results[name]):
+ found = True
+ break
+
+ if found:
+ self.parent.cancel_custom()
+ self.parent.create_sender(event)
+
+ else:
+ event.reactor.schedule(2, self)
+
+
+class SemanticsBalanced(MessagingHandler):
+ def __init__(self, address1, address2, address3):
+ super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0)
+ self.address1 = address1
+ self.address2 = address2
+ self.address3 = address3
+ self.dest = "balanced.1"
+ self.timer = None
+ self.conn1 = None
+ self.conn2 = None
+ self.conn3 = None
+ self.sender = None
+ self.receiver_a = None
+ self.receiver_b = None
+ self.receiver_c = None
+ self.num_messages = 400
+ self.n_received_a = 0
+ self.n_received_b = 0
+ self.n_received_c = 0
+ self.error = None
+ self.n_sent = 0
+ self.rx_set = []
+ self.custom_timer = None
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.custom_timer = event.reactor.schedule(2, CustomTimeout(self))
+ self.conn1 = event.container.connect(self.address1)
+ self.conn2 = event.container.connect(self.address2)
+ self.conn3 = event.container.connect(self.address3)
+
+ # This receiver is on the same router as the sender
+ self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
+
+ # These two receivers are connected to a different router than the sender
+ self.receiver_b = event.container.create_receiver(self.conn3, self.dest, name="B")
+ self.receiver_c = event.container.create_receiver(self.conn3, self.dest, name="C")
+
+ self.receiver_a.flow(300)
+ self.receiver_b.flow(300)
+ self.receiver_c.flow(300)
+
+ def cancel_custom(self):
+ self.custom_timer.cancel()
+
+ def create_sender(self, event):
+ self.sender = event.container.create_sender(self.conn1, self.dest)
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
+ (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
+ self.conn1.close()
+ self.conn2.close()
+ self.conn3.close()
+
+ def check_if_done(self):
+ if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \
+ self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0:
+ self.rx_set.sort()
+ all_messages_received = True
+ for i in range(self.num_messages):
+ if not i == self.rx_set[i]:
+ all_messages_received = False
+
+ if all_messages_received:
+ self.timer.cancel()
+ self.conn1.close()
+ self.conn2.close()
+ self.conn3.close()
+
+ def on_sendable(self, event):
+ if self.n_sent < self.num_messages:
+ msg = Message(body={'number': self.n_sent})
+ self.sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver_a:
+ self.n_received_a += 1
+ self.rx_set.append(event.message.body['number'])
+ elif event.receiver == self.receiver_b:
+ self.n_received_b += 1
+ self.rx_set.append(event.message.body['number'])
+ elif event.receiver == self.receiver_c:
+ self.n_received_c += 1
+ self.rx_set.append(event.message.body['number'])
+
+ self.check_if_done()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org