You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/06/28 13:31:11 UTC
qpid-dispatch git commit: DISPATCH-1045 - Release the delivery only
after the entire message has been received
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 0f8fb609e -> 5d7304f2c
DISPATCH-1045 - Release the delivery only after the entire message has been received
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5d7304f2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5d7304f2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5d7304f2
Branch: refs/heads/master
Commit: 5d7304f2c4caf206c9304fcbd22be0a972116e3d
Parents: 0f8fb60
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Jun 22 16:01:51 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Jun 28 09:22:49 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 1 +
src/router_core/transfer.c | 41 ++++++++++++++++++-----
src/router_node.c | 57 +++++++++++++++++++++++---------
tests/system_tests_one_router.py | 51 ++++++++++++++++++++++++++--
4 files changed, 124 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 17d3002..e0a1f41 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -660,6 +660,7 @@ bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted);
bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7729a8e..5ee4ae3 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -58,6 +58,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
dlv->link_exclusion = link_exclusion;
dlv->ingress_index = ingress_index;
dlv->error = 0;
+ dlv->disposition = 0;
qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver - protect returned value");
@@ -86,6 +87,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
dlv->link_exclusion = link_exclusion;
dlv->ingress_index = ingress_index;
dlv->error = 0;
+ dlv->disposition = 0;
qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver_to - protect returned value");
@@ -108,11 +110,12 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->link = link;
- dlv->msg = msg;
- dlv->settled = settled;
- dlv->presettled = settled;
- dlv->error = 0;
+ dlv->link = link;
+ dlv->msg = msg;
+ dlv->settled = settled;
+ dlv->presettled = settled;
+ dlv->error = 0;
+ dlv->disposition = 0;
qdr_delivery_read_extension_state(dlv, disposition, disposition_data, true);
qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
@@ -339,6 +342,13 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
return qd_message_receive_complete(delivery->msg);
}
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
+{
+ if (!delivery)
+ return 0;
+ return delivery->disposition;
+}
+
void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
{
@@ -569,6 +579,7 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
qd_bitmask_free(delivery->link_exclusion);
qdr_error_free(delivery->error);
+
free_qdr_delivery_t(delivery);
}
@@ -812,8 +823,12 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
link->dropped_presettled_deliveries++;
if (dlv->link->link_type == QD_LINK_ENDPOINT)
core->dropped_presettled_deliveries++;
- } else
- qdr_delivery_release_CT(core, dlv);
+ } else {
+ if (more)
+ dlv->disposition = PN_RELEASED;
+ else
+ qdr_delivery_release_CT(core, dlv);
+ }
if (qdr_is_addr_treatment_multicast(link->owning_addr))
qdr_link_issue_credit_CT(core, link, 1, false);
@@ -870,13 +885,21 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
// If the delivery is not settled, release it.
//
if (!dlv->settled) {
- qdr_delivery_release_CT(core, dlv);
//
// Set the discard flag on the message only if the message is not completely received yet.
//
- if (more)
+ if (more) {
+ //
+ // Since more of the messgae is still arriving, we want to wait until after the enter message arrives to release it.
+ // Dont release it now.
+ //
qd_message_set_discard(dlv->msg, true);
+ dlv->disposition = PN_RELEASED;
+ }
+ else {
+ qdr_delivery_release_CT(core, dlv);
+ }
}
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 39da87d..c305e54 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -288,11 +288,6 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
bool receive_complete = qd_message_receive_complete(msg);
if (receive_complete) {
- //
- // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
- //
- pn_link_advance(pn_link);
-
if (!qd_message_aborted(msg)) {
// Since the entire message has been received, we can print out its contents to the log if necessary.
if (cf->log_message) {
@@ -312,12 +307,31 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
pn_link_name(pn_link));
}
+ //
+ // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
+ //
+ pn_link_advance(pn_link);
+
+ //
+ // The entire message has been received but this message needs to be discarded
+ //
+ if (qd_message_is_discard(msg)) {
+ if (qdr_delivery_disposition(delivery) != 0)
+ pn_delivery_update(pnd, qdr_delivery_disposition(delivery));
+ pn_delivery_settle(pnd);
+ qdr_delivery_decref(router->router_core, delivery, "release protection of return from delivery discard");
+ }
+
// Link stalling may have ignored some delivery events.
// If there's another delivery pending then reschedule this.
pn_delivery_t *npnd = pn_link_current(pn_link);
if (npnd) {
qd_connection_invoke_deferred(conn, deferred_AMQP_rx_handler, link);
}
+
+ if (qd_message_is_discard(msg)) {
+ return;
+ }
}
//
@@ -339,14 +353,21 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
// A delivery object was already available via pn_delivery_get_context. This means a qdr_delivery was already created. Use it to continue delivery.
//
if (delivery) {
- qdr_deliver_continue(delivery);
//
- // Settle the proton delivery only if all the data has been received.
+ // Call continue only if the discard flag on the message is not set
+ // We should not continue processing the message after it has been discarded
//
- if (pn_delivery_settled(pnd) && receive_complete) {
- qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
- pn_delivery_settle(pnd);
+ if (!qd_message_is_discard(msg)) {
+ qdr_deliver_continue(delivery);
+
+ //
+ // Settle the proton delivery only if all the data has been received.
+ //
+ if (pn_delivery_settled(pnd) && receive_complete) {
+ qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+ pn_delivery_settle(pnd);
+ }
}
}
else {
@@ -418,11 +439,17 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
}
if (delivery) {
- qdr_deliver_continue(delivery);
- if (receive_complete) {
- if (pn_delivery_settled(pnd)) {
- qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
- pn_delivery_settle(pnd);
+ //
+ // Call continue only if the discard flag on the message is not set
+ // We should not continue processing the message after it has been discarded
+ //
+ if (!qd_message_is_discard(msg)) {
+ qdr_deliver_continue(delivery);
+ if (receive_complete) {
+ if (pn_delivery_settled(pnd)) {
+ qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+ pn_delivery_settle(pnd);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 50b3390..afadf81 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -52,8 +52,6 @@ class MultiTimeout ( object ):
self.parent.timeout ( self.name )
-
-
class OneRouterTest(TestCase):
"""System tests involving a single router"""
@classmethod
@@ -414,6 +412,11 @@ class OneRouterTest(TestCase):
client.connection.close()
+ def test_40_anonymous_sender_no_receiver(self):
+ test = AnonymousSenderNoRecvLargeMessagedTest(self.address)
+ test.run()
+ self.assertEqual(None, test.error)
+
class Entity(object):
def __init__(self, status_code, status_description, attrs):
@@ -2228,6 +2231,7 @@ class MulticastUnsettledTest(MessagingHandler):
def run(self):
Container(self).run()
+
class LargeMessageStreamTest(MessagingHandler):
def __init__(self, address):
super(LargeMessageStreamTest, self).__init__()
@@ -2322,6 +2326,7 @@ class MultiframePresettledTest(MessagingHandler):
def run(self):
Container(self).run()
+
class MulticastUnsettledNoReceiverTest(MessagingHandler):
"""
Creates a sender to a multicast address. Router provides a credit of 'linkCapacity' to this sender even
@@ -2400,6 +2405,48 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler):
Container(self).run()
+class AnonymousSenderNoRecvLargeMessagedTest(MessagingHandler):
+ def __init__(self, address):
+ super(AnonymousSenderNoRecvLargeMessagedTest, self).__init__(auto_accept=False)
+ self.timer = None
+ self.conn = None
+ self.sender = None
+ self.address = address
+ self.released = False
+ self.error = None
+ self.body = ""
+ for i in range(20000):
+ self.body += "0123456789101112131415"
+
+ def timeout(self):
+ self.error = "Timeout Expired:, delivery not released. "
+ self.conn.close()
+
+ def check_if_done(self):
+ if self.released:
+ self.sender.close()
+ self.conn.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.conn = event.container.connect(self.address)
+ # This sender is an anonymous sender
+ self.sender = event.container.create_sender(self.conn)
+
+ def on_sendable(self, event):
+ msg = Message(body=self.body, address="someaddress")
+ # send(msg) calls the stream function which streams data from sender to the router
+ event.sender.send(msg)
+
+ def on_released(self, event):
+ self.released = True
+ self.check_if_done()
+
+ def run(self):
+ Container(self).run()
+
+
class ReleasedVsModifiedTest(MessagingHandler):
def __init__(self, address):
super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org