You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/21 23:42:50 UTC
qpid-dispatch git commit: DISPATCH-179 - Added the remainder of the
router-engine functions.
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 382fb1b03 -> 946a9e69b
DISPATCH-179 - Added the remainder of the router-engine functions.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/946a9e69
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/946a9e69
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/946a9e69
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 946a9e69b1f7b0f49edaedccb94c5a56cf6b264a
Parents: 382fb1b
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 21 17:42:07 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 21 17:42:07 2015 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 +-
src/router_core/route_tables.c | 337 ++++++++++++++++++++++++-----
src/router_core/router_core.c | 1 +
src/router_core/router_core_private.h | 21 +-
4 files changed, 300 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 87bcd5c..11f030f 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -53,7 +53,7 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit);
void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit);
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem);
void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase);
typedef void (*qdr_mobile_added_t) (void *context, const char *address);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index d039ff1..137d0e6 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -99,13 +99,14 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask
}
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase)
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem)
{
qdr_action_t *action = qdr_action(qdrh_map_destination);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address);
action->args.route_table.address_phase = phase;
action->args.route_table.address_class = aclass;
+ action->args.route_table.semantics = sem;
qdr_action_enqueue(core, action);
}
@@ -160,7 +161,7 @@ static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
void qdr_route_table_setup(qdr_core_t *core)
{
DEQ_INIT(core->addrs);
- //DEQ_INIT(core->links);
+ DEQ_INIT(core->links);
DEQ_INIT(core->routers);
core->addr_hash = qd_hash(10, 32, 0);
@@ -168,116 +169,348 @@ void qdr_route_table_setup(qdr_core_t *core)
core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
core->hello_addr = qdr_add_local_address(core, "qdhello", QD_SEMANTICS_ROUTER_CONTROL);
- core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
- for (int idx = 0; idx < qd_bitmask_width(); idx++)
- core->routers_by_mask_bit[idx] = 0;
+ core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
+ core->out_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
+ for (int idx = 0; idx < qd_bitmask_width(); idx++) {
+ core->routers_by_mask_bit[idx] = 0;
+ core->out_links_by_mask_bit[idx] = 0;
+ }
}
static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+ qdr_field_t *address = action->args.route_table.address;
+
+ do {
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit);
+ break;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] != 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit);
+ break;
+ }
+
+ //
+ // Hash lookup the address to ensure there isn't an existing router address.
+ //
+ qd_field_iterator_t *iter = address->iterator;
+ qdr_address_t *addr;
+
+ qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+ if (addr) {
+ qd_log(core->log, QD_LOG_CRITICAL, "add_router: Data inconsistency for router-maskbit %d", router_maskbit);
+ assert(addr == 0); // Crash in debug mode. This should never happen
+ break;
+ }
+
+ //
+ // Create an address record for this router and insert it in the hash table.
+ // This record will be found whenever a "foreign" topological address to this
+ // remote router is looked up.
+ //
+ addr = qdr_address(router_addr_semantics);
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+
+ //
+ // Create a router-node record to represent the remote router.
+ //
+ qdr_node_t *rnode = new_qdr_node_t();
+ DEQ_ITEM_INIT(rnode);
+ rnode->owning_addr = addr;
+ rnode->mask_bit = router_maskbit;
+ rnode->next_hop = 0;
+ rnode->peer_link = 0;
+ rnode->ref_count = 0;
+ rnode->valid_origins = qd_bitmask(0);
+
+ DEQ_INSERT_TAIL(core->routers, rnode);
+
+ //
+ // Link the router record to the address record.
+ //
+ qdr_add_node_ref(&addr->rnodes, rnode);
+
+ //
+ // Link the router record to the router address records.
+ //
+ qdr_add_node_ref(&core->router_addr->rnodes, rnode);
+ qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
+
+ //
+ // Add the router record to the mask-bit index.
+ //
+ core->routers_by_mask_bit[router_maskbit] = rnode;
+ } while (false);
+
+ qdr_field_free(address);
+}
+
+
+static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action)
+{
int router_maskbit = action->args.route_table.router_maskbit;
if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit);
+ qd_log(core->log, QD_LOG_CRITICAL, "del_router: Router maskbit out of range: %d", router_maskbit);
return;
}
- if (core->routers_by_mask_bit[router_maskbit] != 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit);
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "del_router: Deleting nonexistent router: %d", router_maskbit);
return;
}
- //
- // Hash lookup the address to ensure there isn't an existing router address.
- //
- qd_field_iterator_t *iter = action->args.route_table.address->iterator;
- qdr_address_t *addr;
-
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
- assert(addr == 0);
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ qdr_address_t *oaddr = rnode->owning_addr;
+ assert(oaddr);
//
- // Create an address record for this router and insert it in the hash table.
- // This record will be found whenever a "foreign" topological address to this
- // remote router is looked up.
+ // Unlink the router node from the address record
//
- addr = qdr_address(router_addr_semantics);
- qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addrs, addr);
+ qdr_del_node_ref(&oaddr->rnodes, rnode);
//
- // Create a router-node record to represent the remote router.
+ // While the router node has a non-zero reference count, look for addresses
+ // to unlink the node from.
//
- qdr_node_t *rnode = new_qdr_node_t();
- DEQ_ITEM_INIT(rnode);
- rnode->owning_addr = addr;
- rnode->mask_bit = router_maskbit;
- rnode->next_hop = 0;
- rnode->peer_link = 0;
- rnode->ref_count = 0;
- rnode->valid_origins = qd_bitmask(0);
-
- DEQ_INSERT_TAIL(core->routers, rnode);
+ qdr_address_t *addr = DEQ_HEAD(core->addrs);
+ while (addr && rnode->ref_count > 0) {
+ qdr_del_node_ref(&addr->rnodes, rnode);
+ addr = DEQ_NEXT(addr);
+ }
+ assert(rnode->ref_count == 0);
//
- // Link the router record to the address record.
+ // Free the router node and the owning address records.
//
- qdr_add_node_ref(&addr->rnodes, rnode);
+ qd_bitmask_free(rnode->valid_origins);
+ DEQ_REMOVE(core->routers, rnode);
+ free_qdr_node_t(rnode);
+
+ qd_hash_remove_by_handle(core->addr_hash, oaddr->hash_handle);
+ DEQ_REMOVE(core->addrs, oaddr);
+ qd_hash_handle_free(oaddr->hash_handle);
+ core->routers_by_mask_bit[router_maskbit] = 0;
+ free_qdr_address_t(oaddr);
+}
- //
- // Link the router record to the router address records.
- //
- qdr_add_node_ref(&core->router_addr->rnodes, rnode);
- qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
- //
- // Add the router record to the mask-bit index.
- //
- core->routers_by_mask_bit[router_maskbit] = rnode;
+static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action)
+{
+ int router_maskbit = action->args.route_table.router_maskbit;
+ int link_maskbit = action->args.route_table.link_maskbit;
- qdr_field_free(action->args.route_table.address);
-}
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_link: Router maskbit out of range: %d", router_maskbit);
+ return;
+ }
+ if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_link: Link maskbit out of range: %d", link_maskbit);
+ return;
+ }
-static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action)
-{
-}
+ if (core->out_links_by_mask_bit[link_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", link_maskbit);
+ return;
+ }
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_link: Router not found");
+ return;
+ }
-static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action)
-{
+ //
+ // Add the peer_link reference to the router record.
+ //
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ rnode->peer_link = core->out_links_by_mask_bit[link_maskbit];
}
static void qdrh_remove_link(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "remove_link: Router maskbit out of range: %d", router_maskbit);
+ return;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "remove_link: Router not found");
+ return;
+ }
+
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ rnode->peer_link = 0;
}
static void qdrh_set_next_hop(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+ int nh_router_maskbit = action->args.route_table.nh_router_maskbit;
+
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Router maskbit out of range: %d", router_maskbit);
+ return;
+ }
+
+ if (nh_router_maskbit >= qd_bitmask_width() || nh_router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Next hop router maskbit out of range: %d", router_maskbit);
+ return;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Router not found");
+ return;
+ }
+
+ if (core->routers_by_mask_bit[nh_router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Next hop router not found");
+ return;
+ }
+
+ if (router_maskbit != nh_router_maskbit) {
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ rnode->next_hop = core->routers_by_mask_bit[nh_router_maskbit];
+ }
}
static void qdrh_remove_next_hop(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "remove_next_hop: Router maskbit out of range: %d", router_maskbit);
+ return;
+ }
+
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ rnode->next_hop = 0;
}
static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+ qd_bitmask_t *valid_origins = action->args.route_table.router_set;
+
+ do {
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_valid_origins: Router maskbit out of range: %d", router_maskbit);
+ break;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_valid_origins: Router not found");
+ break;
+ }
+
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ if (rnode->valid_origins)
+ qd_bitmask_free(rnode->valid_origins);
+ rnode->valid_origins = valid_origins;
+ valid_origins = 0;
+ } while (false);
+
+ if (valid_origins)
+ qd_bitmask_free(valid_origins);
}
static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action)
{
+ //
+ // TODO - handle the class-prefix and phase explicitly
+ //
+
+ int router_maskbit = action->args.route_table.router_maskbit;
+ qdr_field_t *address = action->args.route_table.address;
+
+ do {
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router maskbit out of range: %d", router_maskbit);
+ break;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router not found");
+ break;
+ }
+
+ qd_field_iterator_t *iter = address->iterator;
+ qdr_address_t *addr = 0;
+
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = qdr_address(action->args.route_table.semantics);
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ }
+
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ qdr_add_node_ref(&addr->rnodes, rnode);
+
+ //
+ // TODO - If this affects a waypoint, create the proper side effects
+ //
+ } while (false);
+
+ qdr_field_free(address);
}
static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+ qdr_field_t *address = action->args.route_table.address;
+
+ do {
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router maskbit out of range: %d", router_maskbit);
+ break;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router not found");
+ break;
+ }
+
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+ qd_field_iterator_t *iter = address->iterator;
+ qdr_address_t *addr = 0;
+
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+ if (!addr) {
+ qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Address not found");
+ break;
+ }
+
+ qdr_del_node_ref(&addr->rnodes, rnode);
+
+ //
+ // TODO - If this affects a waypoint, create the proper side effects
+ //
+
+ //
+ // TODO - Port "check-addr" into this module
+ //
+ //qd_router_check_addr(router, addr, 0);
+ } while (false);
+
+ qdr_field_free(address);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index d713583..0b94b4e 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -23,6 +23,7 @@
ALLOC_DEFINE(qdr_address_t);
ALLOC_DEFINE(qdr_node_t);
+ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_link_ref_t);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5803299..0837f58 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -49,13 +49,14 @@ struct qdr_action_t {
qdr_action_handler_t action_handler;
union {
struct {
- int link_maskbit;
- int router_maskbit;
- int nh_router_maskbit;
- qd_bitmask_t *router_set;
- qdr_field_t *address;
- char address_class;
- char address_phase;
+ int link_maskbit;
+ int router_maskbit;
+ int nh_router_maskbit;
+ qd_bitmask_t *router_set;
+ qdr_field_t *address;
+ char address_class;
+ char address_phase;
+ qd_address_semantics_t semantics;
} route_table;
} args;
};
@@ -111,6 +112,9 @@ struct qdr_link_t {
bool strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router
};
+ALLOC_DECLARE(qdr_link_t);
+DEQ_DECLARE(qdr_link_t, qdr_link_list_t);
+
struct qdr_link_ref_t {
DEQ_LINKS(qdr_link_ref_t);
qdr_link_t *link;
@@ -198,9 +202,10 @@ struct qdr_core_t {
qdr_address_t *routerma_addr;
qdr_address_t *hello_addr;
- //qdr_link_list_t links;
+ qdr_link_list_t links;
qdr_node_list_t routers;
qdr_node_t **routers_by_mask_bit;
+ qdr_link_t **out_links_by_mask_bit;
};
void *router_core_thread(void *arg);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org