You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/11/15 15:18:03 UTC
[qpid-dispatch] branch master updated: DISPATCH-1481: Multicast
links should block until a consumer is present
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new dee4313 DISPATCH-1481: Multicast links should block until a consumer is present
dee4313 is described below
commit dee4313a30b4e275c0be589ca346780bcd5c8712
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Nov 7 09:03:43 2019 -0500
DISPATCH-1481: Multicast links should block until a consumer is present
This fixes an infinite loop bug which occurs when using multicast
addresses on inbound waypoint (broker) links.
This closes #618
---
docs/books/modules/user-guide/routing.adoc | 8 ++
.../modules/address_lookup_client/lookup_client.c | 2 -
src/router_core/transfer.c | 10 +-
tests/system_tests_global_delivery_counts.py | 7 +-
tests/system_tests_multicast.py | 105 +++++++++++++++++----
tests/system_tests_one_router.py | 84 +----------------
6 files changed, 110 insertions(+), 106 deletions(-)
diff --git a/docs/books/modules/user-guide/routing.adoc b/docs/books/modules/user-guide/routing.adoc
index c0d6577..5a21834 100644
--- a/docs/books/modules/user-guide/routing.adoc
+++ b/docs/books/modules/user-guide/routing.adoc
@@ -116,6 +116,14 @@ This involves configuring an address with a routing pattern. All messages sent t
+
This involves configuring a waypoint address to identify the broker queue and then connecting the router to the broker. All messages sent to the waypoint address will be routed to the broker queue.
+=== Message Routing Flow Control
+
+{RouterName} uses a _credit-based_ flow control mechanism to ensure that producers can only send messages to a router if at least one consumer is available to receive them. Because {RouterName} does not store messages, this credit-based flow control prevents producers from sending messages when there are no consumers present.
+
+A client wishing to send a message to the router must wait until the router has provided it with credit. Attempting to publish a message without credit available will cause the client to block. Once credit is made available, the client will unblock, and the message will be sent to the router.
+
+NOTE: Most AMQP client libraries enable you to determine the amount of credit available to a producer. For more information, consult your client's documentation.
+
=== Addresses
Addresses determine how messages flow through your router network. An address designates an endpoint in your messaging network, such as:
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 6bed399..ba4ffd8 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -406,14 +406,12 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core,
// Issue the initial credit only if one of the following
// holds:
// - there are destinations for the address
- // - if the address treatment is multicast
// - the address is that of an exchange (no subscribers allowed)
//
if (dir == QD_INCOMING
&& (DEQ_SIZE(addr->subscriptions)
|| DEQ_SIZE(addr->rlinks)
|| qd_bitmask_cardinality(addr->rnodes)
- || qdr_is_addr_treatment_multicast(addr)
|| !!addr->exchange
|| (!!addr->fallback
&& (DEQ_SIZE(addr->fallback->subscriptions)
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 58ad85e..56a064d 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -453,10 +453,9 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
//
// Credit update: since this is a targeted link to an address for which
// there is no consumers then do not replenish credit - drain instead.
- // However edge and multicast are special snowflakes. We cannot block
- // credit on either (see DISPATCH-779 - mcast is effectively a topic)
+ // However edge is a special snowflake which always has credit available.
//
- if (link->edge || qdr_is_addr_treatment_multicast(addr)) {
+ if (link->edge) {
qdr_link_issue_credit_CT(core, link, 1, false);
} else {
qdr_link_issue_credit_CT(core, link, 0, true); // drain
@@ -573,6 +572,11 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
//
if (!dlv->settled)
qdr_delivery_release_CT(core, dlv);
+ else {
+ link->dropped_presettled_deliveries++;
+ if (dlv_link->link_type == QD_LINK_ENDPOINT)
+ core->dropped_presettled_deliveries++;
+ }
//
// Decrementing the delivery ref count for the action
diff --git a/tests/system_tests_global_delivery_counts.py b/tests/system_tests_global_delivery_counts.py
index ff6475a..a20652b 100644
--- a/tests/system_tests_global_delivery_counts.py
+++ b/tests/system_tests_global_delivery_counts.py
@@ -784,13 +784,16 @@ class ReleasedDroppedPresettledCountTest(MessagingHandler):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.sender_conn = event.container.connect(self.sender_addr)
- # Note that there are no receivers. All messages sent to this address will be dropped
- self.sender = event.container.create_sender(self.sender_conn, self.dest)
+ # Note that this is an anonymous link which will be granted credit w/o
+ # blocking for consumers. Therefore all messages sent to this address
+ # will be dropped
+ self.sender = event.container.create_sender(self.sender_conn)
def on_sendable(self, event):
# We are sending a total of 20 deliveries. 10 unsettled and 10 pre-settled to a multicast address
if self.n_sent < self.num_messages:
msg = Message(body={'number': self.n_sent})
+ msg.address = self.dest
dlv = self.sender.send(msg)
if self.n_sent < 10:
dlv.settle()
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index d331c01..1a34cd3 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -38,6 +38,7 @@ from proton import Message
from proton import Delivery
from qpid_dispatch.management.client import Node
from system_test import AsyncTestSender
+from system_test import AsyncTestReceiver
from system_test import TestCase
from system_test import Qdrouterd
from system_test import main_module
@@ -395,24 +396,50 @@ class MulticastLinearTest(TestCase):
test.run()
self.assertEqual(None, test.error)
- def test_90_no_subscribers(self):
- # DISPATCH-779: ensure credit is available even if there are no
- # subscribers
- tx = AsyncTestSender(address=self.EA1.listener,
- target='multicast/no/subscriber',
- count=LINK_CAPACITY * 2,
- presettle=True,
- container_id="test_90_presettled")
- tx.wait()
- self.assertEqual(None, tx.error)
+ def test_90_credit_no_subscribers(self):
+ """
+ Verify that multicast senders are blocked until a consumer is present.
+ """
+ test = MulticastCreditBlocked(address=self.EA1.listener,
+ target='multicast/no/subscriber1')
+
+ test.run()
+ self.assertEqual(None, test.error)
- tx = AsyncTestSender(address=self.EA1.listener,
- target='multicast/no/subscriber',
- count=LINK_CAPACITY * 2,
- presettle=False,
- container_id="test_90_unsettled")
+ test = MulticastCreditBlocked(address=self.INT_A.listener,
+ target='multicast/no/subscriber2')
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_91_anonymous_sender(self):
+ """
+ Verify that senders over anonymous links do not block waiting for
+ consumers.
+ """
+
+ # no receiver - should not block, return RELEASED
+ msg = Message(body="test_100_anonymous_sender")
+ msg.address = "multicast/test_100_anonymous_sender"
+ tx = AsyncTestSender(address=self.INT_B.listener,
+ count=5,
+ target=None,
+ message=msg,
+ container_id="test_100_anonymous_sender")
+ tx.wait()
+ self.assertEqual(5, tx.released)
+
+ # now add a receiver:
+ rx = AsyncTestReceiver(address=self.INT_A.listener,
+ source=msg.address)
+ self.INT_B.wait_address(msg.address)
+ tx = AsyncTestSender(address=self.INT_B.listener,
+ count=5,
+ target=None,
+ message=msg,
+ container_id="test_100_anonymous_sender")
tx.wait()
- self.assertEqual(500, tx.released)
+ self.assertEqual(5, tx.accepted)
+ rx.stop()
def test_999_check_for_leaks(self):
self._check_for_leaks()
@@ -911,5 +938,51 @@ class MulticastUnsettled3AckMA(MulticastUnsettled3Ack):
super(MulticastUnsettled3AckMA, self).on_message(event)
+class MulticastCreditBlocked(MessagingHandler):
+ """
+ Ensure that credit is not provided when there are no consumers present.
+ This client connects to 'address' and creates a sender to 'target'. Once
+ the sending link has opened a short timer is started. It is expected that
+ on_sendable() is NOT invoked before the timer expires.
+ """
+ def __init__(self, address, target=None, timeout=2, **handler_kwargs):
+ super(MulticastCreditBlocked, self).__init__(**handler_kwargs)
+ self.target = target
+ self.address = address
+ self.time_out = timeout
+
+ self.conn = None
+ self.sender = None
+ self.timer = None
+ self.error = "Timeout NOT triggered as expected!"
+
+ def done(self):
+ # stop the reactor and clean up the test
+ if self.timer:
+ self.timer.cancel()
+ if self.conn:
+ self.conn.close()
+
+ def timeout(self):
+ self.error = None
+ self.done()
+
+ def on_start(self, event):
+ self.conn = event.container.connect(self.address)
+ self.sender = event.container.create_sender(self.conn,
+ target=self.target,
+ name="McastBlocked")
+
+ def on_link_opened(self, event):
+ self.timer = event.reactor.schedule(self.time_out, TestTimeout(self))
+
+ def on_sendable(self, event):
+ self.error = "Unexpected call to on_sendable()!"
+ self.done()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 945b871..4a80c34 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -306,11 +306,6 @@ class OneRouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
- def test_26_multicast_no_receiver(self):
- test = MulticastUnsettledNoReceiverTest(self.address)
- test.run()
- self.assertEqual(None, test.error)
-
def test_27_released_vs_modified(self):
test = ReleasedVsModifiedTest(self.address)
test.run()
@@ -2611,84 +2606,6 @@ class MultiframePresettledTest(MessagingHandler):
Container(self).run()
-class MulticastUnsettledNoReceiverTest(MessagingHandler):
- """
- Creates a sender to a multicast address. Router provides a credit of 'linkCapacity' to this sender even
- if there are no receivers (The sender should be able to send messages to multicast addresses even when no receiver
- is connected). The router will send a disposition of released back to the sender and will end up dropping
- these messages since there is no receiver.
- """
- def __init__(self, address):
- super(MulticastUnsettledNoReceiverTest, self).__init__()
- self.address = address
- self.dest = "multicast.MulticastNoReceiverTest"
- self.error = None
- self.n_sent = 0
- self.max_send = 250
- self.n_released = 0
- self.n_accepted = 0
- self.timer = None
- self.conn = None
- self.sender = None
- self.query_sent = False
-
- def timeout(self):
- self.error = "Timeout expired: n_sent=%d n_released=%d n_accepted=%d" % \
- (self.n_sent, self.n_released, self.n_accepted)
- self.conn.close()
-
- def check_if_done(self):
- if self.n_accepted > 0:
- self.error = "Messages should not be accepted as there are no receivers"
- self.timer.cancel()
- self.conn.close()
- elif self.max_send == self.n_released and not self.query_sent:
- self.mgmt_tx.send(self.proxy.query_links())
- self.query_sent = True
-
- def on_start(self, event):
- self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
- self.conn = event.container.connect(self.address)
- self.mgmt_rx = event.container.create_receiver(self.conn, dynamic=True)
- self.mgmt_tx = event.container.create_sender(self.conn, '$management')
-
- def on_link_opened(self, event):
- if event.receiver == self.mgmt_rx:
- self.proxy = RouterProxy(self.mgmt_rx.remote_source.address)
- self.sender = event.container.create_sender(self.conn, self.dest)
-
- def on_message(self, event):
- if event.receiver == self.mgmt_rx:
- results = self.proxy.response(event.message)
- for link in results:
- if link.linkDir == 'in' and link.owningAddr == 'M0' + self.dest:
- if link.releasedCount != self.max_send:
- self.error = "Released count expected %d, got %d" % (self.max_send, link.droppedPresettledCount)
- self.timer.cancel()
- self.conn.close()
-
- def on_sendable(self, event):
- if event.sender == self.sender:
- if self.n_sent >= self.max_send:
- return
- self.n_sent += 1
- msg = Message(body=self.n_sent)
- event.sender.send(msg)
-
- def on_accepted(self, event):
- if event.sender == self.sender:
- self.n_accepted += 1
- self.check_if_done()
-
- def on_released(self, event):
- if event.sender == self.sender:
- self.n_released += 1
- self.check_if_done()
-
- def run(self):
- Container(self).run()
-
-
class UptimeLastDlvChecker(object):
def __init__(self, parent, lastDlv=None, uptime=0):
self.parent = parent
@@ -2784,6 +2701,7 @@ class ConnectionUptimeLastDlvTest(MessagingHandler):
container.container_id = self.container_id
container.run()
+
class AnonymousSenderNoRecvLargeMessagedTest(MessagingHandler):
def __init__(self, address):
super(AnonymousSenderNoRecvLargeMessagedTest, self).__init__(auto_accept=False)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org