You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/12/19 14:23:00 UTC
qpid-dispatch git commit: DISPATCH-209 - From Mick Goulish - Added
anonymous sender for 3-router test. This closes #126
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 21a32cad7 -> 3086de102
DISPATCH-209 - From Mick Goulish - Added anonymous sender for 3-router test.
This closes #126
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3086de10
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3086de10
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3086de10
Branch: refs/heads/master
Commit: 3086de102ff125fc9a218c15140db4f858210da9
Parents: 21a32ca
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Dec 19 09:21:52 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Dec 19 09:21:52 2016 -0500
----------------------------------------------------------------------
tests/system_tests_three_routers.py | 84 +++++++++++++++++++++++++++++++-
1 file changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3086de10/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py
index def31ff..2414c92 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -23,6 +23,7 @@ from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SS
from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
from proton.handlers import MessagingHandler
from proton.reactor import Container, AtMostOnce, AtLeastOnce
+import time
# PROTON-828:
@@ -65,6 +66,7 @@ class RouterTest(TestCase):
inter_router_port_1 = cls.tester.get_port()
inter_router_port_2 = cls.tester.get_port()
+ # A <--- B <--- C
router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port_1}) )
router('B', ('listener', {'role': 'inter-router', 'port': inter_router_port_2}),
@@ -83,6 +85,11 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_02_anonymous_sender(self):
+ test = AnonymousSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
class Timeout(object):
def __init__(self, parent):
@@ -112,7 +119,8 @@ class TargetedSenderTest(MessagingHandler):
self.conn2.close()
def on_start(self, event):
- self.timer = event.reactor.schedule(10, Timeout(self))
+ # receiver <--- A <--- B <---- C <--- sender
+ self.timer = event.reactor.schedule(5, Timeout(self))
self.conn1 = event.container.connect(self.address1)
self.conn2 = event.container.connect(self.address2)
self.sender = event.container.create_sender(self.conn1, self.dest)
@@ -136,10 +144,82 @@ class TargetedSenderTest(MessagingHandler):
self.conn2.close()
self.timer.cancel()
-
def run(self):
Container(self).run()
+
+
+class AnonymousSenderTest(MessagingHandler):
+ def __init__(self, address1, address2):
+ super(AnonymousSenderTest, self).__init__(prefetch=0)
+ self.address1 = address1
+ self.address2 = address2
+ self.dest = "closest.Anonymous"
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.n_expected = 10
+ self.n_sent = 0
+ self.n_received = 0
+ self.n_accepted = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired %d messages received." % self.n_received
+ self.conn1.close()
+ self.conn2.close()
+
+ # The problem with using an anonymous sender in a router
+ # network is that it takes finite time for endpoint information
+ # to propagate around the network. It is possible for me to
+ # start sending before my router knows how to route my messages,
+ # which will cause them to get released, and my test will hang,
+ # doomed to wait eternally for the tenth message to be received.
+ # To fix this, we will detect released messages here, and decrement
+ # the sent message counter, forcing a resend for each drop.
+ # And also pause for a moment, since we know that the network is
+ # not yet ready.
+ def on_released(self, event):
+ self.n_sent -= 1
+ time.sleep(0.1)
+
+ def on_link_opened(self, event):
+ if event.receiver:
+ # This sender has no destination addr, so we will have to
+ # address each message individually.
+ # Also -- Create the sender here, when we know that the
+ # receiver link has opened, because then we are at least
+ # close to being able to send. (See comment above.)
+ self.sender = event.container.create_sender(self.conn1, None)
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn1 = event.container.connect(self.address1)
+ self.conn2 = event.container.connect(self.address2)
+ self.receiver = event.container.create_receiver(self.conn2, self.dest)
+ self.receiver.flow(self.n_expected)
+
+ def on_sendable(self, event):
+ if self.n_sent < self.n_expected:
+ # Add the destination addr to each message.
+ msg = Message(body=self.n_sent, address=self.dest)
+ event.sender.send(msg)
+ self.n_sent += 1
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+
+ def on_message(self, event):
+ self.n_received += 1
+ if self.n_received == self.n_expected:
+ self.receiver.close()
+ self.conn1.close()
+ self.conn2.close()
+ self.timer.cancel()
+ def run(self):
+ Container(self).run()
+
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org