You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/04/11 17:52:16 UTC
[qpid-dispatch] branch master updated: DISPATCH-1309 - Convert the
delivery->link pointer to a safe pointer.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 67e8ef6 DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer.
67e8ef6 is described below
commit 67e8ef6f10a4a3b2f92f39aa63f9aa0f51800000
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Apr 11 13:03:53 2019 -0400
DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer.
---
include/qpid/dispatch/alloc_malloc.h | 1 +
include/qpid/dispatch/alloc_pool.h | 1 +
include/qpid/dispatch/router_core.h | 2 +-
src/router_core/connections.c | 8 ++--
src/router_core/core_link_endpoint.c | 4 +-
src/router_core/forwarder.c | 12 +++---
src/router_core/router_core_private.h | 26 ++++++------
src/router_core/transfer.c | 80 ++++++++++++++++++++---------------
src/router_node.c | 4 +-
9 files changed, 79 insertions(+), 59 deletions(-)
diff --git a/include/qpid/dispatch/alloc_malloc.h b/include/qpid/dispatch/alloc_malloc.h
index d89ed28..ffc0d84 100644
--- a/include/qpid/dispatch/alloc_malloc.h
+++ b/include/qpid/dispatch/alloc_malloc.h
@@ -57,6 +57,7 @@ typedef struct {
#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0)
static inline uint32_t qd_alloc_sequence(void *p) { return 0; }
+static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { }
static inline void qd_alloc_initialize(void) {}
static inline void qd_alloc_debug_dump(const char *file) {}
static inline void qd_alloc_finalize(void) {}
diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h
index 4f6931c..14641da 100644
--- a/include/qpid/dispatch/alloc_pool.h
+++ b/include/qpid/dispatch/alloc_pool.h
@@ -79,6 +79,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool);
/** De-allocate from a thread pool. Use via ALLOC_DECLARE */
void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p);
uint32_t qd_alloc_sequence(void *p);
+static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; }
/**
* Declare functions new_T and alloc_T
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9f9567d..1e48c20 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -660,7 +660,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t disposition, pn_data_t* disposition_state);
-qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *delivery);
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2143580..4956c7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -706,7 +706,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_increment_delivery_counters_CT(core, ref->dlv);
- ref->dlv->link = 0;
+ qd_nullify_safe_ptr(&ref->dlv->link_sp);
//
// Now our reference
//
@@ -747,7 +747,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_increment_delivery_counters_CT(core, dlv);
- dlv->link = 0;
+ qd_nullify_safe_ptr(&dlv->link_sp);
//
// Now the undelivered-list reference
@@ -791,7 +791,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_increment_delivery_counters_CT(core, dlv);
- dlv->link = 0;
+ qd_nullify_safe_ptr(&dlv->link_sp);
//
// Now the unsettled-list reference
@@ -823,7 +823,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_increment_delivery_counters_CT(core, dlv);
- dlv->link = 0;
+ qd_nullify_safe_ptr(&dlv->link_sp);
// This decref is for the removing the delivery from the settled list
qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from settled list");
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 6ee4922..92260b3 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -115,7 +115,7 @@ void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t
{
uint64_t *tag = (uint64_t*) dlv->tag;
- dlv->link = ep->link;
+ set_safe_ptr_qdr_link_t(ep->link, &dlv->link_sp);
dlv->settled = presettled;
dlv->presettled = presettled;
*tag = core->next_tag++;
@@ -133,7 +133,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end
uint64_t *tag = (uint64_t*) dlv->tag;
ZERO(dlv);
- dlv->link = endpoint->link;
+ set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp);
dlv->msg = message;
*tag = core->next_tag++;
dlv->tag_length = 8;
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index a58e165..b7f2ad9 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -117,7 +117,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
uint64_t *tag = (uint64_t*) out_dlv->tag;
ZERO(out_dlv);
- out_dlv->link = link;
+ set_safe_ptr_qdr_link_t(link, &out_dlv->link_sp);
out_dlv->msg = qd_message_copy(msg);
out_dlv->settled = !in_dlv || in_dlv->settled;
out_dlv->presettled = out_dlv->settled;
@@ -298,7 +298,8 @@ static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t *
*/
static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t *in_dlv, qdr_link_t *out_link)
{
- return (in_dlv && in_dlv->via_edge && in_dlv->link->conn == out_link->conn);
+ qdr_link_t *link = in_dlv ? safe_deref_qdr_link_t(in_dlv->link_sp) : 0;
+ return (in_dlv && in_dlv->via_edge && link && link->conn == out_link->conn);
}
@@ -313,9 +314,10 @@ static void qdr_forward_to_subscriber(qdr_core_t *core, qdr_subscription_t *sub,
// Only if the message has been completely received, forward it to the subscription
// Subscriptions, at the moment, dont have the ability to deal with partial messages
//
- if (receive_complete)
- qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, in_msg);
- else {
+ if (receive_complete) {
+ qdr_link_t *link = in_dlv ? safe_deref_qdr_link_t(in_dlv->link_sp) : 0;
+ qdr_forward_on_message_CT(core, sub, link, in_msg);
+ } else {
//
// Receive is not complete, we will store the sub in
// in_dlv->subscriptions so we can send the message to the subscription
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 1e4f69c..e4666a5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -40,6 +40,19 @@ typedef struct qdr_connection_ref_t qdr_connection_ref_t;
typedef struct qdr_exchange qdr_exchange_t;
typedef struct qdr_edge_t qdr_edge_t;
+ALLOC_DECLARE(qdr_address_t);
+ALLOC_DECLARE(qdr_address_config_t);
+ALLOC_DECLARE(qdr_node_t);
+ALLOC_DECLARE(qdr_router_ref_t);
+ALLOC_DECLARE(qdr_link_ref_t);
+ALLOC_DECLARE(qdr_link_route_t);
+ALLOC_DECLARE(qdr_auto_link_t);
+ALLOC_DECLARE(qdr_conn_identifier_t);
+ALLOC_DECLARE(qdr_connection_ref_t);
+
+ALLOC_DECLARE(qdr_connection_t);
+ALLOC_DECLARE(qdr_link_t);
+
#include "core_link_endpoint.h"
#include "core_events.h"
@@ -338,7 +351,6 @@ struct qdr_node_t {
int cost;
};
-ALLOC_DECLARE(qdr_node_t);
DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
@@ -352,7 +364,6 @@ struct qdr_router_ref_t {
qdr_node_t *router;
};
-ALLOC_DECLARE(qdr_router_ref_t);
DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
typedef enum {
@@ -386,7 +397,7 @@ struct qdr_delivery_t {
void *context;
sys_atomic_t ref_count;
bool ref_counted; /// Used to protect against ref count going 1 -> 0 -> 1
- qdr_link_t *link;
+ qdr_link_t_sp link_sp; /// Safe pointer to the link
qdr_delivery_t *peer; /// Use this peer if the delivery has one and only one peer.
qdr_delivery_ref_t *next_peer_ref;
qd_message_t *msg;
@@ -494,7 +505,6 @@ struct qdr_link_t {
uint32_t core_ticks;
};
-ALLOC_DECLARE(qdr_link_t);
DEQ_DECLARE(qdr_link_t, qdr_link_list_t);
struct qdr_link_ref_t {
@@ -502,7 +512,6 @@ struct qdr_link_ref_t {
qdr_link_t *link;
};
-ALLOC_DECLARE(qdr_link_ref_t);
DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
@@ -515,7 +524,6 @@ struct qdr_connection_ref_t {
qdr_connection_t *conn;
};
-ALLOC_DECLARE(qdr_connection_ref_t);
DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t);
void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
@@ -580,7 +588,6 @@ struct qdr_address_t {
int priority;
};
-ALLOC_DECLARE(qdr_address_t);
DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config);
@@ -605,7 +612,6 @@ struct qdr_address_config_t {
int priority;
};
-ALLOC_DECLARE(qdr_address_config_t);
DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t);
void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr);
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
@@ -681,7 +687,6 @@ struct qdr_connection_t {
bool closed; // This bit is used in the case where a client is trying to force close this connection.
};
-ALLOC_DECLARE(qdr_connection_t);
DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
#define QDR_IS_LINK_ROUTE_PREFIX(p) ((p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_IN || (p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_OUT)
@@ -709,7 +714,6 @@ struct qdr_link_route_t {
qdr_connection_t *parent_conn;
};
-ALLOC_DECLARE(qdr_link_route_t);
void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr);
void qdr_core_delete_auto_link (qdr_core_t *core, qdr_auto_link_t *al);
@@ -755,7 +759,6 @@ struct qdr_auto_link_t {
char *last_error;
};
-ALLOC_DECLARE(qdr_auto_link_t);
DEQ_DECLARE(qdr_auto_link_t, qdr_auto_link_list_t);
@@ -767,7 +770,6 @@ struct qdr_conn_identifier_t {
qdr_auto_link_list_t auto_link_refs;
};
-ALLOC_DECLARE(qdr_conn_identifier_t);
DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
typedef struct qdr_priority_sheaf_t {
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7b1af31..3e6fe73 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -49,7 +49,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->link = link;
+ set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
dlv->msg = msg;
dlv->to_addr = 0;
dlv->origin = ingress;
@@ -78,7 +78,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->link = link;
+ set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
dlv->msg = msg;
dlv->to_addr = addr;
dlv->origin = ingress;
@@ -110,7 +110,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
qdr_delivery_t *dlv = new_qdr_delivery_t();
ZERO(dlv);
- dlv->link = link;
+ set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
dlv->msg = msg;
dlv->settled = settled;
dlv->presettled = settled;
@@ -130,7 +130,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
}
-qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv)
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *in_dlv)
{
qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
action->args.connection.delivery = in_dlv;
@@ -140,7 +140,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv)
// This incref is for the action reference
qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
- qdr_action_enqueue(in_dlv->link->core, action);
+ qdr_action_enqueue(core, action);
return in_dlv;
}
@@ -337,7 +337,7 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery)
{
- return delivery ? delivery->link : 0;
+ return delivery ? safe_deref_qdr_link_t(delivery->link_sp) : 0;
}
@@ -391,8 +391,9 @@ void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
uint32_t rc = sys_atomic_inc(&delivery->ref_count);
assert(rc > 0 || !delivery->ref_counted);
delivery->ref_counted = true;
- if (delivery->link)
- qd_log(delivery->link->core->log, QD_LOG_DEBUG,
+ qdr_link_t *link = qdr_delivery_link(delivery);
+ if (link)
+ qd_log(link->core->log, QD_LOG_DEBUG,
"Delivery incref: dlv:%lx rc:%"PRIu32" %s", (long) delivery, rc + 1, label);
}
@@ -515,7 +516,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
// Remove a delivery from its unsettled list. Side effects include issuing
// replacement credit and visiting the link-quiescence algorithm
//
- qdr_link_t *link = dlv->link;
+ qdr_link_t *link = qdr_delivery_link(dlv);
qdr_connection_t *conn = link ? link->conn : 0;
bool moved = false;
@@ -561,7 +562,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery)
{
- qdr_link_t *link = delivery->link;
+ qdr_link_t *link = qdr_delivery_link(delivery);
if (link) {
bool do_rate = false;
@@ -915,7 +916,12 @@ static long qdr_addr_path_count_CT(qdr_address_t *addr)
static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr, bool more)
{
- if (dlv->link->link_type == QD_LINK_ENDPOINT)
+ qdr_link_t *dlv_link = qdr_delivery_link(dlv);
+
+ if (!dlv_link)
+ return;
+
+ if (dlv_link->link_type == QD_LINK_ENDPOINT)
core->deliveries_ingress++;
if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) {
@@ -930,7 +936,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
if (dlv->settled) {
// Increment the presettled_dropped_deliveries on the in_link
link->dropped_presettled_deliveries++;
- if (dlv->link->link_type == QD_LINK_ENDPOINT)
+ if (dlv_link->link_type == QD_LINK_ENDPOINT)
core->dropped_presettled_deliveries++;
//
@@ -1062,7 +1068,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
qdr_delivery_t *dlv = action->args.connection.delivery;
bool more = action->args.connection.more;
- qdr_link_t *link = dlv->link;
+ qdr_link_t *link = qdr_delivery_link(dlv);
+
+ if (!link)
+ return;
//
// Record the ingress time so we can track the age of this delivery.
@@ -1203,6 +1212,9 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
qdr_error_t *error = action->args.delivery.error;
bool error_unassigned = true;
+ qdr_link_t *dlv_link = qdr_delivery_link(dlv);
+ qdr_link_t *peer_link = qdr_delivery_link(peer);
+
//
// Logic:
//
@@ -1228,7 +1240,7 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
if (settled) {
if (peer) {
peer->settled = true;
- if (peer->link) {
+ if (peer_link) {
peer_moved = qdr_delivery_settled_CT(core, peer);
if (peer_moved)
push = true;
@@ -1236,15 +1248,15 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
- if (dlv->link)
+ if (dlv_link)
dlv_moved = qdr_delivery_settled_CT(core, dlv);
}
//
// If the delivery's link has a core endpoint, notify the endpoint of the update
//
- if (dlv->link && dlv->link->core_endpoint)
- qdrc_endpoint_do_update_CT(core, dlv->link->core_endpoint, dlv, settled);
+ if (dlv_link && dlv_link->core_endpoint)
+ qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
if (push)
qdr_delivery_push_CT(core, peer);
@@ -1276,29 +1288,30 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
{
qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+
while (peer) {
- qdr_link_work_t *work = peer->link_work;
+ qdr_link_work_t *work = peer->link_work;
+ qdr_link_t *peer_link = qdr_delivery_link(peer);
+
//
// Determines if the peer connection can be activated.
// For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
// after the streaming message has been sent.
//
- if (work) {
- sys_mutex_lock(peer->link->conn->work_lock);
- if (work->processing || work == DEQ_HEAD(peer->link->work_list)) {
+ if (!!work && !!peer_link) {
+ sys_mutex_lock(peer_link->conn->work_lock);
+ if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
// Adding this work at priority 0.
- qdr_add_link_ref(peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK);
- sys_mutex_unlock(peer->link->conn->work_lock);
+ qdr_add_link_ref(peer_link->conn->links_with_work, peer_link, QDR_LINK_LIST_CLASS_WORK);
+ sys_mutex_unlock(peer_link->conn->work_lock);
//
// Activate the outgoing connection for later processing.
//
- qdr_connection_activate_CT(core, peer->link->conn);
- }
- else {
- sys_mutex_unlock(peer->link->conn->work_lock);
-
+ qdr_connection_activate_CT(core, peer_link->conn);
}
+ else
+ sys_mutex_unlock(peer_link->conn->work_lock);
}
peer = qdr_delivery_next_peer_CT(in_dlv);
@@ -1313,11 +1326,12 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
qdr_delivery_t *in_dlv = action->args.connection.delivery;
bool more = action->args.connection.more;
+ qdr_link_t *link = qdr_delivery_link(in_dlv);
//
// If it is already in the undelivered list, don't try to deliver this again.
//
- if (in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
+ if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
qdr_deliver_continue_peers_CT(core, in_dlv);
qd_message_t *msg = qdr_delivery_message(in_dlv);
@@ -1330,7 +1344,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions);
while (sub) {
DEQ_REMOVE_HEAD(in_dlv->subscriptions);
- qdr_forward_on_message_CT(core, sub, in_dlv->link, in_dlv->msg);
+ qdr_forward_on_message_CT(core, sub, link, in_dlv->msg);
sub = DEQ_HEAD(in_dlv->subscriptions);
}
@@ -1367,7 +1381,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
// Remove the delivery from the settled list and decref the in_dlv.
in_dlv->where = QDR_DELIVERY_NOWHERE;
- DEQ_REMOVE(in_dlv->link->settled, in_dlv);
+ DEQ_REMOVE(link->settled, in_dlv);
qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list");
}
}
@@ -1472,10 +1486,10 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
- if (!dlv || !dlv->link)
+ qdr_link_t *link = qdr_delivery_link(dlv);
+ if (!link)
return;
- qdr_link_t *link = dlv->link;
bool activate = false;
sys_mutex_lock(link->conn->work_lock);
diff --git a/src/router_node.c b/src/router_node.c
index 2114ce8..19d2ae2 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -375,7 +375,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
// We should not continue processing the message after it has been discarded
//
if (!qd_message_is_discard(msg)) {
- qdr_deliver_continue(delivery);
+ qdr_deliver_continue(router->router_core, delivery);
}
}
else {
@@ -452,7 +452,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
// We should not continue processing the message after it has been discarded
//
if (!qd_message_is_discard(msg)) {
- qdr_deliver_continue(delivery);
+ qdr_deliver_continue(router->router_core, delivery);
}
return next_delivery;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org