You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/16 22:51:06 UTC
[3/3] qpid-dispatch git commit: DISPATCH-179 - WIP checkpoint. Ripped
out large sections of the old architecture. Added the general callback
functionality for non-connection-specific actions.
DISPATCH-179 - WIP checkpoint.
Ripped out large sections of the old architecture.
Added the general callback functionality for non-connection-specific actions.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ca24d67f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ca24d67f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ca24d67f
Branch: refs/heads/tross-DISPATCH-179-1
Commit: ca24d67f0a8394836417e988630bb9810420b965
Parents: fc2c243
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Dec 16 16:49:14 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Dec 16 16:49:14 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 26 +-
python/qpid_dispatch_internal/router/node.py | 5 +-
src/CMakeLists.txt | 1 +
src/lrp.c | 22 +-
src/router_core/connections.c | 58 +-
src/router_core/error.c | 97 +++
src/router_core/route_tables.c | 134 +++-
src/router_core/router_core.c | 63 ++
src/router_core/router_core_private.h | 56 +-
src/router_core/router_core_thread.c | 3 +-
src/router_node.c | 904 +---------------------
src/router_private.h | 4 -
src/router_pynode.c | 461 ++---------
src/waypoint.c | 2 +-
14 files changed, 479 insertions(+), 1357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index cf2c7a4..e42d818 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -37,6 +37,7 @@ typedef struct qdr_connection_t qdr_connection_t;
typedef struct qdr_link_t qdr_link_t;
typedef struct qdr_delivery_t qdr_delivery_t;
typedef struct qdr_terminus_t qdr_terminus_t;
+typedef struct qdr_error_t qdr_error_t;
/**
* Allocate and start an instance of the router core module.
@@ -60,11 +61,11 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit);
void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit);
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem);
-void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
+void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
-typedef void (*qdr_mobile_added_t) (void *context, const char *address);
-typedef void (*qdr_mobile_removed_t) (void *context, const char *address);
+typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash);
+typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash);
typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
void qdr_core_route_table_handlers(qdr_core_t *core,
@@ -277,6 +278,17 @@ qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term);
/**
******************************************************************************
+ * Error functions
+ ******************************************************************************
+ */
+
+qdr_error_t *qdr_error_from_pn(pn_condition_t *pn);
+qdr_error_t *qdr_error(const char *name, const char *description);
+void qdr_error_free(qdr_error_t *error);
+void qdr_error_copy(qdr_error_t *from, pn_condition_t *to);
+
+/**
+ ******************************************************************************
* Link functions
******************************************************************************
*/
@@ -347,9 +359,9 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin
* This function is invoked when a link detach arrives.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
- * @param condition The link condition from the detach frame.
+ * @param error The link error from the detach frame or 0 if none.
*/
-void qdr_link_detach(qdr_link_t *link, pn_condition_t *condition);
+void qdr_link_detach(qdr_link_t *link, qdr_error_t *error);
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg);
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr);
@@ -357,7 +369,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, q
typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
-typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, pn_condition_t *condition);
+typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error);
void qdr_connection_handlers(qdr_core_t *core,
void *context,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index fc40dd9..1fd4f27 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -486,10 +486,7 @@ class RouterNode(object):
def map_address(self, addr):
self.mobile_addresses.append(addr)
- phase = '0'
- if addr[0] == 'M':
- phase = addr[1]
- self.adapter.map_destination(phase, addr, self.maskbit)
+ self.adapter.map_destination(addr, self.maskbit)
self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % (self._logify(addr), self.id))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 395614d..333b4f0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,7 @@ set(qpid_dispatch_SOURCES
router_core/agent_waypoint.c
router_core/agent_link.c
router_core/connections.c
+ router_core/error.c
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/lrp.c
----------------------------------------------------------------------
diff --git a/src/lrp.c b/src/lrp.c
index 0d13748..d3d0b47 100644
--- a/src/lrp.c
+++ b/src/lrp.c
@@ -28,6 +28,10 @@
static const char qd_link_route_addr_prefix_inbound = 'C';
static const char qd_link_route_addr_prefix_outbound = 'D';
+
+//
+// DEPRECATE
+//
static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
{
qd_lrp_container_t *lrpc = (qd_lrp_container_t*) context;
@@ -39,7 +43,7 @@ static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
while (lrp) {
qd_address_t *addr;
qd_field_iterator_t *iter;
- bool propagate;
+ //bool propagate;
char unused;
qd_log(router->log_source, QD_LOG_INFO, "Activating Prefix '%s' for routed links to '%s'",
@@ -73,14 +77,14 @@ static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
// across the network.
//
qd_router_add_lrp_ref_LH(&addr->lrps, lrp);
- propagate = DEQ_SIZE(addr->lrps) == 1;
+ //propagate = DEQ_SIZE(addr->lrps) == 1;
sys_mutex_unlock(router->lock);
//
// Propagate the address if appropriate
//
- if (propagate)
- qd_router_mobile_added(router, iter);
+ //if (propagate)
+ // qd_router_mobile_added(router, iter);
}
if (lrp->outbound) {
@@ -106,14 +110,14 @@ static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
// across the network.
//
qd_router_add_lrp_ref_LH(&addr->lrps, lrp);
- propagate = DEQ_SIZE(addr->lrps) == 1;
+ //propagate = DEQ_SIZE(addr->lrps) == 1;
sys_mutex_unlock(router->lock);
//
// Propagate the address if appropriate
//
- if (propagate)
- qd_router_mobile_added(router, iter);
+ //if (propagate)
+ // qd_router_mobile_added(router, iter);
}
qd_field_iterator_free(iter);
@@ -166,7 +170,7 @@ static void qd_lrpc_close_handler(void *context, qd_connection_t *conn)
//
if (propagate) {
char *text = (char*) qd_field_iterator_copy(iter);
- qd_router_mobile_removed(router, text);
+ //qd_router_mobile_removed(router, text);
free(text);
}
}
@@ -194,7 +198,7 @@ static void qd_lrpc_close_handler(void *context, qd_connection_t *conn)
//
if (propagate) {
char *text = (char*) qd_field_iterator_copy(iter);
- qd_router_mobile_removed(router, text);
+ //qd_router_mobile_removed(router, text);
free(text);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 884a298..26064de 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -130,7 +130,7 @@ void qdr_connection_process(qdr_connection_t *conn)
break;
case QDR_CONNECTION_WORK_DETACH :
- core->detach_handler(core->user_context, work->link, work->condition);
+ core->detach_handler(core->user_context, work->link, work->error);
break;
}
@@ -203,12 +203,12 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin
}
-void qdr_link_detach(qdr_link_t *link, pn_condition_t *condition)
+void qdr_link_detach(qdr_link_t *link, qdr_error_t *error)
{
qdr_action_t *action = qdr_action(qdr_link_detach_CT, "link_detach");
- action->args.connection.link = link;
- action->args.connection.condition = condition;
+ action->args.connection.link = link;
+ action->args.connection.error = error;
qdr_action_enqueue(link->core, action);
}
@@ -331,12 +331,47 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie
// Question: Should we use a new prefix for configuration? (No: allows the possibility of
// static routes; yes: prevents occlusion by mobile addresses with specified semantics)
//
- qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) addr);
+ qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
return addr ? addr->semantics : qdr_default_semantics;
}
/**
+ * Check an address to see if it no longer has any associated destinations.
+ * Depending on its policy, the address may be eligible for being closed out
+ * (i.e. Logging its terminal statistics and freeing its resources).
+ */
+/*static*/ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
+{
+ if (addr == 0)
+ return;
+
+ //
+ // If we have just removed a local linkage and it was the last local linkage,
+ // we need to notify the router module that there is no longer a local
+ // presence of this address.
+ //
+ if (was_local && DEQ_SIZE(addr->rlinks) == 0) {
+ const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ if (key && *key == 'M')
+ qdr_post_mobile_removed_CT(core, key);
+ }
+
+ //
+ // If the address has no in-process consumer or destinations, it should be
+ // deleted.
+ //
+ if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
+ !addr->waypoint && !addr->block_deletion) {
+ qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
+ DEQ_REMOVE(core->addrs, addr);
+ qd_hash_handle_free(addr->hash_handle);
+ free_qdr_address_t(addr);
+ }
+}
+
+
+/**
* qdr_lookup_terminus_address_CT
*
* Lookup a terminus address in the route table and possibly create a new address
@@ -420,6 +455,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
// a link-route destination for the address.
//
qd_field_iterator_t *iter = qdr_terminus_get_address(terminus);
+ qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_override_prefix(iter, qdr_prefix_for_dir(dir));
qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
if (addr) {
@@ -618,9 +654,11 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo
//
link->owning_addr = addr;
qdr_add_link_ref(&addr->rlinks, link);
- if (DEQ_SIZE(addr->rlinks) == 1)
- // TODO - notify the router module
- ;
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ if (key && *key == 'M')
+ qdr_post_mobile_added_CT(core, key);
+ }
qdr_link_accept_CT(core, link);
}
break;
@@ -690,8 +728,8 @@ static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool disc
if (discard)
return;
- qdr_link_t *link = action->args.connection.link;
- //pn_condition_t *condition = action->args.connection.condition;
+ qdr_link_t *link = action->args.connection.link;
+ //qdr_error_t *error = action->args.connection.error;
switch (link->link_type) {
case QD_LINK_ENDPOINT:
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/error.c
----------------------------------------------------------------------
diff --git a/src/router_core/error.c b/src/router_core/error.c
new file mode 100644
index 0000000..7bc1cae
--- /dev/null
+++ b/src/router_core/error.c
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+struct qdr_error_t {
+ qdr_field_t *name;
+ qdr_field_t *description;
+ pn_data_t *info;
+};
+
+ALLOC_DECLARE(qdr_error_t);
+ALLOC_DEFINE(qdr_error_t);
+
+qdr_error_t *qdr_error_from_pn(pn_condition_t *pn)
+{
+ if (!pn)
+ return 0;
+
+ qdr_error_t *error = new_qdr_error_t();
+ ZERO(error);
+
+ const char *name = pn_condition_get_name(pn);
+ if (name && *name)
+ error->name = qdr_field(name);
+
+ const char *desc = pn_condition_get_description(pn);
+ if (desc && *desc)
+ error->description = qdr_field(desc);
+
+ error->info = pn_data(0);
+
+ return error;
+}
+
+
+qdr_error_t *qdr_error(const char *name, const char *description)
+{
+ qdr_error_t *error = new_qdr_error_t();
+
+ error->name = qdr_field(name);
+ error->description = qdr_field(description);
+ error->info = 0;
+
+ return error;
+}
+
+
+void qdr_error_free(qdr_error_t *error)
+{
+ if (error == 0)
+ return;
+
+ qdr_field_free(error->name);
+ qdr_field_free(error->description);
+ if (error->info)
+ pn_data_free(error->info);
+
+ free_qdr_error_t(error);
+}
+
+
+void qdr_error_copy(qdr_error_t *from, pn_condition_t *to)
+{
+ if (from->name) {
+ unsigned char *name = qd_field_iterator_copy(from->name->iterator);
+ pn_condition_set_name(to, (char*) name);
+ free(name);
+ }
+
+ if (from->description) {
+ unsigned char *desc = qd_field_iterator_copy(from->description->iterator);
+ pn_condition_set_description(to, (char*) desc);
+ free(desc);
+ }
+
+ if (from->info)
+ pn_data_copy(pn_condition_info(to), from->info);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 11e5bd3..5da26fa 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -20,16 +20,16 @@
#include "router_core_private.h"
#include <stdio.h>
-static void qdrh_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
@@ -40,7 +40,7 @@ static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS
void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_add_router_CT, "add_router");
+ qdr_action_t *action = qdr_action(qdr_add_router_CT, "add_router");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address);
qdr_action_enqueue(core, action);
@@ -49,7 +49,7 @@ void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskb
void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_del_router_CT, "del_router");
+ qdr_action_t *action = qdr_action(qdr_del_router_CT, "del_router");
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -57,7 +57,7 @@ void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_set_link_CT, "set_link");
+ qdr_action_t *action = qdr_action(qdr_set_link_CT, "set_link");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.link_maskbit = link_maskbit;
qdr_action_enqueue(core, action);
@@ -66,7 +66,7 @@ void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_remove_link_CT, "remove_link");
+ qdr_action_t *action = qdr_action(qdr_remove_link_CT, "remove_link");
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -74,7 +74,7 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_set_next_hop_CT, "set_next_hop");
+ qdr_action_t *action = qdr_action(qdr_set_next_hop_CT, "set_next_hop");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.nh_router_maskbit = nh_router_maskbit;
qdr_action_enqueue(core, action);
@@ -83,7 +83,7 @@ void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_m
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_remove_next_hop_CT, "remove_next_hop");
+ qdr_action_t *action = qdr_action(qdr_remove_next_hop_CT, "remove_next_hop");
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -91,32 +91,27 @@ void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers)
{
- qdr_action_t *action = qdr_action(qdrh_set_valid_origins_CT, "set_valid_origins");
+ qdr_action_t *action = qdr_action(qdr_set_valid_origins_CT, "set_valid_origins");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.router_set = routers;
qdr_action_enqueue(core, action);
}
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem)
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash)
{
- qdr_action_t *action = qdr_action(qdrh_map_destination_CT, "map_destination");
+ qdr_action_t *action = qdr_action(qdr_map_destination_CT, "map_destination");
action->args.route_table.router_maskbit = router_maskbit;
- action->args.route_table.address = qdr_field(address);
- action->args.route_table.address_phase = phase;
- action->args.route_table.address_class = aclass;
- action->args.route_table.semantics = sem;
+ action->args.route_table.address = qdr_field(address_hash);
qdr_action_enqueue(core, action);
}
-void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase)
+void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash)
{
- qdr_action_t *action = qdr_action(qdrh_unmap_destination_CT, "unmap_destination");
+ qdr_action_t *action = qdr_action(qdr_unmap_destination_CT, "unmap_destination");
action->args.route_table.router_maskbit = router_maskbit;
- action->args.route_table.address = qdr_field(address);
- action->args.route_table.address_phase = phase;
- action->args.route_table.address_class = aclass;
+ action->args.route_table.address = qdr_field(address_hash);
qdr_action_enqueue(core, action);
}
@@ -136,7 +131,7 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
qd_address_semantics_t sem, qdr_receive_t on_message, void *context)
{
- qdr_action_t *action = qdr_action(qdrh_subscribe_CT, "subscribe");
+ qdr_action_t *action = qdr_action(qdr_subscribe_CT, "subscribe");
action->args.subscribe.address = qdr_field(address);
action->args.subscribe.semantics = sem;
action->args.subscribe.address_class = aclass;
@@ -232,7 +227,7 @@ static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_lo
}
-static void qdrh_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -313,7 +308,7 @@ static void qdrh_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disc
}
-static void qdrh_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -362,7 +357,7 @@ static void qdrh_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disc
}
-static void qdrh_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
int link_maskbit = action->args.route_table.link_maskbit;
@@ -396,7 +391,7 @@ static void qdrh_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discar
}
-static void qdrh_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -416,7 +411,7 @@ static void qdrh_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool dis
}
-static void qdrh_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
int nh_router_maskbit = action->args.route_table.nh_router_maskbit;
@@ -448,7 +443,7 @@ static void qdrh_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool di
}
-static void qdrh_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -462,7 +457,7 @@ static void qdrh_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool
}
-static void qdrh_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qd_bitmask_t *valid_origins = action->args.route_table.router_set;
@@ -495,7 +490,7 @@ static void qdrh_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bo
}
-static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
//
// TODO - handle the class-prefix and phase explicitly
@@ -525,7 +520,7 @@ static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qdr_address(action->args.route_table.semantics);
+ addr = qdr_address(0); // FIXME - Semantics
qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(core->addrs, addr);
@@ -543,7 +538,7 @@ static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
}
-static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -588,7 +583,7 @@ static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bo
}
-static void qdrh_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_field_t *address = action->args.subscribe.address;
@@ -621,3 +616,62 @@ static void qdrh_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool disca
qdr_field_free(address);
}
+
+//==================================================================================
+// Call-back Functions
+//==================================================================================
+
+static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work)
+{
+ char *address_hash = qdr_field_copy(work->field);
+ if (address_hash) {
+ core->rt_mobile_added(core->rt_context, address_hash);
+ free(address_hash);
+ }
+
+ qdr_field_free(work->field);
+}
+
+
+static void qdr_do_mobile_removed(qdr_core_t *core, qdr_general_work_t *work)
+{
+ char *address_hash = qdr_field_copy(work->field);
+ if (address_hash) {
+ core->rt_mobile_removed(core->rt_context, address_hash);
+ free(address_hash);
+ }
+
+ qdr_field_free(work->field);
+}
+
+
+static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work)
+{
+ core->rt_link_lost(core->rt_context, work->maskbit);
+}
+
+
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added);
+ work->field = qdr_field(address_hash);
+ qdr_post_general_work_CT(core, work);
+}
+
+
+void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_removed);
+ work->field = qdr_field(address_hash);
+ qdr_post_general_work_CT(core, work);
+}
+
+
+void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_do_link_lost);
+ work->maskbit = link_maskbit;
+ qdr_post_general_work_CT(core, work);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index f86f921..fa82c06 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -26,7 +26,9 @@ ALLOC_DEFINE(qdr_node_t);
ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
+ALLOC_DEFINE(qdr_general_work_t);
+static void qdr_general_handler(void *context);
qdr_core_t *qdr_core(qd_dispatch_t *qd, const char *area, const char *id)
{
@@ -50,6 +52,10 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, const char *area, const char *id)
core->running = true;
DEQ_INIT(core->action_list);
+ core->work_lock = sys_mutex();
+ DEQ_INIT(core->work_list);
+ core->work_timer = qd_timer(core->qd, qdr_general_handler, core);
+
//
// Launch the core thread
//
@@ -74,6 +80,8 @@ void qdr_core_free(qdr_core_t *core)
sys_thread_free(core->thread);
sys_cond_free(core->action_cond);
sys_mutex_free(core->action_lock);
+ sys_mutex_free(core->work_lock);
+ qd_timer_free(core->work_timer);
free(core);
}
@@ -120,6 +128,15 @@ void qdr_field_free(qdr_field_t *field)
}
+char *qdr_field_copy(qdr_field_t *field)
+{
+ if (!field || !field->iterator)
+ return 0;
+
+ return (char*) qd_field_iterator_copy(field->iterator);
+}
+
+
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label)
{
qdr_action_t *action = new_qdr_action_t();
@@ -214,3 +231,49 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
ref = DEQ_NEXT(ref);
}
}
+
+
+static void qdr_general_handler(void *context)
+{
+ qdr_core_t *core = (qdr_core_t*) context;
+ qdr_general_work_list_t work_list;
+ qdr_general_work_t *work;
+
+ sys_mutex_lock(core->work_lock);
+ DEQ_MOVE(core->work_list, work_list);
+ sys_mutex_unlock(core->work_lock);
+
+ work = DEQ_HEAD(work_list);
+ while (work) {
+ DEQ_REMOVE_HEAD(work_list);
+ work->handler(core, work);
+ free_qdr_general_work_t(work);
+ work = DEQ_HEAD(work_list);
+ }
+}
+
+
+qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler)
+{
+ qdr_general_work_t *work = new_qdr_general_work_t();
+ ZERO(work);
+ work->handler = handler;
+ return work;
+}
+
+
+void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work)
+{
+ bool notify;
+
+ sys_mutex_lock(core->work_lock);
+ DEQ_ITEM_INIT(work);
+ DEQ_INSERT_TAIL(core->work_list, work);
+ notify = DEQ_SIZE(core->work_list) == 1;
+ sys_mutex_unlock(core->work_lock);
+
+ if (notify)
+ qd_timer_schedule(core->work_timer, 0);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index dfeefd6..db84c55 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -36,6 +36,7 @@ typedef struct {
qdr_field_t *qdr_field(const char *string);
void qdr_field_free(qdr_field_t *field);
+char *qdr_field_copy(qdr_field_t *field);
/**
* qdr_action_t - This type represents one work item to be performed by the router-core thread.
@@ -52,14 +53,11 @@ struct qdr_action_t {
// Arguments for router control-plane actions
//
struct {
- int link_maskbit;
- int router_maskbit;
- int nh_router_maskbit;
- qd_bitmask_t *router_set;
- qdr_field_t *address;
- char address_class;
- char address_phase;
- qd_address_semantics_t semantics;
+ int link_maskbit;
+ int router_maskbit;
+ int nh_router_maskbit;
+ qd_bitmask_t *router_set;
+ qdr_field_t *address;
} route_table;
//
@@ -73,7 +71,7 @@ struct qdr_action_t {
qd_direction_t dir;
qdr_terminus_t *source;
qdr_terminus_t *target;
- pn_condition_t *condition;
+ qdr_error_t *error;
} connection;
//
@@ -243,6 +241,33 @@ void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+//
+// General Work
+//
+// The following types are used to post work to the IO threads for
+// non-connection-specific action.
+//
+typedef struct qdr_general_work_t qdr_general_work_t;
+typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work);
+
+struct qdr_general_work_t {
+ DEQ_LINKS(qdr_general_work_t);
+ qdr_general_work_handler_t handler;
+ qdr_field_t *field;
+ int maskbit;
+};
+
+ALLOC_DECLARE(qdr_general_work_t);
+DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t);
+
+qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler);
+
+//
+// Connection Work
+//
+// The following types are used to post work to the IO threads for
+// connection-specific action.
+//
typedef enum {
QDR_CONNECTION_WORK_FIRST_ATTACH,
QDR_CONNECTION_WORK_SECOND_ATTACH,
@@ -255,12 +280,13 @@ typedef struct qdr_connection_work_t {
qdr_link_t *link;
qdr_terminus_t *source;
qdr_terminus_t *target;
- pn_condition_t *condition;
+ qdr_error_t *error;
} qdr_connection_work_t;
ALLOC_DECLARE(qdr_connection_work_t);
DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
+
struct qdr_connection_t {
DEQ_LINKS(qdr_connection_t);
qdr_core_t *core;
@@ -287,6 +313,10 @@ struct qdr_core_t {
sys_cond_t *action_cond;
sys_mutex_t *action_lock;
+ sys_mutex_t *work_lock;
+ qdr_general_work_list_t work_list;
+ qd_timer_t *work_timer;
+
qdr_connection_list_t open_connections;
//
@@ -343,6 +373,12 @@ qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label)
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
+
+void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
+
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
qd_router_entity_type_t type,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index ba8eab9..ce4b6a8 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -65,7 +65,8 @@ void *router_core_thread(void *arg)
action = DEQ_HEAD(action_list);
while (action) {
DEQ_REMOVE_HEAD(action_list);
- qd_log(core->log, QD_LOG_TRACE, "Core action '%s'%s", action->label, core->running ? "" : " (discard)");
+ if (action->label)
+ qd_log(core->log, QD_LOG_TRACE, "Core action '%s'%s", action->label, core->running ? "" : " (discard)");
action->action_handler(core, action, !core->running);
free_qdr_action_t(action);
action = DEQ_HEAD(action_list);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ba06a25..e9e8012 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -37,25 +37,9 @@ const char *CORE_AGENT_ADDRESS = "$management";
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
static char *direct_prefix;
static char *node_id;
-/*
- * Address Types and Processing:
- *
- * Address Hash Key onReceive
- * ===================================================================
- * _local/<local> L<local> handler
- * _topo/<area>/<router>/<local> A<area> forward
- * _topo/<my-area>/<router>/<local> R<router> forward
- * _topo/<my-area>/<my-router>/<local> L<local> handler
- * _topo/<area>/all/<local> A<area> forward
- * _topo/<my-area>/all/<local> L<local> forward handler
- * _topo/all/all/<local> L<local> forward handler
- * <mobile> M<mobile> forward handler
- */
-
ALLOC_DEFINE(qd_routed_event_t);
ALLOC_DEFINE(qd_router_link_t);
ALLOC_DEFINE(qd_router_node_t);
@@ -152,77 +136,6 @@ void qd_router_del_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp)
/**
- * Check an address to see if it no longer has any associated destinations.
- * Depending on its policy, the address may be eligible for being closed out
- * (i.e. Logging its terminal statistics and freeing its resources).
- */
-void qd_router_check_addr(qd_router_t *router, qd_address_t *addr, int was_local)
-{
- if (addr == 0)
- return;
-
- unsigned char *key = 0;
- int to_delete = 0;
- int no_more_locals = 0;
-
- sys_mutex_lock(router->lock);
-
- //
- // If the address has no in-process consumer or destinations, it should be
- // deleted.
- //
- if (addr->on_message == 0 &&
- DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
- !addr->waypoint && !addr->block_deletion)
- to_delete = 1;
-
- //
- // If we have just removed a local linkage and it was the last local linkage,
- // we need to notify the router module that there is no longer a local
- // presence of this address.
- //
- if (was_local && DEQ_SIZE(addr->rlinks) == 0)
- no_more_locals = 1;
-
- if (to_delete) {
- //
- // Delete the address but grab the hash key so we can use it outside the
- // critical section.
- //
- qd_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key);
- DEQ_REMOVE(router->addrs, addr);
- qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, addr);
- qd_hash_handle_free(addr->hash_handle);
- free_qd_address_t(addr);
- }
-
- //
- // If we're not deleting but there are no more locals, get a copy of the hash key.
- //
- if (!to_delete && no_more_locals) {
- const unsigned char *key_const = qd_hash_key_by_handle(addr->hash_handle);
- key = (unsigned char*) malloc(strlen((const char*) key_const) + 1);
- strcpy((char*) key, (const char*) key_const);
- }
-
- sys_mutex_unlock(router->lock);
-
- //
- // If the address is mobile-class and it was just removed from a local link,
- // tell the router module that it is no longer attached locally.
- //
- if (no_more_locals && key && key[0] == 'M')
- qd_router_mobile_removed(router, (const char*) key);
-
- //
- // Free the key that was not freed by the hash table.
- //
- if (key)
- free(key);
-}
-
-
-/**
* Determine the role of a connection
*/
static qdr_connection_role_t qd_router_connection_role(const qd_connection_t *conn)
@@ -239,173 +152,6 @@ static qdr_connection_role_t qd_router_connection_role(const qd_connection_t *co
}
-/**
- * Determine whether a connection is configured in the inter-router role.
- * DEPRECATE
- */
-static int qd_router_connection_is_inter_router(const qd_connection_t *conn)
-{
- if (!conn)
- return 0;
-
- const qd_server_config_t *cf = qd_connection_config(conn);
- if (cf && strcmp(cf->role, router_role) == 0)
- return 1;
-
- return 0;
-}
-
-
-/**
- * Determine whether a connection is configured in the on-demand role.
- * DEPRECATE
- */
-static int qd_router_connection_is_on_demand(const qd_connection_t *conn)
-{
- if (!conn)
- return 0;
-
- const qd_server_config_t *cf = qd_connection_config(conn);
- if (cf && strcmp(cf->role, on_demand_role) == 0)
- return 1;
-
- return 0;
-}
-
-
-/**
- * Determine whether a terminus has router capability
- */
-static int qd_router_terminus_is_router(pn_terminus_t *term)
-{
- pn_data_t *cap = pn_terminus_capabilities(term);
-
- pn_data_rewind(cap);
- pn_data_next(cap);
- if (cap && pn_data_type(cap) == PN_SYMBOL) {
- pn_bytes_t sym = pn_data_get_symbol(cap);
- if (sym.size == strlen(QD_CAPABILITY_ROUTER_CONTROL) &&
- strcmp(sym.start, QD_CAPABILITY_ROUTER_CONTROL) == 0)
- return 1;
- }
-
- return 0;
-}
-
-
-/**
- * If the terminus has a dynamic-node-property for a node address,
- * return an iterator for the content of that property.
- * DEPRECATE
- */
-static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
-{
- pn_data_t *props = pn_terminus_properties(term);
-
- if (!props)
- return 0;
-
- pn_data_rewind(props);
- if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props)) {
- pn_bytes_t sym = pn_data_get_symbol(props);
- if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start) == 0) {
- if (pn_data_next(props)) {
- pn_bytes_t val = pn_data_get_string(props);
- if (val.start && *val.start != '\0')
- return val.start;
- }
- }
- }
-
- return 0;
-}
-
-
-/**
- * Generate a temporary routable address for a destination connected to this
- * router node.
- * DEPRECATE
- */
-static void qd_router_generate_temp_addr(qd_router_t *router, char *buffer, size_t length)
-{
- static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
- char discriminator[11];
- long int rnd1 = random();
- long int rnd2 = random();
- int idx;
- int cursor = 0;
-
- for (idx = 0; idx < 5; idx++) {
- discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63];
- discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63];
- }
- discriminator[cursor] = '\0';
-
- snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
-}
-
-
-/**
- * Assign a link-mask-bit to a new link. Do this in such a way that all links on the same
- * connection share the same mask-bit value.
- *
- * DEPRECATE
- */
-static int qd_router_find_mask_bit_LH(qd_router_t *router, qd_link_t *link)
-{
- qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
- if (shared) {
- shared->ref_count++;
- return shared->mask_bit;
- }
-
- int mask_bit;
- if (qd_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
- qd_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
- } else {
- qd_log(router->log_source, QD_LOG_CRITICAL, "Exceeded maximum inter-router link count");
- return -1;
- }
-
- shared = new_qd_router_conn_t();
- shared->ref_count = 1;
- shared->mask_bit = mask_bit;
- qd_link_set_conn_context(link, shared);
- return mask_bit;
-}
-
-
-/**
- * DEPRECATE
- */
-static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr, qd_direction_t dir)
-{
- char addr_prefix = (dir == QD_INCOMING) ? 'C' : 'D';
- qd_field_iterator_t *iter;
- qd_address_t *addr;
-
- if (taddr == 0 || *taddr == '\0')
- return 0;
-
- //
- // Initialize the iterator with taddr so we can traverse through taddr by traversing the iterator.
- //
- iter = qd_address_iterator_string(taddr, ITER_VIEW_ADDRESS_HASH);
-
- //Set the iter->prefix_override to addr_prefix
- qd_address_iterator_override_prefix(iter, addr_prefix);
-
- //Populate addr with the appropriate qd_address_t if the string in the iterator generated a hash
- //that is present in the addr_hash
- qd_hash_retrieve_prefix(router->addr_hash, iter, (void*) &addr);
-
- qd_field_iterator_free(iter);
-
- return addr;
-}
-
-
-
void qd_router_link_free_LH(qd_router_link_t *rlink)
{
qd_link_t *link = rlink->link;
@@ -714,38 +460,6 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
/**
- * Handle the link-routing case, where links are pre-paired and there is no per-message
- * routing needed.
- *
- * Note that this function does not issue a replacement credit for the received message.
- * In link-routes, the flow commands must be propagated end-to-end. In other words, the
- * ultimate receiving endpoint will issue the replacement credits as it sees fit.
- *
- * Note also that this function does not perform any message validation. For link-routing,
- * there is no need to look into the transferred message.
- */
-static void router_link_route_delivery_LH(qd_router_link_t *peer_link, qd_router_delivery_t *delivery, qd_message_t *msg)
-{
- qd_routed_event_t *re = new_qd_routed_event_t();
-
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = msg;
- re->settle = false;
- re->disposition = 0;
- DEQ_INSERT_TAIL(peer_link->msg_fifo, re);
-
- //
- // Link the incoming delivery into the event for deferred processing
- //
- re->delivery = delivery;
- qd_router_delivery_fifo_enter_LH(delivery);
-
- qd_link_activate(peer_link->link);
-}
-
-
-/**
* Inbound Delivery Handler
*/
static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
@@ -788,7 +502,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
sys_mutex_lock(router->lock);
qd_router_link_t *clink = rlink->connected_link;
if (clink) {
- router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd), msg);
+ //router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd), msg);
sys_mutex_unlock(router->lock);
return;
}
@@ -1040,88 +754,6 @@ typedef enum {
} link_attach_result_t;
-static void qd_router_attach_routed_link(void *context, bool discard)
-{
- link_attach_t *la = (link_attach_t*) context;
-
- if (!discard) {
- qd_link_t *link = qd_link(la->router->node, la->conn, la->dir, la->link_name);
-
- qd_router_link_t *rlink = qd_router_link(link, QD_LINK_ENDPOINT, la->dir, 0, 0, 0);
-
- qd_link_set_context(link, rlink);
-
- sys_mutex_lock(la->router->lock);
- rlink->connected_link = la->peer_link;
- la->peer_link->connected_link = rlink;
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(la->router->links, rlink);
- sys_mutex_unlock(la->router->lock);
-
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(la->peer_qd_link));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
-
- pn_link_open(qd_link_pn(link));
- if (la->credit > 0)
- pn_link_flow(qd_link_pn(link), la->credit);
- }
-
- if (la->link_name)
- free(la->link_name);
- free_link_attach_t(la);
-}
-
-
-static void qd_router_detach_routed_link(void *context, bool discard)
-{
- link_detach_t *ld = (link_detach_t*) context;
-
- if (!discard) {
- qd_link_t *link = ld->rlink->link;
-
- if (ld->condition_name[0]) {
- pn_condition_t *cond = pn_link_condition(qd_link_pn(link));
- pn_condition_set_name(cond, ld->condition_name);
- pn_condition_set_description(cond, ld->condition_description);
- if (ld->condition_info)
- pn_data_copy(pn_condition_info(cond), ld->condition_info);
- }
-
- qd_link_close(link);
-
- sys_mutex_lock(ld->router->lock);
- qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, ld->rlink);
- DEQ_REMOVE(ld->router->links, ld->rlink);
- qd_router_link_free_LH(ld->rlink);
- sys_mutex_unlock(ld->router->lock);
- }
-
- if (ld->condition_info)
- pn_data_free(ld->condition_info);
- free_link_detach_t(ld);
-}
-
-
-static void qd_router_open_routed_link(void *context, bool discard)
-{
- link_event_t *le = (link_event_t*) context;
-
- if (!discard) {
- sys_mutex_lock(le->router->lock);
- qd_link_t *link = le->rlink->link;
- if (le->rlink->connected_link) {
- qd_link_t *peer = le->rlink->connected_link->link;
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(peer));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(peer));
- pn_link_open(qd_link_pn(link));
- }
- sys_mutex_unlock(le->router->lock);
- }
-
- free_link_event_t(le);
-}
-
-
static void qd_router_flow(void *context, bool discard)
{
link_event_t *le = (link_event_t*) context;
@@ -1140,71 +772,6 @@ static void qd_router_flow(void *context, bool discard)
}
-link_attach_result_t qd_router_link_route_LH(qd_router_t *router,
- qd_router_link_t *rlink,
- const char *term_addr,
- qd_direction_t dir)
-{
- //
- // Lookup the target address to see if we can link-route this attach.
- //
- qd_address_t *addr = router_lookup_terminus_LH(router, term_addr, dir);
- if (addr) {
- //
- // This is a link-attach routable target. Propagate the attach downrange.
- // Check first for a locally connected container.
- //
- qd_link_t *link = rlink->link;
- pn_link_t *pn_link = qd_link_pn(link);
- qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
- if (lrpref) {
- qd_connection_t *conn = lrpref->lrp->container->conn;
- if (conn) {
- link_attach_t *la = new_link_attach_t();
- la->router = router;
- la->peer_link = rlink;
- la->peer_qd_link = link;
- la->link_name = strdup(pn_link_name(pn_link));
- la->dir = dir;
- la->conn = conn;
- la->credit = pn_link_credit(pn_link);
- qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
- }
- } else if (DEQ_SIZE(addr->rnodes) > 0) {
- //
- // There are no locally connected containers for this link but there is at
- // least one on a remote router. Forward the attach toward the remote destination.
- //
- qd_router_node_t *remote_router = DEQ_HEAD(addr->rnodes)->router;
- qd_router_link_t *out_link = 0;
- if (remote_router)
- out_link = remote_router->peer_link;
- if (!out_link && remote_router && remote_router->next_hop)
- out_link = remote_router->next_hop->peer_link;
- if (out_link) {
- qd_connection_t *out_conn = qd_link_connection(out_link->link);
- if (out_conn) {
- link_attach_t *la = new_link_attach_t();
- la->router = router;
- la->peer_link = rlink;
- la->peer_qd_link = link;
- la->link_name = strdup(pn_link_name(pn_link));
- la->dir = dir;
- la->conn = out_conn;
- la->credit = pn_link_credit(pn_link);
- qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
- } else
- return LINK_ATTACH_NO_PATH;
- } else
- return LINK_ATTACH_NO_PATH;
- } else
- return LINK_ATTACH_NO_PATH;
- } else
- return LINK_ATTACH_NO_MATCH;
- return LINK_ATTACH_FORWARDED;
-}
-
-
qd_router_link_t* qd_router_link(qd_link_t *link, qd_link_type_t link_type, qd_direction_t direction, qd_address_t *owning_addr, qd_waypoint_t *wp, int mask_bit)
{
qd_router_link_t *rlink = new_qd_router_link_t();
@@ -1250,11 +817,6 @@ qd_router_link_t* qd_router_link(qd_link_t *link, qd_link_type_t link_type, qd_d
*/
static int router_incoming_link_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- pn_link_t *pn_link = qd_link_pn(link);
- int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
- const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
-
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
@@ -1263,72 +825,6 @@ static int router_incoming_link_handler(void* context, qd_link_t *link)
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
- //
- // DEPRECATE:
- //
-
- if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING,
- "Incoming link claims router capability but is not on an inter-router connection");
- pn_link_close(pn_link);
- return 0;
- }
-
- qd_router_link_t *rlink = qd_router_link(link, is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT, QD_INCOMING, 0, 0, 0);
-
- if (!is_router && r_tgt) {
- rlink->target = (char*) malloc(strlen(r_tgt) + 1);
- strcpy(rlink->target, r_tgt);
- }
-
- qd_link_set_context(link, rlink);
-
- sys_mutex_lock(router->lock);
- rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
- //
- // Attempt to link-route this attach
- //
- link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
- if (!is_router)
- la_result = qd_router_link_route_LH(router, rlink, r_tgt, QD_OUTGOING);
- sys_mutex_unlock(router->lock);
-
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
-
- switch (la_result) {
- case LINK_ATTACH_NO_MATCH:
- //
- // We didn't link-route this attach. It terminates here.
- // Open it in the reverse direction.
- //
- pn_link_flow(pn_link, 1000);
- pn_link_open(pn_link);
- break;
-
- case LINK_ATTACH_NO_PATH: {
- //
- // The link should be routable but there is no path to the
- // destination. Close the link.
- //
- pn_condition_t *cond = pn_link_condition(pn_link);
- pn_condition_set_name(cond, "qd:no-route-to-dest");
- pn_condition_set_description(cond, "No route to the destination node");
- pn_link_close(pn_link);
- break;
- }
-
- case LINK_ATTACH_FORWARDED:
- //
- // We routed the attach outbound. Don't open the link back until
- // the downstream link is opened.
- //
- break;
- }
-
return 0;
}
@@ -1338,18 +834,6 @@ static int router_incoming_link_handler(void* context, qd_link_t *link)
*/
static int router_outgoing_link_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- pn_link_t *pn_link = qd_link_pn(link);
- const char *r_src = pn_terminus_get_address(qd_link_remote_source(link));
- int is_dynamic = pn_terminus_is_dynamic(qd_link_remote_source(link));
- int is_router = qd_router_terminus_is_router(qd_link_remote_target(link));
- int propagate = 0;
- qd_field_iterator_t *iter = 0;
- char phase = '0';
- qd_address_semantics_t semantics;
- qd_address_t *addr = 0;
- link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
-
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
@@ -1358,165 +842,6 @@ static int router_outgoing_link_handler(void* context, qd_link_t *link)
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
- //
- // DEPRECATE:
- //
-
- if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING,
- "Outgoing link claims router capability but is not on an inter-router connection");
- pn_link_close(pn_link);
- return 0;
- }
-
- //
- // If this link is not a router link and it has no source address, we can't
- // accept it.
- //
- if (r_src == 0 && !is_router && !is_dynamic) {
- pn_link_close(pn_link);
- return 0;
- }
-
- //
- // If this is an endpoint link with a source address, make sure the address is
- // appropriate for endpoint links. If it is not mobile address, it cannot be
- // bound to an endpoint link.
- //
- if (r_src && !is_router && !is_dynamic) {
- iter = qd_address_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
- unsigned char prefix = qd_field_iterator_octet(iter);
- qd_field_iterator_reset(iter);
-
- if (prefix != 'M') {
- qd_field_iterator_free(iter);
- pn_link_close(pn_link);
- qd_log(router->log_source, QD_LOG_WARNING,
- "Rejected an outgoing endpoint link with a router address: %s", r_src);
- return 0;
- }
- }
-
- //
- // Create a router_link record for this link. Some of the fields will be
- // modified in the different cases below.
- //
- qd_router_link_t *rlink = qd_router_link(link, is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT, QD_OUTGOING, 0, 0, 0);
-
- qd_link_set_context(link, rlink);
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
-
- //
- // Determine the semantics for the address prior to taking out the lock.
- //
- if (is_dynamic || !iter)
- semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE;
- else {
- semantics = router_semantics_for_addr(router, iter, '\0', &phase);
- qd_address_iterator_set_phase(iter, phase);
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- }
-
- sys_mutex_lock(router->lock);
- rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
-
- if (is_router) {
- //
- // If this is a router link, put it in the hello_address link-list.
- //
- qd_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
- rlink->owning_addr = router->hello_addr;
- router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
-
- } else {
- //
- // If this is an endpoint link, check the source. If it is dynamic, we will
- // assign it an ephemeral and routable address. If it has a non-dynamic
- // address, that address needs to be set up in the address list.
- //
- char temp_addr[1000]; // TODO: Use pn_string or aprintf.
- const char *link_route_address = qd_router_terminus_dnp_address(qd_link_remote_source(link));
-
- if (link_route_address == 0)
- link_route_address = r_src;
- la_result = qd_router_link_route_LH(router, rlink, link_route_address, QD_INCOMING);
-
- if (la_result == LINK_ATTACH_NO_MATCH) {
- if (is_dynamic) {
- qd_router_generate_temp_addr(router, temp_addr, 1000);
- iter = qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
- pn_terminus_set_address(qd_link_source(link), temp_addr);
- qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
- } else
- qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
-
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- if (!addr) {
- addr = qd_address(semantics);
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(router->addrs, addr);
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- }
-
- rlink->owning_addr = addr;
- qd_router_add_link_ref_LH(&addr->rlinks, rlink);
-
- //
- // If this is not a dynamic address and it is the first local subscription
- // to the address, supply the address to the router module for propagation
- // to other nodes.
- //
- propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
- }
- }
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
- //
- // If an interesting change has occurred with this address and it has an associated waypoint,
- // notify the waypoint module so it can react appropriately.
- //
- if (propagate && addr->waypoint)
- qd_waypoint_address_updated_LH(router->qd, addr);
-
- sys_mutex_unlock(router->lock);
-
- if (propagate)
- qd_router_mobile_added(router, iter);
-
- if (iter)
- qd_field_iterator_free(iter);
-
- switch (la_result) {
- case LINK_ATTACH_NO_MATCH:
- //
- // We didn't link-route this attach. It terminates here.
- // Open it in the reverse direction.
- //
- pn_link_open(pn_link);
- break;
-
- case LINK_ATTACH_NO_PATH: {
- //
- // The link should be routable but there is no path to the
- // destination. Close the link.
- //
- pn_condition_t *cond = pn_link_condition(qd_link_pn(link));
- pn_condition_set_name(cond, "qd:no-route-to-dest");
- pn_condition_set_description(cond, "No route to the destination node");
- pn_link_close(pn_link);
- break;
- }
-
- case LINK_ATTACH_FORWARDED:
- //
- // We routed the attach outbound. Don't open the link back until
- // the downstream link is opened.
- //
- break;
- }
-
return 0;
}
@@ -1526,26 +851,9 @@ static int router_outgoing_link_handler(void* context, qd_link_t *link)
*/
static int router_link_attach_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
+ qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link);
+ qdr_link_second_attach(qlink, qdr_terminus(qd_link_remote_source(link)), qdr_terminus(qd_link_remote_target(link)));
- if (!rlink)
- return 0;
-
- sys_mutex_lock(router->lock);
- qd_router_link_t *peer_rlink = rlink->connected_link;
- if (peer_rlink) {
- qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
- if (out_conn) {
- link_event_t *le = new_link_event_t();
- memset(le, 0, sizeof(link_event_t));
- le->router = router;
- le->rlink = peer_rlink;
- qd_connection_invoke_deferred(out_conn, qd_router_open_routed_link, le);
- }
- }
- sys_mutex_unlock(router->lock);
-
return 0;
}
@@ -1600,115 +908,16 @@ static int router_link_flow_handler(void* context, qd_link_t *link)
*/
static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
- qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
- qd_address_t *oaddr = 0;
- qd_waypoint_t *wp = 0;
- int lost_link_mask_bit = -1;
-
- if (!rlink)
- return 0;
-
- sys_mutex_lock(router->lock);
-
- //
- // Save this so we can check it after the lock is released.
- //
- wp = rlink->waypoint;
-
- if (rlink->connected_link) {
- qd_connection_t *out_conn = qd_link_connection(rlink->connected_link->link);
- if (out_conn) {
- link_detach_t *ld = new_link_detach_t();
- memset(ld, 0, sizeof(link_detach_t));
- ld->router = router;
- ld->rlink = rlink->connected_link;
- pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
- if (cond && pn_condition_is_set(cond)) {
- if (pn_condition_get_name(cond)) {
- strncpy(ld->condition_name, pn_condition_get_name(cond), COND_NAME_LEN);
- ld->condition_name[COND_NAME_LEN] = '\0';
- }
- if (pn_condition_get_description(cond)) {
- strncpy(ld->condition_description, pn_condition_get_description(cond), COND_DESCRIPTION_LEN);
- ld->condition_description[COND_DESCRIPTION_LEN] = '\0';
- }
- if (pn_condition_info(cond)) {
- ld->condition_info = pn_data(0);
- pn_data_copy(ld->condition_info, pn_condition_info(cond));
- }
- } else if (dt == QD_LOST) {
- strcpy(ld->condition_name, "qd:routed-link-lost");
- strcpy(ld->condition_description, "Connectivity to the peer container was lost");
- }
- rlink->connected_link->connected_link = 0;
- qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link, ld);
- }
- }
-
- //
- // If this link is part of an inter-router connection, drop the
- // reference count. If this is the last link on the connection,
- // free the mask-bit and the shared connection record.
- //
- if (shared) {
- shared->ref_count--;
- if (shared->ref_count == 0) {
- lost_link_mask_bit = rlink->mask_bit;
- qd_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
- qd_link_set_conn_context(link, 0);
- free_qd_router_conn_t(shared);
- }
- }
-
- //
- // If the link is outgoing, we must disassociate it from its address.
- //
- if (rlink->link_direction == QD_OUTGOING && rlink->owning_addr) {
- qd_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
- oaddr = rlink->owning_addr;
+ qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
+ pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
+
+ if (rlink) {
+ qdr_error_t *error = qdr_error_from_pn(cond);
+ if (!error && dt == QD_LOST)
+ error = qdr_error("qd:routed-link-lost", "Connectivity to the peer container was lost");
+ qdr_link_detach(rlink, error);
}
- //
- // If this is an outgoing inter-router link, we must remove the by-mask-bit
- // index reference to this link.
- //
- if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_OUTGOING) {
- if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink)
- router->out_links_by_mask_bit[rlink->mask_bit] = 0;
- else
- qd_log(router->log_source, QD_LOG_CRITICAL,
- "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
- }
-
- //
- // Remove the link from the master list-of-links and deallocate
- //
- DEQ_REMOVE(router->links, rlink);
- qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, rlink);
- qd_router_link_free_LH(rlink);
-
- sys_mutex_unlock(router->lock);
-
- //
- // If this was a waypoint link, notify the waypoint module.
- //
- if (wp)
- qd_waypoint_link_closed(router->qd, wp, link);
-
- //
- // Check to see if the owning address should be deleted
- //
- qd_router_check_addr(router, oaddr, 1);
-
- //
- // If we lost the link to a neighbor router, notify the route engine so it doesn't
- // have to wait for the HELLO timeout to expire.
- //
- if (lost_link_mask_bit >= 0)
- qd_router_link_lost(router, lost_link_mask_bit);
-
return 0;
}
@@ -1716,8 +925,8 @@ static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_
static void router_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
- qdr_connection_role_t role = qd_router_connection_role(conn);
- qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, true, role, 0); // TODO - get label
+ qdr_connection_role_t role = qd_router_connection_role(conn);
+ qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, true, role, 0); // TODO - get label
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
@@ -1727,92 +936,11 @@ static void router_inbound_opened_handler(void *type_context, qd_connection_t *c
static void router_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
- qdr_connection_role_t role = qd_router_connection_role(conn);
- qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, false, role, 0); // TODO - get label
+ qdr_connection_role_t role = qd_router_connection_role(conn);
+ qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, false, role, 0); // TODO - get label
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
-
- // DEPRECATE:
-
- //
- // If the connection is on-demand, visit all waypoints that are waiting for their
- // connection to arrive.
- //
- if (qd_router_connection_is_on_demand(conn)) {
- qd_waypoint_connection_opened(router->qd, (qd_config_connector_t*) context, conn);
- return;
- }
-
- //
- // If the connection isn't inter-router, ignore it.
- //
- if (!qd_router_connection_is_inter_router(conn))
- return;
-
- qd_link_t *sender;
- qd_link_t *receiver;
- qd_router_link_t *rlink;
- int mask_bit = 0;
- size_t clen = strlen(QD_CAPABILITY_ROUTER_CONTROL);
-
- //
- // Allocate a mask bit to designate the pair of links connected to the neighbor router
- //
- sys_mutex_lock(router->lock);
- if (qd_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
- qd_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
- } else {
- sys_mutex_unlock(router->lock);
- qd_log(router->log_source, QD_LOG_CRITICAL, "Exceeded maximum inter-router link count");
- return;
- }
-
- //
- // Create an incoming link with router source capability
- //
- receiver = qd_link(router->node, conn, QD_INCOMING, QD_INTERNODE_LINK_NAME_1);
- // TODO - We don't want to have to cast away the constness of the literal string here!
- // See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)),
- pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER_CONTROL));
-
- rlink = qd_router_link(receiver, QD_LINK_ROUTER, QD_INCOMING, 0, 0, mask_bit);
-
- qd_link_set_context(receiver, rlink);
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
- //
- // Create an outgoing link with router target capability
- //
- sender = qd_link(router->node, conn, QD_OUTGOING, QD_INTERNODE_LINK_NAME_2);
- // TODO - We don't want to have to cast away the constness of the literal string here!
- // See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)),
- pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER_CONTROL));
-
- rlink = qd_router_link(sender, QD_LINK_ROUTER, QD_OUTGOING, router->hello_addr, 0, mask_bit);
-
- //
- // Add the new outgoing link to the hello_address's list of links.
- //
- qd_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
-
- //
- // Index this link from the by-maskbit index so we can later find it quickly
- // when provided with the mask bit.
- //
- router->out_links_by_mask_bit[mask_bit] = rlink;
-
- qd_link_set_context(sender, rlink);
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
- sys_mutex_unlock(router->lock);
-
- pn_link_open(qd_link_pn(receiver));
- pn_link_open(qd_link_pn(sender));
- pn_link_flow(qd_link_pn(receiver), 1000);
}
@@ -1960,7 +1088,7 @@ static void qd_router_link_second_attach(void *context, qdr_link_t *link, qdr_te
}
-static void qd_router_link_detach(void *context, qdr_link_t *link, pn_condition_t *condition)
+static void qd_router_link_detach(void *context, qdr_link_t *link, qdr_error_t *error)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 50ab94e..4bd0467 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -277,10 +277,6 @@ void qd_router_del_node_ref_LH(qd_router_ref_list_t *ref_list, qd_router_node_t
void qd_router_add_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp);
void qd_router_del_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp);
-void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter);
-void qd_router_mobile_removed(qd_router_t *router, const char *addr);
-void qd_router_link_lost(qd_router_t *router, int link_mask_bit);
-
qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
char in_phase, char *out_phase);
qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org