You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/03/27 19:32:22 UTC
qpid-dispatch git commit: DISPATCH-947: fix
system_tests_two_routers.test_17_address_wildcard
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 95c374139 -> 0402e4c80
DISPATCH-947: fix system_tests_two_routers.test_17_address_wildcard
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0402e4c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0402e4c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0402e4c8
Branch: refs/heads/master
Commit: 0402e4c802c7ce5521c6d21da8eaad990cd9443d
Parents: 95c3741
Author: Kenneth Giusti <kg...@apache.org>
Authored: Tue Mar 27 15:06:25 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Mar 27 15:31:16 2018 -0400
----------------------------------------------------------------------
tests/system_test.py | 73 +++++++++++++++++++++++++++-
tests/system_tests_exchange_bindings.py | 72 ++++++---------------------
tests/system_tests_two_routers.py | 48 +++++++++---------
3 files changed, 108 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0402e4c8/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index a6937ce..4f25fdd 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -30,8 +30,15 @@ Features:
import errno, os, time, socket, random, subprocess, shutil, unittest, __main__, re
from copy import copy
+try:
+ import queue as Queue # 3.x
+except ImportError:
+ import Queue as Queue # 2.7
+from threading import Thread
+
import proton
-from proton import Message
+from proton import Message, Timeout
+from proton.utils import BlockingConnection
from qpid_dispatch.management.client import Node
# Optional modules
@@ -648,3 +655,67 @@ def main_module():
unittest.main(module=main_module())
"""
return os.path.splitext(os.path.basename(__main__.__file__))[0]
+
+
+class AsyncTestReceiver(object):
+ """
+ A simple receiver that runs in the background and queues any received
+ messages. Messages can be retrieved from this thread via the queue member
+ """
+ Empty = Queue.Empty
+
+ def __init__(self, address, source, credit=100, timeout=0.1,
+ conn_args=None, link_args=None):
+ """
+ Runs a BlockingReceiver in a separate thread.
+
+ :param address: address of router (URL)
+ :param source: the node address to consume from
+ :param credit: max credit for receiver
+ :param timeout: receive poll frequency in seconds
+ :param conn_args: map of BlockingConnection arguments
+ :param link_args: map of BlockingReceiver arguments
+ """
+ super(AsyncTestReceiver, self).__init__()
+ self.queue = Queue.Queue()
+ kwargs = {'url': address}
+ if conn_args:
+ kwargs.update(conn_args)
+ self._conn = BlockingConnection(**kwargs)
+ kwargs = {'address': source,
+ 'credit': credit}
+ if link_args:
+ kwargs.update(link_args)
+ self._rcvr = self._conn.create_receiver(**kwargs)
+ self._thread = Thread(target=self._poll)
+ self._run = True
+ self._timeout = timeout
+ self._thread.start()
+
+ def _poll(self):
+ """
+ Thread main loop
+ """
+
+ while self._run:
+ try:
+ msg = self._rcvr.receive(timeout=self._timeout)
+ except Timeout:
+ continue
+ try:
+ self._rcvr.accept()
+ except IndexError:
+ # PROTON-1743
+ pass
+ self.queue.put(msg)
+ self._rcvr.close()
+ self._conn.close()
+
+ def stop(self, timeout=10):
+ """
+ Called to terminate the AsyncTestReceiver
+ """
+ self._run = False
+ self._thread.join(timeout=timeout)
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0402e4c8/tests/system_tests_exchange_bindings.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_exchange_bindings.py b/tests/system_tests_exchange_bindings.py
index 921f1ba..36dd3a3 100644
--- a/tests/system_tests_exchange_bindings.py
+++ b/tests/system_tests_exchange_bindings.py
@@ -19,16 +19,11 @@
import ast
import unittest2 as unittest
-from threading import Thread
from time import sleep
from subprocess import PIPE, STDOUT
-try:
- import Queue as Queue # 2.7
-except ImportError:
- import queue as Queue # 3.x
-
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
+from system_test import AsyncTestReceiver
from proton import Message, Timeout
from proton.reactor import AtMostOnce, AtLeastOnce
from proton.utils import BlockingConnection, SendException
@@ -38,45 +33,6 @@ _EXCHANGE_TYPE = "org.apache.qpid.dispatch.router.config.exchange"
_BINDING_TYPE = "org.apache.qpid.dispatch.router.config.binding"
-class _AsyncReceiver(object):
- def __init__(self, address, source, credit=100, timeout=0.1,
- conn_args=None, link_args=None):
- super(_AsyncReceiver, self).__init__()
- kwargs = {'url': address}
- if conn_args:
- kwargs.update(conn_args)
- self.conn = BlockingConnection(**kwargs)
- kwargs = {'address': source,
- 'credit': credit}
- if link_args:
- kwargs.update(link_args)
- self.rcvr = self.conn.create_receiver(**kwargs)
- self.thread = Thread(target=self._poll)
- self.queue = Queue.Queue()
- self._run = True
- self._timeout = timeout
- self.thread.start()
-
- def _poll(self):
- while self._run:
- try:
- msg = self.rcvr.receive(timeout=self._timeout)
- except Timeout:
- continue
- try:
- self.rcvr.accept()
- except IndexError:
- # PROTON-1743
- pass
- self.queue.put(msg)
- self.rcvr.close()
- self.conn.close()
-
- def stop(self):
- self._run = False
- self.thread.join(timeout=TIMEOUT)
-
-
class ExchangeBindingsTest(TestCase):
"""
Tests the exchange/bindings of the dispatch router.
@@ -529,10 +485,10 @@ class ExchangeBindingsTest(TestCase):
# create clients for message transfer
conn = BlockingConnection(router.addresses[0])
sender = conn.create_sender(address="Address3", options=AtLeastOnce())
- nhop1 = _AsyncReceiver(address=router.addresses[0], source="nextHop1")
- nhop2A = _AsyncReceiver(address=router.addresses[0], source="nextHop2")
- nhop2B = _AsyncReceiver(address=router.addresses[0], source="nextHop2")
- alt = _AsyncReceiver(address=router.addresses[0], source="altNextHop")
+ nhop1 = AsyncTestReceiver(address=router.addresses[0], source="nextHop1")
+ nhop2A = AsyncTestReceiver(address=router.addresses[0], source="nextHop2")
+ nhop2B = AsyncTestReceiver(address=router.addresses[0], source="nextHop2")
+ alt = AsyncTestReceiver(address=router.addresses[0], source="altNextHop")
sender.send(Message(subject='a.b', body='A'))
sender.send(Message(subject='x.y', body='B'))
@@ -572,7 +528,7 @@ class ExchangeBindingsTest(TestCase):
# create clients for message transfer
conn = BlockingConnection(router.addresses[0])
sender = conn.create_sender(address="Address4", options=AtLeastOnce())
- nhop1 = _AsyncReceiver(address=router.addresses[0], source="nextHop1")
+ nhop1 = AsyncTestReceiver(address=router.addresses[0], source="nextHop1")
self.assertRaises(SendException,
sender.send,
@@ -652,10 +608,10 @@ class ExchangeBindingsTest(TestCase):
self.routers[1].wait_address('AddressA')
# connect clients to router B (no exchange)
- nhop1A = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop1')
- nhop1B = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop1')
- nhop2 = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop2')
- nhop3 = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop3')
+ nhop1A = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop1')
+ nhop1B = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop1')
+ nhop2 = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop2')
+ nhop3 = AsyncTestReceiver(self.routers[1].addresses[0], 'nextHop3')
self.routers[0].wait_address('nextHop1', remotes=1)
self.routers[0].wait_address('nextHop2', remotes=1)
@@ -708,10 +664,10 @@ class ExchangeBindingsTest(TestCase):
wait=True)
# connect clients to router B (no exchange)
- nhop1A = _AsyncReceiver(router.addresses[0], 'nextHop1',
- conn_args={'max_frame_size': MAX_FRAME})
- nhop1B = _AsyncReceiver(router.addresses[0], 'nextHop1',
- conn_args={'max_frame_size': MAX_FRAME})
+ nhop1A = AsyncTestReceiver(router.addresses[0], 'nextHop1',
+ conn_args={'max_frame_size': MAX_FRAME})
+ nhop1B = AsyncTestReceiver(router.addresses[0], 'nextHop1',
+ conn_args={'max_frame_size': MAX_FRAME})
conn = BlockingConnection(router.addresses[0],
max_frame_size=MAX_FRAME)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0402e4c8/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 637bd7e..7a32b3f 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -22,8 +22,11 @@ import unittest2 as unittest
import logging
from proton import Message, PENDING, ACCEPTED, REJECTED, Timeout, Delivery
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
+from system_test import AsyncTestReceiver
+
from proton.handlers import MessagingHandler
-from proton.reactor import Container
+from proton.reactor import Container, AtLeastOnce
+from proton.utils import BlockingConnection
from qpid_dispatch.management.client import Node
@@ -263,39 +266,32 @@ class TwoRouterTest(TestCase):
receivers = []
for a in addresses:
for x in range(2):
- M = self.messenger(timeout=0.1)
- M.route("amqp:/*", self.routers[x].addresses[0]+"/$1")
- M.start()
- M.subscribe('amqp:/' + a[0])
- receivers.append(M)
+ ar = AsyncTestReceiver(address=self.routers[x].addresses[0],
+ source=a[0])
+ receivers.append(ar)
+
+ # wait for the consumer info to propagate
+ for a in addresses:
self.routers[0].wait_address(a[0], 1, 1)
self.routers[1].wait_address(a[0], 1, 1)
- # single sender sends one message to each address
- M1 = self.messenger()
- M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
- M1.start()
+ # send one message to each address
+ conn = BlockingConnection(self.routers[0].addresses[0])
+ sender = conn.create_sender(address=None, options=AtLeastOnce())
for a in addresses:
- tm = Message()
- tm.address = 'amqp:/' + a[0]
- tm.body = {'address': a[0]}
- M1.put(tm)
- M1.send()
+ sender.send(Message(address=a[0], body={'address': a[0]}))
- # gather all received messages
+ # count received messages by address
msgs_recvd = {}
- rm = Message()
for M in receivers:
try:
while True:
- M.recv(1)
- M.get(rm)
- index = rm.body.get('address', "ERROR")
- if index not in msgs_recvd:
- msgs_recvd[index] = 0
- msgs_recvd[index] += 1
- except Exception as exc:
- self.assertTrue("None" in str(exc))
+ i = M.queue.get(timeout=0.2).body.get('address', "ERROR")
+ if i not in msgs_recvd:
+ msgs_recvd[i] = 0
+ msgs_recvd[i] += 1
+ except AsyncTestReceiver.Empty:
+ pass
# verify expected count == actual count
self.assertTrue("ERROR" not in msgs_recvd)
@@ -303,9 +299,9 @@ class TwoRouterTest(TestCase):
self.assertTrue(a[0] in msgs_recvd)
self.assertEqual(a[1], msgs_recvd[a[0]])
- M1.stop()
for M in receivers:
M.stop()
+ conn.close()
def test_17_large_streaming_test(self):
test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org