You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/10/18 14:43:30 UTC
[08/15] qpid-dispatch git commit: DISPATCH-829 - Handle truncated
messages on routed links.
DISPATCH-829 - Handle truncated messages on routed links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/653ed7c9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/653ed7c9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/653ed7c9
Branch: refs/heads/master
Commit: 653ed7c9249241ebddf3d21e26d89aaaacc005fa
Parents: 294baf3
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 11 12:39:43 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 18 08:11:40 2017 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 25 ++++++
tests/system_tests_delivery_abort.py | 137 +++++++++++++++++++++++++++++-
2 files changed, 160 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/653ed7c9/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 817978d..c239507 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -764,6 +764,23 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
}
+static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
+{
+ assert(link->link_direction == QD_OUTGOING);
+
+ qdr_connection_t *conn = link->conn;
+
+ sys_mutex_lock(conn->work_lock);
+ qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
+ while (dlv) {
+ if (!qdr_delivery_receive_complete(dlv))
+ qdr_delivery_set_aborted(dlv, true);
+ dlv = DEQ_NEXT(dlv);
+ }
+ sys_mutex_unlock(conn->work_lock);
+}
+
+
static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
//
@@ -1638,6 +1655,14 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
// For routed links, propagate the detach
//
if (link->connected_link) {
+ //
+ // If the connected link is outgoing and there is a delivery on the connected link's undelivered
+ // list that is not receive-complete, we must flag that delivery as aborted or it will forever
+ // block the propagation of the detach.
+ //
+ if (link->connected_link->link_direction == QD_OUTGOING)
+ qdr_link_abort_undelivered_CT(core, link->connected_link);
+
if (dt != QD_LOST)
qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
else {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/653ed7c9/tests/system_tests_delivery_abort.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_delivery_abort.py b/tests/system_tests_delivery_abort.py
index b0869e5..3128086 100644
--- a/tests/system_tests_delivery_abort.py
+++ b/tests/system_tests_delivery_abort.py
@@ -85,6 +85,15 @@ class RouterTest(TestCase):
self.assertEqual(None, test.error)
+ def test_03_link_route_interrupted_stream_one_router(self):
+ test = LinkRouteAbortTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[1],
+ "link.addr_03",
+ self.routers[0].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+
class Entity(object):
def __init__(self, status_code, status_description, attrs):
self.status_code = status_code
@@ -140,6 +149,7 @@ class MessageRouteAbortTest(MessagingHandler):
self.error = None
self.sender1 = None
self.sender2 = None
+ self.sender3 = None
self.receiver = None
self.streaming = False
self.delivery = None
@@ -150,7 +160,8 @@ class MessageRouteAbortTest(MessagingHandler):
self.program = ['Send_Short_1', 'Send_Long_Truncated', 'Send_Short_2', 'Send_Short_3']
self.result = []
self.expected_result = ['Send_Short_1', 'Aborted_Delivery', '2', '2', '2', '2', '2',
- '2', '2', '2', '2', '2', 'Send_Short_2', 'Send_Short_3']
+ '2', '2', '2', '2', '2', 'Send_Short_2', '3', '3', '3', '3',
+ '3', '3', '3', '3', '3', '3', 'Send_Short_3']
def timeout(self):
self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
@@ -193,8 +204,12 @@ class MessageRouteAbortTest(MessagingHandler):
self.sender2.send(m)
self.sender2.close()
elif next_op == 'Send_Short_3':
+ m = Message(body="3")
+ for i in range(10):
+ self.sender3.send(m)
m = Message(body="%s" % next_op)
self.sender3.send(m)
+ self.sender_conn.close()
def on_sendable(self, event):
if event.sender == self.sender1 and self.program[0] == 'Send_Short_1':
@@ -212,7 +227,6 @@ class MessageRouteAbortTest(MessagingHandler):
elif m.body == 'Send_Short_3':
if self.result != self.expected_result:
self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
- self.sender_conn.close()
self.receiver_conn.close()
self.timer.cancel()
@@ -224,5 +238,124 @@ class MessageRouteAbortTest(MessagingHandler):
Container(self).run()
+class LinkRouteAbortTest(MessagingHandler):
+ def __init__(self, sender_host, receiver_host, address, query_host):
+ super(LinkRouteAbortTest, self).__init__()
+ self.sender_host = sender_host
+ self.receiver_host = receiver_host
+ self.address = address
+ self.query_host = query_host
+
+ self.sender_conn = None
+ self.receiver_conn = None
+ self.query_conn = None
+ self.error = None
+ self.sender1 = None
+ self.receiver = None
+ self.poll_timer = None
+ self.streaming = False
+ self.delivery = None
+ self.data = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+ self.long_data = ""
+
+ self.sent_stream = 0
+ self.program = ['Send_Short_1', 'Send_Long_Truncated']
+ self.result = []
+ self.expected_result = ['Send_Short_1', 'Aborted_Delivery']
+
+ def timeout(self):
+ self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.query_conn.close()
+ if self.poll_timer:
+ self.poll_timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.receiver_conn = event.container.connect(self.receiver_host)
+ self.query_conn = event.container.connect(self.query_host)
+ self.reply_receiver = event.container.create_receiver(self.query_conn, dynamic=True)
+ self.agent_sender = event.container.create_sender(self.query_conn, "$management")
+
+ def setup_first_links(self, event):
+ self.sender1 = event.container.create_sender(self.sender_conn, self.address, name="S1")
+
+ def stream(self):
+ self.sender1.stream(self.long_data)
+ self.sent_stream += len(self.long_data)
+ if self.sent_stream >= 100000:
+ self.streaming = False
+ self.sender1.close()
+
+ def send(self):
+ next_op = self.program.pop(0) if len(self.program) > 0 else None
+ if next_op == 'Send_Short_1':
+ m = Message(body="%s" % next_op)
+ self.sender1.send(m)
+ elif next_op == 'Send_Long_Truncated':
+ for i in range(100):
+ self.long_data += self.data
+ self.delivery = self.sender1.delivery(self.sender1.delivery_tag())
+ self.streaming = True
+ self.stream()
+
+ def poll_timeout(self):
+ self.poll()
+
+ def poll(self):
+ request = self.proxy.read_address('Clink')
+ self.agent_sender.send(request)
+
+ def on_sendable(self, event):
+ if event.sender == self.sender1 and len(self.program) > 0 and self.program[0] == 'Send_Short_1':
+ self.send()
+ if event.sender == self.sender1 and self.streaming:
+ self.stream()
+
+ def on_link_opening(self, event):
+ if event.receiver:
+ self.receiver = event.receiver
+ event.receiver.target.address = self.address
+ event.receiver.open()
+
+ def on_link_opened(self, event):
+ if event.receiver == self.reply_receiver:
+ self.proxy = RouterProxy(self.reply_receiver.remote_source.address)
+ self.poll()
+
+ def on_message(self, event):
+ if event.receiver == self.reply_receiver:
+ response = self.proxy.response(event.message)
+ if response.status_code == 200 and (response.remoteCount + response.containerCount) > 0:
+ if self.poll_timer:
+ self.poll_timer.cancel()
+ self.poll_timer = None
+ self.setup_first_links(event)
+ else:
+ self.poll_timer = event.reactor.schedule(0.25, PollTimeout(self))
+ return
+
+ m = event.message
+ self.result.append(m.body)
+ if m.body == 'Send_Short_1':
+ self.send()
+
+ def on_aborted(self, event):
+ self.result.append('Aborted_Delivery')
+ if self.result != self.expected_result:
+ self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.query_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ container = Container(self)
+ container.container_id="LRC"
+ container.run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org