You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/05/29 20:28:17 UTC
qpid-dispatch git commit: DISPATCH-1012 - For anycast addresses,
deliveries to an address that has no attached destinations shall be
released by the router network and credit not replenished to the sender. Once
a destination becomes reachable, the sende
Repository: qpid-dispatch
Updated Branches:
refs/heads/master c728d7f41 -> ab4421db6
DISPATCH-1012 - For anycast addresses, deliveries to an address that has no attached destinations shall be released by the router network and credit not replenished to the sender. Once a destination becomes reachable, the sender's credit shall be topped back up to the original capacity.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ab4421db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ab4421db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ab4421db
Branch: refs/heads/master
Commit: ab4421db6b1371e5ba902bc441ccc332352e7516
Parents: c728d7f
Author: Ted Ross <tr...@redhat.com>
Authored: Tue May 29 16:25:00 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue May 29 16:25:00 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 2 +
src/router_core/router_core_private.h | 2 +-
src/router_core/transfer.c | 56 ++++++------
tests/system_tests_one_router.py | 132 ++++++++++++++++++-----------
4 files changed, 114 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0e655d3..5fdc3bf 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -442,6 +442,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
strcpy(link->name, name);
link->link_direction = dir;
link->capacity = conn->link_capacity;
+ link->credit_pending = conn->link_capacity;
link->admin_enabled = true;
link->oper_status = QDR_LINK_OPER_DOWN;
@@ -867,6 +868,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->link_type = link_type;
link->link_direction = dir;
link->capacity = conn->link_capacity;
+ link->credit_pending = conn->link_capacity;
link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
link->disambiguated_name = 0;
link->terminus_addr = 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index b3c9798..8017c67 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -398,10 +398,10 @@ struct qdr_link_t {
qdr_link_oper_status_t oper_status;
int capacity;
int credit_to_core; ///< Number of the available credits incrementally given to the core
+ int credit_pending; ///< Number of credits to be issued once consumers are available
bool admin_enabled;
bool strip_annotations_in;
bool strip_annotations_out;
- bool flow_started; ///< for incoming, true iff initial credit has been granted
bool drain_mode;
bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 706a26d..d92cc2b 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -795,32 +795,30 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
//
// We are trying to forward a delivery on an address that has no outbound paths
// AND the incoming link is targeted (not anonymous).
-
- // If the owning_addr is a multicast addr and there are no outbound paths, we will release this delivery
- // and replenish the credit.
-
- // For non multicast addresses, put the delivery on the incoming link's undelivered list. Note that it is safe
- // to do this because the undelivered list will be flushed once the number of
- // paths transitions from zero to one.
//
- // Use the action-reference as the reference for undelivered rather
- // than decrementing and incrementing the delivery ref_count.
+ // We shall release the delivery (it is currently undeliverable). If the distribution is
+ // multicast, we will replenish the credit. If it is anycast, we will allow the credit to
+ // drain.
//
- if (qdr_is_addr_treatment_multicast(link->owning_addr)) {
+ if (dlv->settled) {
+ // Increment the presettled_dropped_deliveries on the in_link
+ link->dropped_presettled_deliveries++;
+ core->dropped_presettled_deliveries++;
+ } else
qdr_delivery_release_CT(core, dlv);
+
+ if (qdr_is_addr_treatment_multicast(link->owning_addr))
qdr_link_issue_credit_CT(core, link, 1, false);
- qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
- if (dlv->settled) {
- // Increment the presettled_dropped_deliveries on the in_link
- link->dropped_presettled_deliveries++;
- core->dropped_presettled_deliveries++;
- }
- }
- else {
- DEQ_INSERT_TAIL(link->undelivered, dlv);
- dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
- qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer: dlv:%lx qdr_link_forward_CT: action-list -> undelivered-list", (long) dlv);
- }
+ else
+ link->credit_pending++;
+
+ //
+ // Set the discard flag on the message only if the message is not completely received yet.
+ //
+ if (!receive_complete)
+ qd_message_set_discard(dlv->msg, true);
+
+ qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
return;
}
@@ -834,7 +832,6 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
if (qdr_connection_route_container(link->conn)) {
addr->deliveries_ingress_route_container++;
core->deliveries_ingress_route_container++;
-
}
}
@@ -1151,7 +1148,8 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
qdr_deliver_continue_peers_CT(core, in_dlv);
- if (qd_message_receive_complete(qdr_delivery_message(in_dlv))) {
+ qd_message_t *msg = qdr_delivery_message(in_dlv);
+ if (qd_message_receive_complete(msg) && !qd_message_is_discard(msg)) {
//
// The entire message has now been received. Check to see if there are in process subscriptions that need to
// receive this message. in process subscriptions, at this time, can deal only with full messages.
@@ -1203,12 +1201,12 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo
bool drain_changed = link->drain_mode |= drain;
link->drain_mode = drain;
+ if (link->credit_pending > 0)
+ link->credit_pending = link->credit_pending > credit ? link->credit_pending - credit : 0;
+
if (!drain_changed && credit == 0)
return;
- if (credit > 0)
- link->flow_started = true;
-
qdr_link_work_t *work = new_qdr_link_work_t();
ZERO(work);
@@ -1271,8 +1269,8 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
//
// Issue credit to stalled links
//
- if (!link->flow_started)
- qdr_link_issue_credit_CT(core, link, link->capacity, false);
+ if (link->credit_pending > 0)
+ qdr_link_issue_credit_CT(core, link, link->credit_pending, false);
//
// Drain undelivered deliveries via the forwarder
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index ea9f3cf..3664acf 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -283,7 +283,7 @@ class OneRouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
- def test_26_multicast_no_receivcer(self):
+ def test_26_multicast_no_receiver(self):
test = MulticastUnsettledNoReceiverTest(self.address)
test.run()
self.assertEqual(None, test.error)
@@ -410,6 +410,51 @@ class OneRouterTest(TestCase):
client.connection.close()
+class Entity(object):
+ def __init__(self, status_code, status_description, attrs):
+ self.status_code = status_code
+ self.status_description = status_description
+ self.attrs = attrs
+
+ def __getattr__(self, key):
+ return self.attrs[key]
+
+
+class RouterProxy(object):
+ def __init__(self, reply_addr):
+ self.reply_addr = reply_addr
+
+ def response(self, msg):
+ ap = msg.properties
+ bd = msg.body
+ if bd.__class__ == dict and 'results' in bd and 'attributeNames' in bd:
+ ##
+ ## This is a query response
+ ##
+ response = []
+ anames = bd['attributeNames']
+ for row in bd['results']:
+ cols = {}
+ for i in range(len(row)):
+ cols[anames[i]] = row[i]
+ response.append(Entity(ap['statusCode'], ap['statusDescription'], cols))
+ return response
+
+ return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+ def read_address(self, name):
+ ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
+ return Message(properties=ap, reply_to=self.reply_addr)
+
+ def query_addresses(self):
+ ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
+ return Message(properties=ap, reply_to=self.reply_addr)
+
+ def query_links(self):
+ ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.link'}
+ return Message(properties=ap, reply_to=self.reply_addr)
+
+
class SemanticsClosest(MessagingHandler):
def __init__(self, address):
super(SemanticsClosest, self).__init__()
@@ -2282,10 +2327,10 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler):
these messages since there is no receiver.
"""
def __init__(self, address):
- super(MulticastUnsettledNoReceiverTest, self).__init__(prefetch=0)
+ super(MulticastUnsettledNoReceiverTest, self).__init__()
self.address = address
self.dest = "multicast.MulticastNoReceiverTest"
- self.error = "Some error"
+ self.error = None
self.n_sent = 0
self.max_send = 250
self.n_released = 0
@@ -2293,55 +2338,59 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler):
self.timer = None
self.conn = None
self.sender = None
+ self.query_sent = False
+
+ def timeout(self):
+ self.error = "Timeout expired: n_sent=%d n_released=%d n_accepted=%d" % \
+ (self.n_sent, self.n_released, self.n_accepted)
+ self.conn.close()
def check_if_done(self):
if self.n_accepted > 0:
self.error = "Messages should not be accepted as there are no receivers"
self.timer.cancel()
self.conn.close()
- elif self.n_sent == self.n_released:
- self.error = None
-
- if not self.error:
- local_node = Node.connect(self.address, timeout=TIMEOUT)
- for result in local_node.query(type='org.apache.qpid.dispatch.router.link').results:
- if result[5] == 'in' and 'multicast.MulticastNoReceiverTest' in result[6]:
- if result[16] != 250:
- self.error = "Expected 250 dropped presettled deliveries but got " + str(result[16])
- else:
- outs = local_node.query(type='org.apache.qpid.dispatch.router')
- pos = outs.attribute_names.index("droppedPresettledDeliveries")
- results = outs.results[0]
- if results[pos] != 250:
- self.error = "When querying router, expected 250 dropped presettled " \
- "deliveries but got " + str(results[pos])
- else:
- pos = outs.attribute_names.index("releasedDeliveries")
- if results[pos] < 250:
- self.error = "The number of released deliveries cannot be less that 250 " \
- "but it is " + str(results[pos])
-
- self.timer.cancel()
- self.conn.close()
+ elif self.max_send == self.n_released and not self.query_sent:
+ self.mgmt_tx.send(self.proxy.query_links())
+ self.query_sent = True
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn = event.container.connect(self.address)
- self.sender = event.container.create_sender(self.conn, self.dest)
+ self.mgmt_rx = event.container.create_receiver(self.conn, dynamic=True)
+ self.mgmt_tx = event.container.create_sender(self.conn, '$management')
+
+ def on_link_opened(self, event):
+ if event.receiver == self.mgmt_rx:
+ self.proxy = RouterProxy(self.mgmt_rx.remote_source.address)
+ self.sender = event.container.create_sender(self.conn, self.dest)
+
+ def on_message(self, event):
+ if event.receiver == self.mgmt_rx:
+ results = self.proxy.response(event.message)
+ for link in results:
+ if link.linkDir == 'in' and link.owningAddr == 'M0' + self.dest:
+ if link.releasedCount != self.max_send:
+ self.error = "Released count expected %d, got %d" % (self.max_send, link.droppedPresettledCount)
+ self.timer.cancel()
+ self.conn.close()
def on_sendable(self, event):
- if self.n_sent >= self.max_send:
- return
- self.n_sent += 1
- msg = Message(body=self.n_sent)
- event.sender.send(msg)
+ if event.sender == self.sender:
+ if self.n_sent >= self.max_send:
+ return
+ self.n_sent += 1
+ msg = Message(body=self.n_sent)
+ event.sender.send(msg)
def on_accepted(self, event):
- self.n_accepted += 1
+ if event.sender == self.sender:
+ self.n_accepted += 1
self.check_if_done()
def on_released(self, event):
- self.n_released += 1
+ if event.sender == self.sender:
+ self.n_released += 1
self.check_if_done()
def run(self):
@@ -2638,19 +2687,6 @@ class PresettledOverflowTest(MessagingHandler):
if result[5] == 'out' and 'balanced.PresettledOverflow' in result[6]:
if result[16] != 250:
self.error = "Expected 250 dropped presettled deliveries but got " + str(result[16])
- else:
- outs = local_node.query(type='org.apache.qpid.dispatch.router')
- pos_presett = outs.attribute_names.index("presettledDeliveries")
- pos = outs.attribute_names.index("droppedPresettledDeliveries")
- results = outs.results[0]
- # Enforce presettledDeliveries metric is updated
- if results[pos_presett] < 500:
- self.error = "When querying router, expected 500 presettled " \
- "deliveries but got " + str(results[pos_presett])
- # There is 250 from a previous test
- if results[pos] < 500:
- self.error = "When querying router, expected 500 dropped presettled " \
- "deliveries but got " + str(results[pos])
self.conn.close()
self.timer.cancel()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org