You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/23 21:12:33 UTC

qpid-dispatch git commit: DISPATCH-523 - Make sure deliveries that _should_ be deliverable but are not get released.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 152185eb9 -> 0e2a34abd


DISPATCH-523 - Make sure deliveries that _should_ be deliverable but are not get released.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0e2a34ab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0e2a34ab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0e2a34ab

Branch: refs/heads/master
Commit: 0e2a34abd3387f95c53325ddc92cf781070b4330
Parents: 152185e
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 23 17:11:36 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 23 17:11:36 2016 -0400

----------------------------------------------------------------------
 src/router_core/transfer.c | 65 ++++++++++++++++++++++++-----------------
 1 file changed, 39 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e2a34ab/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 312b866..00ec719 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -483,8 +483,36 @@ static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, boo
 }
 
 
-static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr)
+/**
+ * Return the number of outbound paths to destinations that this address has.
+ * Note that even if there are more than zero paths, the destination still may
+ * be unreachable (e.g. an rnode next hop with no link).
+ */
+static long qdr_addr_path_count_CT(qdr_address_t *addr)
+{
+    return (long) DEQ_SIZE(addr->subscriptions) + (long) DEQ_SIZE(addr->rlinks) +
+        (long) qd_bitmask_cardinality(addr->rnodes);
+}
+
+
+static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr)
 {
+    if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) {
+        //
+        // We are trying to forward a delivery on an address that has no outbound paths
+        // AND the incoming link is targeted (not anonymous).  In this case, we must put
+        // the delivery on the incoming link's undelivered list.  Note that it is safe
+        // to do this because the undelivered list will be flushed once the number of
+        // paths transitions from zero to one.
+        //
+        // Use the action-reference as the reference for undelivered rather
+        // than decrementing and incrementing the delivery ref_count.
+        //
+        DEQ_INSERT_TAIL(link->undelivered, dlv);
+        dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
+        return;
+    }
+
     int fanout = 0;
 
     if (addr) {
@@ -495,28 +523,15 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
     }
 
     if (fanout == 0) {
-        if (link->owning_addr) {
-            //
-            // Message was not delivered and the link is not anonymous.
-            // Queue the message for later delivery (when the address gets
-            // a valid destination).
-            //
-            // Use the action-reference as the reference for undelivered rather
-            // than decrementing and incrementing the delivery ref_count.
-            //
-            DEQ_INSERT_TAIL(link->undelivered, dlv);
-            dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
-        } else {
-            //
-            // Message was not delivered and the link is anonymous, drop the delivery.
-            //
-            // If the delivery is not settled, release it.
-            //
-            if (!dlv->settled)
-                qdr_delivery_release_CT(core, dlv);
-            qdr_delivery_decref(dlv);
-            qdr_link_issue_credit_CT(core, link, 1, false);
-        }
+        //
+        // Message was not delivered, drop the delivery.
+        //
+        // If the delivery is not settled, release it.
+        //
+        if (!dlv->settled)
+            qdr_delivery_release_CT(core, dlv);
+        qdr_delivery_decref(dlv);
+        qdr_link_issue_credit_CT(core, link, 1, false);
     } else if (fanout > 0) {
         if (dlv->settled) {
             //
@@ -547,8 +562,6 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
                 qdr_link_issue_credit_CT(core, link, 1, false);
         }
     }
-
-    return fanout;
 }
 
 
@@ -769,7 +782,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
     if (DEQ_SIZE(addr->inlinks) == 0)
         return;
 
-    if (DEQ_SIZE(addr->subscriptions) + DEQ_SIZE(addr->rlinks) + qd_bitmask_cardinality(addr->rnodes) == 1) {
+    if (qdr_addr_path_count_CT(addr) == 1) {
         qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks);
         while (ref) {
             qdr_link_t *link = ref->link;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org