You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/06/28 13:31:11 UTC

qpid-dispatch git commit: DISPATCH-1045 - Release the delivery only after the entire message has been received

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 0f8fb609e -> 5d7304f2c


DISPATCH-1045 - Release the delivery only after the entire message has been received


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5d7304f2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5d7304f2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5d7304f2

Branch: refs/heads/master
Commit: 5d7304f2c4caf206c9304fcbd22be0a972116e3d
Parents: 0f8fb60
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Jun 22 16:01:51 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Jun 28 09:22:49 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h |  1 +
 src/router_core/transfer.c          | 41 ++++++++++++++++++-----
 src/router_node.c                   | 57 +++++++++++++++++++++++---------
 tests/system_tests_one_router.py    | 51 ++++++++++++++++++++++++++--
 4 files changed, 124 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 17d3002..e0a1f41 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -660,6 +660,7 @@ bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
 bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
 void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
 bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
 void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted);
 bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7729a8e..5ee4ae3 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -58,6 +58,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
     dlv->link_exclusion = link_exclusion;
     dlv->ingress_index  = ingress_index;
     dlv->error          = 0;
+    dlv->disposition    = 0;
 
     qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
     qdr_delivery_incref(dlv, "qdr_link_deliver - protect returned value");
@@ -86,6 +87,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
     dlv->link_exclusion = link_exclusion;
     dlv->ingress_index  = ingress_index;
     dlv->error          = 0;
+    dlv->disposition    = 0;
 
     qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");
     qdr_delivery_incref(dlv, "qdr_link_deliver_to - protect returned value");
@@ -108,11 +110,12 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->link       = link;
-    dlv->msg        = msg;
-    dlv->settled    = settled;
-    dlv->presettled = settled;
-    dlv->error      = 0;
+    dlv->link         = link;
+    dlv->msg          = msg;
+    dlv->settled      = settled;
+    dlv->presettled   = settled;
+    dlv->error        = 0;
+    dlv->disposition  = 0;
 
     qdr_delivery_read_extension_state(dlv, disposition, disposition_data, true);
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
@@ -339,6 +342,13 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
     return qd_message_receive_complete(delivery->msg);
 }
 
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
+{
+    if (!delivery)
+        return 0;
+    return delivery->disposition;
+}
+
 
 void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
 {
@@ -569,6 +579,7 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
 
     qd_bitmask_free(delivery->link_exclusion);
     qdr_error_free(delivery->error);
+
     free_qdr_delivery_t(delivery);
 
 }
@@ -812,8 +823,12 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
             link->dropped_presettled_deliveries++;
             if (dlv->link->link_type == QD_LINK_ENDPOINT)
                 core->dropped_presettled_deliveries++;
-        } else
-            qdr_delivery_release_CT(core, dlv);
+        } else {
+            if (more)
+                dlv->disposition = PN_RELEASED;
+            else
+                qdr_delivery_release_CT(core, dlv);
+        }
 
         if (qdr_is_addr_treatment_multicast(link->owning_addr))
             qdr_link_issue_credit_CT(core, link, 1, false);
@@ -870,13 +885,21 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         // If the delivery is not settled, release it.
         //
         if (!dlv->settled) {
-            qdr_delivery_release_CT(core, dlv);
 
             //
             // Set the discard flag on the message only if the message is not completely received yet.
             //
-            if (more)
+            if (more) {
+                //
+                // Since more of the messgae is still arriving, we want to wait until after the enter message arrives to release it.
+                // Dont release it now.
+                //
                 qd_message_set_discard(dlv->msg, true);
+                dlv->disposition = PN_RELEASED;
+            }
+            else {
+                qdr_delivery_release_CT(core, dlv);
+            }
         }
 
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 39da87d..c305e54 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -288,11 +288,6 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
     bool receive_complete = qd_message_receive_complete(msg);
 
     if (receive_complete) {
-        //
-        // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
-        //
-        pn_link_advance(pn_link);
-
         if (!qd_message_aborted(msg)) {
             // Since the entire message has been received, we can print out its contents to the log if necessary.
             if (cf->log_message) {
@@ -312,12 +307,31 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
                        pn_link_name(pn_link));
         }
 
+        //
+        // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
+        //
+        pn_link_advance(pn_link);
+
+        //
+        // The entire message has been received but this message needs to be discarded
+        //
+        if (qd_message_is_discard(msg)) {
+            if (qdr_delivery_disposition(delivery) != 0)
+                pn_delivery_update(pnd, qdr_delivery_disposition(delivery));
+            pn_delivery_settle(pnd);
+            qdr_delivery_decref(router->router_core, delivery, "release protection of return from delivery discard");
+        }
+
         // Link stalling may have ignored some delivery events.
         // If there's another delivery pending then reschedule this.
         pn_delivery_t *npnd = pn_link_current(pn_link);
         if (npnd) {
             qd_connection_invoke_deferred(conn, deferred_AMQP_rx_handler, link);
         }
+
+        if (qd_message_is_discard(msg)) {
+            return;
+        }
     }
 
     //
@@ -339,14 +353,21 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
         // A delivery object was already available via pn_delivery_get_context. This means a qdr_delivery was already created. Use it to continue delivery.
         //
         if (delivery) {
-            qdr_deliver_continue(delivery);
 
             //
-            // Settle the proton delivery only if all the data has been received.
+            // Call continue only if the discard flag on the message is not set
+            // We should not continue processing the message after it has been discarded
             //
-            if (pn_delivery_settled(pnd) && receive_complete) {
-                qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
-                pn_delivery_settle(pnd);
+            if (!qd_message_is_discard(msg)) {
+                qdr_deliver_continue(delivery);
+
+                //
+                // Settle the proton delivery only if all the data has been received.
+                //
+                if (pn_delivery_settled(pnd) && receive_complete) {
+                    qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+                    pn_delivery_settle(pnd);
+                }
             }
         }
         else {
@@ -418,11 +439,17 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
     }
 
     if (delivery) {
-        qdr_deliver_continue(delivery);
-        if (receive_complete) {
-            if (pn_delivery_settled(pnd)) {
-                qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
-                pn_delivery_settle(pnd);
+        //
+        // Call continue only if the discard flag on the message is not set
+        // We should not continue processing the message after it has been discarded
+        //
+        if (!qd_message_is_discard(msg)) {
+            qdr_deliver_continue(delivery);
+            if (receive_complete) {
+                if (pn_delivery_settled(pnd)) {
+                    qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+                    pn_delivery_settle(pnd);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 50b3390..afadf81 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -52,8 +52,6 @@ class MultiTimeout ( object ):
         self.parent.timeout ( self.name )
 
 
-
-
 class OneRouterTest(TestCase):
     """System tests involving a single router"""
     @classmethod
@@ -414,6 +412,11 @@ class OneRouterTest(TestCase):
 
         client.connection.close()
 
+    def test_40_anonymous_sender_no_receiver(self):
+        test = AnonymousSenderNoRecvLargeMessagedTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -2228,6 +2231,7 @@ class MulticastUnsettledTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
 class LargeMessageStreamTest(MessagingHandler):
     def __init__(self, address):
         super(LargeMessageStreamTest, self).__init__()
@@ -2322,6 +2326,7 @@ class MultiframePresettledTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
 class MulticastUnsettledNoReceiverTest(MessagingHandler):
     """
     Creates a sender to a multicast address. Router provides a credit of 'linkCapacity' to this sender even
@@ -2400,6 +2405,48 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler):
         Container(self).run()
 
 
+class AnonymousSenderNoRecvLargeMessagedTest(MessagingHandler):
+    def __init__(self, address):
+        super(AnonymousSenderNoRecvLargeMessagedTest, self).__init__(auto_accept=False)
+        self.timer = None
+        self.conn = None
+        self.sender = None
+        self.address = address
+        self.released = False
+        self.error = None
+        self.body = ""
+        for i in range(20000):
+            self.body += "0123456789101112131415"
+
+    def timeout(self):
+        self.error = "Timeout Expired:, delivery not released. "
+        self.conn.close()
+
+    def check_if_done(self):
+        if self.released:
+            self.sender.close()
+            self.conn.close()
+            self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        # This sender is an anonymous sender
+        self.sender = event.container.create_sender(self.conn)
+
+    def on_sendable(self, event):
+        msg = Message(body=self.body, address="someaddress")
+        # send(msg) calls the stream function which streams data from sender to the router
+        event.sender.send(msg)
+
+    def on_released(self, event):
+        self.released = True
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 class ReleasedVsModifiedTest(MessagingHandler):
     def __init__(self, address):
         super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org