You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/03/27 17:30:07 UTC

qpid-dispatch git commit: DISPATCH-947 - Modified 3 additional tests to not use messenger

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master e9d09419c -> 165907723


DISPATCH-947 - Modified 3 additional tests to not use messenger


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/16590772
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/16590772
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/16590772

Branch: refs/heads/master
Commit: 1659077239a727e4a5c88be13f7973319ece2175
Parents: e9d0941
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Mar 27 13:29:53 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Mar 27 13:29:53 2018 -0400

----------------------------------------------------------------------
 tests/system_tests_two_routers.py | 523 +++++++++++++++++----------------
 1 file changed, 267 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16590772/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 436dd76..0d64c8c 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -17,6 +17,7 @@
 # under the License.
 #
 
+from time import sleep
 import unittest2 as unittest
 import logging
 from proton import Message, PENDING, ACCEPTED, REJECTED, Timeout
@@ -25,6 +26,7 @@ from proton.handlers import MessagingHandler
 from proton.reactor import Container
 from qpid_dispatch.management.client import Node
 
+
 # PROTON-828:
 try:
     from proton import MODIFIED
@@ -46,17 +48,15 @@ class TwoRouterTest(TestCase):
             config = [
                 ('router', {'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}),
 
-                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'linkCapacity': 500}),
 
-                # The following listeners were exclusively added to test the stripAnnotations attribute in qdrouterd.conf file
-                # Different listeners will be used to test all allowed values of stripAnnotations ('no', 'both', 'out', 'in')
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}),
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
                 ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
 
                 ('address', {'prefix': 'closest', 'distribution': 'closest'}),
-                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
                 ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
 
                 # for testing pattern matching
@@ -151,261 +151,21 @@ class TwoRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
-    def test_05_semantics_multicast(self):
-        addr = "amqp:/multicast.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("multicast.1", 1, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(100):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        for i in range(100):
-            try:
-                M2.recv(1)
-                M2.get(rm)
-                self.assertEqual(i, rm.body['number'])
-            except:
-                print "M2 at", i
-
-            try:
-                M3.recv(1)
-                M3.get(rm)
-                self.assertEqual(i, rm.body['number'])
-            except:
-                print "M3 at", i
-
-            try:
-                M4.recv(1)
-                M4.get(rm)
-                self.assertEqual(i, rm.body['number'])
-            except:
-                print "M4 at", i
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
     def test_06_semantics_closest_is_local(self):
-        addr = "amqp:/closest.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M2.timeout = 0.1
-        M3.timeout = 0.1
-        M4.timeout = 0.1
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        self.routers[0].wait_address("closest.1", 0, 1)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("closest.1", 1, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(30):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        for i in range(30):
-            M3.recv(1)
-            M3.get(rm)
-            rx_set.append(rm.body['number'])
-
-        try:
-            M2.recv(1)
-            self.assertEqual(0, "Unexpected messages arrived on M2")
-        except AssertionError:
-            raise
-        except Exception:
-            pass
-
-        try:
-            M4.recv(1)
-            self.assertEqual(0, "Unexpected messages arrived on M4")
-        except AssertionError:
-            raise
-        except Exception:
-            pass
-
-        self.assertEqual(30, len(rx_set))
-        rx_set.sort()
-        for i in range(30):
-            self.assertEqual(i, rx_set[i])
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
+        test = SemanticsClosestIsLocal(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
 
     def test_07_semantics_closest_is_remote(self):
-        addr = "amqp:/closest.2"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("closest.2", 0, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(30):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        for i in range(15):
-            M2.recv(1)
-            M2.get(rm)
-            rx_set.append(rm.body['number'])
-
-            M4.recv(1)
-            M4.get(rm)
-            rx_set.append(rm.body['number'])
-
-        self.assertEqual(30, len(rx_set))
-        rx_set.sort()
-        for i in range(30):
-            self.assertEqual(i, rx_set[i])
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
-    def test_08_semantics_spread(self):
-        addr = "amqp:/spread.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M2.timeout = 0.1
-        M3.timeout = 0.1
-        M4.timeout = 0.1
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-        M2.subscribe(addr)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("spread.1", 1, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(50):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        ca = 0
-        cb = 0
-        cc = 0
-
-        while len(rx_set) < 50:
-            try:
-                M2.recv(1)
-                M2.get(rm)
-                rx_set.append(rm.body['number'])
-                ca += 1
-            except:
-                pass
-
-            try:
-                M3.recv(1)
-                M3.get(rm)
-                rx_set.append(rm.body['number'])
-                cb += 1
-            except:
-                pass
-
-            try:
-                M4.recv(1)
-                M4.get(rm)
-                rx_set.append(rm.body['number'])
-                cc += 1
-            except:
-                pass
-
-        self.assertEqual(50, len(rx_set))
-        rx_set.sort()
-        for i in range(50):
-            self.assertEqual(i, rx_set[i])
-        self.assertTrue(ca > 0)
-        self.assertTrue(cb > 0)
-        self.assertTrue(cc > 0)
+        test = SemanticsClosestIsRemote(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
 
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
+    def test_08_semantics_balanced(self):
+        test = SemanticsBalanced(self.routers[0].addresses[0], self.routers[0].addresses[1],
+                                 self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
 
     def test_9_to_override(self):
         addr = "amqp:/toov/1"
@@ -680,6 +440,7 @@ class Timeout(object):
     def on_timer_task(self, event):
         self.parent.timeout()
 
+
 class LargeMessageStreamTest(MessagingHandler):
     def __init__(self, address1, address2):
         super(LargeMessageStreamTest, self).__init__()
@@ -799,6 +560,8 @@ class AttachOnInterRouterTest(MessagingHandler):
         self.dest = "AOIRtest"
         self.error = None
         self.sender = None
+        self.timer = None
+        self.conn = None
 
     def timeout(self):
         self.error = "Timeout Expired"
@@ -1280,7 +1043,7 @@ class MulticastUnsettled(MessagingHandler):
 
     def on_sendable(self, event):
         if self.n_sent == 0:
-            msg = Message(body="Appearance-Test")
+            msg = Message(body="MulticastUnsettled-Test")
             self.sender.send(msg)
             self.n_sent += 1
 
@@ -1299,5 +1062,253 @@ class MulticastUnsettled(MessagingHandler):
         Container(self).run()
 
 
+class SemanticsClosestIsLocal(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(SemanticsClosestIsLocal, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "closest.1"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.receiver_a = None
+        self.receiver_b = None
+        self.receiver_c = None
+        self.num_messages = 100
+        self.n_received_a = 0
+        self.n_received_b = 0
+        self.n_received_c = 0
+        self.error = None
+        self.n_sent = 0
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        # Receiver on same router as the sender must receive all the messages. The other two
+        # receivers are on the other router
+        self.receiver_a = event.container.create_receiver(self.conn1, self.dest, name="A")
+        self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
+        self.receiver_c = event.container.create_receiver(self.conn2, self.dest, name="C")
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
+                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
+        self.conn1.close()
+        self.conn2.close()
+
+    def check_if_done(self):
+        if self.n_received_a == 100 and self.n_received_b + self.n_received_c == 0:
+            self.timer.cancel()
+            self.conn1.close()
+            self.conn2.close()
+
+    def on_sendable(self, event):
+        if self.n_sent < self.num_messages:
+            msg = Message(body="SemanticsClosestIsLocal-Test")
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver_a:
+            self.n_received_a += 1
+        if event.receiver == self.receiver_b:
+            self.n_received_b += 1
+        if event.receiver == self.receiver_c:
+            self.n_received_c += 1
+
+    def on_accepted(self, event):
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
+class SemanticsClosestIsRemote(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(SemanticsClosestIsRemote, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "closest.1"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.receiver_a = None
+        self.receiver_b = None
+        self.receiver_c = None
+        self.num_messages = 100
+        self.n_received_a = 0
+        self.n_received_b = 0
+        self.error = None
+        self.n_sent = 0
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        # Receiver on same router as the sender must receive all the messages. The other two
+        # receivers are on the other router
+        self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
+        self.receiver_b = event.container.create_receiver(self.conn2, self.dest, name="B")
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d/%d" % \
+                     (self.n_sent, self.n_received_a, self.n_received_b)
+        self.conn1.close()
+        self.conn2.close()
+
+    def check_if_done(self):
+        if self.n_received_a + self.n_received_b == 100 and self.n_received_a > 0 and self.n_received_b > 0:
+            self.timer.cancel()
+            self.conn1.close()
+            self.conn2.close()
+
+    def on_sendable(self, event):
+        if self.n_sent < self.num_messages:
+            msg = Message(body="SemanticsClosestIsRemote-Test")
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver_a:
+            self.n_received_a += 1
+        if event.receiver == self.receiver_b:
+            self.n_received_b += 1
+
+    def on_accepted(self, event):
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
+class CustomTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def addr_text(self, addr):
+        if not addr:
+            return ""
+        if addr[0] == 'M':
+            return addr[2:]
+        else:
+            return addr[1:]
+
+    def on_timer_task(self, event):
+        local_node = Node.connect(self.parent.address1, timeout=TIMEOUT)
+
+        res = local_node.query('org.apache.qpid.dispatch.router.address')
+        name = res.attribute_names.index('name')
+        found = False
+        for results in res.results:
+            if "balanced.1" == self.addr_text(results[name]):
+                found = True
+                break
+
+        if found:
+            self.parent.cancel_custom()
+            self.parent.create_sender(event)
+
+        else:
+            event.reactor.schedule(2, self)
+
+
+class SemanticsBalanced(MessagingHandler):
+    def __init__(self, address1, address2, address3):
+        super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0)
+        self.address1 = address1
+        self.address2 = address2
+        self.address3 = address3
+        self.dest = "balanced.1"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.conn3 = None
+        self.sender = None
+        self.receiver_a = None
+        self.receiver_b = None
+        self.receiver_c = None
+        self.num_messages = 400
+        self.n_received_a = 0
+        self.n_received_b = 0
+        self.n_received_c = 0
+        self.error = None
+        self.n_sent = 0
+        self.rx_set = []
+        self.custom_timer = None
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.custom_timer = event.reactor.schedule(2, CustomTimeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.conn3 = event.container.connect(self.address3)
+
+        # This receiver is on the same router as the sender
+        self.receiver_a = event.container.create_receiver(self.conn2, self.dest, name="A")
+
+        # These two receivers are connected to a different router than the sender
+        self.receiver_b = event.container.create_receiver(self.conn3, self.dest, name="B")
+        self.receiver_c = event.container.create_receiver(self.conn3, self.dest, name="C")
+
+        self.receiver_a.flow(300)
+        self.receiver_b.flow(300)
+        self.receiver_c.flow(300)
+
+    def cancel_custom(self):
+        self.custom_timer.cancel()
+
+    def create_sender(self, event):
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
+                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
+        self.conn1.close()
+        self.conn2.close()
+        self.conn3.close()
+
+    def check_if_done(self):
+        if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \
+                self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0:
+            self.rx_set.sort()
+            all_messages_received = True
+            for i in range(self.num_messages):
+                if not i == self.rx_set[i]:
+                    all_messages_received = False
+
+            if all_messages_received:
+                self.timer.cancel()
+                self.conn1.close()
+                self.conn2.close()
+                self.conn3.close()
+
+    def on_sendable(self, event):
+        if self.n_sent < self.num_messages:
+            msg = Message(body={'number': self.n_sent})
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver ==    self.receiver_a:
+            self.n_received_a += 1
+            self.rx_set.append(event.message.body['number'])
+        elif event.receiver == self.receiver_b:
+            self.n_received_b += 1
+            self.rx_set.append(event.message.body['number'])
+        elif event.receiver == self.receiver_c:
+            self.n_received_c += 1
+            self.rx_set.append(event.message.body['number'])
+
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org