You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/08/06 15:42:51 UTC
qpid-dispatch git commit: DISPATCH-1085 - Modified
AMQP_link_detach_handler to flush out the remaining bytes in the message
buffers before responding to detaches. This closes #345
Repository: qpid-dispatch
Updated Branches:
refs/heads/master a53179572 -> 22ef3d167
DISPATCH-1085 - Modified AMQP_link_detach_handler to flush out the remaining bytes in the message buffers before responding to detaches. This closes #345
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/22ef3d16
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/22ef3d16
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/22ef3d16
Branch: refs/heads/master
Commit: 22ef3d167b47fe243344908ef48fe44910e476ee
Parents: a531795
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Jul 23 15:21:44 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Mon Aug 6 11:40:56 2018 -0400
----------------------------------------------------------------------
src/message.c | 14 ++++----
src/router_node.c | 19 +++++++++++
tests/system_tests_one_router.py | 62 +++++++++++++++++++++++++++++++++++
3 files changed, 89 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 9c71b79..fce3394 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1128,13 +1128,15 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
return discard_receive(delivery, link, (qd_message_t *)msg);
}
+ // if q2 holdoff has been disabled (disable_q2_holdoff=true), we keep receiving.
+ // if q2 holdoff has been enabled (disable_q2_holdoff=false), if input is in holdoff then just exit.
+ // When enough buffers
+ // have been processed and freed by outbound processing then
+ // message holdoff is cleared and receiving may continue.
//
- // If input is in holdoff then just exit. When enough buffers
- // have been processed and freed by outbound processing then
- // message holdoff is cleared and receiving may continue.
- //
- if (msg->content->q2_input_holdoff) {
- return (qd_message_t*)msg;
+ if (!msg->content->disable_q2_holdoff) {
+ if (msg->content->q2_input_holdoff)
+ return (qd_message_t*)msg;
}
// Loop until msg is complete, error seen, or incoming bytes are consumed
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 585dd6f..dbc7b59 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -752,6 +752,25 @@ static int AMQP_link_flow_handler(void* context, qd_link_t *link)
*/
static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
+ if (!link)
+ return 0;
+
+ pn_link_t *pn_link = qd_link_pn(link);
+
+ if (!pn_link)
+ return 0;
+
+ pn_delivery_t *pnd = pn_link_current(pn_link);
+
+ if (pnd) {
+ qd_message_t *msg = qd_message_receive(pnd);
+
+ if (!qd_message_receive_complete(msg)) {
+ qd_message_Q2_holdoff_disable(msg);
+ deferred_AMQP_rx_handler((void *)link, false);
+ }
+ }
+
qd_router_t *router = (qd_router_t*) context;
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index afadf81..b40a95b 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -417,6 +417,11 @@ class OneRouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_41_large_streaming_close_conn_test(self):
+ test = LargeMessageStreamCloseConnTest(self.address)
+ test.run()
+ self.assertEqual(None, test.error)
+
class Entity(object):
def __init__(self, status_code, status_description, attrs):
@@ -2232,6 +2237,63 @@ class MulticastUnsettledTest(MessagingHandler):
Container(self).run()
+class LargeMessageStreamCloseConnTest(MessagingHandler):
+ def __init__(self, address):
+ super(LargeMessageStreamCloseConnTest, self).__init__()
+ self.address = address
+ self.dest = "LargeMessageStreamCloseConnTest"
+ self.error = None
+ self.timer = None
+ self.sender_conn = None
+ self.receiver_conn = None
+ self.sender = None
+ self.receiver = None
+ self.body = ""
+ self.aborted = False
+ for i in range(20000):
+ self.body += "0123456789101112131415"
+
+ def timeout(self):
+ if self.aborted:
+ self.error = "Message has been aborted. Test failed"
+ else:
+ self.error = "Message not received. test failed"
+ self.receiver_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.sender_conn = event.container.connect(self.address)
+ self.receiver_conn = event.container.connect(self.address)
+ self.sender = event.container.create_sender(self.sender_conn, self.dest)
+ self.receiver = event.container.create_receiver(self.receiver_conn,
+ self.dest, name="A")
+
+ def on_sendable(self, event):
+ msg = Message(body=self.body)
+ # send(msg) calls the stream function which streams data
+ # from sender to the router
+ event.sender.send(msg)
+
+ # Close the connection immediately after sending the message
+ # Without the fix for DISPATCH-1085, this test will fail
+ # one in five times with an abort
+ # With the fix in place, this test will never fail (the
+ # on_aborted will never be called).
+ self.sender_conn.close()
+
+ def on_message(self, event):
+ self.timer.cancel()
+ self.receiver_conn.close()
+
+ def on_aborted(self, event):
+ self.aborted = True
+ self.timer.cancel()
+ self.timeout()
+
+ def run(self):
+ Container(self).run()
+
+
class LargeMessageStreamTest(MessagingHandler):
def __init__(self, address):
super(LargeMessageStreamTest, self).__init__()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org