You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/16 22:51:04 UTC
[1/3] qpid-dispatch git commit: DISPATCH-179 - Final changes to core
before ripping out the deprecated connection and link logic.
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 dbdff65b0 -> ca24d67f0
DISPATCH-179 - Final changes to core before ripping out the deprecated connection
and link logic.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fc2c243e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fc2c243e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fc2c243e
Branch: refs/heads/tross-DISPATCH-179-1
Commit: fc2c243e9c02104e5b6ae6f01ab715ad850b449a
Parents: dbdff65
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Dec 14 16:12:04 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Dec 14 16:12:04 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 +-
src/iterator.c | 6 ++++++
src/router_core/connections.c | 24 ++++++++++++++++++++-
src/router_core/router_core.c | 6 ++++--
src/router_core/router_core_thread.c | 2 +-
src/router_node.c | 35 +++++++++++++++++++++++++++----
6 files changed, 66 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fc2c243e/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index f83e4cc..cf2c7a4 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -41,7 +41,7 @@ typedef struct qdr_terminus_t qdr_terminus_t;
/**
* Allocate and start an instance of the router core module.
*/
-qdr_core_t *qdr_core(qd_dispatch_t *qd);
+qdr_core_t *qdr_core(qd_dispatch_t *qd, const char *area, const char *id);
/**
* Stop and deallocate an instance of the router core.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fc2c243e/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index da12347..7a8f0ab 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -376,6 +376,8 @@ qd_field_iterator_t* qd_address_iterator_binary(const char *text, int length, qd
iter->phase = '0';
iter->prefix_override = '\0';
+ DEQ_INIT(iter->hash_segments);
+
qd_address_iterator_reset_view(iter, view);
return iter;
@@ -394,6 +396,8 @@ qd_field_iterator_t *qd_address_iterator_buffer(qd_buffer_t *buffer, int offset,
iter->phase = '0';
iter->prefix_override = '\0';
+ DEQ_INIT(iter->hash_segments);
+
qd_address_iterator_reset_view(iter, view);
return iter;
@@ -492,6 +496,8 @@ qd_field_iterator_t *qd_field_iterator_sub(const qd_field_iterator_t *iter, uint
sub->prefix_override = '\0';
sub->phase = '0';
+ DEQ_INIT(sub->hash_segments);
+
return sub;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fc2c243e/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index e760cfb..884a298 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -174,6 +174,13 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, qd
link->core = conn->core;
link->conn = conn;
+ if (dir == QD_OUTGOING) {
+ if (qdr_terminus_has_capability(target, QD_CAPABILITY_ROUTER_CONTROL))
+ link->link_type = QD_LINK_CONTROL;
+ else if (qdr_terminus_has_capability(target, QD_CAPABILITY_ROUTER_DATA))
+ link->link_type = QD_LINK_ROUTER;
+ }
+
action->args.connection.conn = conn;
action->args.connection.link = link;
action->args.connection.dir = dir;
@@ -683,9 +690,24 @@ static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool disc
if (discard)
return;
- //qdr_link_t *link = action->args.connection.link;
+ qdr_link_t *link = action->args.connection.link;
//pn_condition_t *condition = action->args.connection.condition;
+ switch (link->link_type) {
+ case QD_LINK_ENDPOINT:
+ break;
+
+ case QD_LINK_WAYPOINT:
+ break;
+
+ case QD_LINK_CONTROL:
+ break;
+
+ case QD_LINK_ROUTER:
+ break;
+ }
+
+
//
// Cases to be handled:
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fc2c243e/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index e1c3fff..f86f921 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -28,12 +28,14 @@ ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
-qdr_core_t *qdr_core(qd_dispatch_t *qd)
+qdr_core_t *qdr_core(qd_dispatch_t *qd, const char *area, const char *id)
{
qdr_core_t *core = NEW(qdr_core_t);
ZERO(core);
- core->qd = qd;
+ core->qd = qd;
+ core->router_area = area;
+ core->router_id = id;
//
// Set up the logging source for the router core
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fc2c243e/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index 9debd25..ba8eab9 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -39,7 +39,7 @@ void *router_core_thread(void *arg)
qdr_route_table_setup_CT(core);
qdr_agent_setup_CT(core);
- qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
+ qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id);
while (core->running) {
//
// Use the lock only to protect the condition variable and the action list
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fc2c243e/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 1eb6bc6..ba06a25 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1255,6 +1255,18 @@ static int router_incoming_link_handler(void* context, qd_link_t *link)
int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
+ qd_connection_t *conn = qd_link_connection(link);
+ qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+ qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
+ qdr_terminus(qd_link_remote_source(link)),
+ qdr_terminus(qd_link_remote_target(link)));
+ qdr_link_set_context(qdr_link, link);
+ qd_link_set_context(link, qdr_link);
+
+ //
+ // DEPRECATE:
+ //
+
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING,
"Incoming link claims router capability but is not on an inter-router connection");
@@ -1338,6 +1350,18 @@ static int router_outgoing_link_handler(void* context, qd_link_t *link)
qd_address_t *addr = 0;
link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
+ qd_connection_t *conn = qd_link_connection(link);
+ qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+ qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
+ qdr_terminus(qd_link_remote_source(link)),
+ qdr_terminus(qd_link_remote_target(link)));
+ qdr_link_set_context(qdr_link, link);
+ qd_link_set_context(link, qdr_link);
+
+ //
+ // DEPRECATE:
+ //
+
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING,
"Outgoing link claims router capability but is not on an inter-router connection");
@@ -1918,12 +1942,15 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
static void qd_router_connection_activate(void *context, qdr_connection_t *conn)
{
- //qd_router_t *router = (qd_router_t*) context;
+ qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn));
}
-static void qd_router_link_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
- qdr_terminus_t *source, qdr_terminus_t *target)
+static void qd_router_link_first_attach(void *context,
+ qdr_connection_t *conn,
+ qdr_link_t *link,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target)
{
}
@@ -1940,7 +1967,7 @@ static void qd_router_link_detach(void *context, qdr_link_t *link, pn_condition_
void qd_router_setup_late(qd_dispatch_t *qd)
{
- qd->router->router_core = qdr_core(qd);
+ qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id);
qdr_connection_handlers(qd->router->router_core, (void*) qd->router,
qd_router_connection_activate,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-dispatch git commit: DISPATCH-179 - WIP checkpoint. Ripped
out large sections of the old architecture. Added the general callback
functionality for non-connection-specific actions.
Posted by tr...@apache.org.
DISPATCH-179 - WIP checkpoint.
Ripped out large sections of the old architecture.
Added the general callback functionality for non-connection-specific actions.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ca24d67f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ca24d67f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ca24d67f
Branch: refs/heads/tross-DISPATCH-179-1
Commit: ca24d67f0a8394836417e988630bb9810420b965
Parents: fc2c243
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Dec 16 16:49:14 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Dec 16 16:49:14 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 26 +-
python/qpid_dispatch_internal/router/node.py | 5 +-
src/CMakeLists.txt | 1 +
src/lrp.c | 22 +-
src/router_core/connections.c | 58 +-
src/router_core/error.c | 97 +++
src/router_core/route_tables.c | 134 +++-
src/router_core/router_core.c | 63 ++
src/router_core/router_core_private.h | 56 +-
src/router_core/router_core_thread.c | 3 +-
src/router_node.c | 904 +---------------------
src/router_private.h | 4 -
src/router_pynode.c | 461 ++---------
src/waypoint.c | 2 +-
14 files changed, 479 insertions(+), 1357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index cf2c7a4..e42d818 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -37,6 +37,7 @@ typedef struct qdr_connection_t qdr_connection_t;
typedef struct qdr_link_t qdr_link_t;
typedef struct qdr_delivery_t qdr_delivery_t;
typedef struct qdr_terminus_t qdr_terminus_t;
+typedef struct qdr_error_t qdr_error_t;
/**
* Allocate and start an instance of the router core module.
@@ -60,11 +61,11 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit);
void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit);
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem);
-void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
+void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
-typedef void (*qdr_mobile_added_t) (void *context, const char *address);
-typedef void (*qdr_mobile_removed_t) (void *context, const char *address);
+typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash);
+typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash);
typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
void qdr_core_route_table_handlers(qdr_core_t *core,
@@ -277,6 +278,17 @@ qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term);
/**
******************************************************************************
+ * Error functions
+ ******************************************************************************
+ */
+
+qdr_error_t *qdr_error_from_pn(pn_condition_t *pn);
+qdr_error_t *qdr_error(const char *name, const char *description);
+void qdr_error_free(qdr_error_t *error);
+void qdr_error_copy(qdr_error_t *from, pn_condition_t *to);
+
+/**
+ ******************************************************************************
* Link functions
******************************************************************************
*/
@@ -347,9 +359,9 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin
* This function is invoked when a link detach arrives.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
- * @param condition The link condition from the detach frame.
+ * @param error The link error from the detach frame or 0 if none.
*/
-void qdr_link_detach(qdr_link_t *link, pn_condition_t *condition);
+void qdr_link_detach(qdr_link_t *link, qdr_error_t *error);
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg);
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr);
@@ -357,7 +369,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, q
typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
-typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, pn_condition_t *condition);
+typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error);
void qdr_connection_handlers(qdr_core_t *core,
void *context,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index fc40dd9..1fd4f27 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -486,10 +486,7 @@ class RouterNode(object):
def map_address(self, addr):
self.mobile_addresses.append(addr)
- phase = '0'
- if addr[0] == 'M':
- phase = addr[1]
- self.adapter.map_destination(phase, addr, self.maskbit)
+ self.adapter.map_destination(addr, self.maskbit)
self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % (self._logify(addr), self.id))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 395614d..333b4f0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,7 @@ set(qpid_dispatch_SOURCES
router_core/agent_waypoint.c
router_core/agent_link.c
router_core/connections.c
+ router_core/error.c
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/lrp.c
----------------------------------------------------------------------
diff --git a/src/lrp.c b/src/lrp.c
index 0d13748..d3d0b47 100644
--- a/src/lrp.c
+++ b/src/lrp.c
@@ -28,6 +28,10 @@
static const char qd_link_route_addr_prefix_inbound = 'C';
static const char qd_link_route_addr_prefix_outbound = 'D';
+
+//
+// DEPRECATE
+//
static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
{
qd_lrp_container_t *lrpc = (qd_lrp_container_t*) context;
@@ -39,7 +43,7 @@ static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
while (lrp) {
qd_address_t *addr;
qd_field_iterator_t *iter;
- bool propagate;
+ //bool propagate;
char unused;
qd_log(router->log_source, QD_LOG_INFO, "Activating Prefix '%s' for routed links to '%s'",
@@ -73,14 +77,14 @@ static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
// across the network.
//
qd_router_add_lrp_ref_LH(&addr->lrps, lrp);
- propagate = DEQ_SIZE(addr->lrps) == 1;
+ //propagate = DEQ_SIZE(addr->lrps) == 1;
sys_mutex_unlock(router->lock);
//
// Propagate the address if appropriate
//
- if (propagate)
- qd_router_mobile_added(router, iter);
+ //if (propagate)
+ // qd_router_mobile_added(router, iter);
}
if (lrp->outbound) {
@@ -106,14 +110,14 @@ static void qd_lrpc_open_handler(void *context, qd_connection_t *conn)
// across the network.
//
qd_router_add_lrp_ref_LH(&addr->lrps, lrp);
- propagate = DEQ_SIZE(addr->lrps) == 1;
+ //propagate = DEQ_SIZE(addr->lrps) == 1;
sys_mutex_unlock(router->lock);
//
// Propagate the address if appropriate
//
- if (propagate)
- qd_router_mobile_added(router, iter);
+ //if (propagate)
+ // qd_router_mobile_added(router, iter);
}
qd_field_iterator_free(iter);
@@ -166,7 +170,7 @@ static void qd_lrpc_close_handler(void *context, qd_connection_t *conn)
//
if (propagate) {
char *text = (char*) qd_field_iterator_copy(iter);
- qd_router_mobile_removed(router, text);
+ //qd_router_mobile_removed(router, text);
free(text);
}
}
@@ -194,7 +198,7 @@ static void qd_lrpc_close_handler(void *context, qd_connection_t *conn)
//
if (propagate) {
char *text = (char*) qd_field_iterator_copy(iter);
- qd_router_mobile_removed(router, text);
+ //qd_router_mobile_removed(router, text);
free(text);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 884a298..26064de 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -130,7 +130,7 @@ void qdr_connection_process(qdr_connection_t *conn)
break;
case QDR_CONNECTION_WORK_DETACH :
- core->detach_handler(core->user_context, work->link, work->condition);
+ core->detach_handler(core->user_context, work->link, work->error);
break;
}
@@ -203,12 +203,12 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin
}
-void qdr_link_detach(qdr_link_t *link, pn_condition_t *condition)
+void qdr_link_detach(qdr_link_t *link, qdr_error_t *error)
{
qdr_action_t *action = qdr_action(qdr_link_detach_CT, "link_detach");
- action->args.connection.link = link;
- action->args.connection.condition = condition;
+ action->args.connection.link = link;
+ action->args.connection.error = error;
qdr_action_enqueue(link->core, action);
}
@@ -331,12 +331,47 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie
// Question: Should we use a new prefix for configuration? (No: allows the possibility of
// static routes; yes: prevents occlusion by mobile addresses with specified semantics)
//
- qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) addr);
+ qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
return addr ? addr->semantics : qdr_default_semantics;
}
/**
+ * Check an address to see if it no longer has any associated destinations.
+ * Depending on its policy, the address may be eligible for being closed out
+ * (i.e. Logging its terminal statistics and freeing its resources).
+ */
+/*static*/ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
+{
+ if (addr == 0)
+ return;
+
+ //
+ // If we have just removed a local linkage and it was the last local linkage,
+ // we need to notify the router module that there is no longer a local
+ // presence of this address.
+ //
+ if (was_local && DEQ_SIZE(addr->rlinks) == 0) {
+ const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ if (key && *key == 'M')
+ qdr_post_mobile_removed_CT(core, key);
+ }
+
+ //
+ // If the address has no in-process consumer or destinations, it should be
+ // deleted.
+ //
+ if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
+ !addr->waypoint && !addr->block_deletion) {
+ qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
+ DEQ_REMOVE(core->addrs, addr);
+ qd_hash_handle_free(addr->hash_handle);
+ free_qdr_address_t(addr);
+ }
+}
+
+
+/**
* qdr_lookup_terminus_address_CT
*
* Lookup a terminus address in the route table and possibly create a new address
@@ -420,6 +455,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
// a link-route destination for the address.
//
qd_field_iterator_t *iter = qdr_terminus_get_address(terminus);
+ qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_override_prefix(iter, qdr_prefix_for_dir(dir));
qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
if (addr) {
@@ -618,9 +654,11 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo
//
link->owning_addr = addr;
qdr_add_link_ref(&addr->rlinks, link);
- if (DEQ_SIZE(addr->rlinks) == 1)
- // TODO - notify the router module
- ;
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+ if (key && *key == 'M')
+ qdr_post_mobile_added_CT(core, key);
+ }
qdr_link_accept_CT(core, link);
}
break;
@@ -690,8 +728,8 @@ static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool disc
if (discard)
return;
- qdr_link_t *link = action->args.connection.link;
- //pn_condition_t *condition = action->args.connection.condition;
+ qdr_link_t *link = action->args.connection.link;
+ //qdr_error_t *error = action->args.connection.error;
switch (link->link_type) {
case QD_LINK_ENDPOINT:
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/error.c
----------------------------------------------------------------------
diff --git a/src/router_core/error.c b/src/router_core/error.c
new file mode 100644
index 0000000..7bc1cae
--- /dev/null
+++ b/src/router_core/error.c
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+struct qdr_error_t {
+ qdr_field_t *name;
+ qdr_field_t *description;
+ pn_data_t *info;
+};
+
+ALLOC_DECLARE(qdr_error_t);
+ALLOC_DEFINE(qdr_error_t);
+
+qdr_error_t *qdr_error_from_pn(pn_condition_t *pn)
+{
+ if (!pn)
+ return 0;
+
+ qdr_error_t *error = new_qdr_error_t();
+ ZERO(error);
+
+ const char *name = pn_condition_get_name(pn);
+ if (name && *name)
+ error->name = qdr_field(name);
+
+ const char *desc = pn_condition_get_description(pn);
+ if (desc && *desc)
+ error->description = qdr_field(desc);
+
+ error->info = pn_data(0);
+
+ return error;
+}
+
+
+qdr_error_t *qdr_error(const char *name, const char *description)
+{
+ qdr_error_t *error = new_qdr_error_t();
+
+ error->name = qdr_field(name);
+ error->description = qdr_field(description);
+ error->info = 0;
+
+ return error;
+}
+
+
+void qdr_error_free(qdr_error_t *error)
+{
+ if (error == 0)
+ return;
+
+ qdr_field_free(error->name);
+ qdr_field_free(error->description);
+ if (error->info)
+ pn_data_free(error->info);
+
+ free_qdr_error_t(error);
+}
+
+
+void qdr_error_copy(qdr_error_t *from, pn_condition_t *to)
+{
+ if (from->name) {
+ unsigned char *name = qd_field_iterator_copy(from->name->iterator);
+ pn_condition_set_name(to, (char*) name);
+ free(name);
+ }
+
+ if (from->description) {
+ unsigned char *desc = qd_field_iterator_copy(from->description->iterator);
+ pn_condition_set_description(to, (char*) desc);
+ free(desc);
+ }
+
+ if (from->info)
+ pn_data_copy(pn_condition_info(to), from->info);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 11e5bd3..5da26fa 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -20,16 +20,16 @@
#include "router_core_private.h"
#include <stdio.h>
-static void qdrh_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
@@ -40,7 +40,7 @@ static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS
void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_add_router_CT, "add_router");
+ qdr_action_t *action = qdr_action(qdr_add_router_CT, "add_router");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address);
qdr_action_enqueue(core, action);
@@ -49,7 +49,7 @@ void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskb
void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_del_router_CT, "del_router");
+ qdr_action_t *action = qdr_action(qdr_del_router_CT, "del_router");
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -57,7 +57,7 @@ void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_set_link_CT, "set_link");
+ qdr_action_t *action = qdr_action(qdr_set_link_CT, "set_link");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.link_maskbit = link_maskbit;
qdr_action_enqueue(core, action);
@@ -66,7 +66,7 @@ void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_remove_link_CT, "remove_link");
+ qdr_action_t *action = qdr_action(qdr_remove_link_CT, "remove_link");
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -74,7 +74,7 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_set_next_hop_CT, "set_next_hop");
+ qdr_action_t *action = qdr_action(qdr_set_next_hop_CT, "set_next_hop");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.nh_router_maskbit = nh_router_maskbit;
qdr_action_enqueue(core, action);
@@ -83,7 +83,7 @@ void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_m
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_remove_next_hop_CT, "remove_next_hop");
+ qdr_action_t *action = qdr_action(qdr_remove_next_hop_CT, "remove_next_hop");
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -91,32 +91,27 @@ void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers)
{
- qdr_action_t *action = qdr_action(qdrh_set_valid_origins_CT, "set_valid_origins");
+ qdr_action_t *action = qdr_action(qdr_set_valid_origins_CT, "set_valid_origins");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.router_set = routers;
qdr_action_enqueue(core, action);
}
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem)
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash)
{
- qdr_action_t *action = qdr_action(qdrh_map_destination_CT, "map_destination");
+ qdr_action_t *action = qdr_action(qdr_map_destination_CT, "map_destination");
action->args.route_table.router_maskbit = router_maskbit;
- action->args.route_table.address = qdr_field(address);
- action->args.route_table.address_phase = phase;
- action->args.route_table.address_class = aclass;
- action->args.route_table.semantics = sem;
+ action->args.route_table.address = qdr_field(address_hash);
qdr_action_enqueue(core, action);
}
-void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase)
+void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash)
{
- qdr_action_t *action = qdr_action(qdrh_unmap_destination_CT, "unmap_destination");
+ qdr_action_t *action = qdr_action(qdr_unmap_destination_CT, "unmap_destination");
action->args.route_table.router_maskbit = router_maskbit;
- action->args.route_table.address = qdr_field(address);
- action->args.route_table.address_phase = phase;
- action->args.route_table.address_class = aclass;
+ action->args.route_table.address = qdr_field(address_hash);
qdr_action_enqueue(core, action);
}
@@ -136,7 +131,7 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
qd_address_semantics_t sem, qdr_receive_t on_message, void *context)
{
- qdr_action_t *action = qdr_action(qdrh_subscribe_CT, "subscribe");
+ qdr_action_t *action = qdr_action(qdr_subscribe_CT, "subscribe");
action->args.subscribe.address = qdr_field(address);
action->args.subscribe.semantics = sem;
action->args.subscribe.address_class = aclass;
@@ -232,7 +227,7 @@ static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_lo
}
-static void qdrh_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -313,7 +308,7 @@ static void qdrh_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disc
}
-static void qdrh_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -362,7 +357,7 @@ static void qdrh_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disc
}
-static void qdrh_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
int link_maskbit = action->args.route_table.link_maskbit;
@@ -396,7 +391,7 @@ static void qdrh_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discar
}
-static void qdrh_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -416,7 +411,7 @@ static void qdrh_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool dis
}
-static void qdrh_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
int nh_router_maskbit = action->args.route_table.nh_router_maskbit;
@@ -448,7 +443,7 @@ static void qdrh_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool di
}
-static void qdrh_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -462,7 +457,7 @@ static void qdrh_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool
}
-static void qdrh_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qd_bitmask_t *valid_origins = action->args.route_table.router_set;
@@ -495,7 +490,7 @@ static void qdrh_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bo
}
-static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
//
// TODO - handle the class-prefix and phase explicitly
@@ -525,7 +520,7 @@ static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qdr_address(action->args.route_table.semantics);
+ addr = qdr_address(0); // FIXME - Semantics
qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(core->addrs, addr);
@@ -543,7 +538,7 @@ static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
}
-static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -588,7 +583,7 @@ static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bo
}
-static void qdrh_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_field_t *address = action->args.subscribe.address;
@@ -621,3 +616,62 @@ static void qdrh_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool disca
qdr_field_free(address);
}
+
+//==================================================================================
+// Call-back Functions
+//==================================================================================
+
+static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work)
+{
+ char *address_hash = qdr_field_copy(work->field);
+ if (address_hash) {
+ core->rt_mobile_added(core->rt_context, address_hash);
+ free(address_hash);
+ }
+
+ qdr_field_free(work->field);
+}
+
+
+static void qdr_do_mobile_removed(qdr_core_t *core, qdr_general_work_t *work)
+{
+ char *address_hash = qdr_field_copy(work->field);
+ if (address_hash) {
+ core->rt_mobile_removed(core->rt_context, address_hash);
+ free(address_hash);
+ }
+
+ qdr_field_free(work->field);
+}
+
+
+static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work)
+{
+ core->rt_link_lost(core->rt_context, work->maskbit);
+}
+
+
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added);
+ work->field = qdr_field(address_hash);
+ qdr_post_general_work_CT(core, work);
+}
+
+
+void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_removed);
+ work->field = qdr_field(address_hash);
+ qdr_post_general_work_CT(core, work);
+}
+
+
+void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit)
+{
+ qdr_general_work_t *work = qdr_general_work(qdr_do_link_lost);
+ work->maskbit = link_maskbit;
+ qdr_post_general_work_CT(core, work);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index f86f921..fa82c06 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -26,7 +26,9 @@ ALLOC_DEFINE(qdr_node_t);
ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
+ALLOC_DEFINE(qdr_general_work_t);
+static void qdr_general_handler(void *context);
qdr_core_t *qdr_core(qd_dispatch_t *qd, const char *area, const char *id)
{
@@ -50,6 +52,10 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, const char *area, const char *id)
core->running = true;
DEQ_INIT(core->action_list);
+ core->work_lock = sys_mutex();
+ DEQ_INIT(core->work_list);
+ core->work_timer = qd_timer(core->qd, qdr_general_handler, core);
+
//
// Launch the core thread
//
@@ -74,6 +80,8 @@ void qdr_core_free(qdr_core_t *core)
sys_thread_free(core->thread);
sys_cond_free(core->action_cond);
sys_mutex_free(core->action_lock);
+ sys_mutex_free(core->work_lock);
+ qd_timer_free(core->work_timer);
free(core);
}
@@ -120,6 +128,15 @@ void qdr_field_free(qdr_field_t *field)
}
+char *qdr_field_copy(qdr_field_t *field)
+{
+ if (!field || !field->iterator)
+ return 0;
+
+ return (char*) qd_field_iterator_copy(field->iterator);
+}
+
+
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label)
{
qdr_action_t *action = new_qdr_action_t();
@@ -214,3 +231,49 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
ref = DEQ_NEXT(ref);
}
}
+
+
+static void qdr_general_handler(void *context)
+{
+ qdr_core_t *core = (qdr_core_t*) context;
+ qdr_general_work_list_t work_list;
+ qdr_general_work_t *work;
+
+ sys_mutex_lock(core->work_lock);
+ DEQ_MOVE(core->work_list, work_list);
+ sys_mutex_unlock(core->work_lock);
+
+ work = DEQ_HEAD(work_list);
+ while (work) {
+ DEQ_REMOVE_HEAD(work_list);
+ work->handler(core, work);
+ free_qdr_general_work_t(work);
+ work = DEQ_HEAD(work_list);
+ }
+}
+
+
+qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler)
+{
+ qdr_general_work_t *work = new_qdr_general_work_t();
+ ZERO(work);
+ work->handler = handler;
+ return work;
+}
+
+
+void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work)
+{
+ bool notify;
+
+ sys_mutex_lock(core->work_lock);
+ DEQ_ITEM_INIT(work);
+ DEQ_INSERT_TAIL(core->work_list, work);
+ notify = DEQ_SIZE(core->work_list) == 1;
+ sys_mutex_unlock(core->work_lock);
+
+ if (notify)
+ qd_timer_schedule(core->work_timer, 0);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index dfeefd6..db84c55 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -36,6 +36,7 @@ typedef struct {
qdr_field_t *qdr_field(const char *string);
void qdr_field_free(qdr_field_t *field);
+char *qdr_field_copy(qdr_field_t *field);
/**
* qdr_action_t - This type represents one work item to be performed by the router-core thread.
@@ -52,14 +53,11 @@ struct qdr_action_t {
// Arguments for router control-plane actions
//
struct {
- int link_maskbit;
- int router_maskbit;
- int nh_router_maskbit;
- qd_bitmask_t *router_set;
- qdr_field_t *address;
- char address_class;
- char address_phase;
- qd_address_semantics_t semantics;
+ int link_maskbit;
+ int router_maskbit;
+ int nh_router_maskbit;
+ qd_bitmask_t *router_set;
+ qdr_field_t *address;
} route_table;
//
@@ -73,7 +71,7 @@ struct qdr_action_t {
qd_direction_t dir;
qdr_terminus_t *source;
qdr_terminus_t *target;
- pn_condition_t *condition;
+ qdr_error_t *error;
} connection;
//
@@ -243,6 +241,33 @@ void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+//
+// General Work
+//
+// The following types are used to post work to the IO threads for
+// non-connection-specific action.
+//
+typedef struct qdr_general_work_t qdr_general_work_t;
+typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work);
+
+struct qdr_general_work_t {
+ DEQ_LINKS(qdr_general_work_t);
+ qdr_general_work_handler_t handler;
+ qdr_field_t *field;
+ int maskbit;
+};
+
+ALLOC_DECLARE(qdr_general_work_t);
+DEQ_DECLARE(qdr_general_work_t, qdr_general_work_list_t);
+
+qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler);
+
+//
+// Connection Work
+//
+// The following types are used to post work to the IO threads for
+// connection-specific action.
+//
typedef enum {
QDR_CONNECTION_WORK_FIRST_ATTACH,
QDR_CONNECTION_WORK_SECOND_ATTACH,
@@ -255,12 +280,13 @@ typedef struct qdr_connection_work_t {
qdr_link_t *link;
qdr_terminus_t *source;
qdr_terminus_t *target;
- pn_condition_t *condition;
+ qdr_error_t *error;
} qdr_connection_work_t;
ALLOC_DECLARE(qdr_connection_work_t);
DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
+
struct qdr_connection_t {
DEQ_LINKS(qdr_connection_t);
qdr_core_t *core;
@@ -287,6 +313,10 @@ struct qdr_core_t {
sys_cond_t *action_cond;
sys_mutex_t *action_lock;
+ sys_mutex_t *work_lock;
+ qdr_general_work_list_t work_list;
+ qd_timer_t *work_timer;
+
qdr_connection_list_t open_connections;
//
@@ -343,6 +373,12 @@ qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label)
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
+
+void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
+
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
qd_router_entity_type_t type,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index ba8eab9..ce4b6a8 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -65,7 +65,8 @@ void *router_core_thread(void *arg)
action = DEQ_HEAD(action_list);
while (action) {
DEQ_REMOVE_HEAD(action_list);
- qd_log(core->log, QD_LOG_TRACE, "Core action '%s'%s", action->label, core->running ? "" : " (discard)");
+ if (action->label)
+ qd_log(core->log, QD_LOG_TRACE, "Core action '%s'%s", action->label, core->running ? "" : " (discard)");
action->action_handler(core, action, !core->running);
free_qdr_action_t(action);
action = DEQ_HEAD(action_list);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ba06a25..e9e8012 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -37,25 +37,9 @@ const char *CORE_AGENT_ADDRESS = "$management";
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
static char *direct_prefix;
static char *node_id;
-/*
- * Address Types and Processing:
- *
- * Address Hash Key onReceive
- * ===================================================================
- * _local/<local> L<local> handler
- * _topo/<area>/<router>/<local> A<area> forward
- * _topo/<my-area>/<router>/<local> R<router> forward
- * _topo/<my-area>/<my-router>/<local> L<local> handler
- * _topo/<area>/all/<local> A<area> forward
- * _topo/<my-area>/all/<local> L<local> forward handler
- * _topo/all/all/<local> L<local> forward handler
- * <mobile> M<mobile> forward handler
- */
-
ALLOC_DEFINE(qd_routed_event_t);
ALLOC_DEFINE(qd_router_link_t);
ALLOC_DEFINE(qd_router_node_t);
@@ -152,77 +136,6 @@ void qd_router_del_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp)
/**
- * Check an address to see if it no longer has any associated destinations.
- * Depending on its policy, the address may be eligible for being closed out
- * (i.e. Logging its terminal statistics and freeing its resources).
- */
-void qd_router_check_addr(qd_router_t *router, qd_address_t *addr, int was_local)
-{
- if (addr == 0)
- return;
-
- unsigned char *key = 0;
- int to_delete = 0;
- int no_more_locals = 0;
-
- sys_mutex_lock(router->lock);
-
- //
- // If the address has no in-process consumer or destinations, it should be
- // deleted.
- //
- if (addr->on_message == 0 &&
- DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
- !addr->waypoint && !addr->block_deletion)
- to_delete = 1;
-
- //
- // If we have just removed a local linkage and it was the last local linkage,
- // we need to notify the router module that there is no longer a local
- // presence of this address.
- //
- if (was_local && DEQ_SIZE(addr->rlinks) == 0)
- no_more_locals = 1;
-
- if (to_delete) {
- //
- // Delete the address but grab the hash key so we can use it outside the
- // critical section.
- //
- qd_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key);
- DEQ_REMOVE(router->addrs, addr);
- qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, addr);
- qd_hash_handle_free(addr->hash_handle);
- free_qd_address_t(addr);
- }
-
- //
- // If we're not deleting but there are no more locals, get a copy of the hash key.
- //
- if (!to_delete && no_more_locals) {
- const unsigned char *key_const = qd_hash_key_by_handle(addr->hash_handle);
- key = (unsigned char*) malloc(strlen((const char*) key_const) + 1);
- strcpy((char*) key, (const char*) key_const);
- }
-
- sys_mutex_unlock(router->lock);
-
- //
- // If the address is mobile-class and it was just removed from a local link,
- // tell the router module that it is no longer attached locally.
- //
- if (no_more_locals && key && key[0] == 'M')
- qd_router_mobile_removed(router, (const char*) key);
-
- //
- // Free the key that was not freed by the hash table.
- //
- if (key)
- free(key);
-}
-
-
-/**
* Determine the role of a connection
*/
static qdr_connection_role_t qd_router_connection_role(const qd_connection_t *conn)
@@ -239,173 +152,6 @@ static qdr_connection_role_t qd_router_connection_role(const qd_connection_t *co
}
-/**
- * Determine whether a connection is configured in the inter-router role.
- * DEPRECATE
- */
-static int qd_router_connection_is_inter_router(const qd_connection_t *conn)
-{
- if (!conn)
- return 0;
-
- const qd_server_config_t *cf = qd_connection_config(conn);
- if (cf && strcmp(cf->role, router_role) == 0)
- return 1;
-
- return 0;
-}
-
-
-/**
- * Determine whether a connection is configured in the on-demand role.
- * DEPRECATE
- */
-static int qd_router_connection_is_on_demand(const qd_connection_t *conn)
-{
- if (!conn)
- return 0;
-
- const qd_server_config_t *cf = qd_connection_config(conn);
- if (cf && strcmp(cf->role, on_demand_role) == 0)
- return 1;
-
- return 0;
-}
-
-
-/**
- * Determine whether a terminus has router capability
- */
-static int qd_router_terminus_is_router(pn_terminus_t *term)
-{
- pn_data_t *cap = pn_terminus_capabilities(term);
-
- pn_data_rewind(cap);
- pn_data_next(cap);
- if (cap && pn_data_type(cap) == PN_SYMBOL) {
- pn_bytes_t sym = pn_data_get_symbol(cap);
- if (sym.size == strlen(QD_CAPABILITY_ROUTER_CONTROL) &&
- strcmp(sym.start, QD_CAPABILITY_ROUTER_CONTROL) == 0)
- return 1;
- }
-
- return 0;
-}
-
-
-/**
- * If the terminus has a dynamic-node-property for a node address,
- * return an iterator for the content of that property.
- * DEPRECATE
- */
-static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
-{
- pn_data_t *props = pn_terminus_properties(term);
-
- if (!props)
- return 0;
-
- pn_data_rewind(props);
- if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props)) {
- pn_bytes_t sym = pn_data_get_symbol(props);
- if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start) == 0) {
- if (pn_data_next(props)) {
- pn_bytes_t val = pn_data_get_string(props);
- if (val.start && *val.start != '\0')
- return val.start;
- }
- }
- }
-
- return 0;
-}
-
-
-/**
- * Generate a temporary routable address for a destination connected to this
- * router node.
- * DEPRECATE
- */
-static void qd_router_generate_temp_addr(qd_router_t *router, char *buffer, size_t length)
-{
- static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
- char discriminator[11];
- long int rnd1 = random();
- long int rnd2 = random();
- int idx;
- int cursor = 0;
-
- for (idx = 0; idx < 5; idx++) {
- discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63];
- discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63];
- }
- discriminator[cursor] = '\0';
-
- snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
-}
-
-
-/**
- * Assign a link-mask-bit to a new link. Do this in such a way that all links on the same
- * connection share the same mask-bit value.
- *
- * DEPRECATE
- */
-static int qd_router_find_mask_bit_LH(qd_router_t *router, qd_link_t *link)
-{
- qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
- if (shared) {
- shared->ref_count++;
- return shared->mask_bit;
- }
-
- int mask_bit;
- if (qd_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
- qd_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
- } else {
- qd_log(router->log_source, QD_LOG_CRITICAL, "Exceeded maximum inter-router link count");
- return -1;
- }
-
- shared = new_qd_router_conn_t();
- shared->ref_count = 1;
- shared->mask_bit = mask_bit;
- qd_link_set_conn_context(link, shared);
- return mask_bit;
-}
-
-
-/**
- * DEPRECATE
- */
-static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr, qd_direction_t dir)
-{
- char addr_prefix = (dir == QD_INCOMING) ? 'C' : 'D';
- qd_field_iterator_t *iter;
- qd_address_t *addr;
-
- if (taddr == 0 || *taddr == '\0')
- return 0;
-
- //
- // Initialize the iterator with taddr so we can traverse through taddr by traversing the iterator.
- //
- iter = qd_address_iterator_string(taddr, ITER_VIEW_ADDRESS_HASH);
-
- //Set the iter->prefix_override to addr_prefix
- qd_address_iterator_override_prefix(iter, addr_prefix);
-
- //Populate addr with the appropriate qd_address_t if the string in the iterator generated a hash
- //that is present in the addr_hash
- qd_hash_retrieve_prefix(router->addr_hash, iter, (void*) &addr);
-
- qd_field_iterator_free(iter);
-
- return addr;
-}
-
-
-
void qd_router_link_free_LH(qd_router_link_t *rlink)
{
qd_link_t *link = rlink->link;
@@ -714,38 +460,6 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
/**
- * Handle the link-routing case, where links are pre-paired and there is no per-message
- * routing needed.
- *
- * Note that this function does not issue a replacement credit for the received message.
- * In link-routes, the flow commands must be propagated end-to-end. In other words, the
- * ultimate receiving endpoint will issue the replacement credits as it sees fit.
- *
- * Note also that this function does not perform any message validation. For link-routing,
- * there is no need to look into the transferred message.
- */
-static void router_link_route_delivery_LH(qd_router_link_t *peer_link, qd_router_delivery_t *delivery, qd_message_t *msg)
-{
- qd_routed_event_t *re = new_qd_routed_event_t();
-
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = msg;
- re->settle = false;
- re->disposition = 0;
- DEQ_INSERT_TAIL(peer_link->msg_fifo, re);
-
- //
- // Link the incoming delivery into the event for deferred processing
- //
- re->delivery = delivery;
- qd_router_delivery_fifo_enter_LH(delivery);
-
- qd_link_activate(peer_link->link);
-}
-
-
-/**
* Inbound Delivery Handler
*/
static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
@@ -788,7 +502,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
sys_mutex_lock(router->lock);
qd_router_link_t *clink = rlink->connected_link;
if (clink) {
- router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd), msg);
+ //router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd), msg);
sys_mutex_unlock(router->lock);
return;
}
@@ -1040,88 +754,6 @@ typedef enum {
} link_attach_result_t;
-static void qd_router_attach_routed_link(void *context, bool discard)
-{
- link_attach_t *la = (link_attach_t*) context;
-
- if (!discard) {
- qd_link_t *link = qd_link(la->router->node, la->conn, la->dir, la->link_name);
-
- qd_router_link_t *rlink = qd_router_link(link, QD_LINK_ENDPOINT, la->dir, 0, 0, 0);
-
- qd_link_set_context(link, rlink);
-
- sys_mutex_lock(la->router->lock);
- rlink->connected_link = la->peer_link;
- la->peer_link->connected_link = rlink;
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(la->router->links, rlink);
- sys_mutex_unlock(la->router->lock);
-
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(la->peer_qd_link));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
-
- pn_link_open(qd_link_pn(link));
- if (la->credit > 0)
- pn_link_flow(qd_link_pn(link), la->credit);
- }
-
- if (la->link_name)
- free(la->link_name);
- free_link_attach_t(la);
-}
-
-
-static void qd_router_detach_routed_link(void *context, bool discard)
-{
- link_detach_t *ld = (link_detach_t*) context;
-
- if (!discard) {
- qd_link_t *link = ld->rlink->link;
-
- if (ld->condition_name[0]) {
- pn_condition_t *cond = pn_link_condition(qd_link_pn(link));
- pn_condition_set_name(cond, ld->condition_name);
- pn_condition_set_description(cond, ld->condition_description);
- if (ld->condition_info)
- pn_data_copy(pn_condition_info(cond), ld->condition_info);
- }
-
- qd_link_close(link);
-
- sys_mutex_lock(ld->router->lock);
- qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, ld->rlink);
- DEQ_REMOVE(ld->router->links, ld->rlink);
- qd_router_link_free_LH(ld->rlink);
- sys_mutex_unlock(ld->router->lock);
- }
-
- if (ld->condition_info)
- pn_data_free(ld->condition_info);
- free_link_detach_t(ld);
-}
-
-
-static void qd_router_open_routed_link(void *context, bool discard)
-{
- link_event_t *le = (link_event_t*) context;
-
- if (!discard) {
- sys_mutex_lock(le->router->lock);
- qd_link_t *link = le->rlink->link;
- if (le->rlink->connected_link) {
- qd_link_t *peer = le->rlink->connected_link->link;
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(peer));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(peer));
- pn_link_open(qd_link_pn(link));
- }
- sys_mutex_unlock(le->router->lock);
- }
-
- free_link_event_t(le);
-}
-
-
static void qd_router_flow(void *context, bool discard)
{
link_event_t *le = (link_event_t*) context;
@@ -1140,71 +772,6 @@ static void qd_router_flow(void *context, bool discard)
}
-link_attach_result_t qd_router_link_route_LH(qd_router_t *router,
- qd_router_link_t *rlink,
- const char *term_addr,
- qd_direction_t dir)
-{
- //
- // Lookup the target address to see if we can link-route this attach.
- //
- qd_address_t *addr = router_lookup_terminus_LH(router, term_addr, dir);
- if (addr) {
- //
- // This is a link-attach routable target. Propagate the attach downrange.
- // Check first for a locally connected container.
- //
- qd_link_t *link = rlink->link;
- pn_link_t *pn_link = qd_link_pn(link);
- qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
- if (lrpref) {
- qd_connection_t *conn = lrpref->lrp->container->conn;
- if (conn) {
- link_attach_t *la = new_link_attach_t();
- la->router = router;
- la->peer_link = rlink;
- la->peer_qd_link = link;
- la->link_name = strdup(pn_link_name(pn_link));
- la->dir = dir;
- la->conn = conn;
- la->credit = pn_link_credit(pn_link);
- qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
- }
- } else if (DEQ_SIZE(addr->rnodes) > 0) {
- //
- // There are no locally connected containers for this link but there is at
- // least one on a remote router. Forward the attach toward the remote destination.
- //
- qd_router_node_t *remote_router = DEQ_HEAD(addr->rnodes)->router;
- qd_router_link_t *out_link = 0;
- if (remote_router)
- out_link = remote_router->peer_link;
- if (!out_link && remote_router && remote_router->next_hop)
- out_link = remote_router->next_hop->peer_link;
- if (out_link) {
- qd_connection_t *out_conn = qd_link_connection(out_link->link);
- if (out_conn) {
- link_attach_t *la = new_link_attach_t();
- la->router = router;
- la->peer_link = rlink;
- la->peer_qd_link = link;
- la->link_name = strdup(pn_link_name(pn_link));
- la->dir = dir;
- la->conn = out_conn;
- la->credit = pn_link_credit(pn_link);
- qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
- } else
- return LINK_ATTACH_NO_PATH;
- } else
- return LINK_ATTACH_NO_PATH;
- } else
- return LINK_ATTACH_NO_PATH;
- } else
- return LINK_ATTACH_NO_MATCH;
- return LINK_ATTACH_FORWARDED;
-}
-
-
qd_router_link_t* qd_router_link(qd_link_t *link, qd_link_type_t link_type, qd_direction_t direction, qd_address_t *owning_addr, qd_waypoint_t *wp, int mask_bit)
{
qd_router_link_t *rlink = new_qd_router_link_t();
@@ -1250,11 +817,6 @@ qd_router_link_t* qd_router_link(qd_link_t *link, qd_link_type_t link_type, qd_d
*/
static int router_incoming_link_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- pn_link_t *pn_link = qd_link_pn(link);
- int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
- const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
-
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
@@ -1263,72 +825,6 @@ static int router_incoming_link_handler(void* context, qd_link_t *link)
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
- //
- // DEPRECATE:
- //
-
- if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING,
- "Incoming link claims router capability but is not on an inter-router connection");
- pn_link_close(pn_link);
- return 0;
- }
-
- qd_router_link_t *rlink = qd_router_link(link, is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT, QD_INCOMING, 0, 0, 0);
-
- if (!is_router && r_tgt) {
- rlink->target = (char*) malloc(strlen(r_tgt) + 1);
- strcpy(rlink->target, r_tgt);
- }
-
- qd_link_set_context(link, rlink);
-
- sys_mutex_lock(router->lock);
- rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
- //
- // Attempt to link-route this attach
- //
- link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
- if (!is_router)
- la_result = qd_router_link_route_LH(router, rlink, r_tgt, QD_OUTGOING);
- sys_mutex_unlock(router->lock);
-
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
-
- switch (la_result) {
- case LINK_ATTACH_NO_MATCH:
- //
- // We didn't link-route this attach. It terminates here.
- // Open it in the reverse direction.
- //
- pn_link_flow(pn_link, 1000);
- pn_link_open(pn_link);
- break;
-
- case LINK_ATTACH_NO_PATH: {
- //
- // The link should be routable but there is no path to the
- // destination. Close the link.
- //
- pn_condition_t *cond = pn_link_condition(pn_link);
- pn_condition_set_name(cond, "qd:no-route-to-dest");
- pn_condition_set_description(cond, "No route to the destination node");
- pn_link_close(pn_link);
- break;
- }
-
- case LINK_ATTACH_FORWARDED:
- //
- // We routed the attach outbound. Don't open the link back until
- // the downstream link is opened.
- //
- break;
- }
-
return 0;
}
@@ -1338,18 +834,6 @@ static int router_incoming_link_handler(void* context, qd_link_t *link)
*/
static int router_outgoing_link_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- pn_link_t *pn_link = qd_link_pn(link);
- const char *r_src = pn_terminus_get_address(qd_link_remote_source(link));
- int is_dynamic = pn_terminus_is_dynamic(qd_link_remote_source(link));
- int is_router = qd_router_terminus_is_router(qd_link_remote_target(link));
- int propagate = 0;
- qd_field_iterator_t *iter = 0;
- char phase = '0';
- qd_address_semantics_t semantics;
- qd_address_t *addr = 0;
- link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
-
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
@@ -1358,165 +842,6 @@ static int router_outgoing_link_handler(void* context, qd_link_t *link)
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
- //
- // DEPRECATE:
- //
-
- if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING,
- "Outgoing link claims router capability but is not on an inter-router connection");
- pn_link_close(pn_link);
- return 0;
- }
-
- //
- // If this link is not a router link and it has no source address, we can't
- // accept it.
- //
- if (r_src == 0 && !is_router && !is_dynamic) {
- pn_link_close(pn_link);
- return 0;
- }
-
- //
- // If this is an endpoint link with a source address, make sure the address is
- // appropriate for endpoint links. If it is not mobile address, it cannot be
- // bound to an endpoint link.
- //
- if (r_src && !is_router && !is_dynamic) {
- iter = qd_address_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
- unsigned char prefix = qd_field_iterator_octet(iter);
- qd_field_iterator_reset(iter);
-
- if (prefix != 'M') {
- qd_field_iterator_free(iter);
- pn_link_close(pn_link);
- qd_log(router->log_source, QD_LOG_WARNING,
- "Rejected an outgoing endpoint link with a router address: %s", r_src);
- return 0;
- }
- }
-
- //
- // Create a router_link record for this link. Some of the fields will be
- // modified in the different cases below.
- //
- qd_router_link_t *rlink = qd_router_link(link, is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT, QD_OUTGOING, 0, 0, 0);
-
- qd_link_set_context(link, rlink);
- pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
- pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
-
- //
- // Determine the semantics for the address prior to taking out the lock.
- //
- if (is_dynamic || !iter)
- semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE;
- else {
- semantics = router_semantics_for_addr(router, iter, '\0', &phase);
- qd_address_iterator_set_phase(iter, phase);
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- }
-
- sys_mutex_lock(router->lock);
- rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
-
- if (is_router) {
- //
- // If this is a router link, put it in the hello_address link-list.
- //
- qd_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
- rlink->owning_addr = router->hello_addr;
- router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
-
- } else {
- //
- // If this is an endpoint link, check the source. If it is dynamic, we will
- // assign it an ephemeral and routable address. If it has a non-dynamic
- // address, that address needs to be set up in the address list.
- //
- char temp_addr[1000]; // TODO: Use pn_string or aprintf.
- const char *link_route_address = qd_router_terminus_dnp_address(qd_link_remote_source(link));
-
- if (link_route_address == 0)
- link_route_address = r_src;
- la_result = qd_router_link_route_LH(router, rlink, link_route_address, QD_INCOMING);
-
- if (la_result == LINK_ATTACH_NO_MATCH) {
- if (is_dynamic) {
- qd_router_generate_temp_addr(router, temp_addr, 1000);
- iter = qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
- pn_terminus_set_address(qd_link_source(link), temp_addr);
- qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
- } else
- qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
-
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- if (!addr) {
- addr = qd_address(semantics);
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(router->addrs, addr);
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- }
-
- rlink->owning_addr = addr;
- qd_router_add_link_ref_LH(&addr->rlinks, rlink);
-
- //
- // If this is not a dynamic address and it is the first local subscription
- // to the address, supply the address to the router module for propagation
- // to other nodes.
- //
- propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
- }
- }
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
- //
- // If an interesting change has occurred with this address and it has an associated waypoint,
- // notify the waypoint module so it can react appropriately.
- //
- if (propagate && addr->waypoint)
- qd_waypoint_address_updated_LH(router->qd, addr);
-
- sys_mutex_unlock(router->lock);
-
- if (propagate)
- qd_router_mobile_added(router, iter);
-
- if (iter)
- qd_field_iterator_free(iter);
-
- switch (la_result) {
- case LINK_ATTACH_NO_MATCH:
- //
- // We didn't link-route this attach. It terminates here.
- // Open it in the reverse direction.
- //
- pn_link_open(pn_link);
- break;
-
- case LINK_ATTACH_NO_PATH: {
- //
- // The link should be routable but there is no path to the
- // destination. Close the link.
- //
- pn_condition_t *cond = pn_link_condition(qd_link_pn(link));
- pn_condition_set_name(cond, "qd:no-route-to-dest");
- pn_condition_set_description(cond, "No route to the destination node");
- pn_link_close(pn_link);
- break;
- }
-
- case LINK_ATTACH_FORWARDED:
- //
- // We routed the attach outbound. Don't open the link back until
- // the downstream link is opened.
- //
- break;
- }
-
return 0;
}
@@ -1526,26 +851,9 @@ static int router_outgoing_link_handler(void* context, qd_link_t *link)
*/
static int router_link_attach_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
+ qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link);
+ qdr_link_second_attach(qlink, qdr_terminus(qd_link_remote_source(link)), qdr_terminus(qd_link_remote_target(link)));
- if (!rlink)
- return 0;
-
- sys_mutex_lock(router->lock);
- qd_router_link_t *peer_rlink = rlink->connected_link;
- if (peer_rlink) {
- qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
- if (out_conn) {
- link_event_t *le = new_link_event_t();
- memset(le, 0, sizeof(link_event_t));
- le->router = router;
- le->rlink = peer_rlink;
- qd_connection_invoke_deferred(out_conn, qd_router_open_routed_link, le);
- }
- }
- sys_mutex_unlock(router->lock);
-
return 0;
}
@@ -1600,115 +908,16 @@ static int router_link_flow_handler(void* context, qd_link_t *link)
*/
static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
- qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
- qd_address_t *oaddr = 0;
- qd_waypoint_t *wp = 0;
- int lost_link_mask_bit = -1;
-
- if (!rlink)
- return 0;
-
- sys_mutex_lock(router->lock);
-
- //
- // Save this so we can check it after the lock is released.
- //
- wp = rlink->waypoint;
-
- if (rlink->connected_link) {
- qd_connection_t *out_conn = qd_link_connection(rlink->connected_link->link);
- if (out_conn) {
- link_detach_t *ld = new_link_detach_t();
- memset(ld, 0, sizeof(link_detach_t));
- ld->router = router;
- ld->rlink = rlink->connected_link;
- pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
- if (cond && pn_condition_is_set(cond)) {
- if (pn_condition_get_name(cond)) {
- strncpy(ld->condition_name, pn_condition_get_name(cond), COND_NAME_LEN);
- ld->condition_name[COND_NAME_LEN] = '\0';
- }
- if (pn_condition_get_description(cond)) {
- strncpy(ld->condition_description, pn_condition_get_description(cond), COND_DESCRIPTION_LEN);
- ld->condition_description[COND_DESCRIPTION_LEN] = '\0';
- }
- if (pn_condition_info(cond)) {
- ld->condition_info = pn_data(0);
- pn_data_copy(ld->condition_info, pn_condition_info(cond));
- }
- } else if (dt == QD_LOST) {
- strcpy(ld->condition_name, "qd:routed-link-lost");
- strcpy(ld->condition_description, "Connectivity to the peer container was lost");
- }
- rlink->connected_link->connected_link = 0;
- qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link, ld);
- }
- }
-
- //
- // If this link is part of an inter-router connection, drop the
- // reference count. If this is the last link on the connection,
- // free the mask-bit and the shared connection record.
- //
- if (shared) {
- shared->ref_count--;
- if (shared->ref_count == 0) {
- lost_link_mask_bit = rlink->mask_bit;
- qd_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
- qd_link_set_conn_context(link, 0);
- free_qd_router_conn_t(shared);
- }
- }
-
- //
- // If the link is outgoing, we must disassociate it from its address.
- //
- if (rlink->link_direction == QD_OUTGOING && rlink->owning_addr) {
- qd_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
- oaddr = rlink->owning_addr;
+ qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
+ pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
+
+ if (rlink) {
+ qdr_error_t *error = qdr_error_from_pn(cond);
+ if (!error && dt == QD_LOST)
+ error = qdr_error("qd:routed-link-lost", "Connectivity to the peer container was lost");
+ qdr_link_detach(rlink, error);
}
- //
- // If this is an outgoing inter-router link, we must remove the by-mask-bit
- // index reference to this link.
- //
- if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_OUTGOING) {
- if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink)
- router->out_links_by_mask_bit[rlink->mask_bit] = 0;
- else
- qd_log(router->log_source, QD_LOG_CRITICAL,
- "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
- }
-
- //
- // Remove the link from the master list-of-links and deallocate
- //
- DEQ_REMOVE(router->links, rlink);
- qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, rlink);
- qd_router_link_free_LH(rlink);
-
- sys_mutex_unlock(router->lock);
-
- //
- // If this was a waypoint link, notify the waypoint module.
- //
- if (wp)
- qd_waypoint_link_closed(router->qd, wp, link);
-
- //
- // Check to see if the owning address should be deleted
- //
- qd_router_check_addr(router, oaddr, 1);
-
- //
- // If we lost the link to a neighbor router, notify the route engine so it doesn't
- // have to wait for the HELLO timeout to expire.
- //
- if (lost_link_mask_bit >= 0)
- qd_router_link_lost(router, lost_link_mask_bit);
-
return 0;
}
@@ -1716,8 +925,8 @@ static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_
static void router_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
- qdr_connection_role_t role = qd_router_connection_role(conn);
- qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, true, role, 0); // TODO - get label
+ qdr_connection_role_t role = qd_router_connection_role(conn);
+ qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, true, role, 0); // TODO - get label
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
@@ -1727,92 +936,11 @@ static void router_inbound_opened_handler(void *type_context, qd_connection_t *c
static void router_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
- qdr_connection_role_t role = qd_router_connection_role(conn);
- qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, false, role, 0); // TODO - get label
+ qdr_connection_role_t role = qd_router_connection_role(conn);
+ qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, false, role, 0); // TODO - get label
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
-
- // DEPRECATE:
-
- //
- // If the connection is on-demand, visit all waypoints that are waiting for their
- // connection to arrive.
- //
- if (qd_router_connection_is_on_demand(conn)) {
- qd_waypoint_connection_opened(router->qd, (qd_config_connector_t*) context, conn);
- return;
- }
-
- //
- // If the connection isn't inter-router, ignore it.
- //
- if (!qd_router_connection_is_inter_router(conn))
- return;
-
- qd_link_t *sender;
- qd_link_t *receiver;
- qd_router_link_t *rlink;
- int mask_bit = 0;
- size_t clen = strlen(QD_CAPABILITY_ROUTER_CONTROL);
-
- //
- // Allocate a mask bit to designate the pair of links connected to the neighbor router
- //
- sys_mutex_lock(router->lock);
- if (qd_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
- qd_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
- } else {
- sys_mutex_unlock(router->lock);
- qd_log(router->log_source, QD_LOG_CRITICAL, "Exceeded maximum inter-router link count");
- return;
- }
-
- //
- // Create an incoming link with router source capability
- //
- receiver = qd_link(router->node, conn, QD_INCOMING, QD_INTERNODE_LINK_NAME_1);
- // TODO - We don't want to have to cast away the constness of the literal string here!
- // See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)),
- pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER_CONTROL));
-
- rlink = qd_router_link(receiver, QD_LINK_ROUTER, QD_INCOMING, 0, 0, mask_bit);
-
- qd_link_set_context(receiver, rlink);
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
- //
- // Create an outgoing link with router target capability
- //
- sender = qd_link(router->node, conn, QD_OUTGOING, QD_INTERNODE_LINK_NAME_2);
- // TODO - We don't want to have to cast away the constness of the literal string here!
- // See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)),
- pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER_CONTROL));
-
- rlink = qd_router_link(sender, QD_LINK_ROUTER, QD_OUTGOING, router->hello_addr, 0, mask_bit);
-
- //
- // Add the new outgoing link to the hello_address's list of links.
- //
- qd_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
-
- //
- // Index this link from the by-maskbit index so we can later find it quickly
- // when provided with the mask bit.
- //
- router->out_links_by_mask_bit[mask_bit] = rlink;
-
- qd_link_set_context(sender, rlink);
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
- sys_mutex_unlock(router->lock);
-
- pn_link_open(qd_link_pn(receiver));
- pn_link_open(qd_link_pn(sender));
- pn_link_flow(qd_link_pn(receiver), 1000);
}
@@ -1960,7 +1088,7 @@ static void qd_router_link_second_attach(void *context, qdr_link_t *link, qdr_te
}
-static void qd_router_link_detach(void *context, qdr_link_t *link, pn_condition_t *condition)
+static void qd_router_link_detach(void *context, qdr_link_t *link, qdr_error_t *error)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 50ab94e..4bd0467 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -277,10 +277,6 @@ void qd_router_del_node_ref_LH(qd_router_ref_list_t *ref_list, qd_router_node_t
void qd_router_add_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp);
void qd_router_del_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t *lrp);
-void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter);
-void qd_router_mobile_removed(qd_router_t *router, const char *addr);
-void qd_router_link_lost(qd_router_t *router, int link_mask_bit);
-
qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
char in_phase, char *out_phase);
qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-dispatch git commit: DISPATCH-179 - WIP checkpoint. Ripped
out large sections of the old architecture. Added the general callback
functionality for non-connection-specific actions.
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 77ac601..3cdc98f 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -28,8 +28,6 @@
#include "waypoint_private.h"
#include "entity_cache.h"
-static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
-
static qd_log_source_t *log_source = 0;
static PyObject *pyRouter = 0;
static PyObject *pyTick = 0;
@@ -49,84 +47,12 @@ static PyObject *qd_add_router(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
const char *address;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
return 0;
qdr_core_add_router(router->router_core, address, router_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] != 0) {
- sys_mutex_unlock(router->lock);
- error = "Adding router over already existing router";
- break;
- }
-
- //
- // Hash lookup the address to ensure there isn't an existing router address.
- //
- qd_field_iterator_t *iter = qd_address_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
- qd_address_t *addr;
-
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- assert(addr == 0);
-
- //
- // Create an address record for this router and insert it in the hash table.
- // This record will be found whenever a "foreign" topological address to this
- // remote router is looked up.
- //
- addr = qd_address(router_addr_semantics);
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(router->addrs, addr);
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- qd_field_iterator_free(iter);
-
- //
- // Create a router-node record to represent the remote router.
- //
- qd_router_node_t *rnode = new_qd_router_node_t();
- DEQ_ITEM_INIT(rnode);
- rnode->owning_addr = addr;
- rnode->mask_bit = router_maskbit;
- rnode->next_hop = 0;
- rnode->peer_link = 0;
- rnode->ref_count = 0;
- rnode->valid_origins = qd_bitmask(0);
-
- DEQ_INSERT_TAIL(router->routers, rnode);
-
- //
- // Link the router record to the address record.
- //
- qd_router_add_node_ref_LH(&addr->rnodes, rnode);
-
- //
- // Link the router record to the router address records.
- //
- qd_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
- qd_router_add_node_ref_LH(&router->routerma_addr->rnodes, rnode);
-
- //
- // Add the router record to the mask-bit index.
- //
- router->routers_by_mask_bit[router_maskbit] = rnode;
-
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -137,69 +63,12 @@ static PyObject* qd_del_router(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
qdr_core_del_router(router->router_core, router_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Deleting nonexistent router";
- break;
- }
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- qd_address_t *oaddr = rnode->owning_addr;
- assert(oaddr);
-
- qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, oaddr);
-
- //
- // Unlink the router node from the address record
- //
- qd_router_del_node_ref_LH(&oaddr->rnodes, rnode);
-
- //
- // While the router node has a non-zero reference count, look for addresses
- // to unlink the node from.
- //
- qd_address_t *addr = DEQ_HEAD(router->addrs);
- while (addr && rnode->ref_count > 0) {
- qd_router_del_node_ref_LH(&addr->rnodes, rnode);
- addr = DEQ_NEXT(addr);
- }
- assert(rnode->ref_count == 0);
-
- //
- // Free the router node and the owning address records.
- //
- qd_bitmask_free(rnode->valid_origins);
- DEQ_REMOVE(router->routers, rnode);
- free_qd_router_node_t(rnode);
-
- qd_hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
- DEQ_REMOVE(router->addrs, oaddr);
- qd_hash_handle_free(oaddr->hash_handle);
- router->routers_by_mask_bit[router_maskbit] = 0;
- free_qd_address_t(oaddr);
-
- sys_mutex_unlock(router->lock);
- } while(0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -211,40 +80,12 @@ static PyObject* qd_set_link(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
int router_maskbit;
int link_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &link_maskbit))
return 0;
qdr_core_set_link(router->router_core, router_maskbit, link_maskbit);
- do {
- if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
- error = "Link bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->out_links_by_mask_bit[link_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Adding neighbor router with invalid link reference";
- break;
- }
-
- //
- // Add the peer_link reference to the router record.
- //
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->peer_link = router->out_links_by_mask_bit[link_maskbit];
-
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -255,25 +96,12 @@ static PyObject* qd_remove_link(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
qdr_core_remove_link(router->router_core, router_maskbit);
- do {
- sys_mutex_lock(router->lock);
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->peer_link = 0;
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -285,49 +113,12 @@ static PyObject* qd_set_next_hop(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
int router_maskbit;
int next_hop_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
return 0;
qdr_core_set_next_hop(router->router_core, router_maskbit, next_hop_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- if (next_hop_maskbit >= qd_bitmask_width() || next_hop_maskbit < 0) {
- error = "Next Hop bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Router Not Found";
- break;
- }
-
- if (router->routers_by_mask_bit[next_hop_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Next Hop Not Found";
- break;
- }
-
- if (router_maskbit != next_hop_maskbit) {
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
- }
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -338,37 +129,12 @@ static PyObject* qd_remove_next_hop(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
qdr_core_remove_next_hop(router->router_core, router_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Router Not Found";
- break;
- }
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->next_hop = 0;
-
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -397,43 +163,26 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
break;
}
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Router Not Found";
- break;
- }
-
- Py_ssize_t origin_count = PyList_Size(origin_list);
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- qd_bitmask_t *core_bitmask = qd_bitmask(0);
- int maskbit;
+ Py_ssize_t origin_count = PyList_Size(origin_list);
+ qd_bitmask_t *core_bitmask = qd_bitmask(0);
+ int maskbit;
for (idx = 0; idx < origin_count; idx++) {
maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
-
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
error = "Origin bit mask out of range";
break;
}
-
- if (router->routers_by_mask_bit[maskbit] == 0) {
- error = "Origin router Not Found";
- break;
- }
}
if (error == 0) {
- qd_bitmask_clear_all(rnode->valid_origins);
- qd_bitmask_set_bit(rnode->valid_origins, 0); // This router is a valid origin for all destinations
+ qd_bitmask_set_bit(core_bitmask, 0); // This router is a valid origin for all destinations
for (idx = 0; idx < origin_count; idx++) {
maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
- qd_bitmask_set_bit(rnode->valid_origins, maskbit);
qd_bitmask_set_bit(core_bitmask, maskbit);
}
- }
-
- sys_mutex_unlock(router->lock);
+ } else
+ qd_bitmask_free(core_bitmask);
qdr_core_set_valid_origins(router->router_core, router_maskbit, core_bitmask);
} while (0);
@@ -450,16 +199,12 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
static PyObject* qd_map_destination(PyObject *self, PyObject *args)
{
- RouterAdapter *adapter = (RouterAdapter*) self;
- qd_router_t *router = adapter->router;
- char phase;
- char unused;
- const char *addr_string;
- int maskbit;
- qd_address_t *addr;
- qd_field_iterator_t *iter;
-
- if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ qd_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -467,34 +212,8 @@ static PyObject* qd_map_destination(PyObject *self, PyObject *args)
return 0;
}
- if (router->routers_by_mask_bit[maskbit] == 0) {
- PyErr_SetString(PyExc_Exception, "Router Not Found");
- return 0;
- }
-
- iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
-
- sys_mutex_lock(router->lock);
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- if (!addr) {
- addr = qd_address(router_semantics_for_addr(router, iter, phase, &unused));
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_ITEM_INIT(addr);
- DEQ_INSERT_TAIL(router->addrs, addr);
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- }
- qd_field_iterator_free(iter);
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
- qd_router_add_node_ref_LH(&addr->rnodes, rnode);
+ qdr_core_map_destination(router->router_core, maskbit, addr_string);
- //
- // If the address has an associated waypoint, notify the waypoint module of the changes.
- //
- if (addr->waypoint)
- qd_waypoint_address_updated_LH(router->qd, addr);
-
- sys_mutex_unlock(router->lock);
Py_INCREF(Py_None);
return Py_None;
}
@@ -506,7 +225,6 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
const char *addr_string;
int maskbit;
- qd_address_t *addr;
if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
@@ -516,35 +234,7 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
return 0;
}
- if (router->routers_by_mask_bit[maskbit] == 0) {
- PyErr_SetString(PyExc_Exception, "Router Not Found");
- return 0;
- }
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
- qd_field_iterator_t *iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
-
- sys_mutex_lock(router->lock);
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- qd_field_iterator_free(iter);
-
- if (!addr) {
- PyErr_SetString(PyExc_Exception, "Address Not Found");
- sys_mutex_unlock(router->lock);
- return 0;
- }
-
- qd_router_del_node_ref_LH(&addr->rnodes, rnode);
-
- //
- // If the address has an associated waypoint, notify the waypoint module of the changes.
- //
- if (addr->waypoint)
- qd_waypoint_address_updated_LH(router->qd, addr);
-
- sys_mutex_unlock(router->lock);
-
- qd_router_check_addr(router, addr, 0);
+ qdr_core_unmap_destination(router->router_core, maskbit, addr_string);
Py_INCREF(Py_None);
return Py_None;
@@ -625,11 +315,75 @@ static PyTypeObject RouterAdapterType = {
0 /* tp_version_tag */
};
+
+static void qd_router_mobile_added(void *context, const char *address_hash)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock_state_t lock_state = qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
+ pValue = PyObject_CallObject(pyAdded, pArgs);
+ qd_error_py();
+ Py_DECREF(pArgs);
+ Py_XDECREF(pValue);
+ qd_python_unlock(lock_state);
+ }
+}
+
+
+static void qd_router_mobile_removed(void *context, const char *address_hash)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock_state_t lock_state = qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
+ pValue = PyObject_CallObject(pyRemoved, pArgs);
+ qd_error_py();
+ Py_DECREF(pArgs);
+ Py_XDECREF(pValue);
+ qd_python_unlock(lock_state);
+ }
+}
+
+
+static void qd_router_link_lost(void *context, int link_mask_bit)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock_state_t lock_state = qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
+ pValue = PyObject_CallObject(pyLinkLost, pArgs);
+ qd_error_py();
+ Py_DECREF(pArgs);
+ Py_XDECREF(pValue);
+ qd_python_unlock(lock_state);
+ }
+}
+
+
qd_error_t qd_router_python_setup(qd_router_t *router)
{
qd_error_clear();
log_source = qd_log_source("ROUTER");
+ qdr_core_route_table_handlers(router->router_core,
+ router,
+ qd_router_mobile_added,
+ qd_router_mobile_removed,
+ qd_router_link_lost);
+
//
// If we are not operating as an interior router, don't start the
// router module.
@@ -726,62 +480,3 @@ qd_error_t qd_pyrouter_tick(qd_router_t *router)
return err;
}
-
-void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- char *address = (char*) qd_field_iterator_copy(iter);
-
- qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
- pValue = PyObject_CallObject(pyAdded, pArgs);
- qd_error_py();
- Py_DECREF(pArgs);
- Py_XDECREF(pValue);
- qd_python_unlock(lock_state);
-
- free(address);
- }
-}
-
-
-void qd_router_mobile_removed(qd_router_t *router, const char *address)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
- pValue = PyObject_CallObject(pyRemoved, pArgs);
- qd_error_py();
- Py_DECREF(pArgs);
- Py_XDECREF(pValue);
- qd_python_unlock(lock_state);
- }
-}
-
-
-void qd_router_link_lost(qd_router_t *router, int link_mask_bit)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
- pValue = PyObject_CallObject(pyLinkLost, pArgs);
- qd_error_py();
- Py_DECREF(pArgs);
- Py_XDECREF(pValue);
- qd_python_unlock(lock_state);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/waypoint.c
----------------------------------------------------------------------
diff --git a/src/waypoint.c b/src/waypoint.c
index dcf00b3..fb97311 100644
--- a/src/waypoint.c
+++ b/src/waypoint.c
@@ -95,7 +95,7 @@ static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
if (DEQ_SIZE(addr->rlinks) == 1) {
qd_field_iterator_t *iter = qd_address_iterator_string(wp->address, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_set_phase(iter, wp->in_phase);
- qd_router_mobile_added(router, iter);
+ //qd_router_mobile_added(router, iter);
qd_field_iterator_free(iter);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org