You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/21 18:07:59 UTC

qpid-dispatch git commit: DISPATCH-179 - Ported in the first handler: add_router

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 f03cca6bb -> 382fb1b03


DISPATCH-179 - Ported in the first handler: add_router


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/382fb1b0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/382fb1b0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/382fb1b0

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 382fb1b030115bc173edd81119ececd76c215c3e
Parents: f03cca6
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 21 11:57:27 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 21 11:57:27 2015 -0400

----------------------------------------------------------------------
 src/router_core/route_tables.c        |  82 +++++++++++++++++-
 src/router_core/route_tables.h        |  22 -----
 src/router_core/router_core.c         |  81 +++++++++++++++--
 src/router_core/router_core_private.h | 134 +++++++++++++++++++++++++++++
 4 files changed, 290 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index bd102f0..d039ff1 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -18,7 +18,6 @@
  */
 
 #include "router_core_private.h"
-//#include "route_tables.h"
 
 static qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
 static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
@@ -33,6 +32,8 @@ static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action);
 static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action);
 static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action);
 
+static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
+
 
 //==================================================================================
 // Interface Functions
@@ -156,8 +157,87 @@ static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
 // In-Thread Functions
 //==================================================================================
 
+void qdr_route_table_setup(qdr_core_t *core)
+{
+    DEQ_INIT(core->addrs);
+    //DEQ_INIT(core->links);
+    DEQ_INIT(core->routers);
+    core->addr_hash = qd_hash(10, 32, 0);
+
+    core->router_addr   = qdr_add_local_address(core, "qdrouter",    QD_SEMANTICS_ROUTER_CONTROL);
+    core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
+    core->hello_addr    = qdr_add_local_address(core, "qdhello",     QD_SEMANTICS_ROUTER_CONTROL);
+
+    core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
+    for (int idx = 0; idx < qd_bitmask_width(); idx++)
+        core->routers_by_mask_bit[idx] = 0;
+}
+
+
 static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action)
 {
+    int router_maskbit = action->args.route_table.router_maskbit;
+
+    if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit);
+        return;
+    }
+
+    if (core->routers_by_mask_bit[router_maskbit] != 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit);
+        return;
+    }
+
+    //
+    // Hash lookup the address to ensure there isn't an existing router address.
+    //
+    qd_field_iterator_t *iter = action->args.route_table.address->iterator;
+    qdr_address_t       *addr;
+
+    qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+    qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+    assert(addr == 0);
+
+    //
+    // Create an address record for this router and insert it in the hash table.
+    // This record will be found whenever a "foreign" topological address to this
+    // remote router is looked up.
+    //
+    addr = qdr_address(router_addr_semantics);
+    qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+    DEQ_INSERT_TAIL(core->addrs, addr);
+
+    //
+    // Create a router-node record to represent the remote router.
+    //
+    qdr_node_t *rnode = new_qdr_node_t();
+    DEQ_ITEM_INIT(rnode);
+    rnode->owning_addr   = addr;
+    rnode->mask_bit      = router_maskbit;
+    rnode->next_hop      = 0;
+    rnode->peer_link     = 0;
+    rnode->ref_count     = 0;
+    rnode->valid_origins = qd_bitmask(0);
+
+    DEQ_INSERT_TAIL(core->routers, rnode);
+
+    //
+    // Link the router record to the address record.
+    //
+    qdr_add_node_ref(&addr->rnodes, rnode);
+
+    //
+    // Link the router record to the router address records.
+    //
+    qdr_add_node_ref(&core->router_addr->rnodes, rnode);
+    qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
+
+    //
+    // Add the router record to the mask-bit index.
+    //
+    core->routers_by_mask_bit[router_maskbit] = rnode;
+
+    qdr_field_free(action->args.route_table.address);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/route_tables.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.h b/src/router_core/route_tables.h
deleted file mode 100644
index 13980c4..0000000
--- a/src/router_core/route_tables.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#ifndef __core_route_tables_h__
-#define __core_route_tables_h__ 1
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 7293caf..d713583 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -18,6 +18,12 @@
  */
 
 #include "router_core_private.h"
+#include <stdio.h>
+
+
+ALLOC_DEFINE(qdr_address_t);
+ALLOC_DEFINE(qdr_node_t);
+ALLOC_DEFINE(qdr_link_ref_t);
 
 
 qdr_core_t *qdr_core(void)
@@ -67,12 +73,16 @@ ALLOC_DEFINE(qdr_field_t);
 
 qdr_field_t *qdr_field(const char *text)
 {
-    size_t       length  = strlen(text);
-    size_t       ilength = length;
+    size_t length  = strlen(text);
+    size_t ilength = length;
+
+    if (length == 0)
+        return 0;
+
     qdr_field_t *field   = new_qdr_field_t();
     qd_buffer_t *buf;
-    ZERO(field);
 
+    ZERO(field);
     while (length > 0) {
         buf = qd_buffer();
         size_t cap  = qd_buffer_capacity(buf);
@@ -92,8 +102,67 @@ qdr_field_t *qdr_field(const char *text)
 
 void qdr_field_free(qdr_field_t *field)
 {
-    qd_field_iterator_free(field->iterator);
-    qd_buffer_list_free_buffers(&field->buffers);
-    free_qdr_field_t(field);
+    if (field) {
+        qd_field_iterator_free(field->iterator);
+        qd_buffer_list_free_buffers(&field->buffers);
+        free_qdr_field_t(field);
+    }
+}
+
+
+qdr_address_t *qdr_address(qd_address_semantics_t semantics)
+{
+    qdr_address_t *addr = new_qdr_address_t();
+    ZERO(addr);
+    addr->semantics = semantics;
+    addr->forwarder = qd_router_get_forwarder(semantics);
+    return addr;
+}
+
+
+qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *address, qd_address_semantics_t semantics)
+{
+    char                 addr_string[1000];
+    qdr_address_t       *addr = 0;
+    qd_field_iterator_t *iter = 0;
+
+    snprintf(addr_string, sizeof(addr_string), "L%s", address);
+    iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
+
+    qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+    if (!addr) {
+        addr = qdr_address(semantics);
+        qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+        DEQ_ITEM_INIT(addr);
+        DEQ_INSERT_TAIL(core->addrs, addr);
+        addr->block_deletion = true;
+    }
+    qd_field_iterator_free(iter);
+    return addr;
+}
+
+
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+{
+    qdr_link_ref_t *ref = new_qdr_link_ref_t();
+    DEQ_ITEM_INIT(ref);
+    ref->link = link;
+    link->ref = ref;
+    DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+{
+}
+
+
+void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
+{
+}
+
+
+void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
+{
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 94ec4a1..5803299 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -25,6 +25,10 @@
 #include <qpid/dispatch/log.h>
 #include <memory.h>
 
+/**
+ * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into
+ *               and out of the router-core thread.
+ */
 typedef struct {
     qd_buffer_list_t     buffers;
     qd_field_iterator_t *iterator;
@@ -33,6 +37,10 @@ typedef struct {
 qdr_field_t *qdr_field(const char *string);
 void qdr_field_free(qdr_field_t *field);
 
+
+/**
+ * qdr_action_t - This type represents one work item to be performed by the router-core thread.
+ */
 typedef struct qdr_action_t qdr_action_t;
 typedef void (*qdr_action_handler_t) (qdr_core_t *core, qdr_action_t *action);
 
@@ -55,6 +63,119 @@ struct qdr_action_t {
 ALLOC_DECLARE(qdr_action_t);
 DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
 
+typedef struct qdr_address_t     qdr_address_t;
+typedef struct qdr_node_t        qdr_node_t;
+typedef struct qdr_router_ref_t  qdr_router_ref_t;
+typedef struct qdr_link_ref_t    qdr_link_ref_t;
+typedef struct qdr_lrp_t         qdr_lrp_t;
+typedef struct qdr_lrp_ref_t     qdr_lrp_ref_t;
+
+struct qdr_node_t {
+    DEQ_LINKS(qdr_node_t);
+    qdr_address_t    *owning_addr;
+    int               mask_bit;
+    qdr_node_t       *next_hop;   ///< Next hop node _if_ this is not a neighbor node
+    qdr_link_t       *peer_link;  ///< Outgoing link _if_ this is a neighbor node
+    uint32_t          ref_count;
+    qd_bitmask_t     *valid_origins;
+};
+
+ALLOC_DECLARE(qdr_node_t);
+DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
+
+
+struct qdr_router_ref_t {
+    DEQ_LINKS(qdr_router_ref_t);
+    qdr_node_t *router;
+};
+
+ALLOC_DECLARE(qdr_router_ref_t);
+DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
+
+
+struct qdr_link_t {
+    DEQ_LINKS(qdr_link_t);
+    int                       mask_bit;        ///< Unique mask bit if this is an inter-router link
+    qd_link_type_t            link_type;
+    qd_direction_t            link_direction;
+    qdr_address_t            *owning_addr;     ///< [ref] Address record that owns this link
+    //qd_waypoint_t            *waypoint;        ///< [ref] Waypoint that owns this link
+    qd_link_t                *link;            ///< [own] Link pointer
+    qdr_link_t               *connected_link;  ///< [ref] If this is a link-route, reference the connected link
+    qdr_link_ref_t           *ref;             ///< Pointer to a containing reference object
+    char                     *target;          ///< Target address for incoming links
+    qd_routed_event_list_t    event_fifo;      ///< FIFO of outgoing delivery/link events (no messages)
+    qd_routed_event_list_t    msg_fifo;        ///< FIFO of incoming or outgoing message deliveries
+    qd_router_delivery_list_t deliveries;      ///< [own] outstanding unsettled deliveries
+    bool                      strip_inbound_annotations;  ///<should the dispatch specific inbound annotations be stripped at the ingress router
+    bool                      strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router
+};
+
+struct qdr_link_ref_t {
+    DEQ_LINKS(qdr_link_ref_t);
+    qdr_link_t *link;
+};
+
+ALLOC_DECLARE(qdr_link_ref_t);
+DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
+
+
+struct qdr_lrp_t {
+    DEQ_LINKS(qdr_lrp_t);
+    char               *prefix;
+    bool                inbound;
+    bool                outbound;
+    qd_lrp_container_t *container;
+};
+
+DEQ_DECLARE(qdr_lrp_t, qdr_lrp_list_t);
+
+struct qdr_lrp_ref_t {
+    DEQ_LINKS(qdr_lrp_ref_t);
+    qdr_lrp_t *lrp;
+};
+
+ALLOC_DECLARE(qdr_lrp_ref_t);
+DEQ_DECLARE(qdr_lrp_ref_t, qdr_lrp_ref_list_t);
+
+
+struct qdr_address_t {
+    DEQ_LINKS(qdr_address_t);
+    qd_router_message_cb_t     on_message;          ///< In-Process Message Consumer
+    void                      *on_message_context;  ///< In-Process Consumer context
+    qdr_lrp_ref_list_t         lrps;                ///< Local link-route destinations
+    qdr_link_ref_list_t        rlinks;              ///< Locally-Connected Consumers
+    qdr_router_ref_list_t      rnodes;              ///< Remotely-Connected Consumers
+    qd_hash_handle_t          *hash_handle;         ///< Linkage back to the hash table entry
+    qd_address_semantics_t     semantics;
+    bool                       toggle;
+    bool                       waypoint;
+    bool                       block_deletion;
+    qd_router_forwarder_t     *forwarder;
+
+    /**@name Statistics */
+    ///@{
+    uint64_t deliveries_ingress;
+    uint64_t deliveries_egress;
+    uint64_t deliveries_transit;
+    uint64_t deliveries_to_container;
+    uint64_t deliveries_from_container;
+    ///@}
+};
+
+ALLOC_DECLARE(qdr_address_t);
+DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
+
+qdr_address_t *qdr_address(qd_address_semantics_t semantics);
+qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics);
+
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
+
+void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+
+
 struct qdr_core_t {
     qd_log_source_t   *log;
     sys_cond_t        *cond;
@@ -67,6 +188,19 @@ struct qdr_core_t {
     qdr_mobile_added_t    rt_mobile_added;
     qdr_mobile_removed_t  rt_mobile_removed;
     qdr_link_lost_t       rt_link_lost;
+
+    const char *router_area;
+    const char *router_id;
+
+    qdr_address_list_t    addrs;
+    qd_hash_t            *addr_hash;
+    qdr_address_t        *router_addr;
+    qdr_address_t        *routerma_addr;
+    qdr_address_t        *hello_addr;
+
+    //qdr_link_list_t       links;
+    qdr_node_list_t       routers;
+    qdr_node_t          **routers_by_mask_bit;
 };
 
 void *router_core_thread(void *arg);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org