You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/08/06 15:42:51 UTC

qpid-dispatch git commit: DISPATCH-1085 - Modified AMQP_link_detach_handler to flush out the remaining bytes in the message buffers before responding to detaches. This closes #345

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master a53179572 -> 22ef3d167


DISPATCH-1085 - Modified AMQP_link_detach_handler to flush out the remaining bytes in the message buffers before responding to detaches. This closes #345


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/22ef3d16
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/22ef3d16
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/22ef3d16

Branch: refs/heads/master
Commit: 22ef3d167b47fe243344908ef48fe44910e476ee
Parents: a531795
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Jul 23 15:21:44 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Mon Aug 6 11:40:56 2018 -0400

----------------------------------------------------------------------
 src/message.c                    | 14 ++++----
 src/router_node.c                | 19 +++++++++++
 tests/system_tests_one_router.py | 62 +++++++++++++++++++++++++++++++++++
 3 files changed, 89 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 9c71b79..fce3394 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1128,13 +1128,15 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         return discard_receive(delivery, link, (qd_message_t *)msg);
     }
 
+    // if q2 holdoff has been disabled (disable_q2_holdoff=true), we keep receiving.
+    // if q2 holdoff has been enabled (disable_q2_holdoff=false), if input is in holdoff then just exit.
+    //      When enough buffers
+    //      have been processed and freed by outbound processing then
+    //      message holdoff is cleared and receiving may continue.
     //
-    // If input is in holdoff then just exit. When enough buffers
-    // have been processed and freed by outbound processing then
-    // message holdoff is cleared and receiving may continue.
-    //
-    if (msg->content->q2_input_holdoff) {
-        return (qd_message_t*)msg;
+    if (!msg->content->disable_q2_holdoff) {
+        if (msg->content->q2_input_holdoff)
+            return (qd_message_t*)msg;
     }
 
     // Loop until msg is complete, error seen, or incoming bytes are consumed

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 585dd6f..dbc7b59 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -752,6 +752,25 @@ static int AMQP_link_flow_handler(void* context, qd_link_t *link)
  */
 static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
 {
+    if (!link)
+        return 0;
+
+    pn_link_t      *pn_link      = qd_link_pn(link);
+
+    if (!pn_link)
+        return 0;
+
+    pn_delivery_t  *pnd          = pn_link_current(pn_link);
+
+    if (pnd) {
+        qd_message_t   *msg   = qd_message_receive(pnd);
+
+        if (!qd_message_receive_complete(msg)) {
+            qd_message_Q2_holdoff_disable(msg);
+            deferred_AMQP_rx_handler((void *)link, false);
+        }
+    }
+
     qd_router_t    *router = (qd_router_t*) context;
     qdr_link_t     *rlink  = (qdr_link_t*) qd_link_get_context(link);
     pn_condition_t *cond   = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index afadf81..b40a95b 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -417,6 +417,11 @@ class OneRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_41_large_streaming_close_conn_test(self):
+        test = LargeMessageStreamCloseConnTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -2232,6 +2237,63 @@ class MulticastUnsettledTest(MessagingHandler):
         Container(self).run()
 
 
+class LargeMessageStreamCloseConnTest(MessagingHandler):
+    def __init__(self, address):
+        super(LargeMessageStreamCloseConnTest, self).__init__()
+        self.address = address
+        self.dest = "LargeMessageStreamCloseConnTest"
+        self.error = None
+        self.timer = None
+        self.sender_conn = None
+        self.receiver_conn = None
+        self.sender = None
+        self.receiver = None
+        self.body = ""
+        self.aborted = False
+        for i in range(20000):
+            self.body += "0123456789101112131415"
+
+    def timeout(self):
+        if self.aborted:
+            self.error = "Message has been aborted. Test failed"
+        else:
+            self.error = "Message not received. test failed"
+        self.receiver_conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.sender_conn = event.container.connect(self.address)
+        self.receiver_conn = event.container.connect(self.address)
+        self.sender = event.container.create_sender(self.sender_conn, self.dest)
+        self.receiver = event.container.create_receiver(self.receiver_conn,
+                                                        self.dest, name="A")
+
+    def on_sendable(self, event):
+        msg = Message(body=self.body)
+        # send(msg) calls the stream function which streams data
+        # from sender to the router
+        event.sender.send(msg)
+
+        # Close the connection immediately after sending the message
+        # Without the fix for DISPATCH-1085, this test will fail
+        # one in five times with an abort
+        # With the fix in place, this test will never fail (the
+        # on_aborted will never be called).
+        self.sender_conn.close()
+
+    def on_message(self, event):
+        self.timer.cancel()
+        self.receiver_conn.close()
+
+    def on_aborted(self, event):
+        self.aborted = True
+        self.timer.cancel()
+        self.timeout()
+
+    def run(self):
+        Container(self).run()
+
+
 class LargeMessageStreamTest(MessagingHandler):
     def __init__(self, address):
         super(LargeMessageStreamTest, self).__init__()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org