You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/11/25 17:33:10 UTC
[qpid-dispatch] branch master updated: DISPATCH-1488: properly
forward transaction coordinator disposition data
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 28c3a1b DISPATCH-1488: properly forward transaction coordinator disposition data
28c3a1b is described below
commit 28c3a1b6188f3907984c1e7c0765ee6c0249be92
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Nov 20 15:19:28 2019 -0500
DISPATCH-1488: properly forward transaction coordinator disposition data
This closes #625
---
src/router_core/delivery.c | 48 +++++++++++++++++++++++++---------------------
src/router_core/delivery.h | 6 +++---
src/router_core/transfer.c | 8 +++++---
3 files changed, 34 insertions(+), 28 deletions(-)
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index e1f20f2..5e3cc52 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -26,7 +26,6 @@ ALLOC_DEFINE(qdr_delivery_t);
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery);
static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery);
static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
@@ -184,7 +183,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
action->args.delivery.error = error;
// handle delivery-state extensions e.g. declared, transactional-state
- qdr_delivery_read_extension_state(delivery, disposition, ext_state, false);
+ qdr_delivery_set_extension_state(delivery, disposition, ext_state, false);
//
// The delivery's ref_count must be incremented to protect its travels into the
@@ -453,9 +452,9 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
qd_bitmask_free(delivery->link_exclusion);
qdr_error_free(delivery->error);
+ qdr_delivery_free_extension_state(delivery);
free_qdr_delivery_t(delivery);
-
}
@@ -694,7 +693,10 @@ static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv
peer->error = error;
push = true;
error_assigned = true;
- qdr_delivery_copy_extension_state(dlv, peer, false);
+ qdr_delivery_set_extension_state(peer,
+ dlv->remote_disposition,
+ dlv->extension_state,
+ false);
}
}
@@ -1127,15 +1129,15 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
qdr_connection_activate_CT(core, link->conn);
}
-pn_data_t* qdr_delivery_extension_state(qdr_delivery_t *delivery)
+
+pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery)
{
- if (!delivery->extension_state) {
- delivery->extension_state = pn_data(0);
- }
- pn_data_rewind(delivery->extension_state);
+ if (delivery->extension_state)
+ pn_data_rewind(delivery->extension_state);
return delivery->extension_state;
}
+
void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
{
if (delivery->extension_state) {
@@ -1144,12 +1146,17 @@ void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
}
}
+
+// copy local disposition data into proton delivery
void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition)
{
if (dlv->disposition > PN_MODIFIED) {
- pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), qdr_delivery_extension_state(dlv));
+ pn_data_t *src = dlv->extension_state;
+ if (src) {
+ pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), src);
+ qdr_delivery_free_extension_state(dlv);
+ }
if (update_disposition) pn_delivery_update(pdlv, dlv->disposition);
- qdr_delivery_free_extension_state(dlv);
}
}
@@ -1158,25 +1165,22 @@ void qdr_delivery_export_transfer_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv
qdr_delivery_write_extension_state(dlv, pdlv, true);
}
+
void qdr_delivery_export_disposition_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
{
qdr_delivery_write_extension_state(dlv, pdlv, false);
}
-void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition)
-{
- if (src->disposition > PN_MODIFIED) {
- pn_data_copy(qdr_delivery_extension_state(dest), qdr_delivery_extension_state(src));
- if (update_diposition) dest->disposition = src->disposition;
- qdr_delivery_free_extension_state(src);
- }
-}
-void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
+void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
{
if (disposition > PN_MODIFIED) {
- pn_data_rewind(disposition_data);
- pn_data_copy(qdr_delivery_extension_state(dlv), disposition_data);
+ if (disposition_data) {
+ pn_data_rewind(disposition_data);
+ if (!dlv->extension_state)
+ dlv->extension_state = pn_data(0);
+ pn_data_copy(dlv->extension_state, disposition_data);
+ }
if (update_disposition) dlv->disposition = disposition;
}
}
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index fe3c858..3e93970 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -47,7 +47,7 @@ struct qdr_delivery_t {
uint64_t remote_disposition; ///< disposition as set by remote endpoint
uint64_t mcast_disposition; ///< temporary terminal disposition while multicast fwding
uint32_t ingress_time;
- pn_data_t *extension_state;
+ pn_data_t *extension_state; ///< delivery-state in disposition performative
qdr_error_t *error;
bool settled;
bool presettled;
@@ -97,9 +97,9 @@ bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
-void qdr_delivery_read_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition);
+pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *dlv);
+void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition);
void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
-void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest, bool update_diposition);
//
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 60e0efa..1331f0c 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -110,7 +110,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
dlv->error = 0;
dlv->disposition = 0;
- qdr_delivery_read_extension_state(dlv, disposition, disposition_data, true);
+ qdr_delivery_set_extension_state(dlv, disposition, disposition_data, true);
qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - protect returned value");
@@ -673,8 +673,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
// If this is an attach-routed link, put the delivery directly onto the peer link
//
qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);
-
- qdr_delivery_copy_extension_state(dlv, peer, true);
+ qdr_delivery_set_extension_state(peer,
+ dlv->remote_disposition,
+ qdr_delivery_extension_state(dlv),
+ true);
//
// Copy the delivery tag. For link-routing, the delivery tag must be preserved.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org