You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/12/06 21:52:55 UTC

qpid-dispatch git commit: DISPATCH-1197 - Added system test to make sure that streaming deliveries are handled without stalling when receiver goes away causing messages to be released

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 8701feb7f -> 58567f979


DISPATCH-1197 - Added system test to make sure that streaming deliveries are handled without stalling when receiver goes away causing messages to be released


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/58567f97
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/58567f97
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/58567f97

Branch: refs/heads/master
Commit: 58567f979deef3ecc4d797610f92539396de5f40
Parents: 8701feb
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Thu Dec 6 16:52:25 2018 -0500
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Dec 6 16:52:25 2018 -0500

----------------------------------------------------------------------
 tests/system_tests_one_router.py | 92 +++++++++++++++++++++++++++++++++++
 1 file changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58567f97/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index dd7cd31..fbddc49 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -422,6 +422,11 @@ class OneRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_42_unsettled_large_message_test(self):
+        test = UnsettledLargeMessageTest(self.address, 250)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -2856,5 +2861,92 @@ class RejectDispositionTest(MessagingHandler):
         Container(self).run()
 
 
+class UnsettledLargeMessageTest(MessagingHandler):
+    def __init__(self, addr, n_messages):
+        super (UnsettledLargeMessageTest, self).__init__()
+        self.addr = addr
+        self.n_messages = n_messages
+        self.sender = None
+        self.receiver = None
+        self.sender_conn = None
+        self.recv_conn = None
+        self.n_sent = 0
+        self.n_received = 0
+        self.error = None
+        self.test_timer = None
+        self.max_receive = 1
+        self.custom_timer = None
+        self.timer = None
+        self.n_accepted = 0
+        self.n_modified = 0
+        self.n_released = 0
+        self.str1 = "0123456789abcdef"
+        self.msg_str = ""
+        for i in range(16384):
+            self.msg_str += self.str1
+
+    def run (self):
+        Container(self).run()
+
+    def check_if_done(self):
+        # self.n_accepted + self.n_modified + self.n_released will never
+        # equal self.n_messages without the fix for DISPATCH-1197 because
+        # the router will never pull the data from the proton buffers once
+        # the router hits q2_holdoff
+        if self.n_accepted + self.n_modified + \
+                self.n_released == self.n_messages:
+            self.timer.cancel()
+            self.sender_conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d accepted=%d " \
+                     "released=%d modified=%d" % (self.n_messages,
+                                                  self.n_accepted,
+                                                  self.n_released,
+                                                  self.n_modified)
+
+    def on_start (self, event):
+        self.sender_conn = event.container.connect(self.addr)
+        self.recv_conn = event.container.connect(self.addr)
+        self.receiver = event.container.create_receiver(self.recv_conn,
+                                                        "test_42")
+        self.sender = event.container.create_sender(self.sender_conn,
+                                                    "test_42")
+        self.timer = event.reactor.schedule(15, Timeout(self))
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+
+    def on_released(self, event):
+        if event.delivery.remote_state == Delivery.MODIFIED:
+            self.n_modified += 1
+        else:
+            self.n_released += 1
+
+        self.check_if_done()
+
+    def on_sendable(self, event):
+        while self.n_sent < self.n_messages:
+            msg = Message(id=(self.n_sent + 1),
+                          body={'sequence': (self.n_sent + 1),
+                                'msg_str': self.msg_str})
+            # Presettle the delivery.
+            self.sender.send (msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == self.max_receive:
+            # Close the receiver connection after receiving just one message
+            # This will cause the release of multi-frame deliveries.
+            # Meanwhile the sender will keep sending but will run into
+            # the q2_holodd situation and never recover.
+            # The sender link will be stalled
+            # This test will NEVER pass without the fix to DISPATCH-1197
+            # Receiver bails after receiving max_receive messages.
+            self.receiver.close()
+            self.recv_conn.close()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org