You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/23 21:12:33 UTC
qpid-dispatch git commit: DISPATCH-523 - Make sure deliveries that
_should_ be deliverable but are not get released.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 152185eb9 -> 0e2a34abd
DISPATCH-523 - Make sure deliveries that _should_ be deliverable but are not get released.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0e2a34ab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0e2a34ab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0e2a34ab
Branch: refs/heads/master
Commit: 0e2a34abd3387f95c53325ddc92cf781070b4330
Parents: 152185e
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 23 17:11:36 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 23 17:11:36 2016 -0400
----------------------------------------------------------------------
src/router_core/transfer.c | 65 ++++++++++++++++++++++++-----------------
1 file changed, 39 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e2a34ab/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 312b866..00ec719 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -483,8 +483,36 @@ static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, boo
}
-static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr)
+/**
+ * Return the number of outbound paths to destinations that this address has.
+ * Note that even if there are more than zero paths, the destination still may
+ * be unreachable (e.g. an rnode next hop with no link).
+ */
+static long qdr_addr_path_count_CT(qdr_address_t *addr)
+{
+ return (long) DEQ_SIZE(addr->subscriptions) + (long) DEQ_SIZE(addr->rlinks) +
+ (long) qd_bitmask_cardinality(addr->rnodes);
+}
+
+
+static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr)
{
+ if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) {
+ //
+ // We are trying to forward a delivery on an address that has no outbound paths
+ // AND the incoming link is targeted (not anonymous). In this case, we must put
+ // the delivery on the incoming link's undelivered list. Note that it is safe
+ // to do this because the undelivered list will be flushed once the number of
+ // paths transitions from zero to one.
+ //
+ // Use the action-reference as the reference for undelivered rather
+ // than decrementing and incrementing the delivery ref_count.
+ //
+ DEQ_INSERT_TAIL(link->undelivered, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
+ return;
+ }
+
int fanout = 0;
if (addr) {
@@ -495,28 +523,15 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
}
if (fanout == 0) {
- if (link->owning_addr) {
- //
- // Message was not delivered and the link is not anonymous.
- // Queue the message for later delivery (when the address gets
- // a valid destination).
- //
- // Use the action-reference as the reference for undelivered rather
- // than decrementing and incrementing the delivery ref_count.
- //
- DEQ_INSERT_TAIL(link->undelivered, dlv);
- dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
- } else {
- //
- // Message was not delivered and the link is anonymous, drop the delivery.
- //
- // If the delivery is not settled, release it.
- //
- if (!dlv->settled)
- qdr_delivery_release_CT(core, dlv);
- qdr_delivery_decref(dlv);
- qdr_link_issue_credit_CT(core, link, 1, false);
- }
+ //
+ // Message was not delivered, drop the delivery.
+ //
+ // If the delivery is not settled, release it.
+ //
+ if (!dlv->settled)
+ qdr_delivery_release_CT(core, dlv);
+ qdr_delivery_decref(dlv);
+ qdr_link_issue_credit_CT(core, link, 1, false);
} else if (fanout > 0) {
if (dlv->settled) {
//
@@ -547,8 +562,6 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
qdr_link_issue_credit_CT(core, link, 1, false);
}
}
-
- return fanout;
}
@@ -769,7 +782,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
if (DEQ_SIZE(addr->inlinks) == 0)
return;
- if (DEQ_SIZE(addr->subscriptions) + DEQ_SIZE(addr->rlinks) + qd_bitmask_cardinality(addr->rnodes) == 1) {
+ if (qdr_addr_path_count_CT(addr) == 1) {
qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks);
while (ref) {
qdr_link_t *link = ref->link;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org