You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/12/18 15:23:44 UTC
[qpid-dispatch] branch master updated: DISPATCH-1213 - Prevent
stalling of presettled large message senders by calling the AMQP_rx_handler
which start emptying proton buffers again. This closes #425.
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new ee5c5cb DISPATCH-1213 - Prevent stalling of presettled large message senders by calling the AMQP_rx_handler which start emptying proton buffers again. This closes #425.
ee5c5cb is described below
commit ee5c5cb286d27ca7c7785097078764ac2ee8af28
Author: Ganesh Murthy <gm...@redhat.com>
AuthorDate: Thu Dec 6 15:00:30 2018 -0500
DISPATCH-1213 - Prevent stalling of presettled large message senders by calling the AMQP_rx_handler which start emptying proton buffers again. This closes #425.
---
include/qpid/dispatch/router_core.h | 1 +
src/router_core/transfer.c | 33 +++++++++--
src/router_node.c | 4 +-
tests/system_tests_one_router.py | 108 ++++++++++++++++++++++++++++++++++++
4 files changed, 140 insertions(+), 6 deletions(-)
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 136ad6a..a86428a 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -693,6 +693,7 @@ void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char
void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length);
qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index c4b90c9..49c1442 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -434,6 +434,11 @@ qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery)
return delivery->error;
}
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
+{
+ return delivery->presettled;
+}
+
//==================================================================================
// In-Thread Functions
@@ -441,11 +446,24 @@ qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery)
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
- bool push = dlv->disposition != PN_RELEASED;
+ bool push = false;
+ bool moved = false;
- dlv->disposition = PN_RELEASED;
- dlv->settled = true;
- bool moved = qdr_delivery_settled_CT(core, dlv);
+ if (dlv->presettled) {
+ //
+ // The delivery is presettled. We simply want to call CORE_delivery_update which in turn will
+ // restart stalled links if the q2_holdoff has been hit.
+ // For single frame presettled deliveries, calling CORE_delivery_update does not do anything.
+ //
+ push = true;
+ }
+ else {
+ push = dlv->disposition != PN_RELEASED;
+ dlv->disposition = PN_RELEASED;
+ dlv->settled = true;
+ moved = qdr_delivery_settled_CT(core, dlv);
+
+ }
if (push || moved)
qdr_delivery_push_CT(core, dlv);
@@ -855,6 +873,13 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
link->dropped_presettled_deliveries++;
if (dlv->link->link_type == QD_LINK_ENDPOINT)
core->dropped_presettled_deliveries++;
+
+ //
+ // The delivery is pre-settled. Call the qdr_delivery_release_CT so if this delivery is multi-frame
+ // we can restart receiving the delivery in case it is stalled. Note that messages will not
+ // *actually* be released because these are presettled messages.
+ //
+ qdr_delivery_release_CT(core, dlv);
} else {
qdr_delivery_release_CT(core, dlv);
diff --git a/src/router_node.c b/src/router_node.c
index 3c304b5..7ab9d0a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1605,7 +1605,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
//
// If the disposition has changed, update the proton delivery.
//
- if (disp != pn_delivery_remote_state(pnd)) {
+ if (disp != pn_delivery_remote_state(pnd) && !qdr_delivery_presettled(dlv)) {
qd_message_t *msg = qdr_delivery_message(dlv);
if (disp == PN_MODIFIED)
@@ -1635,7 +1635,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
// If the delivery is settled and it is still arriving, defer the settlement
// until the content has fully arrived.
//
- if (disp == PN_RELEASED || disp == PN_MODIFIED) {
+ if (disp == PN_RELEASED || disp == PN_MODIFIED || qdr_delivery_presettled(dlv)) {
//
// If the disposition is RELEASED or MODIFIED, set the message to discard
// and if it is blocked by holdoff, get the link rolling again.
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index fbddc49..3c828d5 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -427,6 +427,16 @@ class OneRouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_43_dropped_presettled_receiver_stops(self):
+ local_node = Node.connect(self.address, timeout=TIMEOUT)
+ res = local_node.query('org.apache.qpid.dispatch.router')
+ deliveries_ingress = res.attribute_names.index(
+ 'deliveriesIngress')
+ ingress_delivery_count = res.results[0][deliveries_ingress]
+ test = DroppedPresettledTest(self.address, 200, ingress_delivery_count)
+ test.run()
+ self.assertEqual(None, test.error)
+
class Entity(object):
def __init__(self, status_code, status_description, attrs):
@@ -1006,6 +1016,104 @@ class PreSettled ( MessagingHandler ) :
self.bail ( None )
+class PresettledCustomTimeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
+ res = local_node.query('org.apache.qpid.dispatch.router')
+ deliveries_ingress = res.attribute_names.index(
+ 'deliveriesIngress')
+ ingress_delivery_count = res.results[0][deliveries_ingress]
+ self.parent.cancel_custom()
+
+ # Without the fix for DISPATCH--1213 the ingress count will be less than
+ # 200 because the sender link has stalled. The q2_holdoff happened
+ # and so all the remaining messages are still in the
+ # proton buffers.
+
+ if ingress_delivery_count - self.parent.begin_ingress_count > self.parent.n_messages:
+ self.parent.bail(None)
+ else:
+ self.parent.bail("Messages sent to the router is %d, "
+ "Messages processed by the router is %d",
+ (self.parent.n_messages,
+ ingress_delivery_count - self.parent.begin_ingress_count))
+
+
+class DroppedPresettledTest(MessagingHandler):
+ def __init__(self, addr, n_messages, begin_ingress_count):
+ super (DroppedPresettledTest, self).__init__()
+ self.addr = addr
+ self.n_messages = n_messages
+ self.sender = None
+ self.receiver = None
+ self.sender_conn = None
+ self.recv_conn = None
+ self.n_sent = 0
+ self.n_received = 0
+ self.error = None
+ self.test_timer = None
+ self.max_receive = 10
+ self.custom_timer = None
+ self.timer = None
+ self.begin_ingress_count = begin_ingress_count
+ self.str1 = "0123456789abcdef"
+ self.msg_str = ""
+ for i in range(8192):
+ self.msg_str += self.str1
+
+ def run (self):
+ Container(self).run()
+
+ def bail(self, travail):
+ self.error = travail
+ self.sender_conn.close()
+ if self.recv_conn:
+ self.recv_conn.close()
+ self.timer.cancel()
+
+ def timeout(self,):
+ self.bail("Timeout Expired: %d messages received, %d expected." %
+ (self.n_received, self.n_messages))
+
+ def on_start (self, event):
+ self.sender_conn = event.container.connect(self.addr)
+ self.recv_conn = event.container.connect(self.addr)
+ self.receiver = event.container.create_receiver(self.recv_conn,
+ "test_43")
+ self.sender = event.container.create_sender(self.sender_conn,
+ "test_43")
+ self.timer = event.reactor.schedule(10, Timeout(self))
+
+ def cancel_custom(self):
+ self.custom_timer.cancel()
+
+ def on_sendable(self, event):
+ while self.n_sent < self.n_messages:
+ msg = Message(id=(self.n_sent + 1),
+ body={'sequence': (self.n_sent + 1),
+ 'msg_str': self.msg_str})
+ # Presettle the delivery.
+ dlv = self.sender.send (msg)
+ dlv.settle()
+ self.n_sent += 1
+
+ def on_message(self, event):
+ self.n_received += 1
+ if self.n_received == self.max_receive:
+ # Receiver bails after receiving max_receive messages.
+ self.receiver.close()
+ self.recv_conn.close()
+
+ # The sender is only sending 200 large messages which is less
+ # that the initial credit of 250 that the router gives.
+ # Lets do a qdstat to find out if all 200 messages is handled
+ # by the router.
+ self.custom_timer = event.reactor.schedule(1,
+ PresettledCustomTimeout(
+ self))
class MulticastUnsettled ( MessagingHandler ) :
def __init__ ( self,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org