You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/12/06 21:52:55 UTC
qpid-dispatch git commit: DISPATCH-1197 - Added system test to make
sure that streaming deliveries are handled without stalling when receiver
goes away causing messages to be released
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 8701feb7f -> 58567f979
DISPATCH-1197 - Added system test to make sure that streaming deliveries are handled without stalling when receiver goes away causing messages to be released
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/58567f97
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/58567f97
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/58567f97
Branch: refs/heads/master
Commit: 58567f979deef3ecc4d797610f92539396de5f40
Parents: 8701feb
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Thu Dec 6 16:52:25 2018 -0500
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Dec 6 16:52:25 2018 -0500
----------------------------------------------------------------------
tests/system_tests_one_router.py | 92 +++++++++++++++++++++++++++++++++++
1 file changed, 92 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58567f97/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index dd7cd31..fbddc49 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -422,6 +422,11 @@ class OneRouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_42_unsettled_large_message_test(self):
+ test = UnsettledLargeMessageTest(self.address, 250)
+ test.run()
+ self.assertEqual(None, test.error)
+
class Entity(object):
def __init__(self, status_code, status_description, attrs):
@@ -2856,5 +2861,92 @@ class RejectDispositionTest(MessagingHandler):
Container(self).run()
+class UnsettledLargeMessageTest(MessagingHandler):
+ def __init__(self, addr, n_messages):
+ super (UnsettledLargeMessageTest, self).__init__()
+ self.addr = addr
+ self.n_messages = n_messages
+ self.sender = None
+ self.receiver = None
+ self.sender_conn = None
+ self.recv_conn = None
+ self.n_sent = 0
+ self.n_received = 0
+ self.error = None
+ self.test_timer = None
+ self.max_receive = 1
+ self.custom_timer = None
+ self.timer = None
+ self.n_accepted = 0
+ self.n_modified = 0
+ self.n_released = 0
+ self.str1 = "0123456789abcdef"
+ self.msg_str = ""
+ for i in range(16384):
+ self.msg_str += self.str1
+
+ def run (self):
+ Container(self).run()
+
+ def check_if_done(self):
+ # self.n_accepted + self.n_modified + self.n_released will never
+ # equal self.n_messages without the fix for DISPATCH-1197 because
+ # the router will never pull the data from the proton buffers once
+ # the router hits q2_holdoff
+ if self.n_accepted + self.n_modified + \
+ self.n_released == self.n_messages:
+ self.timer.cancel()
+ self.sender_conn.close()
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d accepted=%d " \
+ "released=%d modified=%d" % (self.n_messages,
+ self.n_accepted,
+ self.n_released,
+ self.n_modified)
+
+ def on_start (self, event):
+ self.sender_conn = event.container.connect(self.addr)
+ self.recv_conn = event.container.connect(self.addr)
+ self.receiver = event.container.create_receiver(self.recv_conn,
+ "test_42")
+ self.sender = event.container.create_sender(self.sender_conn,
+ "test_42")
+ self.timer = event.reactor.schedule(15, Timeout(self))
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+
+ def on_released(self, event):
+ if event.delivery.remote_state == Delivery.MODIFIED:
+ self.n_modified += 1
+ else:
+ self.n_released += 1
+
+ self.check_if_done()
+
+ def on_sendable(self, event):
+ while self.n_sent < self.n_messages:
+ msg = Message(id=(self.n_sent + 1),
+ body={'sequence': (self.n_sent + 1),
+ 'msg_str': self.msg_str})
+ # Presettle the delivery.
+ self.sender.send (msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ self.n_received += 1
+ if self.n_received == self.max_receive:
+ # Close the receiver connection after receiving just one message
+ # This will cause the release of multi-frame deliveries.
+ # Meanwhile the sender will keep sending but will run into
+ # the q2_holodd situation and never recover.
+ # The sender link will be stalled
+ # This test will NEVER pass without the fix to DISPATCH-1197
+ # Receiver bails after receiving max_receive messages.
+ self.receiver.close()
+ self.recv_conn.close()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org