You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/12/02 14:17:16 UTC
qpid-dispatch git commit: DISPATCH-582 - Fixed leak of qdr_delivery_t
objects when connections are lost. There were three problems that needed to
be solved: - In qdr_link_cleanup,
cross-linkage of unsettled and undelivered deliveries were not accou
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 69f52f283 -> 0e3d1431f
DISPATCH-582 - Fixed leak of qdr_delivery_t objects when connections are lost.
There were three problems that needed to be solved:
- In qdr_link_cleanup, cross-linkage of unsettled and undelivered deliveries were not
accounted for.
- With connection loss, linkage from abandoned pn_delivery_t to qdr_delivery_t was not
accounted for.
- The delivery refcount on forwarded deliveries was not properly initialized (this was only
a problem if atomics fell back to the default lock-protected-integers).
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0e3d1431
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0e3d1431
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0e3d1431
Branch: refs/heads/master
Commit: 0e3d1431fcdb5608428ec87e7c7b7687c6ec09ca
Parents: 69f52f2
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Dec 2 09:05:57 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Dec 2 09:05:57 2016 -0500
----------------------------------------------------------------------
src/router_core/connections.c | 29 ++++++++++++++++++---
src/router_core/forwarder.c | 10 ++++---
src/router_core/router_core_private.h | 2 +-
src/router_core/transfer.c | 42 +++++++-----------------------
4 files changed, 42 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 132be85..041e885 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -507,11 +507,24 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
DEQ_REMOVE_HEAD(undelivered);
peer = dlv->peer;
if (peer) {
+ dlv->peer = 0;
peer->peer = 0;
qdr_delivery_release_CT(core, peer);
qdr_delivery_decref(peer);
+ qdr_delivery_decref(dlv);
}
+
+ //
+ // Account for the undelivered-list reference
+ //
qdr_delivery_decref(dlv);
+
+ //
+ // Account for the lost reference from the Proton delivery
+ // for unsettled deliveries on incoming links
+ //
+ if (link->link_direction == QD_INCOMING && !dlv->settled)
+ qdr_delivery_decref(dlv);
dlv = DEQ_HEAD(undelivered);
}
@@ -523,20 +536,30 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
DEQ_REMOVE_HEAD(unsettled);
if (dlv->tracking_addr) {
- int link_bit = link->conn->mask_bit;
- assert(link_bit >= 0);
- dlv->tracking_addr->outstanding_deliveries[link_bit]--;
+ dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
dlv->tracking_addr->tracked_deliveries--;
dlv->tracking_addr = 0;
}
peer = dlv->peer;
if (peer) {
+ dlv->peer = 0;
peer->peer = 0;
if (link->link_direction == QD_OUTGOING)
qdr_delivery_failed_CT(core, peer);
+
qdr_delivery_decref(peer);
+ qdr_delivery_decref(dlv);
}
+
+ //
+ // Account for the unsettled-list reference
+ //
+ qdr_delivery_decref(dlv);
+
+ //
+ // Account for the lost reference from the Proton delivery
+ //
qdr_delivery_decref(dlv);
dlv = DEQ_HEAD(unsettled);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 3482068..7ece2f9 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -104,6 +104,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
uint64_t *tag = (uint64_t*) dlv->tag;
ZERO(dlv);
+ sys_atomic_init(&dlv->ref_count, 0);
dlv->link = link;
dlv->msg = qd_message_copy(msg);
dlv->settled = !in_dlv || in_dlv->settled;
@@ -119,7 +120,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
dlv->peer = in_dlv;
in_dlv->peer = dlv;
- sys_atomic_init(&dlv->ref_count, 1);
+ qdr_delivery_incref(dlv);
qdr_delivery_incref(in_dlv);
}
}
@@ -142,7 +143,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_link_t *link)
if (dlv->settled) {
DEQ_REMOVE(link->undelivered, dlv);
dlv->where = QDR_DELIVERY_NOWHERE;
- qdr_delivery_decref_LH(dlv);
+ qdr_delivery_decref(dlv);
}
dlv = next;
}
@@ -163,7 +164,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
DEQ_INSERT_TAIL(link->undelivered, dlv);
dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
- sys_atomic_inc(&dlv->ref_count);
+ qdr_delivery_incref(dlv);
//
// If the link isn't already on the links_with_deliveries list, put it there.
@@ -586,7 +587,8 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
//
if (in_delivery && !in_delivery->settled && chosen_link_bit >= 0) {
addr->outstanding_deliveries[chosen_link_bit]++;
- out_delivery->tracking_addr = addr;
+ out_delivery->tracking_addr = addr;
+ out_delivery->tracking_addr_bit = chosen_link_bit;
addr->tracked_deliveries++;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index cf94c75..5962a17 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -220,6 +220,7 @@ struct qdr_delivery_t {
int tag_length;
qd_bitmask_t *link_exclusion;
qdr_address_t *tracking_addr;
+ int tracking_addr_bit;
};
ALLOC_DECLARE(qdr_delivery_t);
@@ -616,7 +617,6 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
-void qdr_delivery_decref_LH(qdr_delivery_t *delivery);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0ef54f1..2595a70 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -253,27 +253,17 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
void qdr_delivery_incref(qdr_delivery_t *delivery)
{
- qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0;
-
- if (!!conn) {
- sys_atomic_inc(&delivery->ref_count);
- }
+ sys_atomic_inc(&delivery->ref_count);
}
-static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_held)
+void qdr_delivery_decref(qdr_delivery_t *delivery)
{
- qdr_link_t *link = delivery->link;
- qdr_connection_t *conn = link ? link->conn : 0;
- bool delete = false;
-
- if (!!conn) {
- uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
- assert(ref_count > 0);
- delete = (ref_count - 1) == 0;
- }
+ qdr_link_t *link = delivery->link;
+ uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
+ assert(ref_count > 0);
- if (delete) {
+ if (ref_count == 1) {
if (delivery->msg)
qd_message_free(delivery->msg);
@@ -281,8 +271,7 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel
qd_iterator_free(delivery->to_addr);
if (delivery->tracking_addr) {
- int link_bit = conn->mask_bit;
- delivery->tracking_addr->outstanding_deliveries[link_bit]--;
+ delivery->tracking_addr->outstanding_deliveries[delivery->tracking_addr_bit]--;
delivery->tracking_addr->tracked_deliveries--;
delivery->tracking_addr = 0;
}
@@ -306,18 +295,6 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel
}
-void qdr_delivery_decref(qdr_delivery_t *delivery)
-{
- qdr_delivery_decref_internal(delivery, false);
-}
-
-
-void qdr_delivery_decref_LH(qdr_delivery_t *delivery)
-{
- qdr_delivery_decref_internal(delivery, true);
-}
-
-
void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length)
{
*tag = (const char*) delivery->tag;
@@ -402,8 +379,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
sys_mutex_unlock(conn->work_lock);
if (dlv->tracking_addr) {
- int link_bit = link->conn->mask_bit;
- dlv->tracking_addr->outstanding_deliveries[link_bit]--;
+ dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
dlv->tracking_addr->tracked_deliveries--;
dlv->tracking_addr = 0;
}
@@ -829,7 +805,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
sys_mutex_lock(link->conn->work_lock);
if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
- sys_atomic_inc(&dlv->ref_count);
+ qdr_delivery_incref(dlv);
qdr_add_delivery_ref(&link->updated_deliveries, dlv);
qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
activate = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org