You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/05/02 17:09:54 UTC

qpid-dispatch git commit: DISPATCH-295 - Ensure that unsettled messages are properly tracked across routed links.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 375c3edcd -> 00b47f15b


DISPATCH-295 - Ensure that unsettled messages are properly tracked across routed links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/00b47f15
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/00b47f15
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/00b47f15

Branch: refs/heads/master
Commit: 00b47f15b150d9e4f0838795404779b22609e2b2
Parents: 375c3ed
Author: Ted Ross <tr...@redhat.com>
Authored: Mon May 2 11:04:31 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon May 2 11:04:31 2016 -0400

----------------------------------------------------------------------
 src/router_core/transfer.c        | 10 ++++++
 tests/system_tests_link_routes.py | 64 ++++++++++++++++++++++++++++++++++
 2 files changed, 74 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00b47f15/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index acb0b1b..ffe11c9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -410,11 +410,21 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
     //
     if (link->connected_link) {
         qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);
+
+        //
+        // Copy the delivery tag.  For link-routing, the delivery tag must be preserved.
+        //
         peer->tag_length = action->args.connection.tag_length;
         memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
+
         qdr_forward_deliver_CT(core, link->connected_link, peer);
         qd_message_free(dlv->msg);
         dlv->msg = 0;
+        link->total_deliveries++;
+        if (!dlv->settled) {
+            DEQ_INSERT_TAIL(link->unsettled, dlv);
+            dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+        }
         return;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00b47f15/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 9475460..6f49c12 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -447,6 +447,12 @@ class LinkRoutePatternTest(TestCase):
         self.assertTrue(test.message_received)
         self.assertTrue(test.delivery_tag_verified)
 
+    def test_close_with_unsettled(self):
+        test = CloseWithUnsettledTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
+
 class DeliveryTagsTest(MessagingHandler):
     def __init__(self, sender_address, listening_address, qdstat_address):
         super(DeliveryTagsTest, self).__init__()
@@ -512,5 +518,63 @@ class DeliveryTagsTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class CloseWithUnsettledTest(MessagingHandler):
+    ##
+    ## This test sends a message across an attach-routed link.  While the message
+    ## is unsettled, the client link is closed.  The test is ensuring that the
+    ## router does not crash during the closing of the links.
+    ##
+    def __init__(self, normal_addr, route_addr):
+        super(CloseWithUnsettledTest, self).__init__(prefetch=0, auto_accept=False)
+        self.normal_addr = normal_addr
+        self.route_addr  = route_addr
+        self.dest = "pulp.task.CWUtest"
+        self.error = None
+
+    def timeout(self):
+        self.error = "Timeout Expired - Check for cores"
+        self.conn_normal.close()
+        self.conn_route.close()
+
+    def on_start(self, event):
+        self.timer      = event.reactor.schedule(5, Timeout(self))
+        self.conn_route = event.container.connect(self.route_addr)
+
+    def on_connection_opened(self, event):
+        if event.connection == self.conn_route:
+            self.conn_normal = event.container.connect(self.normal_addr)
+        elif event.connection == self.conn_normal:
+            self.sender = event.container.create_sender(self.conn_normal, self.dest)
+
+    def on_connection_closed(self, event):
+        self.conn_route.close()
+        self.timer.cancel()
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            self.receiver = event.receiver
+            self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        msg = Message(body="CloseWithUnsettled")
+        event.sender.send(msg)
+
+    def on_message(self, event):
+        self.conn_normal.close()
+
+    def run(self):
+        Container(self).run()
+
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org