You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/03/28 14:36:15 UTC

qpid-dispatch git commit: DISPATCH-947 - test_23_semantics_spread is now called test_23_semantics_balanced and it is de-Messengerfied

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master a5da7488b -> 0dad431bd


DISPATCH-947 - test_23_semantics_spread is now called test_23_semantics_balanced and it is de-Messengerfied


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0dad431b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0dad431b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0dad431b

Branch: refs/heads/master
Commit: 0dad431bd07cd9b8cf437bc80a8772ea8ceec57a
Parents: a5da748
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Wed Mar 28 10:36:02 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed Mar 28 10:36:02 2018 -0400

----------------------------------------------------------------------
 tests/system_tests_one_router.py | 198 ++++++++++++++++++++--------------
 1 file changed, 119 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0dad431b/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index f27a8d6..7131e00 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -73,7 +73,7 @@ class OneRouterTest(TestCase):
             ('listener', {'port': cls.tester.get_port(), 'maxFrameSize': '2048', 'stripAnnotations': 'in'}),
 
             ('address', {'prefix': 'closest', 'distribution': 'closest'}),
-            ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+            ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
             ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
             ('address', {'prefix': 'unavailable', 'distribution': 'unavailable'})
         ])
@@ -670,84 +670,10 @@ class OneRouterTest(TestCase):
         M3.stop()
         M4.stop()
 
-
-    def test_23_semantics_spread(self):
-        addr = self.address+"/spread.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M2.timeout = 0.1
-        M3.timeout = 0.1
-        M4.timeout = 0.1
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(30):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        ca = 0
-        cb = 0
-        cc = 0
-
-        while len(rx_set) < 30:
-            try:
-                M2.recv(1)
-                M2.get(rm)
-                rx_set.append(rm.body['number'])
-                ca += 1
-            except:
-                pass
-
-            try:
-                M3.recv(1)
-                M3.get(rm)
-                rx_set.append(rm.body['number'])
-                cb += 1
-            except:
-                pass
-
-            try:
-                M4.recv(1)
-                M4.get(rm)
-                rx_set.append(rm.body['number'])
-                cc += 1
-            except:
-                pass
-
-        self.assertEqual(30, len(rx_set))
-        self.assertTrue(ca > 0)
-        self.assertTrue(cb > 0)
-        self.assertTrue(cc > 0)
-
-        rx_set.sort()
-        for i in range(30):
-            self.assertEqual(i, rx_set[i])
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
-
-
-
+    def test_23_semantics_balanced(self):
+        test = SemanticsBalanced(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
 
     def test_24_to_override(self):
         addr = self.address+"/toov/1"
@@ -977,6 +903,120 @@ class OneRouterTest(TestCase):
         client.connection.close()
 
 
+class CustomTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def addr_text(self, addr):
+        if not addr:
+            return ""
+        if addr[0] == 'M':
+            return addr[2:]
+        else:
+            return addr[1:]
+
+    def on_timer_task(self, event):
+        local_node = Node.connect(self.parent.address, timeout=TIMEOUT)
+
+        res = local_node.query('org.apache.qpid.dispatch.router.address')
+        name = res.attribute_names.index('name')
+        found = False
+        for results in res.results:
+            if "balanced.1" == self.addr_text(results[name]):
+                found = True
+                break
+
+        if found:
+            self.parent.cancel_custom()
+            self.parent.create_sender(event)
+
+        else:
+            event.reactor.schedule(2, self)
+
+
+class SemanticsBalanced(MessagingHandler):
+    def __init__(self, address):
+        super(SemanticsBalanced, self).__init__(auto_accept=False, prefetch=0)
+        self.address = address
+        self.dest = "balanced.1"
+        self.timer = None
+        self.conn = None
+        self.sender = None
+        self.receiver_a = None
+        self.receiver_b = None
+        self.receiver_c = None
+        self.num_messages = 250
+        self.n_received_a = 0
+        self.n_received_b = 0
+        self.n_received_c = 0
+        self.error = None
+        self.n_sent = 0
+        self.rx_set = []
+        self.custom_timer = None
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.custom_timer = event.reactor.schedule(2, CustomTimeout(self))
+        self.conn = event.container.connect(self.address)
+
+        # This receiver is on the same router as the sender
+        self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
+
+        # These two receivers are connected to a different router than the sender
+        self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
+        self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")
+
+        self.receiver_a.flow(100)
+        self.receiver_b.flow(100)
+        self.receiver_c.flow(100)
+
+    def cancel_custom(self):
+        self.custom_timer.cancel()
+
+    def create_sender(self, event):
+        self.sender = event.container.create_sender(self.conn, self.dest)
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
+                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
+        self.conn.close()
+
+    def check_if_done(self):
+        if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages and \
+                self.n_received_a > 0 and self.n_received_b > 0 and self.n_received_c > 0:
+            self.rx_set.sort()
+            all_messages_received = True
+            for i in range(self.num_messages):
+                if not i == self.rx_set[i]:
+                    all_messages_received = False
+
+            if all_messages_received:
+                self.timer.cancel()
+                self.conn.close()
+
+    def on_sendable(self, event):
+        if self.n_sent < self.num_messages:
+            msg = Message(body={'number': self.n_sent})
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver_a:
+            self.n_received_a += 1
+            self.rx_set.append(event.message.body['number'])
+        elif event.receiver == self.receiver_b:
+            self.n_received_b += 1
+            self.rx_set.append(event.message.body['number'])
+        elif event.receiver == self.receiver_c:
+            self.n_received_c += 1
+            self.rx_set.append(event.message.body['number'])
+
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 class Timeout(object):
     def __init__(self, parent):
         self.parent = parent


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org