You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/17 21:40:00 UTC
[qpid-dispatch] branch master updated: DISPATCH-1214 - Fixed race
between asynchronous address lookups and connection/link closure. Added
reproducer.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new f126d2d DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure. Added reproducer.
f126d2d is described below
commit f126d2da2df972353a2c53594aeb6fc598697796
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Mon Dec 17 16:19:52 2018 -0500
DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure. Added reproducer.
---
include/qpid/dispatch/alloc_malloc.h | 3 ++
include/qpid/dispatch/alloc_pool.h | 1 +
src/alloc_pool.c | 13 +++++++
.../modules/address_lookup_client/lookup_client.c | 17 +++++++++
tests/system_tests_link_route_credit.py | 41 ++++++++++++++++++++++
5 files changed, 75 insertions(+)
diff --git a/include/qpid/dispatch/alloc_malloc.h b/include/qpid/dispatch/alloc_malloc.h
index 72c390c..3ba6cf3 100644
--- a/include/qpid/dispatch/alloc_malloc.h
+++ b/include/qpid/dispatch/alloc_malloc.h
@@ -19,6 +19,8 @@
* under the License.
*/
+#include <stdint.h>
+
/**
*@file
*
@@ -37,6 +39,7 @@
#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0)
+static inline uint32_t qd_alloc_sequence(void *p) { return 0; }
static inline void qd_alloc_initialize(void) {}
static inline void qd_alloc_debug_dump(const char *file) {}
static inline void qd_alloc_finalize(void) {}
diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h
index 1d5decc..6b66e88 100644
--- a/include/qpid/dispatch/alloc_pool.h
+++ b/include/qpid/dispatch/alloc_pool.h
@@ -73,6 +73,7 @@ typedef struct {
void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool);
/** De-allocate from a thread pool. Use via ALLOC_DECLARE */
void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p);
+uint32_t qd_alloc_sequence(void *p);
/**
* Declare functions new_T and alloc_T
diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index d4613b6..170fd57 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -49,6 +49,7 @@ DEQ_DECLARE(qd_alloc_type_t, qd_alloc_type_list_t);
struct qd_alloc_item_t {
DEQ_LINKS(qd_alloc_item_t);
+ uint32_t sequence;
#ifdef QD_MEMORY_DEBUG
qd_alloc_type_desc_t *desc;
uint32_t header;
@@ -185,6 +186,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
break;
DEQ_ITEM_INIT(item);
DEQ_INSERT_TAIL(pool->free_list, item);
+ item->sequence = 0;
#if QD_MEMORY_STATS
desc->stats->held_by_threads++;
desc->stats->total_alloc_from_heap++;
@@ -239,6 +241,7 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
qd_alloc_pool_t *pool = *tpool;
+ item->sequence++;
DEQ_INSERT_TAIL(pool->free_list, item);
if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
@@ -278,6 +281,16 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
}
+uint32_t qd_alloc_sequence(void *p)
+{
+ if (!p)
+ return 0;
+
+ qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
+ return item->sequence;
+}
+
+
void qd_alloc_initialize(void)
{
init_lock = sys_mutex();
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 039387b..048a399 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -45,6 +45,8 @@ typedef struct qcm_addr_lookup_request_t {
DEQ_LINKS(struct qcm_addr_lookup_request_t);
qdr_connection_t *conn;
qdr_link_t *link;
+ uint32_t conn_sequence;
+ uint32_t link_sequence;
qd_direction_t dir;
qdr_terminus_t *source;
qdr_terminus_t *target;
@@ -520,6 +522,9 @@ static void qcm_addr_lookup_CT(void *context,
request->source = source;
request->target = target;
+ request->conn_sequence = qd_alloc_sequence(conn);
+ request->link_sequence = qd_alloc_sequence(link);
+
DEQ_INSERT_TAIL(client->pending_requests, request);
qcm_addr_lookup_process_pending_requests_CT(client);
return;
@@ -601,6 +606,18 @@ static uint64_t on_reply(qdr_core_t *core,
bool is_link_route;
bool has_destinations;
+ //
+ // If the pointer sequences mismatch for either the connection or link,
+ // exit without processing because either the connection or link has
+ // been freed while the request was in-flight.
+ //
+ if (request->conn_sequence != qd_alloc_sequence(request->conn) ||
+ request->link_sequence != qd_alloc_sequence(request->link)) {
+ qdr_terminus_free(request->source);
+ qdr_terminus_free(request->target);
+ return 0;
+ }
+
status = qcm_link_route_lookup_decode(app_properties, body, &is_link_route, &has_destinations);
if (status == QCM_ADDR_LOOKUP_OK) {
//
diff --git a/tests/system_tests_link_route_credit.py b/tests/system_tests_link_route_credit.py
index 8674519..04c4131 100644
--- a/tests/system_tests_link_route_credit.py
+++ b/tests/system_tests_link_route_credit.py
@@ -319,6 +319,11 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_29_fast_teardown_test(self):
+ test = LRFastTeardownTest(self.routers[2].addresses[0], "normal.29")
+ test.run()
+ self.assertEqual(None, test.error)
+
class Entity(object):
def __init__(self, status_code, status_description, attrs):
@@ -579,5 +584,41 @@ class LRDestReceiverFlowTest(MessagingHandler):
container.run()
+class LRFastTeardownTest(MessagingHandler):
+ def __init__(self, host, address):
+ super(LRFastTeardownTest, self).__init__(prefetch=0)
+ self.host = host
+ self.address = address
+
+ self.conn = None
+ self.sender = None
+ self.error = None
+ self.last_action = "Test initialization"
+
+ def fail(self, text):
+ self.error = text
+ self.conn.close()
+ self.timer.cancel()
+
+ def timeout(self):
+ self.error = "Timeout Expired - last_action: %s" % (self.last_action)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.reactor = event.reactor
+ self.timer = event.reactor.schedule(7.0, Timeout(self))
+ self.conn = event.container.connect(self.host)
+ self.last_action = "on_start"
+
+ def on_connection_opened(self, event):
+ self.sender = event.container.create_sender(self.conn, self.address)
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ container = Container(self)
+ container.run()
+
+
if __name__== '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org