You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/21 18:07:59 UTC
qpid-dispatch git commit: DISPATCH-179 - Ported in the first handler:
add_router
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 f03cca6bb -> 382fb1b03
DISPATCH-179 - Ported in the first handler: add_router
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/382fb1b0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/382fb1b0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/382fb1b0
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 382fb1b030115bc173edd81119ececd76c215c3e
Parents: f03cca6
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 21 11:57:27 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 21 11:57:27 2015 -0400
----------------------------------------------------------------------
src/router_core/route_tables.c | 82 +++++++++++++++++-
src/router_core/route_tables.h | 22 -----
src/router_core/router_core.c | 81 +++++++++++++++--
src/router_core/router_core_private.h | 134 +++++++++++++++++++++++++++++
4 files changed, 290 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index bd102f0..d039ff1 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -18,7 +18,6 @@
*/
#include "router_core_private.h"
-//#include "route_tables.h"
static qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
@@ -33,6 +32,8 @@ static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action);
static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action);
static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action);
+static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
+
//==================================================================================
// Interface Functions
@@ -156,8 +157,87 @@ static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
// In-Thread Functions
//==================================================================================
+void qdr_route_table_setup(qdr_core_t *core)
+{
+ DEQ_INIT(core->addrs);
+ //DEQ_INIT(core->links);
+ DEQ_INIT(core->routers);
+ core->addr_hash = qd_hash(10, 32, 0);
+
+ core->router_addr = qdr_add_local_address(core, "qdrouter", QD_SEMANTICS_ROUTER_CONTROL);
+ core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
+ core->hello_addr = qdr_add_local_address(core, "qdhello", QD_SEMANTICS_ROUTER_CONTROL);
+
+ core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
+ for (int idx = 0; idx < qd_bitmask_width(); idx++)
+ core->routers_by_mask_bit[idx] = 0;
+}
+
+
static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action)
{
+ int router_maskbit = action->args.route_table.router_maskbit;
+
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit);
+ return;
+ }
+
+ if (core->routers_by_mask_bit[router_maskbit] != 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit);
+ return;
+ }
+
+ //
+ // Hash lookup the address to ensure there isn't an existing router address.
+ //
+ qd_field_iterator_t *iter = action->args.route_table.address->iterator;
+ qdr_address_t *addr;
+
+ qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+ assert(addr == 0);
+
+ //
+ // Create an address record for this router and insert it in the hash table.
+ // This record will be found whenever a "foreign" topological address to this
+ // remote router is looked up.
+ //
+ addr = qdr_address(router_addr_semantics);
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+
+ //
+ // Create a router-node record to represent the remote router.
+ //
+ qdr_node_t *rnode = new_qdr_node_t();
+ DEQ_ITEM_INIT(rnode);
+ rnode->owning_addr = addr;
+ rnode->mask_bit = router_maskbit;
+ rnode->next_hop = 0;
+ rnode->peer_link = 0;
+ rnode->ref_count = 0;
+ rnode->valid_origins = qd_bitmask(0);
+
+ DEQ_INSERT_TAIL(core->routers, rnode);
+
+ //
+ // Link the router record to the address record.
+ //
+ qdr_add_node_ref(&addr->rnodes, rnode);
+
+ //
+ // Link the router record to the router address records.
+ //
+ qdr_add_node_ref(&core->router_addr->rnodes, rnode);
+ qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
+
+ //
+ // Add the router record to the mask-bit index.
+ //
+ core->routers_by_mask_bit[router_maskbit] = rnode;
+
+ qdr_field_free(action->args.route_table.address);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/route_tables.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.h b/src/router_core/route_tables.h
deleted file mode 100644
index 13980c4..0000000
--- a/src/router_core/route_tables.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#ifndef __core_route_tables_h__
-#define __core_route_tables_h__ 1
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 7293caf..d713583 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -18,6 +18,12 @@
*/
#include "router_core_private.h"
+#include <stdio.h>
+
+
+ALLOC_DEFINE(qdr_address_t);
+ALLOC_DEFINE(qdr_node_t);
+ALLOC_DEFINE(qdr_link_ref_t);
qdr_core_t *qdr_core(void)
@@ -67,12 +73,16 @@ ALLOC_DEFINE(qdr_field_t);
qdr_field_t *qdr_field(const char *text)
{
- size_t length = strlen(text);
- size_t ilength = length;
+ size_t length = strlen(text);
+ size_t ilength = length;
+
+ if (length == 0)
+ return 0;
+
qdr_field_t *field = new_qdr_field_t();
qd_buffer_t *buf;
- ZERO(field);
+ ZERO(field);
while (length > 0) {
buf = qd_buffer();
size_t cap = qd_buffer_capacity(buf);
@@ -92,8 +102,67 @@ qdr_field_t *qdr_field(const char *text)
void qdr_field_free(qdr_field_t *field)
{
- qd_field_iterator_free(field->iterator);
- qd_buffer_list_free_buffers(&field->buffers);
- free_qdr_field_t(field);
+ if (field) {
+ qd_field_iterator_free(field->iterator);
+ qd_buffer_list_free_buffers(&field->buffers);
+ free_qdr_field_t(field);
+ }
+}
+
+
+qdr_address_t *qdr_address(qd_address_semantics_t semantics)
+{
+ qdr_address_t *addr = new_qdr_address_t();
+ ZERO(addr);
+ addr->semantics = semantics;
+ addr->forwarder = qd_router_get_forwarder(semantics);
+ return addr;
+}
+
+
+qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *address, qd_address_semantics_t semantics)
+{
+ char addr_string[1000];
+ qdr_address_t *addr = 0;
+ qd_field_iterator_t *iter = 0;
+
+ snprintf(addr_string, sizeof(addr_string), "L%s", address);
+ iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
+
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = qdr_address(semantics);
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ addr->block_deletion = true;
+ }
+ qd_field_iterator_free(iter);
+ return addr;
+}
+
+
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+{
+ qdr_link_ref_t *ref = new_qdr_link_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->link = link;
+ link->ref = ref;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+{
+}
+
+
+void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
+{
+}
+
+
+void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
+{
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 94ec4a1..5803299 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -25,6 +25,10 @@
#include <qpid/dispatch/log.h>
#include <memory.h>
+/**
+ * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into
+ * and out of the router-core thread.
+ */
typedef struct {
qd_buffer_list_t buffers;
qd_field_iterator_t *iterator;
@@ -33,6 +37,10 @@ typedef struct {
qdr_field_t *qdr_field(const char *string);
void qdr_field_free(qdr_field_t *field);
+
+/**
+ * qdr_action_t - This type represents one work item to be performed by the router-core thread.
+ */
typedef struct qdr_action_t qdr_action_t;
typedef void (*qdr_action_handler_t) (qdr_core_t *core, qdr_action_t *action);
@@ -55,6 +63,119 @@ struct qdr_action_t {
ALLOC_DECLARE(qdr_action_t);
DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
+typedef struct qdr_address_t qdr_address_t;
+typedef struct qdr_node_t qdr_node_t;
+typedef struct qdr_router_ref_t qdr_router_ref_t;
+typedef struct qdr_link_ref_t qdr_link_ref_t;
+typedef struct qdr_lrp_t qdr_lrp_t;
+typedef struct qdr_lrp_ref_t qdr_lrp_ref_t;
+
+struct qdr_node_t {
+ DEQ_LINKS(qdr_node_t);
+ qdr_address_t *owning_addr;
+ int mask_bit;
+ qdr_node_t *next_hop; ///< Next hop node _if_ this is not a neighbor node
+ qdr_link_t *peer_link; ///< Outgoing link _if_ this is a neighbor node
+ uint32_t ref_count;
+ qd_bitmask_t *valid_origins;
+};
+
+ALLOC_DECLARE(qdr_node_t);
+DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
+
+
+struct qdr_router_ref_t {
+ DEQ_LINKS(qdr_router_ref_t);
+ qdr_node_t *router;
+};
+
+ALLOC_DECLARE(qdr_router_ref_t);
+DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
+
+
+struct qdr_link_t {
+ DEQ_LINKS(qdr_link_t);
+ int mask_bit; ///< Unique mask bit if this is an inter-router link
+ qd_link_type_t link_type;
+ qd_direction_t link_direction;
+ qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
+ //qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns this link
+ qd_link_t *link; ///< [own] Link pointer
+ qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link
+ qdr_link_ref_t *ref; ///< Pointer to a containing reference object
+ char *target; ///< Target address for incoming links
+ qd_routed_event_list_t event_fifo; ///< FIFO of outgoing delivery/link events (no messages)
+ qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or outgoing message deliveries
+ qd_router_delivery_list_t deliveries; ///< [own] outstanding unsettled deliveries
+ bool strip_inbound_annotations; ///<should the dispatch specific inbound annotations be stripped at the ingress router
+ bool strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router
+};
+
+struct qdr_link_ref_t {
+ DEQ_LINKS(qdr_link_ref_t);
+ qdr_link_t *link;
+};
+
+ALLOC_DECLARE(qdr_link_ref_t);
+DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
+
+
+struct qdr_lrp_t {
+ DEQ_LINKS(qdr_lrp_t);
+ char *prefix;
+ bool inbound;
+ bool outbound;
+ qd_lrp_container_t *container;
+};
+
+DEQ_DECLARE(qdr_lrp_t, qdr_lrp_list_t);
+
+struct qdr_lrp_ref_t {
+ DEQ_LINKS(qdr_lrp_ref_t);
+ qdr_lrp_t *lrp;
+};
+
+ALLOC_DECLARE(qdr_lrp_ref_t);
+DEQ_DECLARE(qdr_lrp_ref_t, qdr_lrp_ref_list_t);
+
+
+struct qdr_address_t {
+ DEQ_LINKS(qdr_address_t);
+ qd_router_message_cb_t on_message; ///< In-Process Message Consumer
+ void *on_message_context; ///< In-Process Consumer context
+ qdr_lrp_ref_list_t lrps; ///< Local link-route destinations
+ qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers
+ qdr_router_ref_list_t rnodes; ///< Remotely-Connected Consumers
+ qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
+ qd_address_semantics_t semantics;
+ bool toggle;
+ bool waypoint;
+ bool block_deletion;
+ qd_router_forwarder_t *forwarder;
+
+ /**@name Statistics */
+ ///@{
+ uint64_t deliveries_ingress;
+ uint64_t deliveries_egress;
+ uint64_t deliveries_transit;
+ uint64_t deliveries_to_container;
+ uint64_t deliveries_from_container;
+ ///@}
+};
+
+ALLOC_DECLARE(qdr_address_t);
+DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
+
+qdr_address_t *qdr_address(qd_address_semantics_t semantics);
+qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics);
+
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
+
+void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+
+
struct qdr_core_t {
qd_log_source_t *log;
sys_cond_t *cond;
@@ -67,6 +188,19 @@ struct qdr_core_t {
qdr_mobile_added_t rt_mobile_added;
qdr_mobile_removed_t rt_mobile_removed;
qdr_link_lost_t rt_link_lost;
+
+ const char *router_area;
+ const char *router_id;
+
+ qdr_address_list_t addrs;
+ qd_hash_t *addr_hash;
+ qdr_address_t *router_addr;
+ qdr_address_t *routerma_addr;
+ qdr_address_t *hello_addr;
+
+ //qdr_link_list_t links;
+ qdr_node_list_t routers;
+ qdr_node_t **routers_by_mask_bit;
};
void *router_core_thread(void *arg);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org