You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/17 21:40:00 UTC

[qpid-dispatch] branch master updated: DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure. Added reproducer.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new f126d2d  DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure.  Added reproducer.
f126d2d is described below

commit f126d2da2df972353a2c53594aeb6fc598697796
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Mon Dec 17 16:19:52 2018 -0500

    DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure.  Added reproducer.
---
 include/qpid/dispatch/alloc_malloc.h               |  3 ++
 include/qpid/dispatch/alloc_pool.h                 |  1 +
 src/alloc_pool.c                                   | 13 +++++++
 .../modules/address_lookup_client/lookup_client.c  | 17 +++++++++
 tests/system_tests_link_route_credit.py            | 41 ++++++++++++++++++++++
 5 files changed, 75 insertions(+)

diff --git a/include/qpid/dispatch/alloc_malloc.h b/include/qpid/dispatch/alloc_malloc.h
index 72c390c..3ba6cf3 100644
--- a/include/qpid/dispatch/alloc_malloc.h
+++ b/include/qpid/dispatch/alloc_malloc.h
@@ -19,6 +19,8 @@
  * under the License.
  */
 
+#include <stdint.h>
+
 /**
  *@file
  *
@@ -37,6 +39,7 @@
 
 #define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0)
 
+static inline uint32_t qd_alloc_sequence(void *p) { return 0; }
 static inline void qd_alloc_initialize(void) {}
 static inline void qd_alloc_debug_dump(const char *file) {}
 static inline void qd_alloc_finalize(void) {}
diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h
index 1d5decc..6b66e88 100644
--- a/include/qpid/dispatch/alloc_pool.h
+++ b/include/qpid/dispatch/alloc_pool.h
@@ -73,6 +73,7 @@ typedef struct {
 void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool);
 /** De-allocate from a thread pool. Use via ALLOC_DECLARE */
 void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p);
+uint32_t qd_alloc_sequence(void *p);
 
 /**
  * Declare functions new_T and alloc_T
diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index d4613b6..170fd57 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -49,6 +49,7 @@ DEQ_DECLARE(qd_alloc_type_t, qd_alloc_type_list_t);
 
 struct qd_alloc_item_t {
     DEQ_LINKS(qd_alloc_item_t);
+    uint32_t              sequence;
 #ifdef QD_MEMORY_DEBUG
     qd_alloc_type_desc_t *desc;
     uint32_t              header;
@@ -185,6 +186,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
                 break;
             DEQ_ITEM_INIT(item);
             DEQ_INSERT_TAIL(pool->free_list, item);
+            item->sequence = 0;
 #if QD_MEMORY_STATS
             desc->stats->held_by_threads++;
             desc->stats->total_alloc_from_heap++;
@@ -239,6 +241,7 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
 
     qd_alloc_pool_t *pool = *tpool;
 
+    item->sequence++;
     DEQ_INSERT_TAIL(pool->free_list, item);
 
     if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
@@ -278,6 +281,16 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
 }
 
 
+uint32_t qd_alloc_sequence(void *p)
+{
+    if (!p)
+        return 0;
+
+    qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
+    return item->sequence;
+}
+
+
 void qd_alloc_initialize(void)
 {
     init_lock = sys_mutex();
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 039387b..048a399 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -45,6 +45,8 @@ typedef struct qcm_addr_lookup_request_t {
     DEQ_LINKS(struct qcm_addr_lookup_request_t);
     qdr_connection_t  *conn;
     qdr_link_t        *link;
+    uint32_t           conn_sequence;
+    uint32_t           link_sequence;
     qd_direction_t     dir;
     qdr_terminus_t    *source;
     qdr_terminus_t    *target;
@@ -520,6 +522,9 @@ static void qcm_addr_lookup_CT(void             *context,
         request->source = source;
         request->target = target;
 
+        request->conn_sequence = qd_alloc_sequence(conn);
+        request->link_sequence = qd_alloc_sequence(link);
+
         DEQ_INSERT_TAIL(client->pending_requests, request);
         qcm_addr_lookup_process_pending_requests_CT(client);
         return;
@@ -601,6 +606,18 @@ static uint64_t on_reply(qdr_core_t    *core,
     bool                         is_link_route;
     bool                         has_destinations;
 
+    //
+    // If the pointer sequences mismatch for either the connection or link,
+    // exit without processing because either the connection or link has
+    // been freed while the request was in-flight.
+    //
+    if (request->conn_sequence != qd_alloc_sequence(request->conn) ||
+        request->link_sequence != qd_alloc_sequence(request->link)) {
+        qdr_terminus_free(request->source);
+        qdr_terminus_free(request->target);
+        return 0;
+    }
+
     status = qcm_link_route_lookup_decode(app_properties, body, &is_link_route, &has_destinations);
     if (status == QCM_ADDR_LOOKUP_OK) {
         //
diff --git a/tests/system_tests_link_route_credit.py b/tests/system_tests_link_route_credit.py
index 8674519..04c4131 100644
--- a/tests/system_tests_link_route_credit.py
+++ b/tests/system_tests_link_route_credit.py
@@ -319,6 +319,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_29_fast_teardown_test(self):
+        test = LRFastTeardownTest(self.routers[2].addresses[0], "normal.29")
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -579,5 +584,41 @@ class LRDestReceiverFlowTest(MessagingHandler):
         container.run()
 
 
+class LRFastTeardownTest(MessagingHandler):
+    def __init__(self, host, address):
+        super(LRFastTeardownTest, self).__init__(prefetch=0)
+        self.host    = host
+        self.address = address
+
+        self.conn        = None
+        self.sender      = None
+        self.error       = None
+        self.last_action = "Test initialization"
+
+    def fail(self, text):
+        self.error = text
+        self.conn.close()
+        self.timer.cancel()
+
+    def timeout(self):
+        self.error = "Timeout Expired - last_action: %s" % (self.last_action)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.reactor     = event.reactor
+        self.timer       = event.reactor.schedule(7.0, Timeout(self))
+        self.conn        = event.container.connect(self.host)
+        self.last_action = "on_start"
+
+    def on_connection_opened(self, event):
+        self.sender = event.container.create_sender(self.conn, self.address)
+        self.conn.close()
+        self.timer.cancel()
+
+    def run(self):
+        container = Container(self)
+        container.run()
+
+
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org