You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/11/25 17:33:10 UTC

[qpid-dispatch] branch master updated: DISPATCH-1488: properly forward transaction coordinator disposition data

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 28c3a1b  DISPATCH-1488: properly forward transaction coordinator disposition data
28c3a1b is described below

commit 28c3a1b6188f3907984c1e7c0765ee6c0249be92
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Nov 20 15:19:28 2019 -0500

    DISPATCH-1488: properly forward transaction coordinator disposition data
    
    This closes #625
---
 src/router_core/delivery.c | 48 +++++++++++++++++++++++++---------------------
 src/router_core/delivery.h |  6 +++---
 src/router_core/transfer.c |  8 +++++---
 3 files changed, 34 insertions(+), 28 deletions(-)

diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index e1f20f2..5e3cc52 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -26,7 +26,6 @@ ALLOC_DEFINE(qdr_delivery_t);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery);
 static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery);
 static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
@@ -184,7 +183,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
     action->args.delivery.error       = error;
 
     // handle delivery-state extensions e.g. declared, transactional-state
-    qdr_delivery_read_extension_state(delivery, disposition, ext_state, false);
+    qdr_delivery_set_extension_state(delivery, disposition, ext_state, false);
 
     //
     // The delivery's ref_count must be incremented to protect its travels into the
@@ -453,9 +452,9 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
 
     qd_bitmask_free(delivery->link_exclusion);
     qdr_error_free(delivery->error);
+    qdr_delivery_free_extension_state(delivery);
 
     free_qdr_delivery_t(delivery);
-
 }
 
 
@@ -694,7 +693,10 @@ static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv
             peer->error       = error;
             push = true;
             error_assigned = true;
-            qdr_delivery_copy_extension_state(dlv, peer, false);
+            qdr_delivery_set_extension_state(peer,
+                                             dlv->remote_disposition,
+                                             dlv->extension_state,
+                                             false);
         }
     }
 
@@ -1127,15 +1129,15 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
         qdr_connection_activate_CT(core, link->conn);
 }
 
-pn_data_t* qdr_delivery_extension_state(qdr_delivery_t *delivery)
+
+pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery)
 {
-    if (!delivery->extension_state) {
-        delivery->extension_state = pn_data(0);
-    }
-    pn_data_rewind(delivery->extension_state);
+    if (delivery->extension_state)
+        pn_data_rewind(delivery->extension_state);
     return delivery->extension_state;
 }
 
+
 void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
 {
     if (delivery->extension_state) {
@@ -1144,12 +1146,17 @@ void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
     }
 }
 
+
+// copy local disposition data into proton delivery
 void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition)
 {
     if (dlv->disposition > PN_MODIFIED) {
-        pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), qdr_delivery_extension_state(dlv));
+        pn_data_t *src = dlv->extension_state;
+        if (src) {
+            pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), src);
+            qdr_delivery_free_extension_state(dlv);
+        }
         if (update_disposition) pn_delivery_update(pdlv, dlv->disposition);
-        qdr_delivery_free_extension_state(dlv);
     }
 }
 
@@ -1158,25 +1165,22 @@ void qdr_delivery_export_transfer_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv
     qdr_delivery_write_extension_state(dlv, pdlv, true);
 }
 
+
 void qdr_delivery_export_disposition_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
 {
     qdr_delivery_write_extension_state(dlv, pdlv, false);
 }
 
-void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition)
-{
-    if (src->disposition > PN_MODIFIED) {
-        pn_data_copy(qdr_delivery_extension_state(dest), qdr_delivery_extension_state(src));
-        if (update_diposition) dest->disposition = src->disposition;
-        qdr_delivery_free_extension_state(src);
-    }
-}
 
-void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
+void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
 {
     if (disposition > PN_MODIFIED) {
-        pn_data_rewind(disposition_data);
-        pn_data_copy(qdr_delivery_extension_state(dlv), disposition_data);
+        if (disposition_data) {
+            pn_data_rewind(disposition_data);
+            if (!dlv->extension_state)
+                dlv->extension_state = pn_data(0);
+            pn_data_copy(dlv->extension_state, disposition_data);
+        }
         if (update_disposition) dlv->disposition = disposition;
     }
 }
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index fe3c858..3e93970 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -47,7 +47,7 @@ struct qdr_delivery_t {
     uint64_t                remote_disposition;  ///< disposition as set by remote endpoint
     uint64_t                mcast_disposition;   ///< temporary terminal disposition while multicast fwding
     uint32_t                ingress_time;
-    pn_data_t              *extension_state;
+    pn_data_t              *extension_state;     ///< delivery-state in disposition performative
     qdr_error_t            *error;
     bool                    settled;
     bool                    presettled;
@@ -97,9 +97,9 @@ bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
 
 void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
 
-void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition);
+pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *dlv);
+void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition);
 void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
-void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition);
 
 
 //
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 60e0efa..1331f0c 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -110,7 +110,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
     dlv->error        = 0;
     dlv->disposition  = 0;
 
-    qdr_delivery_read_extension_state(dlv, disposition, disposition_data, true);
+    qdr_delivery_set_extension_state(dlv, disposition, disposition_data, true);
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - protect returned value");
 
@@ -673,8 +673,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         // If this is an attach-routed link, put the delivery directly onto the peer link
         //
         qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);
-
-        qdr_delivery_copy_extension_state(dlv, peer, true);
+        qdr_delivery_set_extension_state(peer,
+                                         dlv->remote_disposition,
+                                         qdr_delivery_extension_state(dlv),
+                                         true);
 
         //
         // Copy the delivery tag.  For link-routing, the delivery tag must be preserved.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org