You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2017/05/24 13:42:02 UTC
qpid-dispatch git commit: DISPATCH-209 -- three-router test #3
dynamic reply-to. This closes #163
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 8c9f4a581 -> 5fd599d4d
DISPATCH-209 -- three-router test #3 dynamic reply-to. This closes #163
(cherry picked from commit b66a5c7620409252287ae2e3993a17c56758beae)
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5fd599d4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5fd599d4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5fd599d4
Branch: refs/heads/master
Commit: 5fd599d4df3e0dfef377419636e5fffe9537b296
Parents: 8c9f4a5
Author: mick goulish <mg...@redhat.com>
Authored: Wed May 17 14:12:40 2017 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed May 24 09:38:55 2017 -0400
----------------------------------------------------------------------
tests/system_tests_three_routers.py | 354 ++++++++++++++++++++++---------
1 file changed, 253 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5fd599d4/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py
index 83719e5..cfc210c 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -22,7 +22,7 @@ from subprocess import PIPE, STDOUT
from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
from proton.handlers import MessagingHandler
-from proton.reactor import Container, AtMostOnce, AtLeastOnce
+from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption
import time
@@ -39,56 +39,121 @@ class RouterTest(TestCase):
@classmethod
def setUpClass(cls):
- """Start a router and a messenger"""
+ """Start a router and a sender-listener client"""
super(RouterTest, cls).setUpClass()
- def router(name, connection_1, connection_2=None):
+ def router ( name, connection_1, connection_2=None ):
config = [
- ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
-
- ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
-
- ('address', {'prefix': 'closest', 'distribution': 'closest'}),
- ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
- ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+ ('router',
+ {'mode' : 'interior',
+ 'id' : 'QDR.%s' % name
+ }
+ ),
+ ('listener',
+ {'port' : cls.tester.get_port(),
+ 'stripAnnotations' : 'no'
+ }
+ ),
+ ('address',
+ { 'prefix' : 'closest',
+ 'distribution' : 'closest'
+ }
+ ),
]
- config.append(connection_1)
+ config.append ( connection_1 )
if None != connection_2:
- config.append(connection_2)
+ config.append ( connection_2 )
- config = Qdrouterd.Config(config)
+ config = Qdrouterd.Config ( config )
- cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+ cls.routers.append ( cls.tester.qdrouterd(name, config, wait=True) )
cls.routers = []
- inter_router_port_1 = cls.tester.get_port()
- inter_router_port_2 = cls.tester.get_port()
-
- # A <--- B <--- C
- router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port_1}) )
-
- router('B', ('listener', {'role': 'inter-router', 'port': inter_router_port_2}),
- ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port_1, 'verifyHostName': 'no'}))
+ inter_router_port_A = cls.tester.get_port()
+ inter_router_port_B = cls.tester.get_port()
+ port_for_sender = cls.tester.get_port()
+
+
+ router ( 'A',
+ ( 'listener',
+ {'role': 'inter-router',
+ 'port': inter_router_port_A
+ }
+ )
+ )
+
+ router ( 'B',
+ ( 'listener',
+ { 'role': 'inter-router',
+ 'port': inter_router_port_B
+ }
+ ),
+ ( 'connector',
+ { 'name': 'connectorToA',
+ 'role': 'inter-router',
+ 'port': inter_router_port_A,
+ 'verifyHostName': 'no'
+ }
+ )
+ )
+
+ router ( 'C',
+ ( 'connector',
+ { 'name': 'connectorToB',
+ 'role': 'inter-router',
+ 'port': inter_router_port_B,
+ 'verifyHostName': 'no'
+ }
+ ),
+ ( 'listener',
+ { 'role': 'normal',
+ 'port': port_for_sender
+ }
+ )
+ )
+
+
+ cls.router_A = cls.routers[0]
+ cls.router_B = cls.routers[1]
+ cls.router_C = cls.routers[2]
+
+ #----------------------------------------------
+ # Wait until everybody can see everybody,
+ # to minimize the time when the network
+ # doesn't know how to route my messages.
+ #----------------------------------------------
+ cls.router_C.wait_router_connected('QDR.B')
+ cls.router_B.wait_router_connected('QDR.A')
+ cls.router_A.wait_router_connected('QDR.C')
+
+ #------------------------------------------------
+ # In these tests, first address will be used
+ # by the sender, second by the receiver.
+ #
+ # receiver <--- A <--- B <--- C <--- sender
+ #
+ #------------------------------------------------
+ cls.send_addr = cls.router_C.addresses[1]
+ cls.recv_addr = cls.router_A.addresses[0]
- router('C', ('connector', {'name': 'connectorToB', 'role': 'inter-router', 'port': inter_router_port_2, 'verifyHostName': 'no'}))
-
- cls.routers[0].wait_router_connected('QDR.C')
- cls.routers[1].wait_router_connected('QDR.B')
- cls.routers[2].wait_router_connected('QDR.A')
+ def test_01_targeted_sender(self):
+ test = TargetedSenderTest ( self.send_addr, self.recv_addr )
+ test.run()
+ self.assertEqual(None, test.error)
+ def test_02_anonymous_sender(self):
+ test = AnonymousSenderTest ( self.send_addr, self.recv_addr )
+ test.run()
+ self.assertEqual(None, test.error)
- def test_01_targeted_sender(self):
- test = TargetedSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0])
+ def test_03_dynamic_reply_to(self):
+ test = DynamicReplyTo ( self.send_addr, self.recv_addr )
test.run()
self.assertEqual(None, test.error)
-# def test_02_anonymous_sender(self):
-# test = AnonymousSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0])
-# test.run()
-# self.assertEqual(None, test.error)
class Timeout(object):
@@ -100,10 +165,10 @@ class Timeout(object):
class TargetedSenderTest(MessagingHandler):
- def __init__(self, address1, address2):
+ def __init__(self, send_addr, recv_addr):
super(TargetedSenderTest, self).__init__(prefetch=0)
- self.address1 = address1
- self.address2 = address2
+ self.send_addr = send_addr
+ self.recv_addr = recv_addr
self.dest = "closest.Targeted"
self.error = None
self.sender = None
@@ -114,24 +179,29 @@ class TargetedSenderTest(MessagingHandler):
self.n_accepted = 0
def timeout(self):
- self.error = "Timeout Expired"
- self.conn1.close()
- self.conn2.close()
+ self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
+ (self.n_sent, self.n_received, self.n_accepted)
+ self.send_conn.close()
+ self.recv_conn.close()
def on_start(self, event):
- # receiver <--- A <--- B <---- C <--- sender
- self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
- self.conn1 = event.container.connect(self.address1)
- self.conn2 = event.container.connect(self.address2)
- self.sender = event.container.create_sender(self.conn1, self.dest)
- self.receiver = event.container.create_receiver(self.conn2, self.dest)
+ self.timer = event.reactor.schedule(10, Timeout(self))
+ self.send_conn = event.container.connect(self.send_addr)
+ self.recv_conn = event.container.connect(self.recv_addr)
+ self.sender = event.container.create_sender(self.send_conn, self.dest)
+ self.receiver = event.container.create_receiver(self.recv_conn, self.dest)
self.receiver.flow(self.n_expected)
+
+ def send(self):
+ while self.sender.credit > 0 and self.n_sent < self.n_expected:
+ msg = Message(body=self.n_sent)
+ self.sender.send(msg)
+ self.n_sent += 1
+
def on_sendable(self, event):
if self.n_sent < self.n_expected:
- msg = Message(body=self.n_sent)
- event.sender.send(msg)
- self.n_sent += 1
+ self.send()
def on_accepted(self, event):
self.n_accepted += 1
@@ -140,8 +210,8 @@ class TargetedSenderTest(MessagingHandler):
self.n_received += 1
if self.n_received == self.n_expected:
self.receiver.close()
- self.conn1.close()
- self.conn2.close()
+ self.send_conn.close()
+ self.recv_conn.close()
self.timer.cancel()
def run(self):
@@ -149,77 +219,159 @@ class TargetedSenderTest(MessagingHandler):
+class DynamicTarget(LinkOption):
+ def apply(self, link):
+ link.target.dynamic = True
+ link.target.address = None
+
+
+
class AnonymousSenderTest(MessagingHandler):
- def __init__(self, address1, address2):
- super(AnonymousSenderTest, self).__init__(prefetch=0)
- self.address1 = address1
- self.address2 = address2
- self.dest = "closest.Anonymous"
- self.error = None
- self.sender = None
- self.receiver = None
- self.n_expected = 10
+
+ def __init__(self, send_addr, recv_addr):
+ super(AnonymousSenderTest, self).__init__()
+ self.send_addr = send_addr
+ self.recv_addr = recv_addr
+
+ self.error = None
+ self.recv_conn = None
+ self.send_conn = None
+ self.sender = None
+ self.receiver = None
+ self.address = None
+
+ self.expected = 10
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
- def timeout(self):
- self.error = "Timeout Expired %d messages received." % self.n_received
- self.conn1.close()
- self.conn2.close()
-
- # The problem with using an anonymous sender in a router
- # network is that it takes finite time for endpoint information
- # to propagate around the network. It is possible for me to
- # start sending before my router knows how to route my messages,
- # which will cause them to get released, and my test will hang,
- # doomed to wait eternally for the tenth message to be received.
- # To fix this, we will detect released messages here, and decrement
- # the sent message counter, forcing a resend for each drop.
- # And also pause for a moment, since we know that the network is
- # not yet ready.
- def on_released(self, event):
- self.n_sent -= 1
- time.sleep(0.1)
- def on_link_opened(self, event):
- if event.receiver:
- # This sender has no destination addr, so we will have to
- # address each message individually.
- # Also -- Create the sender here, when we know that the
- # receiver link has opened, because then we are at least
- # close to being able to send. (See comment above.)
- self.sender = event.container.create_sender(self.conn1, None)
+ def timeout ( self ):
+ self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \
+ (self.n_sent, self.n_received, self.n_accepted)
+ self.send_conn.close()
+ self.recv_conn.close()
def on_start(self, event):
- self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
- self.conn1 = event.container.connect(self.address1)
- self.conn2 = event.container.connect(self.address2)
- self.receiver = event.container.create_receiver(self.conn2, self.dest)
- self.receiver.flow(self.n_expected)
+ self.timer = event.reactor.schedule(10, Timeout(self))
+ self.send_conn = event.container.connect(self.send_addr)
+ self.recv_conn = event.container.connect(self.recv_addr)
+ self.sender = event.container.create_sender(self.send_conn, options=DynamicTarget())
- def on_sendable(self, event):
- if self.n_sent < self.n_expected:
- # Add the destination addr to each message.
- msg = Message(body=self.n_sent, address=self.dest)
- event.sender.send(msg)
+ def send(self):
+ while self.sender.credit > 0 and self.n_sent < self.expected:
self.n_sent += 1
+ m = Message(address=self.address, body="Message %d of %d" % (self.n_sent, self.expected))
+ self.sender.send(m)
+
+ def on_link_opened(self, event):
+ if event.sender == self.sender:
+ self.address = self.sender.remote_target.address
+ self.receiver = event.container.create_receiver(self.recv_conn, self.address)
+
+ def on_sendable(self, event):
+ self.send()
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ self.n_received += 1
def on_accepted(self, event):
self.n_accepted += 1
+ if self.n_accepted == self.expected:
+ self.send_conn.close()
+ self.recv_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+
+
+#=======================================================================
+# In this test we have a separate 'client' and 'server' with separate
+# connections. The client sends requests to the server, and embeds in
+# them its desired reply-to address. The server uses that address to
+# send back ambiguous and noncommittal messages. The tests ends with
+# success if the client receives the expected number of replies, or
+# with failure if we time out.
+#=======================================================================
+class DynamicReplyTo(MessagingHandler):
+ def __init__(self, client_addr, server_addr):
+ super(DynamicReplyTo, self).__init__(prefetch=10)
+ self.client_addr = client_addr
+ self.server_addr = server_addr
+ self.dest = "closest.dynamicRequestResponse"
+ self.error = None
+ self.server_receiver = None
+ self.client_receiver = None
+ self.sender = None
+ self.server_sender = None
+ self.n_expected = 10
+ self.n_sent = 0
+ self.received_by_server = 0
+ self.received_by_client = 0
+
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent=%d received_by_server=%d received_by_client=%d" % \
+ (self.n_sent, self.received_by_server, self.received_by_client)
+ self.client_connection.close()
+ self.server_connection.close()
+
+
+ def on_start ( self, event ):
+ self.timer = event.reactor.schedule ( 10, Timeout(self) )
+
+ # separate connections to simulate client and server.
+ self.client_connection = event.container.connect(self.client_addr)
+ self.server_connection = event.container.connect(self.server_addr)
+
+ self.sender = event.container.create_sender(self.client_connection, self.dest)
+ self.server_sender = event.container.create_sender(self.server_connection, None)
+
+ self.server_receiver = event.container.create_receiver(self.server_connection, self.dest)
+ self.client_receiver = event.container.create_receiver(self.client_connection, None, dynamic=True)
+
+
+
+ def on_sendable(self, event):
+ while event.sender.credit > 0 and self.n_sent < self.n_expected:
+ # We send to server, and tell it how to reply to the client.
+ reply_to_addr = self.client_receiver.remote_source.address
+
+ request = Message ( body=self.n_sent,
+ address=self.dest,
+ reply_to = reply_to_addr )
+ event.sender.send ( request )
+ self.n_sent += 1
+
def on_message(self, event):
- self.n_received += 1
- if self.n_received == self.n_expected:
- self.receiver.close()
- self.conn1.close()
- self.conn2.close()
- self.timer.cancel()
+ # Server gets a request and responds to
+ # the address that is embedded in the message.
+ if event.receiver == self.server_receiver :
+ self.server_sender.send ( Message(address=event.message.reply_to,
+ body="Reply hazy, try again later.") )
+ self.received_by_server += 1
+
+ # Client gets a response and counts it.
+ elif event.receiver == self.client_receiver :
+ self.received_by_client += 1
+ if self.received_by_client == self.n_expected:
+ self.timer.cancel()
+ self.server_receiver.close()
+ self.client_receiver.close()
+ self.client_connection.close()
+ self.server_connection.close()
+
def run(self):
Container(self).run()
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org