You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/10/04 15:48:18 UTC

[qpid-dispatch] branch master updated: DISPATCH-1423: restore original mcast credit replenish behavior

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 366c6e6  DISPATCH-1423: restore original mcast credit replenish behavior
366c6e6 is described below

commit 366c6e61a299a6a672b6ea81974d96f465499e71
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Oct 3 13:14:09 2019 -0400

    DISPATCH-1423: restore original mcast credit replenish behavior
    
    Prior to DISPATCH-1266 credit was issued for multicast senders
    unconditionally.  DISPATCH-1266 changed this to only grant credit when
    subscribers were present.  This broke DISPATCH-779.
    
    This patch fixes this by issuing credit on multicast links regardless
    of the presence of subscribers.  Note well that unsettled mcast
    messages that are successfully forwarded do not have their credit
    replenished until after the receivers settle the message.
    
    This closes #580
---
 src/router_core/transfer.c      | 12 +++++++++---
 tests/system_test.py            | 14 ++++++++++----
 tests/system_tests_multicast.py | 25 ++++++++++++++++++++++++-
 3 files changed, 43 insertions(+), 8 deletions(-)

diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6ee2624..3ee9843 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -449,11 +449,17 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         //
         qdr_delivery_release_CT(core, dlv);
 
-        if (!link->edge) {
+        //
+        // Credit update: since this is a targeted link to an address for which
+        // there is no consumers then do not replenish credit - drain instead.
+        // However edge and multicast are special snowflakes.  We cannot block
+        // credit on either (see DISPATCH-779 - mcast is effectively a topic)
+        //
+        if (link->edge || qdr_is_addr_treatment_multicast(addr)) {
+            qdr_link_issue_credit_CT(core, link, 1, false);
+        } else {
             qdr_link_issue_credit_CT(core, link, 0, true);  // drain
             link->credit_pending++;
-        } else {
-            qdr_link_issue_credit_CT(core, link, 1, false);
         }
 
         qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
diff --git a/tests/system_test.py b/tests/system_test.py
index 6619d0e..e65f920 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -51,6 +51,7 @@ from proton import Delivery
 from proton.handlers import MessagingHandler
 from proton.utils import BlockingConnection
 from proton.reactor import AtLeastOnce, Container
+from proton.reactor import AtMostOnce
 from qpid_dispatch.management.client import Node
 from qpid_dispatch_internal.compat import dict_iteritems
 
@@ -840,12 +841,14 @@ class AsyncTestSender(MessagingHandler):
         def __init__(self, error=None):
             super(AsyncTestSender.TestSenderException, self).__init__(error)
 
-    def __init__(self, address, target, count=1, message=None, container_id=None):
+    def __init__(self, address, target, count=1, message=None,
+                 container_id=None, presettle=False):
         super(AsyncTestSender, self).__init__(auto_accept=False,
                                               auto_settle=False)
         self.address = address
         self.target = target
         self.total = count
+        self.presettle = presettle
         self.accepted = 0
         self.released = 0
         self.modified = 0
@@ -877,19 +880,22 @@ class AsyncTestSender(MessagingHandler):
         self._conn = self._container.connect(self.address)
 
     def on_connection_opened(self, event):
+        option = AtMostOnce if self.presettle else AtLeastOnce
         self._sender = self._container.create_sender(self._conn,
                                                      target=self.target,
-                                                     options=AtLeastOnce())
+                                                     options=option())
 
     def on_sendable(self, event):
         if self.sent < self.total:
             self._sender.send(self._message)
             self.sent += 1
+        self._check_if_done()
 
     def _check_if_done(self):
         done = (self.sent == self.total
-                and (self.accepted + self.released + self.modified
-                     + self.rejected) == self.sent)
+                and (self.presettle
+                     or (self.accepted + self.released + self.modified
+                         + self.rejected == self.sent)))
         if done:
             self._conn.close()
             self._conn = None
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 08bf0cb..13df3aa 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -38,6 +38,7 @@ from proton import Link
 from proton import Message
 from proton import Delivery
 from qpid_dispatch.management.client import Node
+from system_test import AsyncTestSender
 from system_test import TestCase
 from system_test import Qdrouterd
 from system_test import main_module
@@ -46,6 +47,7 @@ from system_test import TestTimeout
 
 
 MAX_FRAME=1023
+LINK_CAPACITY=250
 W_THREADS=2
 LARGE_PAYLOAD = ("X" * MAX_FRAME) * 19
 
@@ -73,7 +75,8 @@ class MulticastLinearTest(TestCase):
                             'workerThreads': W_THREADS}),
                 ('listener', {'role': 'normal',
                               'port': cls.tester.get_port(),
-                              'maxFrameSize': MAX_FRAME}),
+                              'maxFrameSize': MAX_FRAME,
+                              'linkCapacity': LINK_CAPACITY}),
                 ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
             ]
 
@@ -392,6 +395,25 @@ class MulticastLinearTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_90_no_subscribers(self):
+        # DISPATCH-779: ensure credit is available even if there are no
+        # subscribers
+        tx = AsyncTestSender(address=self.EA1.listener,
+                             target='multicast/no/subscriber',
+                             count=LINK_CAPACITY * 2,
+                             presettle=True,
+                             container_id="test_90_presettled")
+        tx.wait()
+        self.assertEqual(None, tx.error)
+
+        tx = AsyncTestSender(address=self.EA1.listener,
+                             target='multicast/no/subscriber',
+                             count=LINK_CAPACITY * 2,
+                             presettle=False,
+                             container_id="test_90_unsettled")
+        tx.wait()
+        self.assertEqual(500, tx.released)
+
     def test_999_check_for_leaks(self):
         self._check_for_leaks()
 
@@ -888,5 +910,6 @@ class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
             return
         super(MulticastUnsettled3AckMA, self).on_message(event)
 
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org