You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/14 12:57:23 UTC

[2/8] qpid-dispatch git commit: DISPATCH-341 - Added two tests for message-routed drain. Added timeouts for tests.

DISPATCH-341 - Added two tests for message-routed drain.  Added timeouts for tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/789b73e2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/789b73e2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/789b73e2

Branch: refs/heads/0.6.x
Commit: 789b73e2fdf72911182f194c60a8a89b05dd94ea
Parents: 28a4025
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 10:52:24 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jun 13 17:22:06 2016 -0400

----------------------------------------------------------------------
 tests/system_tests_drain.py         |  17 ++++-
 tests/system_tests_drain_support.py | 105 ++++++++++++++++++++++++++++++-
 2 files changed, 116 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/789b73e2/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index ba503a1..9747995 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -21,6 +21,7 @@ import unittest
 
 from system_test import TestCase, Qdrouterd, main_module
 from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
 
 class DrainSupportTest(TestCase):
 
@@ -44,12 +45,22 @@ class DrainSupportTest(TestCase):
     def test_drain_support_all_messages(self):
         drain_support = DrainMessagesHandler(self.address)
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(drain_support.error, None)
 
     def test_drain_support_one_message(self):
         drain_support = DrainOneMessageHandler(self.address)
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(drain_support.error, None)
+
+    def test_drain_support_no_messages(self):
+        drain_support = DrainNoMessagesHandler(self.address)
+        drain_support.run()
+        self.assertEqual(drain_support.error, None)
+
+    def test_drain_support_no_more_messages(self):
+        drain_support = DrainNoMoreMessagesHandler(self.address)
+        drain_support.run()
+        self.assertEqual(drain_support.error, None)
 
 if __name__ == '__main__':
-    unittest.main(main_module())
\ No newline at end of file
+    unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/789b73e2/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index 6192a7c..f11b8b8 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -21,6 +21,14 @@ from proton.handlers import MessagingHandler
 from proton.reactor import Container
 from proton import Message
 
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
 class DrainMessagesHandler(MessagingHandler):
     def __init__(self, address):
         # prefetch is set to zero so that proton does not automatically issue 10 credits.
@@ -31,9 +39,14 @@ class DrainMessagesHandler(MessagingHandler):
         self.sent_count = 0
         self.received_count = 0
         self.address = address
-        self.drain_successful = False
+        self.error = "Unexpected Exit"
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.conn.close()
 
     def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
         self.conn = event.container.connect(self.address)
 
         # Create a sender and a receiver. They are both listening on the same address
@@ -66,7 +79,8 @@ class DrainMessagesHandler(MessagingHandler):
             # messages. That along with 10 messages received indicates that the drain worked and we can
             # declare that the test is successful
             if self.received_count == 10 and event.link.credit == 0:
-                self.drain_successful = True
+                self.error = None
+                self.timer.cancel()
                 self.receiver.close()
                 self.sender.close()
                 self.conn.close()
@@ -74,6 +88,7 @@ class DrainMessagesHandler(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
 class DrainOneMessageHandler(DrainMessagesHandler):
     def __init__(self, address):
         super(DrainOneMessageHandler, self).__init__(address)
@@ -94,8 +109,92 @@ class DrainOneMessageHandler(DrainMessagesHandler):
             # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
             # indicates that the drain worked and we can declare that the test is successful
             if self.received_count == 5 and event.link.credit == 0:
-                self.drain_successful = True
+                self.error = None
+                self.timer.cancel()
                 self.receiver.close()
                 self.sender.close()
                 self.conn.close()
 
+
+class DrainNoMessagesHandler(MessagingHandler):
+    def __init__(self, address):
+        # prefetch is set to zero so that proton does not automatically issue 10 credits.
+        super(DrainNoMessagesHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.address = address
+        self.error = "Unexpected Exit"
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same address
+        self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+        self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+        self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        self.receiver.drain(1)
+
+    def on_drained(self, event):
+        if sender.credit == 0:
+            self.error = None
+            self.timer.cancel()
+            self.conn.close()
+
+    def run(self):
+        Container(self).run()
+
+
+class DrainNoMoreMessagesHandler(MessagingHandler):
+    def __init__(self, address):
+        # prefetch is set to zero so that proton does not automatically issue 10 credits.
+        super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.address = address
+        self.sent = 0
+        self.rcvd = 0
+        self.error = "Unexpected Exit"
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same address
+        self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+        self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+        self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        if self.sent == 0:
+            msg = Message(body="Hello World")
+            event.sender.send(msg)
+            self.sent += 1
+
+    def on_message(self, event):
+        self.rcvd += 1
+
+    def on_settled(self, event):
+        self.receiver.drain(1)
+
+    def on_drained(self, event):
+        if sender.credit == 0:
+            self.error = None
+            self.timer.cancel()
+            self.conn.close()
+
+    def run(self):
+        Container(self).run()
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org