You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/12/02 14:17:16 UTC

qpid-dispatch git commit: DISPATCH-582 - Fixed leak of qdr_delivery_t objects when connections are lost. There were three problems that needed to be solved: - In qdr_link_cleanup, cross-linkage of unsettled and undelivered deliveries were not accou

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 69f52f283 -> 0e3d1431f


DISPATCH-582 - Fixed leak of qdr_delivery_t objects when connections are lost.
There were three problems that needed to be solved:
  - In qdr_link_cleanup, cross-linkage of unsettled and undelivered deliveries were not
    accounted for.
  - With connection loss, linkage from abandoned pn_delivery_t to qdr_delivery_t was not
    accounted for.
  - The delivery refcount on forwarded deliveries was not properly initialized (this was only
    a problem if atomics fell back to the default lock-protected-integers).


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0e3d1431
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0e3d1431
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0e3d1431

Branch: refs/heads/master
Commit: 0e3d1431fcdb5608428ec87e7c7b7687c6ec09ca
Parents: 69f52f2
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Dec 2 09:05:57 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Dec 2 09:05:57 2016 -0500

----------------------------------------------------------------------
 src/router_core/connections.c         | 29 ++++++++++++++++++---
 src/router_core/forwarder.c           | 10 ++++---
 src/router_core/router_core_private.h |  2 +-
 src/router_core/transfer.c            | 42 +++++++-----------------------
 4 files changed, 42 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 132be85..041e885 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -507,11 +507,24 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         DEQ_REMOVE_HEAD(undelivered);
         peer = dlv->peer;
         if (peer) {
+            dlv->peer  = 0;
             peer->peer = 0;
             qdr_delivery_release_CT(core, peer);
             qdr_delivery_decref(peer);
+            qdr_delivery_decref(dlv);
         }
+
+        //
+        // Account for the undelivered-list reference
+        //
         qdr_delivery_decref(dlv);
+
+        //
+        // Account for the lost reference from the Proton delivery
+        // for unsettled deliveries on incoming links
+        //
+        if (link->link_direction == QD_INCOMING && !dlv->settled)
+            qdr_delivery_decref(dlv);
         dlv = DEQ_HEAD(undelivered);
     }
 
@@ -523,20 +536,30 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         DEQ_REMOVE_HEAD(unsettled);
 
         if (dlv->tracking_addr) {
-            int link_bit = link->conn->mask_bit;
-            assert(link_bit >= 0);
-            dlv->tracking_addr->outstanding_deliveries[link_bit]--;
+            dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
             dlv->tracking_addr->tracked_deliveries--;
             dlv->tracking_addr = 0;
         }
 
         peer = dlv->peer;
         if (peer) {
+            dlv->peer  = 0;
             peer->peer = 0;
             if (link->link_direction == QD_OUTGOING)
                 qdr_delivery_failed_CT(core, peer);
+
             qdr_delivery_decref(peer);
+            qdr_delivery_decref(dlv);
         }
+
+        //
+        // Account for the unsettled-list reference
+        //
+        qdr_delivery_decref(dlv);
+
+        //
+        // Account for the lost reference from the Proton delivery
+        //
         qdr_delivery_decref(dlv);
         dlv = DEQ_HEAD(unsettled);
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 3482068..7ece2f9 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -104,6 +104,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
     uint64_t       *tag = (uint64_t*) dlv->tag;
 
     ZERO(dlv);
+    sys_atomic_init(&dlv->ref_count, 0);
     dlv->link       = link;
     dlv->msg        = qd_message_copy(msg);
     dlv->settled    = !in_dlv || in_dlv->settled;
@@ -119,7 +120,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
             dlv->peer = in_dlv;
             in_dlv->peer = dlv;
 
-            sys_atomic_init(&dlv->ref_count, 1);
+            qdr_delivery_incref(dlv);
             qdr_delivery_incref(in_dlv);
         }
     }
@@ -142,7 +143,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_link_t *link)
         if (dlv->settled) {
             DEQ_REMOVE(link->undelivered, dlv);
             dlv->where = QDR_DELIVERY_NOWHERE;
-            qdr_delivery_decref_LH(dlv);
+            qdr_delivery_decref(dlv);
         }
         dlv = next;
     }
@@ -163,7 +164,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
 
     DEQ_INSERT_TAIL(link->undelivered, dlv);
     dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
-    sys_atomic_inc(&dlv->ref_count);
+    qdr_delivery_incref(dlv);
 
     //
     // If the link isn't already on the links_with_deliveries list, put it there.
@@ -586,7 +587,8 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
         //
         if (in_delivery && !in_delivery->settled && chosen_link_bit >= 0) {
             addr->outstanding_deliveries[chosen_link_bit]++;
-            out_delivery->tracking_addr = addr;
+            out_delivery->tracking_addr     = addr;
+            out_delivery->tracking_addr_bit = chosen_link_bit;
             addr->tracked_deliveries++;
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index cf94c75..5962a17 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -220,6 +220,7 @@ struct qdr_delivery_t {
     int                  tag_length;
     qd_bitmask_t        *link_exclusion;
     qdr_address_t       *tracking_addr;
+    int                  tracking_addr_bit;
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
@@ -616,7 +617,6 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
-void qdr_delivery_decref_LH(qdr_delivery_t *delivery);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
 void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e3d1431/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0ef54f1..2595a70 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -253,27 +253,17 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 
 void qdr_delivery_incref(qdr_delivery_t *delivery)
 {
-    qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0;
-
-    if (!!conn) {
-        sys_atomic_inc(&delivery->ref_count);
-    }
+    sys_atomic_inc(&delivery->ref_count);
 }
 
 
-static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_held)
+void qdr_delivery_decref(qdr_delivery_t *delivery)
 {
-    qdr_link_t       *link   = delivery->link;
-    qdr_connection_t *conn   = link ? link->conn : 0;
-    bool              delete = false;
-    
-    if (!!conn) {
-        uint32_t ref_count = sys_atomic_dec(&delivery->ref_count);
-        assert(ref_count > 0);
-        delete = (ref_count - 1) == 0;
-    }
+    qdr_link_t *link      = delivery->link;
+    uint32_t    ref_count = sys_atomic_dec(&delivery->ref_count);
+    assert(ref_count > 0);
 
-    if (delete) {
+    if (ref_count == 1) {
         if (delivery->msg)
             qd_message_free(delivery->msg);
 
@@ -281,8 +271,7 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel
             qd_iterator_free(delivery->to_addr);
 
         if (delivery->tracking_addr) {
-            int link_bit = conn->mask_bit;
-            delivery->tracking_addr->outstanding_deliveries[link_bit]--;
+            delivery->tracking_addr->outstanding_deliveries[delivery->tracking_addr_bit]--;
             delivery->tracking_addr->tracked_deliveries--;
             delivery->tracking_addr = 0;
         }
@@ -306,18 +295,6 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel
 }
 
 
-void qdr_delivery_decref(qdr_delivery_t *delivery)
-{
-    qdr_delivery_decref_internal(delivery, false);
-}
-
-
-void qdr_delivery_decref_LH(qdr_delivery_t *delivery)
-{
-    qdr_delivery_decref_internal(delivery, true);
-}
-
-
 void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length)
 {
     *tag    = (const char*) delivery->tag;
@@ -402,8 +379,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
         sys_mutex_unlock(conn->work_lock);
 
     if (dlv->tracking_addr) {
-        int link_bit = link->conn->mask_bit;
-        dlv->tracking_addr->outstanding_deliveries[link_bit]--;
+        dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
         dlv->tracking_addr->tracked_deliveries--;
         dlv->tracking_addr = 0;
     }
@@ -829,7 +805,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 
     sys_mutex_lock(link->conn->work_lock);
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
-        sys_atomic_inc(&dlv->ref_count);
+        qdr_delivery_incref(dlv);
         qdr_add_delivery_ref(&link->updated_deliveries, dlv);
         qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
         activate = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org