You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/21 23:42:50 UTC

qpid-dispatch git commit: DISPATCH-179 - Added the remainder of the router-engine functions.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 382fb1b03 -> 946a9e69b


DISPATCH-179 - Added the remainder of the router-engine functions.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/946a9e69
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/946a9e69
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/946a9e69

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 946a9e69b1f7b0f49edaedccb94c5a56cf6b264a
Parents: 382fb1b
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 21 17:42:07 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 21 17:42:07 2015 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |   2 +-
 src/router_core/route_tables.c        | 337 ++++++++++++++++++++++++-----
 src/router_core/router_core.c         |   1 +
 src/router_core/router_core_private.h |  21 +-
 4 files changed, 300 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 87bcd5c..11f030f 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -53,7 +53,7 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit);
 void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit);
 void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
 void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem);
 void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase);
 
 typedef void (*qdr_mobile_added_t)   (void *context, const char *address);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index d039ff1..137d0e6 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -99,13 +99,14 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask
 }
 
 
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase)
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem)
 {
     qdr_action_t *action = qdr_action(qdrh_map_destination);
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.address        = qdr_field(address);
     action->args.route_table.address_phase  = phase;
     action->args.route_table.address_class  = aclass;
+    action->args.route_table.semantics      = sem;
     qdr_action_enqueue(core, action);
 }
 
@@ -160,7 +161,7 @@ static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
 void qdr_route_table_setup(qdr_core_t *core)
 {
     DEQ_INIT(core->addrs);
-    //DEQ_INIT(core->links);
+    DEQ_INIT(core->links);
     DEQ_INIT(core->routers);
     core->addr_hash = qd_hash(10, 32, 0);
 
@@ -168,116 +169,348 @@ void qdr_route_table_setup(qdr_core_t *core)
     core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
     core->hello_addr    = qdr_add_local_address(core, "qdhello",     QD_SEMANTICS_ROUTER_CONTROL);
 
-    core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
-    for (int idx = 0; idx < qd_bitmask_width(); idx++)
-        core->routers_by_mask_bit[idx] = 0;
+    core->routers_by_mask_bit   = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
+    core->out_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
+    for (int idx = 0; idx < qd_bitmask_width(); idx++) {
+        core->routers_by_mask_bit[idx]   = 0;
+        core->out_links_by_mask_bit[idx] = 0;
+    }
 }
 
 
 static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action)
 {
+    int          router_maskbit = action->args.route_table.router_maskbit;
+    qdr_field_t *address        = action->args.route_table.address;
+
+    do {
+        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit);
+            break;
+        }
+
+        if (core->routers_by_mask_bit[router_maskbit] != 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit);
+            break;
+        }
+
+        //
+        // Hash lookup the address to ensure there isn't an existing router address.
+        //
+        qd_field_iterator_t *iter = address->iterator;
+        qdr_address_t       *addr;
+
+        qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+        qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+        if (addr) {
+            qd_log(core->log, QD_LOG_CRITICAL, "add_router: Data inconsistency for router-maskbit %d", router_maskbit);
+            assert(addr == 0);  // Crash in debug mode.  This should never happen
+            break;
+        }
+
+        //
+        // Create an address record for this router and insert it in the hash table.
+        // This record will be found whenever a "foreign" topological address to this
+        // remote router is looked up.
+        //
+        addr = qdr_address(router_addr_semantics);
+        qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+        DEQ_INSERT_TAIL(core->addrs, addr);
+
+        //
+        // Create a router-node record to represent the remote router.
+        //
+        qdr_node_t *rnode = new_qdr_node_t();
+        DEQ_ITEM_INIT(rnode);
+        rnode->owning_addr   = addr;
+        rnode->mask_bit      = router_maskbit;
+        rnode->next_hop      = 0;
+        rnode->peer_link     = 0;
+        rnode->ref_count     = 0;
+        rnode->valid_origins = qd_bitmask(0);
+
+        DEQ_INSERT_TAIL(core->routers, rnode);
+
+        //
+        // Link the router record to the address record.
+        //
+        qdr_add_node_ref(&addr->rnodes, rnode);
+
+        //
+        // Link the router record to the router address records.
+        //
+        qdr_add_node_ref(&core->router_addr->rnodes, rnode);
+        qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
+
+        //
+        // Add the router record to the mask-bit index.
+        //
+        core->routers_by_mask_bit[router_maskbit] = rnode;
+    } while (false);
+
+    qdr_field_free(address);
+}
+
+
+static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action)
+{
     int router_maskbit = action->args.route_table.router_maskbit;
 
     if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
-        qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit);
+        qd_log(core->log, QD_LOG_CRITICAL, "del_router: Router maskbit out of range: %d", router_maskbit);
         return;
     }
 
-    if (core->routers_by_mask_bit[router_maskbit] != 0) {
-        qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit);
+    if (core->routers_by_mask_bit[router_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "del_router: Deleting nonexistent router: %d", router_maskbit);
         return;
     }
 
-    //
-    // Hash lookup the address to ensure there isn't an existing router address.
-    //
-    qd_field_iterator_t *iter = action->args.route_table.address->iterator;
-    qdr_address_t       *addr;
-
-    qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-    qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
-    assert(addr == 0);
+    qdr_node_t    *rnode = core->routers_by_mask_bit[router_maskbit];
+    qdr_address_t *oaddr = rnode->owning_addr;
+    assert(oaddr);
 
     //
-    // Create an address record for this router and insert it in the hash table.
-    // This record will be found whenever a "foreign" topological address to this
-    // remote router is looked up.
+    // Unlink the router node from the address record
     //
-    addr = qdr_address(router_addr_semantics);
-    qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
-    DEQ_INSERT_TAIL(core->addrs, addr);
+    qdr_del_node_ref(&oaddr->rnodes, rnode);
 
     //
-    // Create a router-node record to represent the remote router.
+    // While the router node has a non-zero reference count, look for addresses
+    // to unlink the node from.
     //
-    qdr_node_t *rnode = new_qdr_node_t();
-    DEQ_ITEM_INIT(rnode);
-    rnode->owning_addr   = addr;
-    rnode->mask_bit      = router_maskbit;
-    rnode->next_hop      = 0;
-    rnode->peer_link     = 0;
-    rnode->ref_count     = 0;
-    rnode->valid_origins = qd_bitmask(0);
-
-    DEQ_INSERT_TAIL(core->routers, rnode);
+    qdr_address_t *addr = DEQ_HEAD(core->addrs);
+    while (addr && rnode->ref_count > 0) {
+        qdr_del_node_ref(&addr->rnodes, rnode);
+        addr = DEQ_NEXT(addr);
+    }
+    assert(rnode->ref_count == 0);
 
     //
-    // Link the router record to the address record.
+    // Free the router node and the owning address records.
     //
-    qdr_add_node_ref(&addr->rnodes, rnode);
+    qd_bitmask_free(rnode->valid_origins);
+    DEQ_REMOVE(core->routers, rnode);
+    free_qdr_node_t(rnode);
+
+    qd_hash_remove_by_handle(core->addr_hash, oaddr->hash_handle);
+    DEQ_REMOVE(core->addrs, oaddr);
+    qd_hash_handle_free(oaddr->hash_handle);
+    core->routers_by_mask_bit[router_maskbit] = 0;
+    free_qdr_address_t(oaddr);
+}
 
-    //
-    // Link the router record to the router address records.
-    //
-    qdr_add_node_ref(&core->router_addr->rnodes, rnode);
-    qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
 
-    //
-    // Add the router record to the mask-bit index.
-    //
-    core->routers_by_mask_bit[router_maskbit] = rnode;
+static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action)
+{
+    int router_maskbit = action->args.route_table.router_maskbit;
+    int link_maskbit   = action->args.route_table.link_maskbit;
 
-    qdr_field_free(action->args.route_table.address);
-}
+    if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Router maskbit out of range: %d", router_maskbit);
+        return;
+    }
 
+    if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Link maskbit out of range: %d", link_maskbit);
+        return;
+    }
 
-static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action)
-{
-}
+    if (core->out_links_by_mask_bit[link_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", link_maskbit);
+        return;
+    }
 
+    if (core->routers_by_mask_bit[router_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Router not found");
+        return;
+    }
 
-static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action)
-{
+    //
+    // Add the peer_link reference to the router record.
+    //
+    qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+    rnode->peer_link = core->out_links_by_mask_bit[link_maskbit];
 }
 
 
 static void qdrh_remove_link(qdr_core_t *core, qdr_action_t *action)
 {
+    int router_maskbit = action->args.route_table.router_maskbit;
+
+    if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "remove_link: Router maskbit out of range: %d", router_maskbit);
+        return;
+    }
+
+    if (core->routers_by_mask_bit[router_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "remove_link: Router not found");
+        return;
+    }
+
+    qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+    rnode->peer_link = 0;
 }
 
 
 static void qdrh_set_next_hop(qdr_core_t *core, qdr_action_t *action)
 {
+    int router_maskbit    = action->args.route_table.router_maskbit;
+    int nh_router_maskbit = action->args.route_table.nh_router_maskbit;
+
+    if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Router maskbit out of range: %d", router_maskbit);
+        return;
+    }
+
+    if (nh_router_maskbit >= qd_bitmask_width() || nh_router_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Next hop router maskbit out of range: %d", router_maskbit);
+        return;
+    }
+
+    if (core->routers_by_mask_bit[router_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Router not found");
+        return;
+    }
+
+    if (core->routers_by_mask_bit[nh_router_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_next_hop: Next hop router not found");
+        return;
+    }
+
+    if (router_maskbit != nh_router_maskbit) {
+        qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+        rnode->next_hop   = core->routers_by_mask_bit[nh_router_maskbit];
+    }
 }
 
 
 static void qdrh_remove_next_hop(qdr_core_t *core, qdr_action_t *action)
 {
+    int router_maskbit = action->args.route_table.router_maskbit;
+
+    if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "remove_next_hop: Router maskbit out of range: %d", router_maskbit);
+        return;
+    }
+
+    qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+    rnode->next_hop = 0;
 }
 
 
 static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action)
 {
+    int           router_maskbit = action->args.route_table.router_maskbit;
+    qd_bitmask_t *valid_origins  = action->args.route_table.router_set;
+
+    do {
+        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "set_valid_origins: Router maskbit out of range: %d", router_maskbit);
+            break;
+        }
+
+        if (core->routers_by_mask_bit[router_maskbit] == 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "set_valid_origins: Router not found");
+            break;
+        }
+
+        qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+        if (rnode->valid_origins)
+            qd_bitmask_free(rnode->valid_origins);
+        rnode->valid_origins = valid_origins;
+        valid_origins = 0;
+    } while (false);
+
+    if (valid_origins)
+        qd_bitmask_free(valid_origins);
 }
 
 
 static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action)
 {
+    //
+    // TODO - handle the class-prefix and phase explicitly
+    //
+
+    int          router_maskbit = action->args.route_table.router_maskbit;
+    qdr_field_t *address        = action->args.route_table.address;
+
+    do {
+        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router maskbit out of range: %d", router_maskbit);
+            break;
+        }
+
+        if (core->routers_by_mask_bit[router_maskbit] == 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router not found");
+            break;
+        }
+
+        qd_field_iterator_t *iter = address->iterator;
+        qdr_address_t       *addr = 0;
+
+        qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+        if (!addr) {
+            addr = qdr_address(action->args.route_table.semantics);
+            qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+            DEQ_ITEM_INIT(addr);
+            DEQ_INSERT_TAIL(core->addrs, addr);
+        }
+
+        qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
+        qdr_add_node_ref(&addr->rnodes, rnode);
+
+        //
+        // TODO - If this affects a waypoint, create the proper side effects
+        //
+    } while (false);
+
+    qdr_field_free(address);
 }
 
 
 static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action)
 {
+    int          router_maskbit = action->args.route_table.router_maskbit;
+    qdr_field_t *address        = action->args.route_table.address;
+
+    do {
+        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router maskbit out of range: %d", router_maskbit);
+            break;
+        }
+
+        if (core->routers_by_mask_bit[router_maskbit] == 0) {
+            qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router not found");
+            break;
+        }
+
+        qdr_node_t          *rnode = core->routers_by_mask_bit[router_maskbit];
+        qd_field_iterator_t *iter  = address->iterator;
+        qdr_address_t       *addr  = 0;
+
+        qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+        if (!addr) {
+            qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Address not found");
+            break;
+        }
+
+        qdr_del_node_ref(&addr->rnodes, rnode);
+
+        //
+        // TODO - If this affects a waypoint, create the proper side effects
+        //
+
+        //
+        // TODO - Port "check-addr" into this module
+        //
+        //qd_router_check_addr(router, addr, 0);
+    } while (false);
+
+    qdr_field_free(address);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index d713583..0b94b4e 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -23,6 +23,7 @@
 
 ALLOC_DEFINE(qdr_address_t);
 ALLOC_DEFINE(qdr_node_t);
+ALLOC_DEFINE(qdr_link_t);
 ALLOC_DEFINE(qdr_link_ref_t);
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/946a9e69/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5803299..0837f58 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -49,13 +49,14 @@ struct qdr_action_t {
     qdr_action_handler_t action_handler;
     union {
         struct {
-            int           link_maskbit;
-            int           router_maskbit;
-            int           nh_router_maskbit;
-            qd_bitmask_t *router_set;
-            qdr_field_t  *address;
-            char          address_class;
-            char          address_phase;
+            int                     link_maskbit;
+            int                     router_maskbit;
+            int                     nh_router_maskbit;
+            qd_bitmask_t           *router_set;
+            qdr_field_t            *address;
+            char                    address_class;
+            char                    address_phase;
+            qd_address_semantics_t  semantics;
         } route_table;
     } args;
 };
@@ -111,6 +112,9 @@ struct qdr_link_t {
     bool                      strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router
 };
 
+ALLOC_DECLARE(qdr_link_t);
+DEQ_DECLARE(qdr_link_t, qdr_link_list_t);
+
 struct qdr_link_ref_t {
     DEQ_LINKS(qdr_link_ref_t);
     qdr_link_t *link;
@@ -198,9 +202,10 @@ struct qdr_core_t {
     qdr_address_t        *routerma_addr;
     qdr_address_t        *hello_addr;
 
-    //qdr_link_list_t       links;
+    qdr_link_list_t       links;
     qdr_node_list_t       routers;
     qdr_node_t          **routers_by_mask_bit;
+    qdr_link_t          **out_links_by_mask_bit;
 };
 
 void *router_core_thread(void *arg);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org