You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2017/11/28 14:54:37 UTC
qpid-dispatch git commit: DISPATCH-882: delay settlement until after
the i/o thread puts the delivery on the proper list
Repository: qpid-dispatch
Updated Branches:
refs/heads/master e2a05e14e -> 6166f21ac
DISPATCH-882: delay settlement until after the i/o thread puts the delivery on the proper list
Closes #225
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6166f21a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6166f21a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6166f21a
Branch: refs/heads/master
Commit: 6166f21accec8c305e9e3b378e76bdbdaefd57e9
Parents: e2a05e1
Author: Kenneth Giusti <kg...@apache.org>
Authored: Wed Nov 22 09:55:41 2017 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Tue Nov 28 09:51:08 2017 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 +-
src/router_core/transfer.c | 7 ++++++-
src/router_node.c | 22 +++++++++++++---------
3 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6166f21a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 81e0dd3..18dbc34 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -587,7 +587,7 @@ typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int d
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit);
-typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
+typedef uint64_t (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
void qdr_connection_handlers(qdr_core_t *core,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6166f21a/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index d15b7bc..ab11f15 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -149,7 +149,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
sys_mutex_unlock(conn->work_lock);
if (dlv) {
settled = dlv->settled;
- core->deliver_handler(core->user_context, link, dlv, settled);
+ uint64_t new_disp = core->deliver_handler(core->user_context, link, dlv, settled);
sys_mutex_lock(conn->work_lock);
send_complete = qdr_delivery_send_complete(dlv);
if (send_complete) {
@@ -195,6 +195,11 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
return num_deliveries_completed;
}
sys_mutex_unlock(conn->work_lock);
+
+ // the core will need to update the delivery's disposition
+ if (new_disp)
+ qdr_delivery_update_disposition(((qd_router_t *)core->user_context)->router_core,
+ dlv, new_disp, true, 0, 0, false);
} else
break;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6166f21a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index e6081a8..fc9235a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1298,17 +1298,18 @@ static int CORE_link_push(void *context, qdr_link_t *link, int limit)
return 0;
}
-static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
+static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
{
qd_router_t *router = (qd_router_t*) context;
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ uint64_t update = 0;
if (!qlink)
- return;
+ return 0;
pn_link_t *plink = qd_link_pn(qlink);
if (!plink)
- return;
+ return 0;
//
// If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver.
@@ -1370,10 +1371,10 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
if (send_complete) {
if (qd_message_aborted(msg_out)) {
- // This message has been aborted.
- // When a sender aborts a message the message is implicitly settled.
- // Tell the core that the delivery has been rejected and settled.
- qdr_delivery_update_disposition(router->router_core, dlv, PN_REJECTED, true, 0, 0, false);
+ // This message has been aborted. When a sender aborts a message
+ // the message is implicitly settled. The caller will need to tell
+ // the core that the delivery has been rejected and settled.
+ update = PN_REJECTED;
// Aborted messages must be settled locally
// Settling does not produce any disposition to message sender.
@@ -1386,8 +1387,10 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
} else {
if (!settled && remote_snd_settled) {
- // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
- qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false);
+ // The caller must tell the core that the delivery has been
+ // accepted and settled, since we are settling on behalf of the
+ // receiver
+ update = PN_ACCEPTED; // schedule the settle
}
pn_link_advance(plink);
@@ -1399,6 +1402,7 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
}
}
+ return update;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org