You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/11/23 15:40:48 UTC
qpid-dispatch git commit: DISPATCH-573 - From Dan Skarbek - Numerous
memory leak fixes, bug fixes, and code consolidations. This closes #108
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 6a783b07f -> fbf6cfca4
DISPATCH-573 - From Dan Skarbek - Numerous memory leak fixes, bug fixes, and code consolidations.
This closes #108
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fbf6cfca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fbf6cfca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fbf6cfca
Branch: refs/heads/master
Commit: fbf6cfca44acc06b4aa2be958825a55b0bc1943c
Parents: 6a783b0
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Nov 23 10:37:10 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Nov 23 10:37:10 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 3 +-
src/connection_manager.c | 19 +++-
src/dispatch.c | 37 +++++--
src/dispatch_private.h | 12 +++
src/hash.c | 71 +++++++------
src/router_config.c | 158 +++++-----------------------
src/router_core/agent.c | 18 ++--
src/router_core/agent_config_address.c | 14 +--
src/router_core/connections.c | 10 +-
src/router_core/management_agent.c | 5 +-
src/router_core/route_tables.c | 13 +--
src/router_core/router_core.c | 78 +++++++++++++-
src/router_core/router_core_private.h | 4 +
src/trace_mask.c | 2 +
14 files changed, 219 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index b3f293b..292c497 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -594,7 +594,8 @@ typedef struct qdr_query_t qdr_query_t;
* @param out_body A composed field for the body of the response message
*/
void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t type,
- qd_iterator_t *name, qd_parsed_field_t *in_body, qd_composed_field_t *out_body);
+ qd_iterator_t *name, qd_parsed_field_t *in_body, qd_composed_field_t *out_body,
+ qd_buffer_list_t body_buffers);
/**
* qdr_manage_delete
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 1438760..f737b73 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -98,9 +98,13 @@ static void qd_server_config_free(qd_server_config_t *cf)
if (!cf) return;
free(cf->host);
free(cf->port);
- free(cf->name);
free(cf->role);
- free(cf->sasl_mechanisms);
+ if (cf->name) free(cf->name);
+ if (cf->protocol_family) free(cf->protocol_family);
+ if (cf->sasl_username) free(cf->sasl_username);
+ if (cf->sasl_password) free(cf->sasl_password);
+ if (cf->sasl_mechanisms) free(cf->sasl_mechanisms);
+ if (cf->ssl_profile) free(cf->ssl_profile);
memset(cf, 0, sizeof(*cf));
}
@@ -164,6 +168,10 @@ static void set_config_host(qd_server_config_t *config, qd_entity_t* entity)
config->host = host;
free(addr);
}
+ else {
+ free(host);
+ free(addr);
+ }
assert(config->host);
}
@@ -174,7 +182,6 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
bool authenticatePeer = qd_entity_opt_bool(entity, "authenticatePeer", false); CHECK();
bool verifyHostName = qd_entity_opt_bool(entity, "verifyHostName", true); CHECK();
- char *stripAnnotations = qd_entity_opt_string(entity, "stripAnnotations", 0); CHECK();
bool requireEncryption = qd_entity_opt_bool(entity, "requireEncryption", false); CHECK();
bool requireSsl = qd_entity_opt_bool(entity, "requireSsl", false); CHECK();
bool depRequirePeerAuth = qd_entity_opt_bool(entity, "requirePeerAuth", false); CHECK();
@@ -240,12 +247,15 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
config->allowInsecureAuthentication = true;
config->verify_host_name = verifyHostName;
+ char *stripAnnotations = qd_entity_opt_string(entity, "stripAnnotations", 0);
load_strip_annotations(config, stripAnnotations);
+ free(stripAnnotations);
+ stripAnnotations = 0;
+ CHECK();
config->requireAuthentication = authenticatePeer || depRequirePeerAuth;
config->requireEncryption = requireEncryption || !depAllowUnsecured;
-
if (config->ssl_profile) {
config->ssl_required = requireSsl || !depAllowUnsecured;
config->ssl_require_peer_authentication = config->sasl_mechanisms &&
@@ -264,7 +274,6 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
sys_atomic_inc(&(*ssl_profile)->ref_count);
}
- free(stripAnnotations);
return QD_ERROR_NONE;
error:
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 16b2a40..15f9ca8 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -61,8 +61,8 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
qd_error_initialize();
if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
- qd->router_area = strdup("0");
- qd->router_id = strdup("0");
+ qd_dispatch_set_router_area(qd, strdup("0"));
+ qd_dispatch_set_router_id(qd, strdup("0"));
qd->router_mode = QD_ROUTER_MODE_ENDPOINT;
qd_python_initialize(qd, python_pkgdir);
@@ -154,17 +154,20 @@ qd_error_t qd_dispatch_configure_container(qd_dispatch_t *qd, qd_entity_t *entit
qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity)
{
- qd->router_id = qd_entity_opt_string(entity, "routerId", 0); QD_ERROR_RET();
- if (! qd->router_id)
- qd->router_id = qd_entity_opt_string(entity, "id", 0); QD_ERROR_RET();
+ qd_dispatch_set_router_id(qd, qd_entity_opt_string(entity, "routerId", 0)); QD_ERROR_RET();
+ if (! qd->router_id) {
+ qd_dispatch_set_router_id(qd, qd_entity_opt_string(entity, "id", 0)); QD_ERROR_RET();
+ }
assert(qd->router_id);
qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET();
qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET();
- if (! qd->sasl_config_path)
+ if (! qd->sasl_config_path) {
qd->sasl_config_path = qd_entity_opt_string(entity, "saslConfigPath", 0); QD_ERROR_RET();
- if (! qd->sasl_config_name)
+ }
+ if (! qd->sasl_config_name) {
qd->sasl_config_name = qd_entity_opt_string(entity, "saslConfigName", "qdrouterd"); QD_ERROR_RET();
+ }
char *dump_file = qd_entity_opt_string(entity, "debugDump", 0); QD_ERROR_RET();
if (dump_file) {
@@ -266,11 +269,27 @@ void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent) {
qd->agent = agent;
}
+void qd_dispatch_set_router_id(qd_dispatch_t *qd, char *_id) {
+ if (qd->router_id) {
+ free(qd->router_id);
+ }
+ qd->router_id = _id;
+}
+
+void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area) {
+ if (qd->router_area) {
+ free(qd->router_area);
+ }
+ qd->router_area = _area;
+}
+
void qd_dispatch_free(qd_dispatch_t *qd)
{
if (!qd) return;
- free(qd->router_id);
- free(qd->router_area);
+ qd_dispatch_set_router_id(qd, NULL);
+ qd_dispatch_set_router_area(qd, NULL);
+ free(qd->sasl_config_path);
+ free(qd->sasl_config_name);
qd_connection_manager_free(qd->connection_manager);
qd_policy_free(qd->policy);
Py_XDECREF((PyObject*) qd->agent);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index 99c35d7..9e62085 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -141,4 +141,16 @@ void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void *impl);
/** Set the agent */
void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent);
+/**
+ * Set a new router id, freeing the prior id string
+ * TAKES OWNERSHIP OF THE POINTER PASSED TO IT
+ */
+void qd_dispatch_set_router_id(qd_dispatch_t *qd, char *_id);
+
+/**
+ * Set a new router area, freeing the prior area string
+ * TAKES OWNERSHIP OF THE POINTER PASSED TO IT
+ */
+void qd_dispatch_set_router_area(qd_dispatch_t *qd, char *_area);
+
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/hash.c
----------------------------------------------------------------------
diff --git a/src/hash.c b/src/hash.c
index 8e27a23..7b87c80 100644
--- a/src/hash.c
+++ b/src/hash.c
@@ -84,6 +84,17 @@ qd_hash_t *qd_hash(int bucket_exponent, int batch_size, int value_is_const)
return h;
}
+//remove the given item from the given bucket of the given hash
+//return the key if non-null key pointer given, otherwise, free the memory
+static void qd_hash_internal_remove_item(qd_hash_t *h, bucket_t *bucket, qd_hash_item_t *item, unsigned char **key) {
+ if (key)
+ *key = item->key;
+ else
+ free(item->key);
+ DEQ_REMOVE(bucket->items, item);
+ free_qd_hash_item_t(item);
+ h->size--;
+}
void qd_hash_free(qd_hash_t *h)
{
@@ -94,9 +105,7 @@ void qd_hash_free(qd_hash_t *h)
for (idx = 0; idx < h->bucket_count; idx++) {
item = DEQ_HEAD(h->buckets[idx].items);
while (item) {
- free(item->key);
- DEQ_REMOVE_HEAD(h->buckets[idx].items);
- free_qd_hash_item_t(item);
+ qd_hash_internal_remove_item(h, &h->buckets[idx], item, 0);
item = DEQ_HEAD(h->buckets[idx].items);
}
}
@@ -119,7 +128,7 @@ static qd_hash_item_t *qd_hash_internal_insert(qd_hash_t *h, qd_iterator_t *key,
while (item) {
if (qd_iterator_equal(key, item->key))
break;
- item = item->next;
+ item = DEQ_NEXT(item);
}
if (item) {
@@ -174,25 +183,30 @@ qd_error_t qd_hash_insert_const(qd_hash_t *h, qd_iterator_t *key, const void *va
{
assert(h->is_const);
- int error = 0;
- qd_hash_item_t *item = qd_hash_internal_insert(h, key, &error, handle);
+ int exists = 0;
+ qd_hash_item_t *item = qd_hash_internal_insert(h, key, &exists, handle);
- if (item)
- item->v.val_const = val;
- return error;
+ if (!item)
+ return QD_ERROR_ALLOC;
+
+ if (exists)
+ return QD_ERROR_ALREADY_EXISTS;
+
+ item->v.val_const = val;
+
+ return QD_ERROR_NONE;
}
static qd_hash_item_t *qd_hash_internal_retrieve_with_hash(qd_hash_t *h, uint32_t hash, qd_iterator_t *key)
{
- uint32_t idx = hash & h->bucket_mask;
-
+ uint32_t idx = hash & h->bucket_mask;
qd_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
while (item) {
if (qd_iterator_equal(key, item->key))
break;
- item = item->next;
+ item = DEQ_NEXT(item);
}
return item;
@@ -231,6 +245,9 @@ void qd_hash_retrieve_prefix_const(qd_hash_t *h, qd_iterator_t *iter, const void
{
assert(h->is_const);
+ //Hash individual segments by iterating thru the octets in the iterator.
+ qd_iterator_hash_view_segments(iter);
+
uint32_t hash = 0;
qd_hash_item_t *item;
@@ -276,24 +293,15 @@ qd_error_t qd_hash_retrieve_const(qd_hash_t *h, qd_iterator_t *key, const void *
qd_error_t qd_hash_remove(qd_hash_t *h, qd_iterator_t *key)
{
- uint32_t idx = qd_iterator_hash_view(key) & h->bucket_mask;
- qd_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
-
- while (item) {
- if (qd_iterator_equal(key, item->key))
- break;
- item = item->next;
- }
-
- if (item) {
- free(item->key);
- DEQ_REMOVE(h->buckets[idx].items, item);
- free_qd_hash_item_t(item);
- h->size--;
- return QD_ERROR_NONE;
- }
+ //the retrieve function will re-apply the bucket_mask, but that is ok
+ //we apply it here because we need the bucket index to do the remove
+ uint32_t idx = qd_iterator_hash_view(key) & h->bucket_mask;
+ qd_hash_item_t *item = qd_hash_internal_retrieve_with_hash(h, idx, key);
+ if (!item)
+ return QD_ERROR_NOT_FOUND;
- return QD_ERROR_NOT_FOUND;
+ qd_hash_internal_remove_item(h, &h->buckets[idx], item, 0);
+ return QD_ERROR_NONE;
}
@@ -326,9 +334,6 @@ qd_error_t qd_hash_remove_by_handle2(qd_hash_t *h, qd_hash_handle_t *handle, uns
{
if (!handle)
return QD_ERROR_NOT_FOUND;
- *key = handle->item->key;
- DEQ_REMOVE(handle->bucket->items, handle->item);
- free_qd_hash_item_t(handle->item);
- h->size--;
+ qd_hash_internal_remove_item(h, handle->bucket, handle->item, key);
return QD_ERROR_NONE;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_config.c
----------------------------------------------------------------------
diff --git a/src/router_config.c b/src/router_config.c
index c6c9847..a0b68c4 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -26,6 +26,27 @@
#include "entity_cache.h"
#include "schema_enum.h"
+static void qdi_router_configure_body(qdr_core_t *core,
+ qd_composed_field_t *body,
+ qd_router_entity_type_t type,
+ char *name)
+{
+ qd_buffer_list_t buffers;
+ qd_compose_take_buffers(body, &buffers);
+
+ qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, qd_buffer_list_length(&buffers), ITER_VIEW_ALL);
+ qd_parsed_field_t *in_body = qd_parse(iter);
+ qd_iterator_free(iter);
+
+ qd_iterator_t *name_iter = 0;
+ if (name)
+ name_iter = qd_iterator_string(name, ITER_VIEW_ALL);
+
+ qdr_manage_create(core, 0, type, name_iter, in_body, 0, buffers);
+
+ qd_iterator_free(name_iter);
+}
+
qd_error_t qd_router_configure_fixed_address(qd_router_t *router, qd_entity_t *entity)
{
static bool deprecate_warning = true;
@@ -79,24 +100,9 @@ qd_error_t qd_router_configure_fixed_address(qd_router_t *router, qd_entity_t *e
qd_compose_insert_string(body, distrib);
qd_compose_end_map(body);
- int length = 0;
- qd_buffer_list_t buffers;
-
- qd_compose_take_buffers(body, &buffers);
+ qdi_router_configure_body(router->router_core, body, QD_ROUTER_CONFIG_ADDRESS, 0);
qd_compose_free(body);
- qd_buffer_t *buf = DEQ_HEAD(buffers);
- while (buf) {
- length += qd_buffer_size(buf);
- buf = DEQ_NEXT(buf);
- }
-
- qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, length, ITER_VIEW_ALL);
- qd_parsed_field_t *in_body = qd_parse(iter);
- qd_iterator_free(iter);
-
- qdr_manage_create(router->router_core, 0, QD_ROUTER_CONFIG_ADDRESS, 0, in_body, 0);
-
free(prefix);
return qd_error_code();
}
@@ -109,33 +115,6 @@ qd_error_t qd_router_configure_waypoint(qd_router_t *router, qd_entity_t *entity
qd_log(router->log_source, QD_LOG_WARNING, "waypoint configuration is deprecated, switch to using autoLink instead.");
}
- /*
- char *address = qd_entity_get_string(entity, "address"); QD_ERROR_RET();
- char *connector = qd_entity_get_string(entity, "connector"); QD_ERROR_RET();
- int in_phase = qd_entity_opt_long(entity, "inPhase", 0); QD_ERROR_RET();
- int out_phase = qd_entity_opt_long(entity, "outPhase", 0); QD_ERROR_RET();
-
- if (in_phase > 9 || out_phase > 9) {
- qd_error_t err = qd_error(QD_ERROR_CONFIG,
- "Phases for waypoint '%s' must be between 0 and 9.", address);
- free(address);
- free(connector);
- return err;
- }
- qd_waypoint_t *waypoint = NEW(qd_waypoint_t);
- memset(waypoint, 0, sizeof(qd_waypoint_t));
- DEQ_ITEM_INIT(waypoint);
- waypoint->address = address;
- waypoint->in_phase = in_phase >= 0 ? (char) in_phase + '0' : '\0';
- waypoint->out_phase = out_phase >= 0 ? (char) out_phase + '0' : '\0';
- waypoint->connector_name = connector;
-
- DEQ_INSERT_TAIL(router->waypoints, waypoint);
-
- qd_log(router->log_source, QD_LOG_INFO,
- "Configured Waypoint: address=%s in_phase=%d out_phase=%d connector=%s",
- address, in_phase, out_phase, connector);
- */
return qd_error_code();
}
@@ -160,23 +139,8 @@ static void qd_router_add_link_route(qdr_core_t *core, const char *prefix, const
qd_compose_end_map(body);
- int length = 0;
- qd_buffer_list_t buffers;
-
- qd_compose_take_buffers(body, &buffers);
+ qdi_router_configure_body(core, body, QD_ROUTER_CONFIG_LINK_ROUTE, 0);
qd_compose_free(body);
-
- qd_buffer_t *buf = DEQ_HEAD(buffers);
- while (buf) {
- length += qd_buffer_size(buf);
- buf = DEQ_NEXT(buf);
- }
-
- qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, length, ITER_VIEW_ALL);
- qd_parsed_field_t *in_body = qd_parse(iter);
- qd_iterator_free(iter);
-
- qdr_manage_create(core, 0, QD_ROUTER_CONFIG_LINK_ROUTE, 0, in_body, 0);
}
@@ -264,32 +228,8 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity)
qd_compose_end_map(body);
- int length = 0;
- qd_buffer_list_t buffers;
-
- qd_compose_take_buffers(body, &buffers);
+ qdi_router_configure_body(router->router_core, body, QD_ROUTER_CONFIG_ADDRESS, name);
qd_compose_free(body);
-
- qd_buffer_t *buf = DEQ_HEAD(buffers);
- while (buf) {
- length += qd_buffer_size(buf);
- buf = DEQ_NEXT(buf);
- }
-
- qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, length, ITER_VIEW_ALL);
- qd_parsed_field_t *in_body = qd_parse(iter);
- qd_iterator_free(iter);
-
- qd_iterator_t *name_iter = 0;
-
- if (name)
- name_iter = qd_iterator_string(name, ITER_VIEW_ALL);
-
- qdr_manage_create(router->router_core, 0, QD_ROUTER_CONFIG_ADDRESS, name_iter, in_body, 0);
-
- qd_iterator_free(name_iter);
-
-
} while(0);
free(name);
@@ -356,31 +296,8 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti
qd_compose_end_map(body);
- int length = 0;
- qd_buffer_list_t buffers;
-
- qd_compose_take_buffers(body, &buffers);
+ qdi_router_configure_body(router->router_core, body, QD_ROUTER_CONFIG_LINK_ROUTE, name);
qd_compose_free(body);
-
- qd_buffer_t *buf = DEQ_HEAD(buffers);
- while (buf) {
- length += qd_buffer_size(buf);
- buf = DEQ_NEXT(buf);
- }
-
- qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, length, ITER_VIEW_ALL);
- qd_parsed_field_t *in_body = qd_parse(iter);
- qd_iterator_free(iter);
-
- qd_iterator_t *name_iter = 0;
-
- if (name)
- name_iter = qd_iterator_string(name, ITER_VIEW_ALL);
-
- qdr_manage_create(router->router_core, 0, QD_ROUTER_CONFIG_LINK_ROUTE, name_iter, in_body, 0);
-
- qd_iterator_free(name_iter);
-
} while(0);
free(name);
@@ -455,31 +372,8 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit
qd_compose_end_map(body);
- int length = 0;
- qd_buffer_list_t buffers;
-
- qd_compose_take_buffers(body, &buffers);
+ qdi_router_configure_body(router->router_core, body, QD_ROUTER_CONFIG_AUTO_LINK, name);
qd_compose_free(body);
-
- qd_buffer_t *buf = DEQ_HEAD(buffers);
- while (buf) {
- length += qd_buffer_size(buf);
- buf = DEQ_NEXT(buf);
- }
-
- qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, length, ITER_VIEW_ALL);
- qd_parsed_field_t *in_body = qd_parse(iter);
- qd_iterator_free(iter);
-
- qd_iterator_t *name_iter = 0;
-
- if (name)
- name_iter = qd_iterator_string(name, ITER_VIEW_ALL);
-
- qdr_manage_create(router->router_core, 0, QD_ROUTER_CONFIG_AUTO_LINK, name_iter, in_body, 0);
-
- qd_iterator_free(name_iter);
-
} while (0);
free(name);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index dc92e77..a96abb2 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -106,14 +106,16 @@ void qdr_manage_create(qdr_core_t *core,
qd_router_entity_type_t type,
qd_iterator_t *name,
qd_parsed_field_t *in_body,
- qd_composed_field_t *out_body)
+ qd_composed_field_t *out_body,
+ qd_buffer_list_t body_buffers)
{
qdr_action_t *action = qdr_action(qdr_manage_create_CT, "manage_create");
// Create a query object here
- action->args.agent.query = qdr_query(core, context, type, out_body);
- action->args.agent.name = qdr_field_from_iter(name);
- action->args.agent.in_body = in_body;
+ action->args.agent.query = qdr_query(core, context, type, out_body);
+ action->args.agent.name = qdr_field_from_iter(name);
+ action->args.agent.in_body = in_body;
+ action->args.agent.body_buffers = body_buffers;
qdr_action_enqueue(core, action);
}
@@ -354,9 +356,10 @@ static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool disc
static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- qd_iterator_t *name = qdr_field_iterator(action->args.agent.name);
- qdr_query_t *query = action->args.agent.query;
- qd_parsed_field_t *in_body = action->args.agent.in_body;
+ qd_iterator_t *name = qdr_field_iterator(action->args.agent.name);
+ qdr_query_t *query = action->args.agent.query;
+ qd_parsed_field_t *in_body = action->args.agent.in_body;
+ qd_buffer_list_t body_buffers = action->args.agent.body_buffers;
switch (query->entity_type) {
case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_create_CT(core, name, query, in_body); break;
@@ -373,6 +376,7 @@ static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool di
qdr_field_free(action->args.agent.name);
qd_parse_free(in_body);
+ qd_buffer_list_free_buffers(&body_buffers);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/agent_config_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_address.c b/src/router_core/agent_config_address.c
index e3037ba..c10f80e 100644
--- a/src/router_core/agent_config_address.c
+++ b/src/router_core/agent_config_address.c
@@ -272,19 +272,7 @@ void qdra_config_address_delete_CT(qdr_core_t *core,
addr = qdr_address_config_find_by_name_CT(core, name);
if (addr) {
- //
- // Remove the address from the list and the hash index.
- //
- qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
- DEQ_REMOVE(core->addr_config, addr);
-
- //
- // Free resources associated with this address.
- //
- if (addr->name)
- free(addr->name);
- free_qdr_address_config_t(addr);
-
+ qdr_core_remove_address_config(core, addr);
query->status = QD_AMQP_NO_CONTENT;
} else
query->status = QD_AMQP_NOT_FOUND;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index fc41f57..231095a 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -752,15 +752,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion &&
addr->tracked_deliveries == 0) {
- qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
- DEQ_REMOVE(core->addrs, addr);
- qd_hash_handle_free(addr->hash_handle);
- qd_bitmask_free(addr->rnodes);
- if (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST)
- qd_bitmask_free(addr->closest_remotes);
- else if (addr->treatment == QD_TREATMENT_ANYCAST_BALANCED)
- free(addr->outstanding_deliveries);
- free_qdr_address_t(addr);
+ qdr_core_remove_address(core, addr);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index 141cf02..9fb6503 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -303,8 +303,9 @@ static void qd_core_agent_create_handler(qdr_core_t *core,
qd_parsed_field_t *in_body = qd_parse(body_iter);
- qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body);
-
+ qd_buffer_list_t empty_list;
+ DEQ_INIT(empty_list);
+ qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body, empty_list);
qd_iterator_free(body_iter);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 2c80cc6..1110985 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -379,17 +379,8 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
//
// Free the router node and the owning address records.
//
- qd_bitmask_free(rnode->valid_origins);
- DEQ_REMOVE(core->routers, rnode);
- core->cost_epoch++;
- free_qdr_node_t(rnode);
-
- qd_hash_remove_by_handle(core->addr_hash, oaddr->hash_handle);
- DEQ_REMOVE(core->addrs, oaddr);
- qd_hash_handle_free(oaddr->hash_handle);
- core->routers_by_mask_bit[router_maskbit] = 0;
- qd_bitmask_free(oaddr->rnodes);
- free_qdr_address_t(oaddr);
+ qdr_router_node_free(core, rnode);
+ qdr_core_remove_address(core, oaddr);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 89d8960..635d85c 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -98,17 +98,59 @@ void qdr_core_free(qdr_core_t *core)
//
// Free the core resources
//
- qdr_core_unsubscribe(core->agent_subscription_mobile);
- qdr_core_unsubscribe(core->agent_subscription_local);
sys_thread_free(core->thread);
sys_cond_free(core->action_cond);
sys_mutex_free(core->action_lock);
sys_mutex_free(core->work_lock);
sys_mutex_free(core->id_lock);
qd_timer_free(core->work_timer);
+ //we can't call qdr_core_unsubscribe on the subscriptions because the action processing thread has
+ //already been shut down. But, all the action would have done at this point is free the subscriptions
+ //so we just do that directly.
+ free(core->agent_subscription_mobile);
+ free(core->agent_subscription_local);
+
+ for (int i = 0; i <= QD_TREATMENT_LINK_BALANCED; ++i) {
+ if (core->forwarders[i]) {
+ free(core->forwarders[i]);
+ }
+ }
+
+ qdr_address_t *addr = 0;
+ while ( (addr = DEQ_HEAD(core->addrs)) ) {
+ qdr_core_remove_address(core, addr);
+ }
+ qdr_address_config_t *addr_config = 0;
+ while ( (addr_config = DEQ_HEAD(core->addr_config))) {
+ qdr_core_remove_address_config(core, addr_config);
+ }
+ qd_hash_free(core->addr_hash);
+
+ qd_hash_free(core->conn_id_hash);
+ //TODO what about the actual connection identifier objects?
+
+ qdr_node_t *rnode = 0;
+ while ( (rnode = DEQ_HEAD(core->routers)) ) {
+ qdr_router_node_free(core, rnode);
+ }
+
+ if (core->query_lock) sys_mutex_free(core->query_lock);
+ if (core->routers_by_mask_bit) free(core->routers_by_mask_bit);
+ if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
+ if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit);
+ if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
+
free(core);
}
+void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode)
+{
+ qd_bitmask_free(rnode->valid_origins);
+ DEQ_REMOVE(core->routers, rnode);
+ core->routers_by_mask_bit[rnode->mask_bit] = 0;
+ core->cost_epoch++;
+ free_qdr_node_t(rnode);
+}
ALLOC_DECLARE(qdr_field_t);
ALLOC_DEFINE(qdr_field_t);
@@ -241,7 +283,6 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha
if (!addr) {
addr = qdr_address_CT(core, treatment);
qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(core->addrs, addr);
addr->block_deletion = true;
addr->local = (aclass == 'L');
@@ -250,6 +291,37 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha
return addr;
}
+void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr)
+{
+ // Remove the address from the list and hash index
+ qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
+ DEQ_REMOVE(core->addrs, addr);
+
+ // Free resources associated with this address
+ qd_hash_handle_free(addr->hash_handle);
+ qd_bitmask_free(addr->rnodes);
+ if (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST) {
+ qd_bitmask_free(addr->closest_remotes);
+ }
+ else if (addr->treatment == QD_TREATMENT_ANYCAST_BALANCED) {
+ free(addr->outstanding_deliveries);
+ }
+ free_qdr_address_t(addr);
+}
+
+void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr)
+{
+ // Remove the address from the list and the hash index.
+ qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
+ DEQ_REMOVE(core->addr_config, addr);
+
+ // Free resources associated with this address.
+ if (addr->name) {
+ free(addr->name);
+ }
+ qd_hash_handle_free(addr->hash_handle);
+ free_qdr_address_config_t(addr);
+}
void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
{
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 24c76b9..61d86a2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -142,6 +142,7 @@ struct qdr_action_t {
qdr_field_t *identity;
qdr_field_t *name;
qd_parsed_field_t *in_body;
+ qd_buffer_list_t body_buffers;
} agent;
} args;
@@ -182,6 +183,7 @@ struct qdr_node_t {
ALLOC_DECLARE(qdr_node_t);
DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
+void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
#define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0)
#define PEER_DATA_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->data_links_by_mask_bit[n->link_mask_bit] : 0)
@@ -364,6 +366,7 @@ DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment);
qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_treatment_t treatment);
+void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr);
struct qdr_address_config_t {
DEQ_LINKS(qdr_address_config_t);
@@ -377,6 +380,7 @@ struct qdr_address_config_t {
ALLOC_DECLARE(qdr_address_config_t);
DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t);
+void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr);
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fbf6cfca/src/trace_mask.c
----------------------------------------------------------------------
diff --git a/src/trace_mask.c b/src/trace_mask.c
index 7fc9882..5c52d9b 100644
--- a/src/trace_mask.c
+++ b/src/trace_mask.c
@@ -58,6 +58,7 @@ void qd_tracemask_free(qd_tracemask_t *tm)
if (tm->router_by_mask_bit[i])
qd_tracemask_del_router(tm, i);
}
+ free(tm->router_by_mask_bit);
qd_hash_free(tm->hash);
sys_rwlock_free(tm->lock);
@@ -89,6 +90,7 @@ void qd_tracemask_del_router(qd_tracemask_t *tm, int maskbit)
if (maskbit < qd_bitmask_width() && tm->router_by_mask_bit[maskbit] != 0) {
qdtm_router_t *router = tm->router_by_mask_bit[maskbit];
qd_hash_remove_by_handle(tm->hash, router->hash_handle);
+ qd_hash_handle_free(router->hash_handle);
tm->router_by_mask_bit[maskbit] = 0;
free_qdtm_router_t(router);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org