You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/19 00:06:29 UTC
[11/50] [abbrv] qpid-dispatch git commit: DISPATCH_179 - Added
activation and deactivation logic for link routes.
DISPATCH_179 - Added activation and deactivation logic for link routes.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/da00ebcd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/da00ebcd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/da00ebcd
Branch: refs/heads/master
Commit: da00ebcd00d94122d1c59ab95c2e24944b88ea50
Parents: d53f895
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Mar 4 10:17:04 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Mar 4 10:17:04 2016 -0500
----------------------------------------------------------------------
src/router_core/route_control.c | 61 +++++++++++++++++++++++++++---
src/router_core/router_core.c | 23 +++++++++++
src/router_core/router_core_private.h | 47 ++++++++++-------------
3 files changed, 99 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/da00ebcd/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index b72c535..2f66d14 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -132,13 +132,64 @@ static void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identi
}
-static void qdr_route_activate_CT(qdr_core_t *core, qdr_route_active_t *active)
+static void qdr_route_activate_CT(qdr_core_t *core, qdr_route_active_t *active, qdr_connection_t *conn)
{
+ qdr_route_config_t *route = active->config;
+ const char *key;
+
+ if (route->treatment == QD_TREATMENT_LINK_BALANCED) {
+ //
+ // Activate the address(es) for link-routed destinations. If this is the first
+ // activation for this address, notify the router module of the added address.
+ //
+ if (route->out_addr) {
+ qdr_add_connection_ref(&route->out_addr->conns, conn);
+ if (DEQ_SIZE(route->out_addr->conns) == 1) {
+ key = (const char*) qd_hash_key_by_handle(route->out_addr->hash_handle);
+ if (key)
+ qdr_post_mobile_added_CT(core, key);
+ }
+ }
+
+ if (route->in_addr) {
+ qdr_add_connection_ref(&route->in_addr->conns, conn);
+ if (DEQ_SIZE(route->in_addr->conns) == 1) {
+ key = (const char*) qd_hash_key_by_handle(route->in_addr->hash_handle);
+ if (key)
+ qdr_post_mobile_added_CT(core, key);
+ }
+ }
+ }
}
-static void qdr_route_deactivate_CT(qdr_core_t *core, qdr_route_active_t *active)
+static void qdr_route_deactivate_CT(qdr_core_t *core, qdr_route_active_t *active, qdr_connection_t *conn)
{
+ qdr_route_config_t *route = active->config;
+ const char *key;
+
+ if (route->treatment == QD_TREATMENT_LINK_BALANCED) {
+ //
+ // Deactivate the address(es) for link-routed destinations.
+ //
+ if (route->out_addr) {
+ qdr_del_connection_ref(&route->out_addr->conns, conn);
+ if (DEQ_IS_EMPTY(route->out_addr->conns)) {
+ key = (const char*) qd_hash_key_by_handle(route->out_addr->hash_handle);
+ if (key)
+ qdr_post_mobile_removed_CT(core, key);
+ }
+ }
+
+ if (route->in_addr) {
+ qdr_del_connection_ref(&route->in_addr->conns, conn);
+ if (DEQ_IS_EMPTY(route->in_addr->conns)) {
+ key = (const char*) qd_hash_key_by_handle(route->in_addr->hash_handle);
+ if (key)
+ qdr_post_mobile_removed_CT(core, key);
+ }
+ }
+ }
}
@@ -235,7 +286,7 @@ void qdr_route_connection_add_CT(qdr_core_t *core,
// If the connection identifier represents an already open connection, activate the route.
//
if (cid->open_connection)
- qdr_route_activate_CT(core, active);
+ qdr_route_activate_CT(core, active, cid->open_connection);
}
@@ -274,7 +325,7 @@ void qdr_route_connection_opened_CT(qdr_core_t *core,
//
qdr_route_active_t *active = DEQ_HEAD(cid->active_refs);
while (active) {
- qdr_route_activate_CT(core, active);
+ qdr_route_activate_CT(core, active, conn);
active = DEQ_NEXT_N(REF, active);
}
}
@@ -292,7 +343,7 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn)
//
qdr_route_active_t *active = DEQ_HEAD(cid->active_refs);
while (active) {
- qdr_route_deactivate_CT(core, active);
+ qdr_route_deactivate_CT(core, active, conn);
active = DEQ_NEXT_N(REF, active);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/da00ebcd/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 4603555..020a91c 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -30,6 +30,7 @@ ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
ALLOC_DEFINE(qdr_general_work_t);
+ALLOC_DEFINE(qdr_connection_ref_t);
static void qdr_general_handler(void *context);
@@ -262,6 +263,28 @@ void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
}
+void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn)
+{
+ qdr_connection_ref_t *ref = new_qdr_connection_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->conn = conn;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn)
+{
+ qdr_connection_ref_t *ref = DEQ_HEAD(*ref_list);
+ while (ref) {
+ if (ref->conn == conn) {
+ DEQ_REMOVE(*ref_list, ref);
+ free_qdr_connection_ref_t(ref);
+ break;
+ }
+ }
+}
+
+
void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode)
{
qdr_router_ref_t *ref = new_qdr_router_ref_t();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/da00ebcd/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5caae66..0988ffa 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -30,12 +30,11 @@ typedef struct qdr_address_config_t qdr_address_config_t;
typedef struct qdr_node_t qdr_node_t;
typedef struct qdr_router_ref_t qdr_router_ref_t;
typedef struct qdr_link_ref_t qdr_link_ref_t;
-typedef struct qdr_lrp_t qdr_lrp_t;
-typedef struct qdr_lrp_ref_t qdr_lrp_ref_t;
typedef struct qdr_forwarder_t qdr_forwarder_t;
typedef struct qdr_route_config_t qdr_route_config_t;
typedef struct qdr_route_active_t qdr_route_active_t;
typedef struct qdr_conn_identifier_t qdr_conn_identifier_t;
+typedef struct qdr_connection_ref_t qdr_connection_ref_t;
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -254,23 +253,17 @@ void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
-struct qdr_lrp_t {
- DEQ_LINKS(qdr_lrp_t);
- char *prefix;
- bool inbound;
- bool outbound;
- qd_lrp_container_t *container;
+struct qdr_connection_ref_t {
+ DEQ_LINKS(qdr_connection_ref_t);
+ qdr_connection_t *conn;
};
-DEQ_DECLARE(qdr_lrp_t, qdr_lrp_list_t);
+ALLOC_DECLARE(qdr_connection_ref_t);
+DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t);
-struct qdr_lrp_ref_t {
- DEQ_LINKS(qdr_lrp_ref_t);
- qdr_lrp_t *lrp;
-};
+void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
+void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
-ALLOC_DECLARE(qdr_lrp_ref_t);
-DEQ_DECLARE(qdr_lrp_ref_t, qdr_lrp_ref_list_t);
struct qdr_subscription_t {
DEQ_LINKS(qdr_subscription_t);
@@ -285,18 +278,18 @@ DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
struct qdr_address_t {
DEQ_LINKS(qdr_address_t);
- qdr_subscription_list_t subscriptions; ///< In-process message subscribers
- qdr_lrp_ref_list_t lrps; ///< Local link-route destinations
- qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers
- qdr_link_ref_list_t inlinks; ///< Locally-Connected Producers
- qd_bitmask_t *rnodes; ///< Bitmask of remote routers with connected consumers
- qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
- qd_address_treatment_t treatment;
- qdr_forwarder_t *forwarder;
- bool toggle;
- bool waypoint;
- bool block_deletion;
- bool local;
+ qdr_subscription_list_t subscriptions; ///< In-process message subscribers
+ qdr_connection_ref_list_t conns; ///< Local Connections for route-destinations
+ qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers
+ qdr_link_ref_list_t inlinks; ///< Locally-Connected Producers
+ qd_bitmask_t *rnodes; ///< Bitmask of remote routers with connected consumers
+ qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
+ qd_address_treatment_t treatment;
+ qdr_forwarder_t *forwarder;
+ bool toggle;
+ bool waypoint;
+ bool block_deletion;
+ bool local;
/**@name Statistics */
///@{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org