You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/11/15 15:18:03 UTC

[qpid-dispatch] branch master updated: DISPATCH-1481: Multicast links should block until a consumer is present

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new dee4313  DISPATCH-1481: Multicast links should block until a consumer is present
dee4313 is described below

commit dee4313a30b4e275c0be589ca346780bcd5c8712
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Nov 7 09:03:43 2019 -0500

    DISPATCH-1481: Multicast links should block until a consumer is present
    
    This fixes an infinite loop bug which occurs when using multicast
    addresses on inbound waypoint (broker) links.
    
    This closes #618
---
 docs/books/modules/user-guide/routing.adoc         |   8 ++
 .../modules/address_lookup_client/lookup_client.c  |   2 -
 src/router_core/transfer.c                         |  10 +-
 tests/system_tests_global_delivery_counts.py       |   7 +-
 tests/system_tests_multicast.py                    | 105 +++++++++++++++++----
 tests/system_tests_one_router.py                   |  84 +----------------
 6 files changed, 110 insertions(+), 106 deletions(-)

diff --git a/docs/books/modules/user-guide/routing.adoc b/docs/books/modules/user-guide/routing.adoc
index c0d6577..5a21834 100644
--- a/docs/books/modules/user-guide/routing.adoc
+++ b/docs/books/modules/user-guide/routing.adoc
@@ -116,6 +116,14 @@ This involves configuring an address with a routing pattern. All messages sent t
 +
 This involves configuring a waypoint address to identify the broker queue and then connecting the router to the broker. All messages sent to the waypoint address will be routed to the broker queue.
 
+=== Message Routing Flow Control
+
+{RouterName} uses a _credit-based_ flow control mechanism to ensure that producers can only send messages to a router if at least one consumer is available to receive them. Because {RouterName} does not store messages, this credit-based flow control prevents producers from sending messages when there are no consumers present.
+
+A client wishing to send a message to the router must wait until the router has provided it with credit. Attempting to publish a message without credit available will cause the client to block. Once credit is made available, the client will unblock, and the message will be sent to the router.
+
+NOTE: Most AMQP client libraries enable you to determine the amount of credit available to a producer. For more information, consult your client's documentation.
+
 === Addresses
 
 Addresses determine how messages flow through your router network. An address designates an endpoint in your messaging network, such as:
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 6bed399..ba4ffd8 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -406,14 +406,12 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t       *core,
         // Issue the initial credit only if one of the following
         // holds:
         // - there are destinations for the address
-        // - if the address treatment is multicast
         // - the address is that of an exchange (no subscribers allowed)
         //
         if (dir == QD_INCOMING
             && (DEQ_SIZE(addr->subscriptions)
                 || DEQ_SIZE(addr->rlinks)
                 || qd_bitmask_cardinality(addr->rnodes)
-                || qdr_is_addr_treatment_multicast(addr)
                 || !!addr->exchange
                 || (!!addr->fallback
                     && (DEQ_SIZE(addr->fallback->subscriptions)
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 58ad85e..56a064d 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -453,10 +453,9 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         //
         // Credit update: since this is a targeted link to an address for which
         // there is no consumers then do not replenish credit - drain instead.
-        // However edge and multicast are special snowflakes.  We cannot block
-        // credit on either (see DISPATCH-779 - mcast is effectively a topic)
+        // However edge is a special snowflake which always has credit available.
         //
-        if (link->edge || qdr_is_addr_treatment_multicast(addr)) {
+        if (link->edge) {
             qdr_link_issue_credit_CT(core, link, 1, false);
         } else {
             qdr_link_issue_credit_CT(core, link, 0, true);  // drain
@@ -573,6 +572,11 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         //
         if (!dlv->settled)
             qdr_delivery_release_CT(core, dlv);
+        else {
+            link->dropped_presettled_deliveries++;
+            if (dlv_link->link_type == QD_LINK_ENDPOINT)
+                core->dropped_presettled_deliveries++;
+        }
 
         //
         // Decrementing the delivery ref count for the action
diff --git a/tests/system_tests_global_delivery_counts.py b/tests/system_tests_global_delivery_counts.py
index ff6475a..a20652b 100644
--- a/tests/system_tests_global_delivery_counts.py
+++ b/tests/system_tests_global_delivery_counts.py
@@ -784,13 +784,16 @@ class ReleasedDroppedPresettledCountTest(MessagingHandler):
         self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.sender_conn = event.container.connect(self.sender_addr)
 
-        # Note that there are no receivers. All messages sent to this address will be dropped
-        self.sender = event.container.create_sender(self.sender_conn, self.dest)
+        # Note that this is an anonymous link which will be granted credit w/o
+        # blocking for consumers.  Therefore all messages sent to this address
+        # will be dropped
+        self.sender = event.container.create_sender(self.sender_conn)
 
     def on_sendable(self, event):
         # We are sending a total of 20 deliveries. 10 unsettled and 10 pre-settled to a multicast address
         if self.n_sent < self.num_messages:
             msg = Message(body={'number': self.n_sent})
+            msg.address = self.dest
             dlv = self.sender.send(msg)
             if self.n_sent < 10:
                 dlv.settle()
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index d331c01..1a34cd3 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -38,6 +38,7 @@ from proton import Message
 from proton import Delivery
 from qpid_dispatch.management.client import Node
 from system_test import AsyncTestSender
+from system_test import AsyncTestReceiver
 from system_test import TestCase
 from system_test import Qdrouterd
 from system_test import main_module
@@ -395,24 +396,50 @@ class MulticastLinearTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
-    def test_90_no_subscribers(self):
-        # DISPATCH-779: ensure credit is available even if there are no
-        # subscribers
-        tx = AsyncTestSender(address=self.EA1.listener,
-                             target='multicast/no/subscriber',
-                             count=LINK_CAPACITY * 2,
-                             presettle=True,
-                             container_id="test_90_presettled")
-        tx.wait()
-        self.assertEqual(None, tx.error)
+    def test_90_credit_no_subscribers(self):
+        """
+        Verify that multicast senders are blocked until a consumer is present.
+        """
+        test = MulticastCreditBlocked(address=self.EA1.listener,
+                                      target='multicast/no/subscriber1')
+
+        test.run()
+        self.assertEqual(None, test.error)
 
-        tx = AsyncTestSender(address=self.EA1.listener,
-                             target='multicast/no/subscriber',
-                             count=LINK_CAPACITY * 2,
-                             presettle=False,
-                             container_id="test_90_unsettled")
+        test = MulticastCreditBlocked(address=self.INT_A.listener,
+                                      target='multicast/no/subscriber2')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_91_anonymous_sender(self):
+        """
+        Verify that senders over anonymous links do not block waiting for
+        consumers.
+        """
+
+        # no receiver - should not block, return RELEASED
+        msg = Message(body="test_100_anonymous_sender")
+        msg.address = "multicast/test_100_anonymous_sender"
+        tx = AsyncTestSender(address=self.INT_B.listener,
+                             count=5,
+                             target=None,
+                             message=msg,
+                             container_id="test_100_anonymous_sender")
+        tx.wait()
+        self.assertEqual(5, tx.released)
+
+        # now add a receiver:
+        rx = AsyncTestReceiver(address=self.INT_A.listener,
+                               source=msg.address)
+        self.INT_B.wait_address(msg.address)
+        tx = AsyncTestSender(address=self.INT_B.listener,
+                             count=5,
+                             target=None,
+                             message=msg,
+                             container_id="test_100_anonymous_sender")
         tx.wait()
-        self.assertEqual(500, tx.released)
+        self.assertEqual(5, tx.accepted)
+        rx.stop()
 
     def test_999_check_for_leaks(self):
         self._check_for_leaks()
@@ -911,5 +938,51 @@ class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
         super(MulticastUnsettled3AckMA, self).on_message(event)
 
 
+class MulticastCreditBlocked(MessagingHandler):
+    """
+    Ensure that credit is not provided when there are no consumers present.
+    This client connects to 'address' and creates a sender to 'target'.  Once
+    the sending link has opened a short timer is started.  It is expected that
+    on_sendable() is NOT invoked before the timer expires.
+    """
+    def __init__(self, address, target=None, timeout=2, **handler_kwargs):
+        super(MulticastCreditBlocked, self).__init__(**handler_kwargs)
+        self.target = target
+        self.address = address
+        self.time_out = timeout
+
+        self.conn = None
+        self.sender = None
+        self.timer = None
+        self.error = "Timeout NOT triggered as expected!"
+
+    def done(self):
+        # stop the reactor and clean up the test
+        if self.timer:
+            self.timer.cancel()
+        if self.conn:
+            self.conn.close()
+
+    def timeout(self):
+        self.error = None
+        self.done()
+
+    def on_start(self, event):
+        self.conn = event.container.connect(self.address)
+        self.sender = event.container.create_sender(self.conn,
+                                                    target=self.target,
+                                                    name="McastBlocked")
+
+    def on_link_opened(self, event):
+        self.timer = event.reactor.schedule(self.time_out, TestTimeout(self))
+
+    def on_sendable(self, event):
+        self.error = "Unexpected call to on_sendable()!"
+        self.done()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 945b871..4a80c34 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -306,11 +306,6 @@ class OneRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
-    def test_26_multicast_no_receiver(self):
-        test = MulticastUnsettledNoReceiverTest(self.address)
-        test.run()
-        self.assertEqual(None, test.error)
-
     def test_27_released_vs_modified(self):
         test = ReleasedVsModifiedTest(self.address)
         test.run()
@@ -2611,84 +2606,6 @@ class MultiframePresettledTest(MessagingHandler):
         Container(self).run()
 
 
-class MulticastUnsettledNoReceiverTest(MessagingHandler):
-    """
-    Creates a sender to a multicast address. Router provides a credit of 'linkCapacity' to this sender even
-    if there are no receivers (The sender should be able to send messages to multicast addresses even when no receiver
-    is connected). The router will send a disposition of released back to the sender and will end up dropping
-    these messages since there is no receiver.
-    """
-    def __init__(self, address):
-        super(MulticastUnsettledNoReceiverTest, self).__init__()
-        self.address = address
-        self.dest = "multicast.MulticastNoReceiverTest"
-        self.error = None
-        self.n_sent = 0
-        self.max_send = 250
-        self.n_released = 0
-        self.n_accepted = 0
-        self.timer = None
-        self.conn = None
-        self.sender = None
-        self.query_sent = False
-
-    def timeout(self):
-        self.error = "Timeout expired: n_sent=%d n_released=%d n_accepted=%d" % \
-                     (self.n_sent, self.n_released, self.n_accepted)
-        self.conn.close()
-
-    def check_if_done(self):
-        if self.n_accepted > 0:
-            self.error = "Messages should not be accepted as there are no receivers"
-            self.timer.cancel()
-            self.conn.close()
-        elif self.max_send == self.n_released and not self.query_sent:
-            self.mgmt_tx.send(self.proxy.query_links())
-            self.query_sent = True
-
-    def on_start(self, event):
-        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.conn = event.container.connect(self.address)
-        self.mgmt_rx = event.container.create_receiver(self.conn, dynamic=True)
-        self.mgmt_tx = event.container.create_sender(self.conn, '$management')
-
-    def on_link_opened(self, event):
-        if event.receiver == self.mgmt_rx:
-            self.proxy  = RouterProxy(self.mgmt_rx.remote_source.address)
-            self.sender = event.container.create_sender(self.conn, self.dest)
-
-    def on_message(self, event):
-        if event.receiver == self.mgmt_rx:
-            results = self.proxy.response(event.message)
-            for link in results:
-                if link.linkDir == 'in' and link.owningAddr == 'M0' + self.dest:
-                    if link.releasedCount != self.max_send:
-                        self.error = "Released count expected %d, got %d" % (self.max_send, link.droppedPresettledCount)
-            self.timer.cancel()
-            self.conn.close()
-
-    def on_sendable(self, event):
-        if event.sender == self.sender:
-            if self.n_sent >= self.max_send:
-                return
-            self.n_sent += 1
-            msg = Message(body=self.n_sent)
-            event.sender.send(msg)
-
-    def on_accepted(self, event):
-        if event.sender == self.sender:
-            self.n_accepted += 1
-        self.check_if_done()
-
-    def on_released(self, event):
-        if event.sender == self.sender:
-            self.n_released += 1
-        self.check_if_done()
-
-    def run(self):
-        Container(self).run()
-
-
 class UptimeLastDlvChecker(object):
     def __init__(self, parent, lastDlv=None, uptime=0):
         self.parent = parent
@@ -2784,6 +2701,7 @@ class ConnectionUptimeLastDlvTest(MessagingHandler):
         container.container_id = self.container_id
         container.run()
 
+
 class AnonymousSenderNoRecvLargeMessagedTest(MessagingHandler):
     def __init__(self, address):
         super(AnonymousSenderNoRecvLargeMessagedTest, self).__init__(auto_accept=False)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org