You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/06 23:27:12 UTC
[2/3] qpid-dispatch git commit: DISPATCH-179 - Updated wiring of the
core management agent to use the core's Message sending facilities.
DISPATCH-179 - Updated wiring of the core management agent to use the core's
Message sending facilities.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bc3ffca5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bc3ffca5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bc3ffca5
Branch: refs/heads/tross-DISPATCH-179-1
Commit: bc3ffca5402d4b95ea303d63ea0a9beb057088f8
Parents: 0ba77e3
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Jan 6 16:20:28 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Jan 6 16:20:28 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 10 ++++-
src/CMakeLists.txt | 1 -
src/python_embedded.c | 2 +-
src/router_config.c | 10 ++++-
src/router_core/management_agent.c | 77 ++++++++++++++------------------
src/router_core/router_core.c | 2 +-
src/router_core/transfer.c | 14 +++++-
src/router_node.c | 62 -------------------------
src/router_private.h | 9 ----
9 files changed, 65 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 84c9274..25727c2 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -102,11 +102,14 @@ void qdr_core_unsubscribe(qdr_subscription_t *sub);
* @param core Pointer to the core module
* @param msg Pointer to the message to be sent. The message will be copied during the call
* and must be freed by the caller if the caller doesn't need to hold it for later use.
- * @param addr Null-terminated string containing the address to which the message should be delivered.
+ * @param addr Field iterator describing the address to which the message should be delivered.
* @param exclude_inprocess If true, the message will not be sent to in-process subscribers.
* @param control If true, this message is to be treated as control traffic and flow on a control link.
*/
-void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess, bool control);
+void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr,
+ bool exclude_inprocess, bool control);
+void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr,
+ bool exclude_inprocess, bool control);
/**
@@ -468,6 +471,9 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
* @param link Pointer to the link over which the message arrived.
* @param msg Pointer to the delivered message. The sender is giving this reference to the router
* core. The sender _must not_ free or otherwise use the message after invoking this function.
+ * @param ingress Field iterator referencing the value of the ingress-router header. NOTE: This
+ * iterator is assumed to reference content in the message that will stay valid
+ * through the lifetime of the message.
* @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
*/
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e1d8097..5301081 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -53,7 +53,6 @@ set(qpid_dispatch_SOURCES
dispatch.c
entity.c
entity_cache.c
- lrp.c
hash.c
iovec.c
iterator.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/python_embedded.c
----------------------------------------------------------------------
diff --git a/src/python_embedded.c b/src/python_embedded.c
index f8afe25..e5b88f8 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -605,7 +605,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
qd_message_compose_2(msg, field);
PyObject *address = PyObject_GetAttrString(message, "address");
if (address) {
- qdr_send_to(ioa->core, msg, PyString_AsString(address), (bool) no_echo, (bool) control);
+ qdr_send_to2(ioa->core, msg, PyString_AsString(address), (bool) no_echo, (bool) control);
Py_DECREF(address);
}
qd_compose_free(field);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/router_config.c
----------------------------------------------------------------------
diff --git a/src/router_config.c b/src/router_config.c
index 315a468..0d95f57 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -27,7 +27,9 @@
#include "entity_cache.h"
#include "schema_enum.h"
-qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) {
+qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity)
+{
+ /*
qd_error_clear();
int phase = qd_entity_opt_long(entity, "phase", 0); QD_ERROR_RET();
qd_schema_fixedAddress_fanout_t fanout = qd_entity_get_long(entity, "fanout"); QD_ERROR_RET();
@@ -107,6 +109,7 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity)
addr->last_phase = addr_phase->phase;
DEQ_INSERT_TAIL(addr->phases, addr_phase);
free(prefix);
+ */
return qd_error_code();
}
@@ -144,6 +147,7 @@ qd_error_t qd_router_configure_waypoint(qd_router_t *router, qd_entity_t *entity
qd_error_t qd_router_configure_lrp(qd_router_t *router, qd_entity_t *entity)
{
+ /*
char *prefix = qd_entity_get_string(entity, "prefix"); QD_ERROR_RET();
char *connector = qd_entity_get_string(entity, "connector"); QD_ERROR_RET();
char *direction = qd_entity_get_string(entity, "dir"); QD_ERROR_RET();
@@ -264,6 +268,7 @@ qd_error_t qd_router_configure_lrp(qd_router_t *router, qd_entity_t *entity)
qd_field_iterator_free(iter);
free(prefix);
free(connector);
+ */
return qd_error_code();
}
@@ -271,7 +276,7 @@ qd_error_t qd_router_configure_lrp(qd_router_t *router, qd_entity_t *entity)
void qd_router_configure_free(qd_router_t *router)
{
if (!router) return;
-
+ /*
for (qd_config_address_t *ca = DEQ_HEAD(router->config_addrs); ca; ca = DEQ_HEAD(router->config_addrs)) {
for (qd_config_phase_t *ap = DEQ_HEAD(ca->phases); ap; ap = DEQ_HEAD(ca->phases)) {
DEQ_REMOVE_HEAD(ca->phases);
@@ -296,6 +301,7 @@ void qd_router_configure_free(qd_router_t *router)
DEQ_REMOVE_HEAD(router->lrp_containers);
free(lrpc);
}
+ */
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index ffb40be..3aae26a 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -69,14 +69,14 @@ typedef enum {
typedef struct qd_management_context_t {
- qd_message_t *msg;
- qd_message_t *source;
- qd_composed_field_t *field;
- qdr_query_t *query;
- qd_dispatch_t *qd;
- int count;
- int current_count;
- qd_router_operation_type_t operation_type;
+ qd_message_t *msg;
+ qd_message_t *source;
+ qd_composed_field_t *field;
+ qdr_query_t *query;
+ qdr_core_t *core;
+ int count;
+ int current_count;
+ qd_router_operation_type_t operation_type;
} qd_management_context_t ;
ALLOC_DECLARE(qd_management_context_t);
@@ -89,7 +89,7 @@ static qd_management_context_t* qd_management_context(qd_message_t
qd_message_t *source,
qd_composed_field_t *field,
qdr_query_t *query,
- qd_dispatch_t *qd,
+ qdr_core_t *core,
qd_router_operation_type_t operation_type,
int count)
{
@@ -98,12 +98,9 @@ static qd_management_context_t* qd_management_context(qd_message_t
ctx->field = field;
ctx->msg = msg;
ctx->source = source;
- if (query)
- ctx->query = query;
- else
- ctx->query = 0;
+ ctx->query = query;
ctx->current_count = 0;
- ctx->qd = qd;
+ ctx->core = core;
ctx->operation_type = operation_type;
return ctx;
@@ -189,7 +186,7 @@ static void qd_manage_response_handler (void *context, const qd_amqp_error_t *st
// Finally, compose and send the message.
qd_message_compose_3(ctx->msg, fld, ctx->field);
- qd_router_send(ctx->qd, reply_to, ctx->msg);
+ qdr_send_to1(ctx->core, ctx->msg, reply_to, true, false);
// We have come to the very end. Free the appropriate memory.
// ctx->field has already been freed in the call to qd_compose_end_list(ctx->field)
@@ -202,15 +199,13 @@ static void qd_manage_response_handler (void *context, const qd_amqp_error_t *st
}
-static void qd_core_agent_query_handler(qd_dispatch_t *qd,
- qd_router_entity_type_t entity_type,
- qd_router_operation_type_t operation_type,
+static void qd_core_agent_query_handler(qdr_core_t *core,
+ qd_router_entity_type_t entity_type,
+ qd_router_operation_type_t operation_type,
qd_message_t *msg,
int *count,
int *offset)
{
- qdr_core_t *core = qd_router_core(qd);
-
//
// Add the Body
//
@@ -222,7 +217,7 @@ static void qd_core_agent_query_handler(qd_dispatch_t *qd,
qd_compose_insert_string(field, attribute_names_key); //add a "attributeNames" key
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
- qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, qd, operation_type, (*count));
+ qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, core, operation_type, (*count));
// Grab the attribute names from the incoming message body. The attribute names will be used later on in the response.
qd_parsed_field_t *attribute_names_parsed_field = 0;
@@ -243,15 +238,13 @@ static void qd_core_agent_query_handler(qd_dispatch_t *qd,
}
-static void qd_core_agent_read_handler(qd_dispatch_t *qd,
+static void qd_core_agent_read_handler(qdr_core_t *core,
qd_message_t *msg,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_field_iterator_t *identity_iter,
qd_field_iterator_t *name_iter)
{
- qdr_core_t *core = qd_router_core(qd);
-
//
// Add the Body
//
@@ -261,21 +254,19 @@ static void qd_core_agent_read_handler(qd_dispatch_t *qd,
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
- qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, qd, operation_type, 0);
+ qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
//Call the read API function
qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body);
}
-static void qd_core_agent_create_handler(qd_dispatch_t *qd,
+static void qd_core_agent_create_handler(qdr_core_t *core,
qd_message_t *msg,
qd_router_entity_type_t entity_type,
qd_router_operation_type_t operation_type,
qd_field_iterator_t *name_iter)
{
- qdr_core_t *core = qd_router_core(qd);
-
//
// Add the Body
//
@@ -285,7 +276,7 @@ static void qd_core_agent_create_handler(qd_dispatch_t *qd,
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
- qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, qd, operation_type, 0);
+ qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0);
qdr_manage_create(core, ctx, entity_type, name_iter, qd_parse(qd_message_field_iterator(msg, QD_FIELD_BODY)), out_body);
}
@@ -297,15 +288,13 @@ static void qd_core_agent_update_handler()
}
-static void qd_core_agent_delete_handler(qd_dispatch_t *qd,
+static void qd_core_agent_delete_handler(qdr_core_t *core,
qd_message_t *msg,
- qd_router_entity_type_t entity_type,
- qd_router_operation_type_t operation_type,
+ qd_router_entity_type_t entity_type,
+ qd_router_operation_type_t operation_type,
qd_field_iterator_t *identity_iter,
qd_field_iterator_t *name_iter)
{
- qdr_core_t *core = qd_router_core(qd);
-
//
// Add the Body
//
@@ -315,7 +304,7 @@ static void qd_core_agent_delete_handler(qd_dispatch_t *qd,
qdr_manage_handler(core, qd_manage_response_handler);
// Call local function that creates and returns a qd_management_context_t containing the values passed in.
- qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, qd, operation_type, 0);
+ qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter);
}
@@ -420,7 +409,7 @@ static bool qd_can_handle_request(qd_field_iterator_t *props,
*/
void management_agent_handler(void *context, qd_message_t *msg, int link_id)
{
- qd_dispatch_t *qd = (qd_dispatch_t*) context;
+ qdr_core_t *core = (qdr_core_t*) context;
qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
qd_router_entity_type_t entity_type = 0;
@@ -435,25 +424,27 @@ void management_agent_handler(void *context, qd_message_t *msg, int link_id)
if (qd_can_handle_request(app_properties_iter, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) {
switch (operation_type) {
case QD_ROUTER_OPERATION_QUERY:
- qd_core_agent_query_handler(qd, entity_type, operation_type, msg, &count, &offset);
+ qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset);
break;
case QD_ROUTER_OPERATION_CREATE:
- qd_core_agent_create_handler(qd, msg, entity_type, operation_type, name_iter);
+ qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter);
break;
case QD_ROUTER_OPERATION_READ:
- qd_core_agent_read_handler(qd, msg, entity_type, operation_type, identity_iter, name_iter);
+ qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
break;
case QD_ROUTER_OPERATION_UPDATE:
qd_core_agent_update_handler();
break;
case QD_ROUTER_OPERATION_DELETE:
- qd_core_agent_delete_handler(qd, msg, entity_type, operation_type, identity_iter, name_iter);
+ qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
break;
}
+ } else {
+ //
+ // The C management agent is not going to handle this request. Forward it off to Python.
+ //
+ qdr_send_to2(core, msg, MANAGEMENT_INTERNAL, false, false);
}
- else
- qd_router_send2(qd, MANAGEMENT_INTERNAL, msg); //the C management agent is not going to handle this request. Forward it off to Python.
- // TODO - This is wrong. Need to find out how I can forward off the message to $management_internal so it can be handled by Python.
qd_field_iterator_free(app_properties_iter);
qd_field_iterator_free(name_iter);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 0625784..c043e09 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -97,7 +97,7 @@ qdr_field_t *qdr_field(const char *text)
if (length == 0)
return 0;
- qdr_field_t *field = new_qdr_field_t();
+ qdr_field_t *field = new_qdr_field_t();
qd_buffer_t *buf;
ZERO(field);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ce7069f..0a2ae66 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -65,7 +65,19 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
}
-void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess, bool control)
+void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess, bool control)
+{
+ qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
+ //action->args.io.address = qdr_field(addr); // TODO - fix this
+ action->args.io.message = qd_message_copy(msg);
+ action->args.io.exclude_inprocess = exclude_inprocess;
+ action->args.io.control = control;
+
+ qdr_action_enqueue(core, action);
+}
+
+
+void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess, bool control)
{
qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
action->args.io.address = qdr_field(addr);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ce4bc52..c775434 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -39,30 +39,7 @@ static char *on_demand_role = "on-demand";
static char *direct_prefix;
static char *node_id;
-ALLOC_DEFINE(qd_routed_event_t);
-ALLOC_DEFINE(qd_router_ref_t);
-ALLOC_DEFINE(qd_router_link_ref_t);
ALLOC_DEFINE(qd_router_lrp_ref_t);
-ALLOC_DEFINE(qd_address_t);
-ALLOC_DEFINE(qd_router_conn_t);
-
-
-qd_address_t* qd_address(qd_address_semantics_t semantics)
-{
- qd_address_t* addr = new_qd_address_t();
- memset(addr, 0, sizeof(qd_address_t));
- DEQ_ITEM_INIT(addr);
- DEQ_INIT(addr->lrps);
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- addr->semantics = semantics;
- addr->forwarder = 0; //qd_router_get_forwarder(semantics);
- return addr;
-}
-
-const char* qd_address_logstr(qd_address_t* address) {
- return (char*)qd_hash_key_by_handle(address->hash_handle);
-}
void qd_router_add_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp)
{
@@ -526,13 +503,6 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
}
size_t dplen = 9 + strlen(area) + strlen(id);
- direct_prefix = (char*) malloc(dplen);
- strcpy(direct_prefix, "_topo/");
- strcat(direct_prefix, area);
- strcat(direct_prefix, "/");
- strcat(direct_prefix, id);
- strcat(direct_prefix, "/");
-
node_id = (char*) malloc(dplen);
strcpy(node_id, area);
strcat(node_id, "/");
@@ -551,19 +521,8 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
router->router_area = area;
router->router_id = id;
router->node = qd_container_set_default_node_type(qd, &router_node, (void*) router, QD_DIST_BOTH);
- DEQ_INIT(router->addrs);
- router->addr_hash = qd_hash(10, 32, 0);
-
DEQ_INIT(router->lrp_containers);
- router->out_links_by_mask_bit = NEW_PTR_ARRAY(qd_router_link_t, qd_bitmask_width());
- router->routers_by_mask_bit = NEW_PTR_ARRAY(qd_router_node_t, qd_bitmask_width());
- for (int idx = 0; idx < qd_bitmask_width(); idx++) {
- router->out_links_by_mask_bit[idx] = 0;
- router->routers_by_mask_bit[idx] = 0;
- }
-
- router->neighbor_free_mask = qd_bitmask(1);
router->lock = sys_mutex();
router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router);
router->dtag = 1;
@@ -673,30 +632,9 @@ void qd_router_free(qd_router_t *router)
qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
- for (qd_address_t *addr = DEQ_HEAD(router->addrs); addr; addr = DEQ_HEAD(router->addrs)) {
- for (qd_router_link_ref_t *rlink = DEQ_HEAD(addr->rlinks); rlink; rlink = DEQ_HEAD(addr->rlinks)) {
- DEQ_REMOVE_HEAD(addr->rlinks);
- free_qd_router_link_ref_t(rlink);
- }
-
- for (qd_router_ref_t *rnode = DEQ_HEAD(addr->rnodes); rnode; rnode = DEQ_HEAD(addr->rnodes)) {
- DEQ_REMOVE_HEAD(addr->rnodes);
- free_qd_router_ref_t(rnode);
- }
-
- qd_hash_handle_free(addr->hash_handle);
- DEQ_REMOVE_HEAD(router->addrs);
- qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, addr);
- free_qd_address_t(addr);
- }
-
qdr_core_free(router->router_core);
qd_timer_free(router->timer);
sys_mutex_free(router->lock);
- qd_bitmask_free(router->neighbor_free_mask);
- free(router->out_links_by_mask_bit);
- free(router->routers_by_mask_bit);
- qd_hash_free(router->addr_hash);
qd_router_configure_free(router);
qd_router_python_free(router);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bc3ffca5/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index f1e9749..760ffaf 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -225,17 +225,8 @@ struct qd_router_t {
const char *router_id;
qd_node_t *node;
- qd_address_list_t addrs;
- qd_hash_t *addr_hash;
- qd_address_t *router_addr;
- qd_address_t *routerma_addr;
- qd_address_t *hello_addr;
-
qd_lrp_container_list_t lrp_containers;
- qd_router_link_t **out_links_by_mask_bit;
- qd_router_node_t **routers_by_mask_bit;
- qd_bitmask_t *neighbor_free_mask;
sys_mutex_t *lock;
qd_timer_t *timer;
uint64_t dtag;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org