You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/12/18 15:23:44 UTC

[qpid-dispatch] branch master updated: DISPATCH-1213 - Prevent stalling of presettled large message senders by calling the AMQP_rx_handler which start emptying proton buffers again. This closes #425.

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5c5cb  DISPATCH-1213 - Prevent stalling of presettled large message senders by calling the AMQP_rx_handler which start emptying proton buffers again. This closes #425.
ee5c5cb is described below

commit ee5c5cb286d27ca7c7785097078764ac2ee8af28
Author: Ganesh Murthy <gm...@redhat.com>
AuthorDate: Thu Dec 6 15:00:30 2018 -0500

    DISPATCH-1213 - Prevent stalling of presettled large message senders by calling the AMQP_rx_handler which start emptying proton buffers again. This closes #425.
---
 include/qpid/dispatch/router_core.h |   1 +
 src/router_core/transfer.c          |  33 +++++++++--
 src/router_node.c                   |   4 +-
 tests/system_tests_one_router.py    | 108 ++++++++++++++++++++++++++++++++++++
 4 files changed, 140 insertions(+), 6 deletions(-)

diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 136ad6a..a86428a 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -693,6 +693,7 @@ void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char
 void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length);
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
 void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
 bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
 bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index c4b90c9..49c1442 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -434,6 +434,11 @@ qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery)
     return delivery->error;
 }
 
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
+{
+    return delivery->presettled;
+}
+
 
 //==================================================================================
 // In-Thread Functions
@@ -441,11 +446,24 @@ qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery)
 
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
-    bool push = dlv->disposition != PN_RELEASED;
+    bool push = false;
+    bool moved = false;
 
-    dlv->disposition = PN_RELEASED;
-    dlv->settled = true;
-    bool moved = qdr_delivery_settled_CT(core, dlv);
+    if (dlv->presettled) {
+        //
+        // The delivery is presettled. We simply want to call CORE_delivery_update which in turn will
+        // restart stalled links if the q2_holdoff has been hit.
+        // For single frame presettled deliveries, calling CORE_delivery_update does not do anything.
+        //
+        push = true;
+    }
+    else {
+        push = dlv->disposition != PN_RELEASED;
+        dlv->disposition = PN_RELEASED;
+        dlv->settled = true;
+        moved = qdr_delivery_settled_CT(core, dlv);
+
+    }
 
     if (push || moved)
         qdr_delivery_push_CT(core, dlv);
@@ -855,6 +873,13 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
             link->dropped_presettled_deliveries++;
             if (dlv->link->link_type == QD_LINK_ENDPOINT)
                 core->dropped_presettled_deliveries++;
+
+            //
+            // The delivery is pre-settled. Call the qdr_delivery_release_CT so if this delivery is multi-frame
+            // we can restart receiving the delivery in case it is stalled. Note that messages will not
+            // *actually* be released because these are presettled messages.
+            //
+            qdr_delivery_release_CT(core, dlv);
         } else {
             qdr_delivery_release_CT(core, dlv);
 
diff --git a/src/router_node.c b/src/router_node.c
index 3c304b5..7ab9d0a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1605,7 +1605,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
     //
     // If the disposition has changed, update the proton delivery.
     //
-    if (disp != pn_delivery_remote_state(pnd)) {
+    if (disp != pn_delivery_remote_state(pnd) && !qdr_delivery_presettled(dlv)) {
         qd_message_t *msg = qdr_delivery_message(dlv);
 
         if (disp == PN_MODIFIED)
@@ -1635,7 +1635,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
             // If the delivery is settled and it is still arriving, defer the settlement
             // until the content has fully arrived.
             //
-            if (disp == PN_RELEASED || disp == PN_MODIFIED) {
+            if (disp == PN_RELEASED || disp == PN_MODIFIED || qdr_delivery_presettled(dlv)) {
                 //
                 // If the disposition is RELEASED or MODIFIED, set the message to discard
                 // and if it is blocked by holdoff, get the link rolling again.
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index fbddc49..3c828d5 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -427,6 +427,16 @@ class OneRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_43_dropped_presettled_receiver_stops(self):
+        local_node = Node.connect(self.address, timeout=TIMEOUT)
+        res = local_node.query('org.apache.qpid.dispatch.router')
+        deliveries_ingress = res.attribute_names.index(
+            'deliveriesIngress')
+        ingress_delivery_count = res.results[0][deliveries_ingress]
+        test = DroppedPresettledTest(self.address, 200, ingress_delivery_count)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -1006,6 +1016,104 @@ class PreSettled ( MessagingHandler ) :
             self.bail ( None )
 
 
+class PresettledCustomTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
+        res = local_node.query('org.apache.qpid.dispatch.router')
+        deliveries_ingress = res.attribute_names.index(
+            'deliveriesIngress')
+        ingress_delivery_count = res.results[0][deliveries_ingress]
+        self.parent.cancel_custom()
+
+        # Without the fix for DISPATCH--1213  the ingress count will be less than
+        # 200 because the sender link has stalled. The q2_holdoff happened
+        # and so all the remaining messages are still in the
+        # proton buffers.
+
+        if ingress_delivery_count - self.parent.begin_ingress_count > self.parent.n_messages:
+            self.parent.bail(None)
+        else:
+            self.parent.bail("Messages sent to the router is %d, "
+                             "Messages processed by the router is %d",
+                             (self.parent.n_messages,
+                              ingress_delivery_count - self.parent.begin_ingress_count))
+
+
+class DroppedPresettledTest(MessagingHandler):
+    def __init__(self, addr, n_messages, begin_ingress_count):
+        super (DroppedPresettledTest, self).__init__()
+        self.addr = addr
+        self.n_messages = n_messages
+        self.sender = None
+        self.receiver = None
+        self.sender_conn = None
+        self.recv_conn = None
+        self.n_sent = 0
+        self.n_received = 0
+        self.error = None
+        self.test_timer = None
+        self.max_receive = 10
+        self.custom_timer = None
+        self.timer = None
+        self.begin_ingress_count = begin_ingress_count
+        self.str1 = "0123456789abcdef"
+        self.msg_str = ""
+        for i in range(8192):
+            self.msg_str += self.str1
+
+    def run (self):
+        Container(self).run()
+
+    def bail(self, travail):
+        self.error = travail
+        self.sender_conn.close()
+        if self.recv_conn:
+            self.recv_conn.close()
+        self.timer.cancel()
+
+    def timeout(self,):
+        self.bail("Timeout Expired: %d messages received, %d expected." %
+                  (self.n_received, self.n_messages))
+
+    def on_start (self, event):
+        self.sender_conn = event.container.connect(self.addr)
+        self.recv_conn = event.container.connect(self.addr)
+        self.receiver = event.container.create_receiver(self.recv_conn,
+                                                        "test_43")
+        self.sender = event.container.create_sender(self.sender_conn,
+                                                    "test_43")
+        self.timer = event.reactor.schedule(10, Timeout(self))
+
+    def cancel_custom(self):
+        self.custom_timer.cancel()
+
+    def on_sendable(self, event):
+        while self.n_sent < self.n_messages:
+            msg = Message(id=(self.n_sent + 1),
+                          body={'sequence': (self.n_sent + 1),
+                                'msg_str': self.msg_str})
+            # Presettle the delivery.
+            dlv = self.sender.send (msg)
+            dlv.settle()
+            self.n_sent += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == self.max_receive:
+            # Receiver bails after receiving max_receive messages.
+            self.receiver.close()
+            self.recv_conn.close()
+
+            # The sender is only sending 200 large messages which is less
+            # that the initial credit of 250 that the router gives.
+            # Lets do a qdstat to find out if all 200 messages is handled
+            # by the router.
+            self.custom_timer = event.reactor.schedule(1,
+                                                       PresettledCustomTimeout(
+                                                           self))
 
 class MulticastUnsettled ( MessagingHandler ) :
     def __init__ ( self,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org