You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/05/16 17:30:18 UTC

qpid-dispatch git commit: DISPATCH-334 - Ensure that undelivered messages are only pushed out on outgoing links.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 33cbeede4 -> 522760691


DISPATCH-334 - Ensure that undelivered messages are only pushed out on outgoing links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/52276069
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/52276069
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/52276069

Branch: refs/heads/master
Commit: 522760691e20976e298352253d25c805881dcd5f
Parents: 33cbeed
Author: Ted Ross <tr...@redhat.com>
Authored: Mon May 16 13:23:30 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon May 16 13:26:15 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c | 13 +++++++++
 src/router_core/transfer.c    | 56 ++++++++++++++++++++------------------
 2 files changed, 42 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/52276069/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 547b49e..229a49b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -440,7 +440,20 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(link->updated_deliveries, updated_deliveries);
     DEQ_MOVE(link->undelivered, undelivered);
+    qdr_delivery_t *d = DEQ_HEAD(undelivered);
+    while (d) {
+        assert(d->where == QDR_DELIVERY_IN_UNDELIVERED);
+        d->where = QDR_DELIVERY_NOWHERE;
+        d = DEQ_NEXT(d);
+    }
+
     DEQ_MOVE(link->unsettled, unsettled);
+    d = DEQ_HEAD(unsettled);
+    while (d) {
+        assert(d->where == QDR_DELIVERY_IN_UNSETTLED);
+        d->where = QDR_DELIVERY_NOWHERE;
+        d = DEQ_NEXT(d);
+    }
     sys_mutex_unlock(conn->work_lock);
 
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/52276069/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 54a4448..8a03875 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -107,36 +107,38 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
     int               offer   = -1;
     bool              settled = false;
 
-    while (credit > 0 && !drained) {
-        sys_mutex_lock(conn->work_lock);
-        dlv = DEQ_HEAD(link->undelivered);
-        if (dlv) {
-            DEQ_REMOVE_HEAD(link->undelivered);
-            settled = dlv->settled;
-            if (!settled) {
-                DEQ_INSERT_TAIL(link->unsettled, dlv);
-                dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+    if (link->link_direction == QD_OUTGOING) {
+        while (credit > 0 && !drained) {
+            sys_mutex_lock(conn->work_lock);
+            dlv = DEQ_HEAD(link->undelivered);
+            if (dlv) {
+                DEQ_REMOVE_HEAD(link->undelivered);
+                settled = dlv->settled;
+                if (!settled) {
+                    DEQ_INSERT_TAIL(link->unsettled, dlv);
+                    dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+                } else
+                    dlv->where = QDR_DELIVERY_NOWHERE;
+                credit--;
+                link->total_deliveries++;
+                offer = DEQ_SIZE(link->undelivered);
             } else
-                dlv->where = QDR_DELIVERY_NOWHERE;
-            credit--;
-            link->total_deliveries++;
-            offer = DEQ_SIZE(link->undelivered);
-        } else
-            drained = true;
-        sys_mutex_unlock(conn->work_lock);
-
-        if (dlv) {
-            link->credit_to_core--;
-            core->deliver_handler(core->user_context, link, dlv, settled);
-            if (settled)
-                qdr_delivery_free(dlv);
+                drained = true;
+            sys_mutex_unlock(conn->work_lock);
+
+            if (dlv) {
+                link->credit_to_core--;
+                core->deliver_handler(core->user_context, link, dlv, settled);
+                if (settled)
+                    qdr_delivery_free(dlv);
+            }
         }
-    }
 
-    if (drained)
-        core->drained_handler(core->user_context, link);
-    else if (offer != -1)
-        core->offer_handler(core->user_context, link, offer);
+        if (drained)
+            core->drained_handler(core->user_context, link);
+        else if (offer != -1)
+            core->offer_handler(core->user_context, link, offer);
+    }
 
     //
     // Handle disposition/settlement updates


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org