You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/03/27 19:32:22 UTC

qpid-dispatch git commit: DISPATCH-947: fix system_tests_two_routers.test_17_address_wildcard

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 95c374139 -> 0402e4c80


DISPATCH-947: fix system_tests_two_routers.test_17_address_wildcard


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0402e4c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0402e4c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0402e4c8

Branch: refs/heads/master
Commit: 0402e4c802c7ce5521c6d21da8eaad990cd9443d
Parents: 95c3741
Author: Kenneth Giusti <kg...@apache.org>
Authored: Tue Mar 27 15:06:25 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Mar 27 15:31:16 2018 -0400

----------------------------------------------------------------------
 tests/system_test.py                    | 73 +++++++++++++++++++++++++++-
 tests/system_tests_exchange_bindings.py | 72 ++++++---------------------
 tests/system_tests_two_routers.py       | 48 +++++++++---------
 3 files changed, 108 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0402e4c8/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index a6937ce..4f25fdd 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -30,8 +30,15 @@ Features:
 
 import errno, os, time, socket, random, subprocess, shutil, unittest, __main__, re
 from copy import copy
+try:
+    import queue as Queue   # 3.x
+except ImportError:
+    import Queue as Queue   # 2.7
+from threading import Thread
+
 import proton
-from proton import Message
+from proton import Message, Timeout
+from proton.utils import BlockingConnection
 from qpid_dispatch.management.client import Node
 
 # Optional modules
@@ -648,3 +655,67 @@ def main_module():
             unittest.main(module=main_module())
     """
     return os.path.splitext(os.path.basename(__main__.__file__))[0]
+
+
+class AsyncTestReceiver(object):
+    """
+    A simple receiver that runs in the background and queues any received
+    messages.  Messages can be retrieved from this thread via the queue member
+    """
+    Empty = Queue.Empty
+
+    def __init__(self, address, source, credit=100, timeout=0.1,
+                 conn_args=None, link_args=None):
+        """
+        Runs a BlockingReceiver in a separate thread.
+
+        :param address: address of router (URL)
+        :param source: the node address to consume from
+        :param credit: max credit for receiver
+        :param timeout: receive poll frequency in seconds
+        :param conn_args: map of BlockingConnection arguments
+        :param link_args: map of BlockingReceiver arguments
+        """
+        super(AsyncTestReceiver, self).__init__()
+        self.queue = Queue.Queue()
+        kwargs = {'url': address}
+        if conn_args:
+            kwargs.update(conn_args)
+        self._conn = BlockingConnection(**kwargs)
+        kwargs = {'address': source,
+                  'credit': credit}
+        if link_args:
+            kwargs.update(link_args)
+        self._rcvr = self._conn.create_receiver(**kwargs)
+        self._thread = Thread(target=self._poll)
+        self._run = True
+        self._timeout = timeout
+        self._thread.start()
+
+    def _poll(self):
+        """
+        Thread main loop
+        """
+
+        while self._run:
+            try:
+                msg = self._rcvr.receive(timeout=self._timeout)
+            except Timeout:
+                continue
+            try:
+                self._rcvr.accept()
+            except IndexError:
+                # PROTON-1743
+                pass
+            self.queue.put(msg)
+        self._rcvr.close()
+        self._conn.close()
+
+    def stop(self, timeout=10):
+        """
+        Called to terminate the AsyncTestReceiver
+        """
+        self._run = False
+        self._thread.join(timeout=timeout)
+
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0402e4c8/tests/system_tests_exchange_bindings.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_exchange_bindings.py b/tests/system_tests_exchange_bindings.py
index 921f1ba..36dd3a3 100644
--- a/tests/system_tests_exchange_bindings.py
+++ b/tests/system_tests_exchange_bindings.py
@@ -19,16 +19,11 @@
 
 import ast
 import unittest2 as unittest
-from threading import Thread
 from time import sleep
 from subprocess import PIPE, STDOUT
 
-try:
-    import Queue as Queue   # 2.7
-except ImportError:
-    import queue as Queue   # 3.x
-
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
+from system_test import AsyncTestReceiver
 from proton import Message, Timeout
 from proton.reactor import AtMostOnce, AtLeastOnce
 from proton.utils import BlockingConnection, SendException
@@ -38,45 +33,6 @@ _EXCHANGE_TYPE = "org.apache.qpid.dispatch.router.config.exchange"
 _BINDING_TYPE  = "org.apache.qpid.dispatch.router.config.binding"
 
 
-class _AsyncReceiver(object):
-    def __init__(self, address, source, credit=100, timeout=0.1,
-                 conn_args=None, link_args=None):
-        super(_AsyncReceiver, self).__init__()
-        kwargs = {'url': address}
-        if conn_args:
-            kwargs.update(conn_args)
-        self.conn = BlockingConnection(**kwargs)
-        kwargs = {'address': source,
-                  'credit': credit}
-        if link_args:
-            kwargs.update(link_args)
-        self.rcvr = self.conn.create_receiver(**kwargs)
-        self.thread = Thread(target=self._poll)
-        self.queue = Queue.Queue()
-        self._run = True
-        self._timeout = timeout
-        self.thread.start()
-
-    def _poll(self):
-        while self._run:
-            try:
-                msg = self.rcvr.receive(timeout=self._timeout)
-            except Timeout:
-                continue
-            try:
-                self.rcvr.accept()
-            except IndexError:
-                # PROTON-1743
-                pass
-            self.queue.put(msg)
-        self.rcvr.close()
-        self.conn.close()
-
-    def stop(self):
-        self._run = False
-        self.thread.join(timeout=TIMEOUT)
-
-
 class ExchangeBindingsTest(TestCase):
     """
     Tests the exchange/bindings of the dispatch router.
@@ -529,10 +485,10 @@ class ExchangeBindingsTest(TestCase):
         # create clients for message transfer
         conn = BlockingConnection(router.addresses[0])
         sender = conn.create_sender(address="Address3", options=AtLeastOnce())
-        nhop1 = _AsyncReceiver(address=router.addresses[0], source="nextHop1")
-        nhop2A = _AsyncReceiver(address=router.addresses[0], source="nextHop2")
-        nhop2B = _AsyncReceiver(address=router.addresses[0], source="nextHop2")
-        alt = _AsyncReceiver(address=router.addresses[0], source="altNextHop")
+        nhop1 = AsyncTestReceiver(address=router.addresses[0], source="nextHop1")
+        nhop2A = AsyncTestReceiver(address=router.addresses[0], source="nextHop2")
+        nhop2B = AsyncTestReceiver(address=router.addresses[0], source="nextHop2")
+        alt = AsyncTestReceiver(address=router.addresses[0], source="altNextHop")
 
         sender.send(Message(subject='a.b', body='A'))
         sender.send(Message(subject='x.y', body='B'))
@@ -572,7 +528,7 @@ class ExchangeBindingsTest(TestCase):
         # create clients for message transfer
         conn = BlockingConnection(router.addresses[0])
         sender = conn.create_sender(address="Address4", options=AtLeastOnce())
-        nhop1 = _AsyncReceiver(address=router.addresses[0], source="nextHop1")
+        nhop1 = AsyncTestReceiver(address=router.addresses[0], source="nextHop1")
 
         self.assertRaises(SendException,
                           sender.send,
@@ -652,10 +608,10 @@ class ExchangeBindingsTest(TestCase):
         self.routers[1].wait_address('AddressA')
 
         # connect clients to router B (no exchange)
-        nhop1A = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop1')
-        nhop1B = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop1')
-        nhop2  = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop2')
-        nhop3  = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop3')
+        nhop1A = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop1')
+        nhop1B = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop1')
+        nhop2  = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop2')
+        nhop3  = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop3')
 
         self.routers[0].wait_address('nextHop1', remotes=1)
         self.routers[0].wait_address('nextHop2', remotes=1)
@@ -708,10 +664,10 @@ class ExchangeBindingsTest(TestCase):
                                        wait=True)
 
         # connect clients to router B (no exchange)
-        nhop1A = _AsyncReceiver(router.addresses[0], 'nextHop1',
-                                conn_args={'max_frame_size': MAX_FRAME})
-        nhop1B = _AsyncReceiver(router.addresses[0], 'nextHop1',
-                                conn_args={'max_frame_size': MAX_FRAME})
+        nhop1A = AsyncTestReceiver(router.addresses[0], 'nextHop1',
+                                   conn_args={'max_frame_size': MAX_FRAME})
+        nhop1B = AsyncTestReceiver(router.addresses[0], 'nextHop1',
+                                   conn_args={'max_frame_size': MAX_FRAME})
 
         conn = BlockingConnection(router.addresses[0],
                                   max_frame_size=MAX_FRAME)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0402e4c8/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 637bd7e..7a32b3f 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -22,8 +22,11 @@ import unittest2 as unittest
 import logging
 from proton import Message, PENDING, ACCEPTED, REJECTED, Timeout, Delivery
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
+from system_test import AsyncTestReceiver
+
 from proton.handlers import MessagingHandler
-from proton.reactor import Container
+from proton.reactor import Container, AtLeastOnce
+from proton.utils import BlockingConnection
 from qpid_dispatch.management.client import Node
 
 
@@ -263,39 +266,32 @@ class TwoRouterTest(TestCase):
         receivers = []
         for a in addresses:
             for x in range(2):
-                M = self.messenger(timeout=0.1)
-                M.route("amqp:/*", self.routers[x].addresses[0]+"/$1")
-                M.start()
-                M.subscribe('amqp:/' + a[0])
-                receivers.append(M)
+                ar = AsyncTestReceiver(address=self.routers[x].addresses[0],
+                                       source=a[0])
+                receivers.append(ar)
+
+        # wait for the consumer info to propagate
+        for a in addresses:
             self.routers[0].wait_address(a[0], 1, 1)
             self.routers[1].wait_address(a[0], 1, 1)
 
-        # single sender sends one message to each address
-        M1 = self.messenger()
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M1.start()
+        # send one message to each address
+        conn = BlockingConnection(self.routers[0].addresses[0])
+        sender = conn.create_sender(address=None, options=AtLeastOnce())
         for a in addresses:
-            tm = Message()
-            tm.address = 'amqp:/' + a[0]
-            tm.body = {'address': a[0]}
-            M1.put(tm)
-            M1.send()
+            sender.send(Message(address=a[0], body={'address': a[0]}))
 
-        # gather all received messages
+        # count received messages by address
         msgs_recvd = {}
-        rm = Message()
         for M in receivers:
             try:
                 while True:
-                    M.recv(1)
-                    M.get(rm)
-                    index = rm.body.get('address', "ERROR")
-                    if index not in msgs_recvd:
-                        msgs_recvd[index] = 0
-                    msgs_recvd[index] += 1
-            except Exception as exc:
-                self.assertTrue("None" in str(exc))
+                    i = M.queue.get(timeout=0.2).body.get('address', "ERROR")
+                    if i not in msgs_recvd:
+                        msgs_recvd[i] = 0
+                    msgs_recvd[i] += 1
+            except AsyncTestReceiver.Empty:
+                pass
 
         # verify expected count == actual count
         self.assertTrue("ERROR" not in msgs_recvd)
@@ -303,9 +299,9 @@ class TwoRouterTest(TestCase):
             self.assertTrue(a[0] in msgs_recvd)
             self.assertEqual(a[1], msgs_recvd[a[0]])
 
-        M1.stop()
         for M in receivers:
             M.stop()
+        conn.close()
 
     def test_17_large_streaming_test(self):
         test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org