You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/10/18 14:43:30 UTC

[08/15] qpid-dispatch git commit: DISPATCH-829 - Handle truncated messages on routed links.

DISPATCH-829 - Handle truncated messages on routed links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/653ed7c9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/653ed7c9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/653ed7c9

Branch: refs/heads/master
Commit: 653ed7c9249241ebddf3d21e26d89aaaacc005fa
Parents: 294baf3
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 11 12:39:43 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 18 08:11:40 2017 -0400

----------------------------------------------------------------------
 src/router_core/connections.c        |  25 ++++++
 tests/system_tests_delivery_abort.py | 137 +++++++++++++++++++++++++++++-
 2 files changed, 160 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/653ed7c9/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 817978d..c239507 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -764,6 +764,23 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
 }
 
 
+static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
+{
+    assert(link->link_direction == QD_OUTGOING);
+
+    qdr_connection_t *conn = link->conn;
+
+    sys_mutex_lock(conn->work_lock);
+    qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
+    while (dlv) {
+        if (!qdr_delivery_receive_complete(dlv))
+            qdr_delivery_set_aborted(dlv, true);
+        dlv = DEQ_NEXT(dlv);
+    }
+    sys_mutex_unlock(conn->work_lock);
+}
+
+
 static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     //
@@ -1638,6 +1655,14 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
     // For routed links, propagate the detach
     //
     if (link->connected_link) {
+        //
+        // If the connected link is outgoing and there is a delivery on the connected link's undelivered
+        // list that is not receive-complete, we must flag that delivery as aborted or it will forever
+        // block the propagation of the detach.
+        //
+        if (link->connected_link->link_direction == QD_OUTGOING)
+            qdr_link_abort_undelivered_CT(core, link->connected_link);
+
         if (dt != QD_LOST)
             qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
         else {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/653ed7c9/tests/system_tests_delivery_abort.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_delivery_abort.py b/tests/system_tests_delivery_abort.py
index b0869e5..3128086 100644
--- a/tests/system_tests_delivery_abort.py
+++ b/tests/system_tests_delivery_abort.py
@@ -85,6 +85,15 @@ class RouterTest(TestCase):
         self.assertEqual(None, test.error)
 
 
+    def test_03_link_route_interrupted_stream_one_router(self):
+        test = LinkRouteAbortTest(self.routers[0].addresses[0],
+                                  self.routers[0].addresses[1],
+                                  "link.addr_03",
+                                  self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
         self.status_code        = status_code
@@ -140,6 +149,7 @@ class MessageRouteAbortTest(MessagingHandler):
         self.error         = None
         self.sender1       = None
         self.sender2       = None
+        self.sender3       = None
         self.receiver      = None
         self.streaming     = False
         self.delivery      = None
@@ -150,7 +160,8 @@ class MessageRouteAbortTest(MessagingHandler):
         self.program       = ['Send_Short_1', 'Send_Long_Truncated', 'Send_Short_2', 'Send_Short_3']
         self.result        = []
         self.expected_result = ['Send_Short_1', 'Aborted_Delivery', '2', '2', '2', '2', '2',
-                                '2', '2', '2', '2', '2', 'Send_Short_2', 'Send_Short_3']
+                                '2', '2', '2', '2', '2', 'Send_Short_2', '3', '3', '3', '3',
+                                '3', '3', '3', '3', '3', '3', 'Send_Short_3']
 
     def timeout(self):
         self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
@@ -193,8 +204,12 @@ class MessageRouteAbortTest(MessagingHandler):
             self.sender2.send(m)
             self.sender2.close()
         elif next_op == 'Send_Short_3':
+            m = Message(body="3")
+            for i in range(10):
+                self.sender3.send(m)
             m = Message(body="%s" % next_op)
             self.sender3.send(m)
+            self.sender_conn.close()
 
     def on_sendable(self, event):
         if event.sender == self.sender1 and self.program[0] == 'Send_Short_1':
@@ -212,7 +227,6 @@ class MessageRouteAbortTest(MessagingHandler):
         elif m.body == 'Send_Short_3':
             if self.result != self.expected_result:
                 self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
-            self.sender_conn.close()
             self.receiver_conn.close()
             self.timer.cancel()
 
@@ -224,5 +238,124 @@ class MessageRouteAbortTest(MessagingHandler):
         Container(self).run()
 
 
+class LinkRouteAbortTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host, address, query_host):
+        super(LinkRouteAbortTest, self).__init__()
+        self.sender_host      = sender_host
+        self.receiver_host    = receiver_host
+        self.address          = address
+        self.query_host       = query_host
+
+        self.sender_conn   = None
+        self.receiver_conn = None
+        self.query_conn    = None
+        self.error         = None
+        self.sender1       = None
+        self.receiver      = None
+        self.poll_timer    = None
+        self.streaming     = False
+        self.delivery      = None
+        self.data          = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+        self.long_data     = ""
+
+        self.sent_stream   = 0
+        self.program       = ['Send_Short_1', 'Send_Long_Truncated']
+        self.result        = []
+        self.expected_result = ['Send_Short_1', 'Aborted_Delivery']
+
+    def timeout(self):
+        self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(10.0, Timeout(self))
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.query_conn     = event.container.connect(self.query_host)
+        self.reply_receiver = event.container.create_receiver(self.query_conn, dynamic=True)
+        self.agent_sender   = event.container.create_sender(self.query_conn, "$management")
+
+    def setup_first_links(self, event):
+        self.sender1 = event.container.create_sender(self.sender_conn, self.address, name="S1")
+
+    def stream(self):
+        self.sender1.stream(self.long_data)
+        self.sent_stream += len(self.long_data)
+        if self.sent_stream >= 100000:
+            self.streaming = False
+            self.sender1.close()
+
+    def send(self):
+        next_op = self.program.pop(0) if len(self.program) > 0 else None
+        if next_op == 'Send_Short_1':
+            m = Message(body="%s" % next_op)
+            self.sender1.send(m)
+        elif next_op == 'Send_Long_Truncated':
+            for i in range(100):
+                self.long_data += self.data
+            self.delivery  = self.sender1.delivery(self.sender1.delivery_tag())
+            self.streaming = True
+            self.stream()
+
+    def poll_timeout(self):
+        self.poll()
+
+    def poll(self):
+        request = self.proxy.read_address('Clink')
+        self.agent_sender.send(request)
+
+    def on_sendable(self, event):
+        if event.sender == self.sender1 and len(self.program) > 0 and self.program[0] == 'Send_Short_1':
+            self.send()
+        if event.sender == self.sender1 and self.streaming:
+            self.stream()
+
+    def on_link_opening(self, event):
+        if event.receiver:
+            self.receiver = event.receiver
+            event.receiver.target.address = self.address
+            event.receiver.open()
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.proxy = RouterProxy(self.reply_receiver.remote_source.address)
+            self.poll()
+
+    def on_message(self, event):
+        if event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            if response.status_code == 200 and (response.remoteCount + response.containerCount) > 0:
+                if self.poll_timer:
+                    self.poll_timer.cancel()
+                    self.poll_timer = None
+                self.setup_first_links(event)
+            else:
+                self.poll_timer = event.reactor.schedule(0.25, PollTimeout(self))
+            return
+
+        m = event.message
+        self.result.append(m.body)
+        if m.body == 'Send_Short_1':
+            self.send()
+
+    def on_aborted(self, event):
+        self.result.append('Aborted_Delivery')
+        if self.result != self.expected_result:
+            self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.query_conn.close()
+        self.timer.cancel()
+
+    def run(self):
+        container = Container(self)
+        container.container_id="LRC"
+        container.run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org