You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/10/04 15:48:18 UTC
[qpid-dispatch] branch master updated: DISPATCH-1423: restore
original mcast credit replenish behavior
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 366c6e6 DISPATCH-1423: restore original mcast credit replenish behavior
366c6e6 is described below
commit 366c6e61a299a6a672b6ea81974d96f465499e71
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Oct 3 13:14:09 2019 -0400
DISPATCH-1423: restore original mcast credit replenish behavior
Prior to DISPATCH-1266 credit was issued for multicast senders
unconditionally. DISPATCH-1266 changed this to only grant credit when
subscribers were present. This broke DISPATCH-779.
This patch fixes this by issuing credit on multicast links regardless
of the presence of subscribers. Note well that unsettled mcast
messages that are successfully forwarded do not have their credit
replenished until after the receivers settle the message.
This closes #580
---
src/router_core/transfer.c | 12 +++++++++---
tests/system_test.py | 14 ++++++++++----
tests/system_tests_multicast.py | 25 ++++++++++++++++++++++++-
3 files changed, 43 insertions(+), 8 deletions(-)
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6ee2624..3ee9843 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -449,11 +449,17 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
//
qdr_delivery_release_CT(core, dlv);
- if (!link->edge) {
+ //
+ // Credit update: since this is a targeted link to an address for which
+ // there is no consumers then do not replenish credit - drain instead.
+ // However edge and multicast are special snowflakes. We cannot block
+ // credit on either (see DISPATCH-779 - mcast is effectively a topic)
+ //
+ if (link->edge || qdr_is_addr_treatment_multicast(addr)) {
+ qdr_link_issue_credit_CT(core, link, 1, false);
+ } else {
qdr_link_issue_credit_CT(core, link, 0, true); // drain
link->credit_pending++;
- } else {
- qdr_link_issue_credit_CT(core, link, 1, false);
}
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
diff --git a/tests/system_test.py b/tests/system_test.py
index 6619d0e..e65f920 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -51,6 +51,7 @@ from proton import Delivery
from proton.handlers import MessagingHandler
from proton.utils import BlockingConnection
from proton.reactor import AtLeastOnce, Container
+from proton.reactor import AtMostOnce
from qpid_dispatch.management.client import Node
from qpid_dispatch_internal.compat import dict_iteritems
@@ -840,12 +841,14 @@ class AsyncTestSender(MessagingHandler):
def __init__(self, error=None):
super(AsyncTestSender.TestSenderException, self).__init__(error)
- def __init__(self, address, target, count=1, message=None, container_id=None):
+ def __init__(self, address, target, count=1, message=None,
+ container_id=None, presettle=False):
super(AsyncTestSender, self).__init__(auto_accept=False,
auto_settle=False)
self.address = address
self.target = target
self.total = count
+ self.presettle = presettle
self.accepted = 0
self.released = 0
self.modified = 0
@@ -877,19 +880,22 @@ class AsyncTestSender(MessagingHandler):
self._conn = self._container.connect(self.address)
def on_connection_opened(self, event):
+ option = AtMostOnce if self.presettle else AtLeastOnce
self._sender = self._container.create_sender(self._conn,
target=self.target,
- options=AtLeastOnce())
+ options=option())
def on_sendable(self, event):
if self.sent < self.total:
self._sender.send(self._message)
self.sent += 1
+ self._check_if_done()
def _check_if_done(self):
done = (self.sent == self.total
- and (self.accepted + self.released + self.modified
- + self.rejected) == self.sent)
+ and (self.presettle
+ or (self.accepted + self.released + self.modified
+ + self.rejected == self.sent)))
if done:
self._conn.close()
self._conn = None
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 08bf0cb..13df3aa 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -38,6 +38,7 @@ from proton import Link
from proton import Message
from proton import Delivery
from qpid_dispatch.management.client import Node
+from system_test import AsyncTestSender
from system_test import TestCase
from system_test import Qdrouterd
from system_test import main_module
@@ -46,6 +47,7 @@ from system_test import TestTimeout
MAX_FRAME=1023
+LINK_CAPACITY=250
W_THREADS=2
LARGE_PAYLOAD = ("X" * MAX_FRAME) * 19
@@ -73,7 +75,8 @@ class MulticastLinearTest(TestCase):
'workerThreads': W_THREADS}),
('listener', {'role': 'normal',
'port': cls.tester.get_port(),
- 'maxFrameSize': MAX_FRAME}),
+ 'maxFrameSize': MAX_FRAME,
+ 'linkCapacity': LINK_CAPACITY}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]
@@ -392,6 +395,25 @@ class MulticastLinearTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_90_no_subscribers(self):
+ # DISPATCH-779: ensure credit is available even if there are no
+ # subscribers
+ tx = AsyncTestSender(address=self.EA1.listener,
+ target='multicast/no/subscriber',
+ count=LINK_CAPACITY * 2,
+ presettle=True,
+ container_id="test_90_presettled")
+ tx.wait()
+ self.assertEqual(None, tx.error)
+
+ tx = AsyncTestSender(address=self.EA1.listener,
+ target='multicast/no/subscriber',
+ count=LINK_CAPACITY * 2,
+ presettle=False,
+ container_id="test_90_unsettled")
+ tx.wait()
+ self.assertEqual(500, tx.released)
+
def test_999_check_for_leaks(self):
self._check_for_leaks()
@@ -888,5 +910,6 @@ class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
return
super(MulticastUnsettled3AckMA, self).on_message(event)
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org