You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2017/11/28 14:54:37 UTC

qpid-dispatch git commit: DISPATCH-882: delay settlement until after the i/o thread puts the delivery on the proper list

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master e2a05e14e -> 6166f21ac


DISPATCH-882: delay settlement until after the i/o thread puts the delivery on the proper list

Closes #225


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6166f21a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6166f21a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6166f21a

Branch: refs/heads/master
Commit: 6166f21accec8c305e9e3b378e76bdbdaefd57e9
Parents: e2a05e1
Author: Kenneth Giusti <kg...@apache.org>
Authored: Wed Nov 22 09:55:41 2017 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Tue Nov 28 09:51:08 2017 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h |  2 +-
 src/router_core/transfer.c          |  7 ++++++-
 src/router_node.c                   | 22 +++++++++++++---------
 3 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6166f21a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 81e0dd3..18dbc34 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -587,7 +587,7 @@ typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int d
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
 typedef void (*qdr_link_drain_t)         (void *context, qdr_link_t *link, bool mode);
 typedef int  (*qdr_link_push_t)          (void *context, qdr_link_t *link, int limit);
-typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
+typedef uint64_t (*qdr_link_deliver_t)   (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
 typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
 
 void qdr_connection_handlers(qdr_core_t             *core,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6166f21a/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index d15b7bc..ab11f15 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -149,7 +149,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
             sys_mutex_unlock(conn->work_lock);
             if (dlv) {
                 settled = dlv->settled;
-                core->deliver_handler(core->user_context, link, dlv, settled);
+                uint64_t new_disp = core->deliver_handler(core->user_context, link, dlv, settled);
                 sys_mutex_lock(conn->work_lock);
                 send_complete = qdr_delivery_send_complete(dlv);
                 if (send_complete) {
@@ -195,6 +195,11 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                     return num_deliveries_completed;
                 }
                 sys_mutex_unlock(conn->work_lock);
+
+                // the core will need to update the delivery's disposition
+                if (new_disp)
+                    qdr_delivery_update_disposition(((qd_router_t *)core->user_context)->router_core,
+                                                    dlv, new_disp, true, 0, 0, false);
             } else
                 break;
         }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6166f21a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index e6081a8..fc9235a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1298,17 +1298,18 @@ static int CORE_link_push(void *context, qdr_link_t *link, int limit)
     return 0;
 }
 
-static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
+static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
 {
     qd_router_t *router = (qd_router_t*) context;
     qd_link_t   *qlink  = (qd_link_t*) qdr_link_get_context(link);
+    uint64_t update = 0;
 
     if (!qlink)
-        return;
+        return 0;
 
     pn_link_t *plink = qd_link_pn(qlink);
     if (!plink)
-        return;
+        return 0;
 
     //
     // If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver.
@@ -1370,10 +1371,10 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
     if (send_complete) {
         if (qd_message_aborted(msg_out)) {
 
-            // This message has been aborted.
-            // When a sender aborts a message the message is implicitly settled.
-            // Tell the core that the delivery has been rejected and settled.
-            qdr_delivery_update_disposition(router->router_core, dlv, PN_REJECTED, true, 0, 0, false);
+            // This message has been aborted.  When a sender aborts a message
+            // the message is implicitly settled.  The caller will need to tell
+            // the core that the delivery has been rejected and settled.
+            update = PN_REJECTED;
 
             // Aborted messages must be settled locally
             // Settling does not produce any disposition to message sender.
@@ -1386,8 +1387,10 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
 
         } else {
             if (!settled && remote_snd_settled) {
-                // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
-                qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false);
+                // The caller must tell the core that the delivery has been
+                // accepted and settled, since we are settling on behalf of the
+                // receiver
+                update = PN_ACCEPTED;  // schedule the settle
             }
 
             pn_link_advance(plink);
@@ -1399,6 +1402,7 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
 
         }
     }
+    return update;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org