You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/05/16 17:30:18 UTC
qpid-dispatch git commit: DISPATCH-334 - Ensure that undelivered
messages are only pushed out on outgoing links.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 33cbeede4 -> 522760691
DISPATCH-334 - Ensure that undelivered messages are only pushed out on outgoing links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/52276069
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/52276069
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/52276069
Branch: refs/heads/master
Commit: 522760691e20976e298352253d25c805881dcd5f
Parents: 33cbeed
Author: Ted Ross <tr...@redhat.com>
Authored: Mon May 16 13:23:30 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon May 16 13:26:15 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 13 +++++++++
src/router_core/transfer.c | 56 ++++++++++++++++++++------------------
2 files changed, 42 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/52276069/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 547b49e..229a49b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -440,7 +440,20 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(link->updated_deliveries, updated_deliveries);
DEQ_MOVE(link->undelivered, undelivered);
+ qdr_delivery_t *d = DEQ_HEAD(undelivered);
+ while (d) {
+ assert(d->where == QDR_DELIVERY_IN_UNDELIVERED);
+ d->where = QDR_DELIVERY_NOWHERE;
+ d = DEQ_NEXT(d);
+ }
+
DEQ_MOVE(link->unsettled, unsettled);
+ d = DEQ_HEAD(unsettled);
+ while (d) {
+ assert(d->where == QDR_DELIVERY_IN_UNSETTLED);
+ d->where = QDR_DELIVERY_NOWHERE;
+ d = DEQ_NEXT(d);
+ }
sys_mutex_unlock(conn->work_lock);
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/52276069/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 54a4448..8a03875 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -107,36 +107,38 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
int offer = -1;
bool settled = false;
- while (credit > 0 && !drained) {
- sys_mutex_lock(conn->work_lock);
- dlv = DEQ_HEAD(link->undelivered);
- if (dlv) {
- DEQ_REMOVE_HEAD(link->undelivered);
- settled = dlv->settled;
- if (!settled) {
- DEQ_INSERT_TAIL(link->unsettled, dlv);
- dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+ if (link->link_direction == QD_OUTGOING) {
+ while (credit > 0 && !drained) {
+ sys_mutex_lock(conn->work_lock);
+ dlv = DEQ_HEAD(link->undelivered);
+ if (dlv) {
+ DEQ_REMOVE_HEAD(link->undelivered);
+ settled = dlv->settled;
+ if (!settled) {
+ DEQ_INSERT_TAIL(link->unsettled, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+ } else
+ dlv->where = QDR_DELIVERY_NOWHERE;
+ credit--;
+ link->total_deliveries++;
+ offer = DEQ_SIZE(link->undelivered);
} else
- dlv->where = QDR_DELIVERY_NOWHERE;
- credit--;
- link->total_deliveries++;
- offer = DEQ_SIZE(link->undelivered);
- } else
- drained = true;
- sys_mutex_unlock(conn->work_lock);
-
- if (dlv) {
- link->credit_to_core--;
- core->deliver_handler(core->user_context, link, dlv, settled);
- if (settled)
- qdr_delivery_free(dlv);
+ drained = true;
+ sys_mutex_unlock(conn->work_lock);
+
+ if (dlv) {
+ link->credit_to_core--;
+ core->deliver_handler(core->user_context, link, dlv, settled);
+ if (settled)
+ qdr_delivery_free(dlv);
+ }
}
- }
- if (drained)
- core->drained_handler(core->user_context, link);
- else if (offer != -1)
- core->offer_handler(core->user_context, link, offer);
+ if (drained)
+ core->drained_handler(core->user_context, link);
+ else if (offer != -1)
+ core->offer_handler(core->user_context, link, offer);
+ }
//
// Handle disposition/settlement updates
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org