You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/04/11 17:52:16 UTC

[qpid-dispatch] branch master updated: DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e8ef6  DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer.
67e8ef6 is described below

commit 67e8ef6f10a4a3b2f92f39aa63f9aa0f51800000
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Apr 11 13:03:53 2019 -0400

    DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer.
---
 include/qpid/dispatch/alloc_malloc.h  |  1 +
 include/qpid/dispatch/alloc_pool.h    |  1 +
 include/qpid/dispatch/router_core.h   |  2 +-
 src/router_core/connections.c         |  8 ++--
 src/router_core/core_link_endpoint.c  |  4 +-
 src/router_core/forwarder.c           | 12 +++---
 src/router_core/router_core_private.h | 26 ++++++------
 src/router_core/transfer.c            | 80 ++++++++++++++++++++---------------
 src/router_node.c                     |  4 +-
 9 files changed, 79 insertions(+), 59 deletions(-)

diff --git a/include/qpid/dispatch/alloc_malloc.h b/include/qpid/dispatch/alloc_malloc.h
index d89ed28..ffc0d84 100644
--- a/include/qpid/dispatch/alloc_malloc.h
+++ b/include/qpid/dispatch/alloc_malloc.h
@@ -57,6 +57,7 @@ typedef struct {
 #define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0)
 
 static inline uint32_t qd_alloc_sequence(void *p) { return 0; }
+static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { }
 static inline void qd_alloc_initialize(void) {}
 static inline void qd_alloc_debug_dump(const char *file) {}
 static inline void qd_alloc_finalize(void) {}
diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h
index 4f6931c..14641da 100644
--- a/include/qpid/dispatch/alloc_pool.h
+++ b/include/qpid/dispatch/alloc_pool.h
@@ -79,6 +79,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool);
 /** De-allocate from a thread pool. Use via ALLOC_DECLARE */
 void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p);
 uint32_t qd_alloc_sequence(void *p);
+static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; }
 
 /**
  * Declare functions new_T and alloc_T
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9f9567d..1e48c20 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -660,7 +660,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
                                                 const uint8_t *tag, int tag_length,
                                                 uint64_t disposition, pn_data_t* disposition_state);
-qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *delivery);
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
 
 int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2143580..4956c7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -706,7 +706,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
         // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
         //
         qdr_increment_delivery_counters_CT(core, ref->dlv);
-        ref->dlv->link = 0;
+        qd_nullify_safe_ptr(&ref->dlv->link_sp);
         //
         // Now our reference
         //
@@ -747,7 +747,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
         // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
         //
         qdr_increment_delivery_counters_CT(core, dlv);
-        dlv->link = 0;
+        qd_nullify_safe_ptr(&dlv->link_sp);
 
         //
         // Now the undelivered-list reference
@@ -791,7 +791,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
         // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
         //
         qdr_increment_delivery_counters_CT(core, dlv);
-        dlv->link = 0;
+        qd_nullify_safe_ptr(&dlv->link_sp);
 
         //
         // Now the unsettled-list reference
@@ -823,7 +823,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
         // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
         //
         qdr_increment_delivery_counters_CT(core, dlv);
-        dlv->link = 0;
+        qd_nullify_safe_ptr(&dlv->link_sp);
 
         // This decref is for the removing the delivery from the settled list
         qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from settled list");
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 6ee4922..92260b3 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -115,7 +115,7 @@ void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t
 {
     uint64_t *tag = (uint64_t*) dlv->tag;
 
-    dlv->link          = ep->link;
+    set_safe_ptr_qdr_link_t(ep->link, &dlv->link_sp);
     dlv->settled       = presettled;
     dlv->presettled    = presettled;
     *tag               = core->next_tag++;
@@ -133,7 +133,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end
     uint64_t       *tag = (uint64_t*) dlv->tag;
 
     ZERO(dlv);
-    dlv->link           = endpoint->link;
+    set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp);
     dlv->msg            = message;
     *tag                = core->next_tag++;
     dlv->tag_length = 8;
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index a58e165..b7f2ad9 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -117,7 +117,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
     uint64_t       *tag = (uint64_t*) out_dlv->tag;
 
     ZERO(out_dlv);
-    out_dlv->link       = link;
+    set_safe_ptr_qdr_link_t(link, &out_dlv->link_sp);
     out_dlv->msg        = qd_message_copy(msg);
     out_dlv->settled    = !in_dlv || in_dlv->settled;
     out_dlv->presettled = out_dlv->settled;
@@ -298,7 +298,8 @@ static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t *
  */
 static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t *in_dlv, qdr_link_t *out_link)
 {
-    return (in_dlv && in_dlv->via_edge && in_dlv->link->conn == out_link->conn);
+    qdr_link_t *link = in_dlv ? safe_deref_qdr_link_t(in_dlv->link_sp) : 0;
+    return (in_dlv && in_dlv->via_edge && link && link->conn == out_link->conn);
 }
 
 
@@ -313,9 +314,10 @@ static void qdr_forward_to_subscriber(qdr_core_t *core, qdr_subscription_t *sub,
     // Only if the message has been completely received, forward it to the subscription
     // Subscriptions, at the moment, dont have the ability to deal with partial messages
     //
-    if (receive_complete)
-        qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, in_msg);
-    else {
+    if (receive_complete) {
+        qdr_link_t *link = in_dlv ? safe_deref_qdr_link_t(in_dlv->link_sp) : 0;
+        qdr_forward_on_message_CT(core, sub, link, in_msg);
+    } else {
         //
         // Receive is not complete, we will store the sub in
         // in_dlv->subscriptions so we can send the message to the subscription
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 1e4f69c..e4666a5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -40,6 +40,19 @@ typedef struct qdr_connection_ref_t  qdr_connection_ref_t;
 typedef struct qdr_exchange          qdr_exchange_t;
 typedef struct qdr_edge_t            qdr_edge_t;
 
+ALLOC_DECLARE(qdr_address_t);
+ALLOC_DECLARE(qdr_address_config_t);
+ALLOC_DECLARE(qdr_node_t);
+ALLOC_DECLARE(qdr_router_ref_t);
+ALLOC_DECLARE(qdr_link_ref_t);
+ALLOC_DECLARE(qdr_link_route_t);
+ALLOC_DECLARE(qdr_auto_link_t);
+ALLOC_DECLARE(qdr_conn_identifier_t);
+ALLOC_DECLARE(qdr_connection_ref_t);
+
+ALLOC_DECLARE(qdr_connection_t);
+ALLOC_DECLARE(qdr_link_t);
+
 
 #include "core_link_endpoint.h"
 #include "core_events.h"
@@ -338,7 +351,6 @@ struct qdr_node_t {
     int               cost;
 };
 
-ALLOC_DECLARE(qdr_node_t);
 DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
 void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
 
@@ -352,7 +364,6 @@ struct qdr_router_ref_t {
     qdr_node_t *router;
 };
 
-ALLOC_DECLARE(qdr_router_ref_t);
 DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
 
 typedef enum {
@@ -386,7 +397,7 @@ struct qdr_delivery_t {
     void                   *context;
     sys_atomic_t            ref_count;
     bool                    ref_counted;   /// Used to protect against ref count going 1 -> 0 -> 1
-    qdr_link_t             *link;
+    qdr_link_t_sp           link_sp;       /// Safe pointer to the link
     qdr_delivery_t         *peer;          /// Use this peer if the delivery has one and only one peer.
     qdr_delivery_ref_t     *next_peer_ref;
     qd_message_t           *msg;
@@ -494,7 +505,6 @@ struct qdr_link_t {
     uint32_t  core_ticks;
 };
 
-ALLOC_DECLARE(qdr_link_t);
 DEQ_DECLARE(qdr_link_t, qdr_link_list_t);
 
 struct qdr_link_ref_t {
@@ -502,7 +512,6 @@ struct qdr_link_ref_t {
     qdr_link_t *link;
 };
 
-ALLOC_DECLARE(qdr_link_ref_t);
 DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
 
 void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
@@ -515,7 +524,6 @@ struct qdr_connection_ref_t {
     qdr_connection_t *conn;
 };
 
-ALLOC_DECLARE(qdr_connection_ref_t);
 DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t);
 
 void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
@@ -580,7 +588,6 @@ struct qdr_address_t {
     int priority;
 };
 
-ALLOC_DECLARE(qdr_address_t);
 DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
 
 qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config);
@@ -605,7 +612,6 @@ struct qdr_address_config_t {
     int                     priority;
 };
 
-ALLOC_DECLARE(qdr_address_config_t);
 DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t);
 void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr);
 bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
@@ -681,7 +687,6 @@ struct qdr_connection_t {
     bool                        closed; // This bit is used in the case where a client is trying to force close this connection.
 };
 
-ALLOC_DECLARE(qdr_connection_t);
 DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
 
 #define QDR_IS_LINK_ROUTE_PREFIX(p) ((p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_IN || (p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_OUT)
@@ -709,7 +714,6 @@ struct qdr_link_route_t {
     qdr_connection_t       *parent_conn;
 };
 
-ALLOC_DECLARE(qdr_link_route_t);
 void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr);
 void qdr_core_delete_auto_link (qdr_core_t *core,  qdr_auto_link_t *al);
 
@@ -755,7 +759,6 @@ struct qdr_auto_link_t {
     char                  *last_error;
 };
 
-ALLOC_DECLARE(qdr_auto_link_t);
 DEQ_DECLARE(qdr_auto_link_t, qdr_auto_link_list_t);
 
 
@@ -767,7 +770,6 @@ struct qdr_conn_identifier_t {
     qdr_auto_link_list_t       auto_link_refs;
 };
 
-ALLOC_DECLARE(qdr_conn_identifier_t);
 DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
 
 typedef struct qdr_priority_sheaf_t {
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7b1af31..3e6fe73 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -49,7 +49,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->link           = link;
+    set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
     dlv->msg            = msg;
     dlv->to_addr        = 0;
     dlv->origin         = ingress;
@@ -78,7 +78,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->link           = link;
+    set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
     dlv->msg            = msg;
     dlv->to_addr        = addr;
     dlv->origin         = ingress;
@@ -110,7 +110,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->link         = link;
+    set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
     dlv->msg          = msg;
     dlv->settled      = settled;
     dlv->presettled   = settled;
@@ -130,7 +130,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
 }
 
 
-qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv)
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *in_dlv)
 {
     qdr_action_t   *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
     action->args.connection.delivery = in_dlv;
@@ -140,7 +140,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv)
 
     // This incref is for the action reference
     qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
-    qdr_action_enqueue(in_dlv->link->core, action);
+    qdr_action_enqueue(core, action);
     return in_dlv;
 }
 
@@ -337,7 +337,7 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 
 qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery)
 {
-    return delivery ? delivery->link : 0;
+    return delivery ? safe_deref_qdr_link_t(delivery->link_sp) : 0;
 }
 
 
@@ -391,8 +391,9 @@ void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label)
     uint32_t rc = sys_atomic_inc(&delivery->ref_count);
     assert(rc > 0 || !delivery->ref_counted);
     delivery->ref_counted = true;
-    if (delivery->link)
-        qd_log(delivery->link->core->log, QD_LOG_DEBUG,
+    qdr_link_t *link = qdr_delivery_link(delivery);
+    if (link)
+        qd_log(link->core->log, QD_LOG_DEBUG,
                "Delivery incref:    dlv:%lx rc:%"PRIu32" %s", (long) delivery, rc + 1, label);
 }
 
@@ -515,7 +516,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
     // Remove a delivery from its unsettled list.  Side effects include issuing
     // replacement credit and visiting the link-quiescence algorithm
     //
-    qdr_link_t       *link  = dlv->link;
+    qdr_link_t       *link  = qdr_delivery_link(dlv);
     qdr_connection_t *conn  = link ? link->conn : 0;
     bool              moved = false;
 
@@ -561,7 +562,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 
 void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery)
 {
-    qdr_link_t *link = delivery->link;
+    qdr_link_t *link = qdr_delivery_link(delivery);
     if (link) {
         bool do_rate = false;
 
@@ -915,7 +916,12 @@ static long qdr_addr_path_count_CT(qdr_address_t *addr)
 
 static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr, bool more)
 {
-    if (dlv->link->link_type == QD_LINK_ENDPOINT)
+    qdr_link_t *dlv_link = qdr_delivery_link(dlv);
+
+    if (!dlv_link)
+        return;
+
+    if (dlv_link->link_type == QD_LINK_ENDPOINT)
         core->deliveries_ingress++;
 
     if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) {
@@ -930,7 +936,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         if (dlv->settled) {
             // Increment the presettled_dropped_deliveries on the in_link
             link->dropped_presettled_deliveries++;
-            if (dlv->link->link_type == QD_LINK_ENDPOINT)
+            if (dlv_link->link_type == QD_LINK_ENDPOINT)
                 core->dropped_presettled_deliveries++;
 
             //
@@ -1062,7 +1068,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
 
     qdr_delivery_t *dlv  = action->args.connection.delivery;
     bool            more = action->args.connection.more;
-    qdr_link_t     *link = dlv->link;
+    qdr_link_t     *link = qdr_delivery_link(dlv);
+
+    if (!link)
+        return;
 
     //
     // Record the ingress time so we can track the age of this delivery.
@@ -1203,6 +1212,9 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
     qdr_error_t    *error      = action->args.delivery.error;
     bool error_unassigned      = true;
 
+    qdr_link_t *dlv_link  = qdr_delivery_link(dlv);
+    qdr_link_t *peer_link = qdr_delivery_link(peer);
+
     //
     // Logic:
     //
@@ -1228,7 +1240,7 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
     if (settled) {
         if (peer) {
             peer->settled = true;
-            if (peer->link) {
+            if (peer_link) {
                 peer_moved = qdr_delivery_settled_CT(core, peer);
                 if (peer_moved)
                     push = true;
@@ -1236,15 +1248,15 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
             qdr_delivery_unlink_peers_CT(core, dlv, peer);
         }
 
-        if (dlv->link)
+        if (dlv_link)
             dlv_moved = qdr_delivery_settled_CT(core, dlv);
     }
 
     //
     // If the delivery's link has a core endpoint, notify the endpoint of the update
     //
-    if (dlv->link && dlv->link->core_endpoint)
-        qdrc_endpoint_do_update_CT(core, dlv->link->core_endpoint, dlv, settled);
+    if (dlv_link && dlv_link->core_endpoint)
+        qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
 
     if (push)
         qdr_delivery_push_CT(core, peer);
@@ -1276,29 +1288,30 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
 void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
 {
     qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+
     while (peer) {
-        qdr_link_work_t *work = peer->link_work;
+        qdr_link_work_t *work      = peer->link_work;
+        qdr_link_t      *peer_link = qdr_delivery_link(peer);
+
         //
         // Determines if the peer connection can be activated.
         // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
         // after the streaming message has been sent.
         //
-        if (work) {
-            sys_mutex_lock(peer->link->conn->work_lock);
-            if (work->processing || work == DEQ_HEAD(peer->link->work_list)) {
+        if (!!work && !!peer_link) {
+            sys_mutex_lock(peer_link->conn->work_lock);
+            if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
                 // Adding this work at priority 0.
-                qdr_add_link_ref(peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK);
-                sys_mutex_unlock(peer->link->conn->work_lock);
+                qdr_add_link_ref(peer_link->conn->links_with_work, peer_link, QDR_LINK_LIST_CLASS_WORK);
+                sys_mutex_unlock(peer_link->conn->work_lock);
 
                 //
                 // Activate the outgoing connection for later processing.
                 //
-                qdr_connection_activate_CT(core, peer->link->conn);
-            }
-            else {
-                sys_mutex_unlock(peer->link->conn->work_lock);
-
+                qdr_connection_activate_CT(core, peer_link->conn);
             }
+            else
+                sys_mutex_unlock(peer_link->conn->work_lock);
         }
 
         peer = qdr_delivery_next_peer_CT(in_dlv);
@@ -1313,11 +1326,12 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
 
     qdr_delivery_t *in_dlv  = action->args.connection.delivery;
     bool more = action->args.connection.more;
+    qdr_link_t *link = qdr_delivery_link(in_dlv);
 
     //
     // If it is already in the undelivered list, don't try to deliver this again.
     //
-    if (in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
+    if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
         qdr_deliver_continue_peers_CT(core, in_dlv);
 
         qd_message_t *msg = qdr_delivery_message(in_dlv);
@@ -1330,7 +1344,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
             qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions);
             while (sub) {
                 DEQ_REMOVE_HEAD(in_dlv->subscriptions);
-                qdr_forward_on_message_CT(core, sub, in_dlv->link, in_dlv->msg);
+                qdr_forward_on_message_CT(core, sub, link, in_dlv->msg);
                 sub = DEQ_HEAD(in_dlv->subscriptions);
             }
 
@@ -1367,7 +1381,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
 
                 // Remove the delivery from the settled list and decref the in_dlv.
                 in_dlv->where = QDR_DELIVERY_NOWHERE;
-                DEQ_REMOVE(in_dlv->link->settled, in_dlv);
+                DEQ_REMOVE(link->settled, in_dlv);
                 qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list");
             }
         }
@@ -1472,10 +1486,10 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
 
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
-    if (!dlv || !dlv->link)
+    qdr_link_t *link = qdr_delivery_link(dlv);
+    if (!link)
         return;
 
-    qdr_link_t *link = dlv->link;
     bool activate = false;
 
     sys_mutex_lock(link->conn->work_lock);
diff --git a/src/router_node.c b/src/router_node.c
index 2114ce8..19d2ae2 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -375,7 +375,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
             // We should not continue processing the message after it has been discarded
             //
             if (!qd_message_is_discard(msg)) {
-                qdr_deliver_continue(delivery);
+                qdr_deliver_continue(router->router_core, delivery);
             }
         }
         else {
@@ -452,7 +452,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         // We should not continue processing the message after it has been discarded
         //
         if (!qd_message_is_discard(msg)) {
-            qdr_deliver_continue(delivery);
+            qdr_deliver_continue(router->router_core, delivery);
         }
 
         return next_delivery;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org