You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/14 12:57:23 UTC
[2/8] qpid-dispatch git commit: DISPATCH-341 - Added two tests for
message-routed drain. Added timeouts for tests.
DISPATCH-341 - Added two tests for message-routed drain. Added timeouts for tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/789b73e2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/789b73e2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/789b73e2
Branch: refs/heads/0.6.x
Commit: 789b73e2fdf72911182f194c60a8a89b05dd94ea
Parents: 28a4025
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 10:52:24 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jun 13 17:22:06 2016 -0400
----------------------------------------------------------------------
tests/system_tests_drain.py | 17 ++++-
tests/system_tests_drain_support.py | 105 ++++++++++++++++++++++++++++++-
2 files changed, 116 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/789b73e2/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index ba503a1..9747995 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -21,6 +21,7 @@ import unittest
from system_test import TestCase, Qdrouterd, main_module
from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
class DrainSupportTest(TestCase):
@@ -44,12 +45,22 @@ class DrainSupportTest(TestCase):
def test_drain_support_all_messages(self):
drain_support = DrainMessagesHandler(self.address)
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(drain_support.error, None)
def test_drain_support_one_message(self):
drain_support = DrainOneMessageHandler(self.address)
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(drain_support.error, None)
+
+ def test_drain_support_no_messages(self):
+ drain_support = DrainNoMessagesHandler(self.address)
+ drain_support.run()
+ self.assertEqual(drain_support.error, None)
+
+ def test_drain_support_no_more_messages(self):
+ drain_support = DrainNoMoreMessagesHandler(self.address)
+ drain_support.run()
+ self.assertEqual(drain_support.error, None)
if __name__ == '__main__':
- unittest.main(main_module())
\ No newline at end of file
+ unittest.main(main_module())
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/789b73e2/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index 6192a7c..f11b8b8 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -21,6 +21,14 @@ from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Message
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
class DrainMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
@@ -31,9 +39,14 @@ class DrainMessagesHandler(MessagingHandler):
self.sent_count = 0
self.received_count = 0
self.address = address
- self.drain_successful = False
+ self.error = "Unexpected Exit"
+
+ def timeout(self):
+ self.error = "Timeout Expired"
+ self.conn.close()
def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
@@ -66,7 +79,8 @@ class DrainMessagesHandler(MessagingHandler):
# messages. That along with 10 messages received indicates that the drain worked and we can
# declare that the test is successful
if self.received_count == 10 and event.link.credit == 0:
- self.drain_successful = True
+ self.error = None
+ self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
@@ -74,6 +88,7 @@ class DrainMessagesHandler(MessagingHandler):
def run(self):
Container(self).run()
+
class DrainOneMessageHandler(DrainMessagesHandler):
def __init__(self, address):
super(DrainOneMessageHandler, self).__init__(address)
@@ -94,8 +109,92 @@ class DrainOneMessageHandler(DrainMessagesHandler):
# messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
# indicates that the drain worked and we can declare that the test is successful
if self.received_count == 5 and event.link.credit == 0:
- self.drain_successful = True
+ self.error = None
+ self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
+
+class DrainNoMessagesHandler(MessagingHandler):
+ def __init__(self, address):
+ # prefetch is set to zero so that proton does not automatically issue 10 credits.
+ super(DrainNoMessagesHandler, self).__init__(prefetch=0)
+ self.conn = None
+ self.sender = None
+ self.receiver = None
+ self.address = address
+ self.error = "Unexpected Exit"
+
+ def timeout(self):
+ self.error = "Timeout Expired"
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ # Create a sender and a receiver. They are both listening on the same address
+ self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+ self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+ self.receiver.flow(1)
+
+ def on_sendable(self, event):
+ self.receiver.drain(1)
+
+ def on_drained(self, event):
+ if sender.credit == 0:
+ self.error = None
+ self.timer.cancel()
+ self.conn.close()
+
+ def run(self):
+ Container(self).run()
+
+
+class DrainNoMoreMessagesHandler(MessagingHandler):
+ def __init__(self, address):
+ # prefetch is set to zero so that proton does not automatically issue 10 credits.
+ super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0)
+ self.conn = None
+ self.sender = None
+ self.receiver = None
+ self.address = address
+ self.sent = 0
+ self.rcvd = 0
+ self.error = "Unexpected Exit"
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ # Create a sender and a receiver. They are both listening on the same address
+ self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+ self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+ self.receiver.flow(1)
+
+ def on_sendable(self, event):
+ if self.sent == 0:
+ msg = Message(body="Hello World")
+ event.sender.send(msg)
+ self.sent += 1
+
+ def on_message(self, event):
+ self.rcvd += 1
+
+ def on_settled(self, event):
+ self.receiver.drain(1)
+
+ def on_drained(self, event):
+ if sender.credit == 0:
+ self.error = None
+ self.timer.cancel()
+ self.conn.close()
+
+ def run(self):
+ Container(self).run()
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org