You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/02 18:20:42 UTC
[4/4] qpid-dispatch git commit: DISPATCH-179 - Updated agent to
handle updated router.route entity definition.
DISPATCH-179 - Updated agent to handle updated router.route entity definition.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/dc675a38
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/dc675a38
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/dc675a38
Branch: refs/heads/tross-DISPATCH-179-1
Commit: dc675a38f24907566e9c575756e6be56ec82e0ef
Parents: c72e179
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Mar 2 12:19:08 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Mar 2 12:19:08 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 1 -
src/CMakeLists.txt | 2 +-
src/router_core/DESIGN | 7 +-
src/router_core/agent.c | 17 +-
src/router_core/agent_address.c | 10 +-
src/router_core/agent_link.c | 2 +-
src/router_core/agent_route.c | 315 ++++++++++++++---------------
src/router_core/agent_route.h | 2 +-
src/router_core/agent_waypoint.c | 165 ---------------
src/router_core/agent_waypoint.h | 39 ----
src/router_core/connections.c | 4 +-
src/router_core/management_agent.c | 62 +++---
src/router_core/route_control.c | 168 +++++++++++++++
src/router_core/route_control.h | 50 +++++
src/router_core/route_tables.c | 3 +-
src/router_core/router_core.c | 1 -
src/router_core/router_core_private.h | 82 +++++---
17 files changed, 488 insertions(+), 442 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index b38022f..338c88e 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -546,7 +546,6 @@ typedef enum {
QD_ROUTER_CONNECTION,
QD_ROUTER_LINK,
QD_ROUTER_ADDRESS,
- QD_ROUTER_WAYPOINT,
QD_ROUTER_EXCHANGE,
QD_ROUTER_BINDING
} qd_router_entity_type_t;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f74babe..472090d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -66,12 +66,12 @@ set(qpid_dispatch_SOURCES
router_config.c
router_core/agent.c
router_core/agent_address.c
- router_core/agent_waypoint.c
router_core/agent_link.c
router_core/agent_route.c
router_core/connections.c
router_core/error.c
router_core/forwarder.c
+ router_core/route_control.c
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/DESIGN
----------------------------------------------------------------------
diff --git a/src/router_core/DESIGN b/src/router_core/DESIGN
index dde7cb4..8114370 100644
--- a/src/router_core/DESIGN
+++ b/src/router_core/DESIGN
@@ -184,7 +184,7 @@ Forwarding Treatment
Transition from fanout/bias to a one-dimensional list of treatment:
- MULTICAST_FLOOD
+ MULTICAST_FLOOD (not available to users)
Messages are delivered to all subscribers via all unique paths. If there is
redundancy in the router topology, multiple copies of each message will be
@@ -257,3 +257,8 @@ org
waypoint (proposed)
waypoint (deprecated)
+
+============================
+Route Control Data Structure
+============================
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index b0c5381..40db041 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -20,7 +20,6 @@
#include <qpid/dispatch/amqp.h>
#include "agent_route.h"
#include "agent_address.h"
-#include "agent_waypoint.h"
#include "agent_link.h"
#include "router_core_private.h"
#include <stdio.h>
@@ -48,7 +47,7 @@ static void qdr_agent_response_handler(void *context)
sys_mutex_unlock(core->query_lock);
if (query) {
- core->agent_response_handler(query->context, query->status, query->more);
+ core->agent_response_handler(query->context, &query->status, query->more);
if (!query->more) {
if (query->next_key)
qdr_field_free(query->next_key);
@@ -70,6 +69,7 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
qd_timer_schedule(core->agent_timer, 0);
}
+
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
qd_router_entity_type_t type,
@@ -79,14 +79,12 @@ qdr_query_t *qdr_query(qdr_core_t *core,
qdr_query_t *query = new_qdr_query_t();
DEQ_ITEM_INIT(query);
+ ZERO(query);
query->core = core;
query->entity_type = type;
query->context = context;
query->body = body;
- query->next_key = 0;
- query->next_offset = 0;
query->more = false;
- query->status = 0;
return query;
}
@@ -188,9 +186,6 @@ qdr_query_t *qdr_manage_query(qdr_core_t *core,
qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT);
break;
- case QD_ROUTER_WAYPOINT:
- break;
-
case QD_ROUTER_EXCHANGE:
break;
@@ -209,7 +204,6 @@ void qdr_query_add_attribute_names(qdr_query_t *query)
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break;
case QD_ROUTER_ADDRESS: qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
- case QD_ROUTER_WAYPOINT: break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
}
@@ -322,7 +316,6 @@ static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool disc
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: break;
case QD_ROUTER_ADDRESS: qdra_address_get_CT(core, name, identity, query, qdr_address_columns); break;
- case QD_ROUTER_WAYPOINT: break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
}
@@ -340,7 +333,6 @@ static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool di
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: break;
case QD_ROUTER_ADDRESS: break;
- case QD_ROUTER_WAYPOINT: qdra_waypoint_create_CT(core, name, query, in_body); break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
}
@@ -358,7 +350,6 @@ static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool di
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: break;
case QD_ROUTER_ADDRESS: qdra_address_delete_CT(core, name, identity, query); break;
- case QD_ROUTER_WAYPOINT: qdra_waypoint_delete_CT(core, name, identity, query); break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
}
@@ -378,7 +369,6 @@ static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: qdra_link_get_first_CT(core, query, offset); break;
case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break;
- case QD_ROUTER_WAYPOINT: break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
}
@@ -396,7 +386,6 @@ static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool
case QD_ROUTER_CONNECTION: break;
case QD_ROUTER_LINK: qdra_link_get_next_CT(core, query); break;
case QD_ROUTER_ADDRESS: qdra_address_get_next_CT(core, query); break;
- case QD_ROUTER_WAYPOINT: break;
case QD_ROUTER_EXCHANGE: break;
case QD_ROUTER_BINDING: break;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index bc008ff..1ed1614 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -185,14 +185,14 @@ void qdra_address_get_CT(qdr_core_t *core,
if (addr == 0) {
// Send back a 404
- query->status = &QD_AMQP_NOT_FOUND;
+ query->status = QD_AMQP_NOT_FOUND;
}
else {
//
// Write the columns of the address entity into the response body.
//
qdr_manage_write_address_map_CT(addr, query->body, qdr_address_columns);
- query->status = &QD_AMQP_OK;
+ query->status = QD_AMQP_OK;
}
//
@@ -208,7 +208,7 @@ void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
//
// Queries that get this far will always succeed.
//
- query->status = &QD_AMQP_OK;
+ query->status = QD_AMQP_OK;
//
// If the offset goes beyond the set of addresses, end the query now.
@@ -302,7 +302,7 @@ void qdra_address_delete_CT(qdr_core_t *core,
//TOOD - do something here
}
else {
- query->status = &QD_AMQP_BAD_REQUEST;
+ query->status = QD_AMQP_BAD_REQUEST;
success = false;
}
@@ -310,7 +310,7 @@ void qdra_address_delete_CT(qdr_core_t *core,
// TODO - Add more logic here.
if (success) {
// If the request was successful then the statusCode MUST be 204 (No Content).
- query->status = &QD_AMQP_NO_CONTENT;
+ query->status = QD_AMQP_NO_CONTENT;
}
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 997b9e6..9eee88f 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -153,7 +153,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
//
// Queries that get this far will always succeed.
//
- query->status = &QD_AMQP_OK;
+ query->status = QD_AMQP_OK;
//
// If the offset goes beyond the set of links, end the query now.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent_route.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_route.c b/src/router_core/agent_route.c
index af042cd..a1586c0 100644
--- a/src/router_core/agent_route.c
+++ b/src/router_core/agent_route.c
@@ -17,40 +17,37 @@
* under the License.
*/
-#include "agent_link.h"
+#include "agent_route.h"
+#include "route_control.h"
#include <stdio.h>
-#define QDR_ROUTE_NAME 0
-#define QDR_ROUTE_IDENTITY 1
-#define QDR_ROUTE_TYPE 2
-#define QDR_ROUTE_OBJECT_TYPE 3
-#define QDR_ROUTE_ADDRESS 4
-#define QDR_ROUTE_CONNECTOR 5
-#define QDR_ROUTE_DIRECTION 6
-#define QDR_ROUTE_TREATMENT 7
-#define QDR_ROUTE_INGRESS_ADDRESS 8
-#define QDR_ROUTE_EGRESS_ADDRESS 9
-#define QDR_ROUTE_INGRESS_TREATMENT 10
-#define QDR_ROUTE_EGRESS_TREATMENT 11
+#define QDR_ROUTE_NAME 0
+#define QDR_ROUTE_IDENTITY 1
+#define QDR_ROUTE_TYPE 2
+#define QDR_ROUTE_ADDRESS 3
+#define QDR_ROUTE_PATH 4
+#define QDR_ROUTE_TREATMENT 5
+#define QDR_ROUTE_CONNECTORS 6
+#define QDR_ROUTE_CONTAINERS 7
+#define QDR_ROUTE_ROUTE_ADDRESS 8
const char *qdr_route_columns[] =
{"name",
"identity",
"type",
- "objectType",
"address",
- "connector",
- "direction",
+ "path",
"treatment",
- "ingressAddress",
- "egressAddress",
- "ingressTreatment",
- "egressTreatment",
+ "connectors",
+ "containers",
+ "routeAddress",
0};
-static void qdr_route_insert_column_CT(qdr_route_t *route, int col, qd_composed_field_t *body, bool as_map)
+static void qdr_route_insert_column_CT(qdr_route_config_t *route, int col, qd_composed_field_t *body, bool as_map)
{
+ const char *text = 0;
+
if (as_map)
qd_compose_insert_string(body, qdr_route_columns[col]);
@@ -62,29 +59,55 @@ static void qdr_route_insert_column_CT(qdr_route_t *route, int col, qd_composed_
}
// else fall into IDENTITY
- case QDR_ROUTE_IDENTITY:
+ case QDR_ROUTE_IDENTITY: {
+ char id_str[100];
+ snprintf(id_str, 100, "%ld", route->identity);
+ qd_compose_insert_string(body, id_str);
+ break;
+ }
case QDR_ROUTE_TYPE:
qd_compose_insert_string(body, "org.apache.qpid.dispatch.router.route");
break;
- case QDR_ROUTE_OBJECT_TYPE:
case QDR_ROUTE_ADDRESS:
- case QDR_ROUTE_CONNECTOR:
- case QDR_ROUTE_DIRECTION:
+ if (route->addr_config)
+ qd_compose_insert_string(body, (const char*) qd_hash_key_by_handle(route->addr_config->hash_handle));
+ else
+ qd_compose_insert_null(body);
+ break;
+
+ case QDR_ROUTE_PATH:
+ switch (route->path) {
+ case QDR_ROUTE_PATH_DIRECT: text = "direct"; break;
+ case QDR_ROUTE_PATH_SOURCE: text = "source"; break;
+ case QDR_ROUTE_PATH_SINK: text = "sink"; break;
+ case QDR_ROUTE_PATH_WAYPOINT: text = "waypoint"; break;
+ }
+ qd_compose_insert_string(body, text);
+ break;
+
case QDR_ROUTE_TREATMENT:
- case QDR_ROUTE_INGRESS_ADDRESS:
- case QDR_ROUTE_EGRESS_ADDRESS:
- case QDR_ROUTE_INGRESS_TREATMENT:
- case QDR_ROUTE_EGRESS_TREATMENT:
- default:
+ switch (route->treatment) {
+ case QD_TREATMENT_MULTICAST_FLOOD:
+ case QD_TREATMENT_MULTICAST_ONCE: text = "multicast"; break;
+ case QD_TREATMENT_ANYCAST_CLOSEST: text = "closest"; break;
+ case QD_TREATMENT_ANYCAST_BALANCED: text = "balanced"; break;
+ case QD_TREATMENT_LINK_BALANCED: text = "linkBalanced"; break;
+ }
+ qd_compose_insert_string(body, text);
+ break;
+
+ case QDR_ROUTE_CONNECTORS:
+ case QDR_ROUTE_CONTAINERS:
+ case QDR_ROUTE_ROUTE_ADDRESS:
qd_compose_insert_null(body);
break;
}
}
-static void qdr_agent_write_route_CT(qdr_query_t *query, qdr_route_t *route)
+static void qdr_agent_write_route_CT(qdr_query_t *query, qdr_route_config_t *route)
{
qd_composed_field_t *body = query->body;
@@ -97,7 +120,7 @@ static void qdr_agent_write_route_CT(qdr_query_t *query, qdr_route_t *route)
qd_compose_end_list(body);
}
-static void qdr_manage_advance_route_CT(qdr_query_t *query, qdr_route_t *route)
+static void qdr_manage_advance_route_CT(qdr_query_t *query, qdr_route_config_t *route)
{
query->next_offset++;
route = DEQ_NEXT(route);
@@ -110,12 +133,12 @@ void qdra_route_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
//
// Queries that get this far will always succeed.
//
- query->status = &QD_AMQP_OK;
+ query->status = QD_AMQP_OK;
//
// If the offset goes beyond the set of objects, end the query now.
//
- if (offset >= DEQ_SIZE(core->routes)) {
+ if (offset >= DEQ_SIZE(core->route_config)) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
@@ -124,7 +147,7 @@ void qdra_route_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
//
// Run to the object at the offset.
//
- qdr_route_t *route = DEQ_HEAD(core->routes);
+ qdr_route_config_t *route = DEQ_HEAD(core->route_config);
for (int i = 0; i < offset && route; i++)
route = DEQ_NEXT(route);
assert(route);
@@ -149,13 +172,13 @@ void qdra_route_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
void qdra_route_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
- qdr_route_t *route = 0;
+ qdr_route_config_t *route = 0;
- if (query->next_offset < DEQ_SIZE(core->routes)) {
- route = DEQ_HEAD(core->routes);
- for (int i = 0; i < query->next_offset && route; i++)
- route = DEQ_NEXT(route);
- }
+ if (query->next_offset < DEQ_SIZE(core->route_config)) {
+ route = DEQ_HEAD(core->route_config);
+ for (int i = 0; i < query->next_offset && route; i++)
+ route = DEQ_NEXT(route);
+ }
if (route) {
//
@@ -181,129 +204,97 @@ static qd_address_treatment_t qdra_treatment(qd_parsed_field_t *field)
{
if (field) {
qd_field_iterator_t *iter = qd_parse_raw(field);
- if (qd_field_iterator_equal(iter, (unsigned char*) "multi")) return QD_TREATMENT_MULTICAST_ONCE;
- if (qd_field_iterator_equal(iter, (unsigned char*) "anyClosest")) return QD_TREATMENT_ANYCAST_CLOSEST;
- if (qd_field_iterator_equal(iter, (unsigned char*) "anyBalanced")) return QD_TREATMENT_ANYCAST_BALANCED;
+ if (qd_field_iterator_equal(iter, (unsigned char*) "multicast")) return QD_TREATMENT_MULTICAST_ONCE;
+ if (qd_field_iterator_equal(iter, (unsigned char*) "closest")) return QD_TREATMENT_ANYCAST_CLOSEST;
+ if (qd_field_iterator_equal(iter, (unsigned char*) "balanced")) return QD_TREATMENT_ANYCAST_BALANCED;
+ if (qd_field_iterator_equal(iter, (unsigned char*) "linkBalanced")) return QD_TREATMENT_LINK_BALANCED;
}
return QD_TREATMENT_ANYCAST_BALANCED;
}
-static qdr_address_config_t *qdra_configure_address_prefix_CT(qdr_core_t *core, qd_parsed_field_t *addr_field, char cls,
- qd_address_treatment_t treatment)
-{
- if (!addr_field)
- return 0;
-
- qd_field_iterator_t *iter = qd_parse_raw(addr_field);
- qd_address_iterator_override_prefix(iter, cls);
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-
- qdr_address_config_t *addr = 0;
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
- if (addr) {
- // Log error TODO
- return 0;
- }
-
- addr = new_qdr_address_config_t();
- DEQ_ITEM_INIT(addr);
- addr->treatment = treatment;
-
- if (!!addr) {
- qd_field_iterator_reset(iter);
- qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addr_config, addr);
- }
-
- return addr;
-}
-
-
-static qdr_address_t *qdra_configure_address_CT(qdr_core_t *core, qd_parsed_field_t *addr_field, char cls,
- qd_address_treatment_t treatment)
+void qdra_route_create_CT(qdr_core_t *core, qd_field_iterator_t *name,
+ qdr_query_t *query, qd_parsed_field_t *in_body)
{
- if (!addr_field)
- return 0;
-
- qd_field_iterator_t *iter = qd_parse_raw(addr_field);
- qd_address_iterator_override_prefix(iter, cls);
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-
- qdr_address_t *addr = 0;
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
- if (addr) {
- // Log error TODO
- return 0;
- }
+ // TODO - reject duplicate names
- addr = qdr_address_CT(core, treatment);
+ while (true) {
+ //
+ // Validation of the request occurs here. Make sure the body is a map.
+ //
+ if (!qd_parse_is_map(in_body)) {
+ query->status = QD_AMQP_BAD_REQUEST;
+ break;
+ }
- if (!!addr) {
- qd_field_iterator_reset(iter);
- qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addrs, addr);
- }
+ //
+ // Extract the fields from the request
+ //
+ qd_parsed_field_t *addr_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_ADDRESS]);
+ qd_parsed_field_t *path_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_PATH]);
+ qd_parsed_field_t *conn_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_CONNECTORS]);
+ qd_parsed_field_t *cont_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_CONTAINERS]);
+ qd_parsed_field_t *treatment_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_TREATMENT]);
+ qd_parsed_field_t *route_addr_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_ROUTE_ADDRESS]);
- return addr;
-}
+ //
+ // Determine the path, which defaults to Direct
+ //
+ qdr_route_path_t path = QDR_ROUTE_PATH_DIRECT;
+ if (path_field) {
+ qd_field_iterator_t *path_iter = qd_parse_raw(path_field);
+ if (qd_field_iterator_equal(path_iter, (unsigned char*) "direct"))
+ path = QDR_ROUTE_PATH_DIRECT;
+ else if (qd_field_iterator_equal(path_iter, (unsigned char*) "source"))
+ path = QDR_ROUTE_PATH_SOURCE;
+ else if (qd_field_iterator_equal(path_iter, (unsigned char*) "sink"))
+ path = QDR_ROUTE_PATH_SINK;
+ else if (qd_field_iterator_equal(path_iter, (unsigned char*) "waypoint"))
+ path = QDR_ROUTE_PATH_WAYPOINT;
+ else {
+ query->status = QD_AMQP_BAD_REQUEST;
+ break;
+ }
+ }
+ qd_address_treatment_t treatment = qdra_treatment(treatment_field);
-void qdra_route_create_CT(qdr_core_t *core, qd_field_iterator_t *name,
- qdr_query_t *query, qd_parsed_field_t *in_body)
-{
- // TODO - reject duplicate names
+ //
+ // Ask the route_control module to create the route object and put into effect any needed
+ // side effects.
+ //
+ qdr_route_config_t *route;
+ const char *error = qdr_route_create_CT(core, name, path, treatment, addr_field, route_addr_field, &route);
- if (qd_parse_is_map(in_body)) {
- qd_parsed_field_t *type_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_OBJECT_TYPE]);
- qd_parsed_field_t *addr_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_ADDRESS]);
- qd_parsed_field_t *conn_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_CONNECTOR]);
- qd_parsed_field_t *dir_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_DIRECTION]);
- qd_parsed_field_t *sem_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_TREATMENT]);
- //qd_parsed_field_t *in_addr_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_INGRESS_ADDRESS]);
- //qd_parsed_field_t *out_addr_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_EGRESS_ADDRESS]);
- //qd_parsed_field_t *in_sem_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_INGRESS_TREATMENT]);
- //qd_parsed_field_t *out_sem_field = qd_parse_value_by_key(in_body, qdr_route_columns[QDR_ROUTE_EGRESS_TREATMENT]);
-
- bool still_good = true;
- qdr_route_t *route = new_qdr_route_t();
- ZERO(route);
-
- route->identity = qdr_identifier(core);
- if (name)
- route->name = (char*) qd_field_iterator_copy(name);
-
- if (!type_field)
- route->object_type = QDR_ROUTE_TYPE_ADDRESS;
- else {
- qd_field_iterator_t *type_iter = qd_parse_raw(type_field);
- if (qd_field_iterator_equal(type_iter, (unsigned char*) "address"))
- route->object_type = QDR_ROUTE_TYPE_ADDRESS;
- else if (qd_field_iterator_equal(type_iter, (unsigned char*) "linkDestination"))
- route->object_type = QDR_ROUTE_TYPE_LINK_DEST;
- else if (qd_field_iterator_equal(type_iter, (unsigned char*) "waypoint"))
- route->object_type = QDR_ROUTE_TYPE_WAYPOINT;
- else
- still_good = false;
+ if (error) {
+ query->status.status = 400;
+ query->status.description = error;
+ break;
}
- route->treatment = qdra_treatment(sem_field);
-
- route->direction_in = true;
- route->direction_out = true;
- if (dir_field) {
- qd_field_iterator_t *dir_iter = qd_parse_raw(dir_field);
- if (qd_field_iterator_equal(dir_iter, (unsigned char*) "in"))
- route->direction_out = false;
- if (qd_field_iterator_equal(dir_iter, (unsigned char*) "out"))
- route->direction_in = false;
+ //
+ // Add the initial list of connection labels to the route
+ //
+ if (conn_field && qd_parse_is_list(conn_field)) {
+ uint32_t count = qd_parse_sub_count(conn_field);
+ for (uint32_t i = 0; i < count; i++) {
+ qd_parsed_field_t *conn_label = qd_parse_sub_value(conn_field, i);
+ qdr_route_connection_add_CT(route, conn_label, false);
+ }
}
- if (conn_field) {
- qd_field_iterator_t *conn_iter = qd_parse_raw(conn_field);
- route->connector_label = (char*) qd_field_iterator_copy(conn_iter);
+ //
+ // Add the initial list of container IDs to the route
+ //
+ if (cont_field && qd_parse_is_list(cont_field)) {
+ uint32_t count = qd_parse_sub_count(cont_field);
+ for (uint32_t i = 0; i < count; i++) {
+ qd_parsed_field_t *cont_id = qd_parse_sub_value(cont_field, i);
+ qdr_route_connection_add_CT(route, cont_id, true);
+ }
}
+ /*
switch (route->object_type) {
case QDR_ROUTE_TYPE_ADDRESS:
route->addr_config = qdra_configure_address_prefix_CT(core, addr_field, 'Z', route->treatment);
@@ -319,26 +310,30 @@ void qdra_route_create_CT(qdr_core_t *core, qd_field_iterator_t *name,
case QDR_ROUTE_TYPE_WAYPOINT:
break;
}
+ */
- if (still_good) {
- // TODO - write response map
- query->status = &QD_AMQP_CREATED;
- DEQ_INSERT_TAIL(core->routes, route);
- } else {
- query->status = &QD_AMQP_BAD_REQUEST;
- if (route->name)
- free(route->name);
- free_qdr_route_t(route);
+ //
+ // Compose the result map for the response.
+ //
+ if (query->body) {
+ qd_compose_start_map(query->body);
+ for (int col = 0; col < QDR_ROUTE_COLUMN_COUNT; col++)
+ qdr_route_insert_column_CT(route, col, query->body, true);
+ qd_compose_end_map(query->body);
}
+
+ query->status = QD_AMQP_CREATED;
+ break;
}
- else
- query->status = &QD_AMQP_BAD_REQUEST;
//
- // Enqueue the response.
+ // Enqueue the response if there is a body. If there is no body, this is a management
+ // operation created internally by the configuration file parser.
//
- if (query->body)
+ if (query->body) {
+ if (query->status.status / 100 > 2)
+ qd_compose_insert_null(query->body);
qdr_agent_enqueue_response_CT(core, query);
- else
+ } else
free_qdr_query_t(query);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent_route.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_route.h b/src/router_core/agent_route.h
index 86ec69c..ec8d6ba 100644
--- a/src/router_core/agent_route.h
+++ b/src/router_core/agent_route.h
@@ -28,7 +28,7 @@ void qdra_route_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_
void qdra_route_delete_CT(qdr_core_t *core, qdr_query_t *query, qd_field_iterator_t *name,
qd_field_iterator_t *identity);
-#define QDR_ROUTE_COLUMN_COUNT 12
+#define QDR_ROUTE_COLUMN_COUNT 9
const char *qdr_route_columns[QDR_ROUTE_COLUMN_COUNT + 1];
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent_waypoint.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_waypoint.c b/src/router_core/agent_waypoint.c
deleted file mode 100644
index aa5ec37..0000000
--- a/src/router_core/agent_waypoint.c
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "agent_waypoint.h"
-
-#define QDR_WAYPOINT_NAME 0
-#define QDR_WAYPOINT_ADDRESS 1
-#define QDR_WAYPOINT_CONNECTOR 2
-#define QDR_WAYPOINT_INPHASE 3
-#define QDR_WAYPOINT_OUTPHASE 4
-#define QDR_WAYPOINT_MODE 5
-
-#define QDR_WAYPOINT_COLUMN_COUNT 6
-
-static const char *qdr_waypoint_columns[] =
- {"name",
- "address",
- "connector",
- "inPhase",
- "outPhase",
- "mode",
- 0};
-
-static void qdr_insert_waypoint_columns_CT(qd_composed_field_t *body,
- int column_index)
-{
- // TODO replace nulls with actual values.
- switch(column_index) {
- case QDR_WAYPOINT_NAME:
- qd_compose_insert_null(body);
- break;
-
- case QDR_WAYPOINT_ADDRESS:
- qd_compose_insert_null(body);
- break;
-
- case QDR_WAYPOINT_CONNECTOR:
- qd_compose_insert_null(body);
- break;
-
- case QDR_WAYPOINT_INPHASE:
- qd_compose_insert_null(body);
- break;
-
- case QDR_WAYPOINT_OUTPHASE:
- qd_compose_insert_null(body);
- break;
-
- case QDR_WAYPOINT_MODE:
- qd_compose_insert_null(body); // TEMP
- break;
-
- default:
- qd_compose_insert_null(body);
- break;
- }
-
-}
-
-static void qdr_manage_write_response_map_CT(qd_composed_field_t *body)
-{
- qd_compose_start_map(body);
-
- for(int i = 0; i < QDR_WAYPOINT_COLUMN_COUNT; i++) {
- qd_compose_insert_string(body, qdr_waypoint_columns[i]);
- qdr_insert_waypoint_columns_CT(body, i);
- }
-
- qd_compose_end_map(body);
-}
-
-void qdra_waypoint_create_CT(qdr_core_t *core,
- qd_field_iterator_t *name,
- qdr_query_t *query,
- qd_parsed_field_t *in_body)
-{
- // Get the map fields from the body
- if (qd_parse_is_map(in_body)) {
- qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body, qdr_waypoint_columns[1]);
- qd_parsed_field_t *connector_field = qd_parse_value_by_key(in_body, qdr_waypoint_columns[2]);
- qd_parsed_field_t *inPhase_field = qd_parse_value_by_key(in_body, qdr_waypoint_columns[3]);
- qd_parsed_field_t *outPhase_field = qd_parse_value_by_key(in_body, qdr_waypoint_columns[4]);
- qd_parsed_field_t *mode_field = qd_parse_value_by_key(in_body, qdr_waypoint_columns[5]);
-
- if ( address_field &&
- connector_field &&
- inPhase_field &&
- outPhase_field &&
- mode_field) {
- // TODO - Add code here that would actually create a waypoint.
- // If the request was successful then the statusCode MUST be 201 (Created) and the body of the message
- // MUST consist an amqp-value section that contains a Map containing the actual attributes of the entity created
- qdr_manage_write_response_map_CT(query->body);
- query->status = &QD_AMQP_CREATED;
- }
- else {
- query->status = &QD_AMQP_BAD_REQUEST;
- }
- }
- else {
- query->status = &QD_AMQP_BAD_REQUEST;
- }
-
- //
- // Enqueue the response.
- //
- qdr_agent_enqueue_response_CT(core, query);
-
-
-}
-
-void qdra_waypoint_delete_CT(qdr_core_t *core,
- qd_field_iterator_t *name,
- qd_field_iterator_t *identity,
- qdr_query_t *query)
-{
- bool success = true;
-
- if (identity) {//If there is identity, ignore the name
- //TOOD - do something here
- }
- else if (name) {
- //TOOD - do something here
- }
- else {
- query->status = &QD_AMQP_BAD_REQUEST;
- success = false;
- }
-
-
- // TODO - Add more logic here.
- if (success) {
- // If the request was successful then the statusCode MUST be 204 (No Content).
- query->status = &QD_AMQP_NO_CONTENT;
- }
-
- //
- // Enqueue the response.
- //
- qdr_agent_enqueue_response_CT(core, query);
-}
-
-
-void qdra_waypoint_update_CT(qdr_core_t *core, qd_field_iterator_t *name, qdr_query_t *query)
-{
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/agent_waypoint.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_waypoint.h b/src/router_core/agent_waypoint.h
deleted file mode 100644
index 7abc38f..0000000
--- a/src/router_core/agent_waypoint.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#ifndef qdr_agent_waypoint
-#define qdr_agent_waypoint 1
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "router_core_private.h"
-
-void qdra_waypoint_create_CT(qdr_core_t *core,
- qd_field_iterator_t *name,
- qdr_query_t *query,
- qd_parsed_field_t *in_body);
-
-void qdra_waypoint_delete_CT(qdr_core_t *core,
- qd_field_iterator_t *name,
- qd_field_iterator_t *identity,
- qdr_query_t *query);
-
-
-void qdra_waypoint_update_CT(qdr_core_t *core,
- qd_field_iterator_t *name,
- qdr_query_t *query);
-
-#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 40e2aaf..37876d4 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -76,7 +76,6 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
conn->user_context = 0;
conn->incoming = incoming;
conn->role = role;
- conn->label = label;
conn->strip_annotations_in = strip_annotations_in;
conn->strip_annotations_out = strip_annotations_out;
conn->mask_bit = -1;
@@ -84,7 +83,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
DEQ_INIT(conn->work_list);
conn->work_lock = sys_mutex();
- action->args.connection.conn = conn;
+ action->args.connection.conn = conn;
+ action->args.connection.label = label;
qdr_action_enqueue(core, action);
return conn;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index e8823b4..2ad0249 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -52,11 +52,14 @@ const char * const status_code = "statusCode";
const char * MANAGEMENT_INTERNAL = "_local/$_management_internal";
//TODO - Move these to amqp.h
-const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
-const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE";
-const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ";
-const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE";
-const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE";
+const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
+const unsigned char *MANAGEMENT_CREATE = (unsigned char*) "CREATE";
+const unsigned char *MANAGEMENT_READ = (unsigned char*) "READ";
+const unsigned char *MANAGEMENT_UPDATE = (unsigned char*) "UPDATE";
+const unsigned char *MANAGEMENT_DELETE = (unsigned char*) "DELETE";
+const unsigned char *MANAGEMENT_ADD_CONTAINER = (unsigned char*) "ADD-CONTAINER";
+const unsigned char *MANAGEMENT_REMOVE_CONTAINER_CLEAN = (unsigned char*) "REMOVE-CONTAINER-CLEAN";
+const unsigned char *MANAGEMENT_REMOVE_CONTAINER_HARD = (unsigned char*) "REMOVE-CONTAINER-HARD";
typedef enum {
@@ -65,6 +68,9 @@ typedef enum {
QD_ROUTER_OPERATION_READ,
QD_ROUTER_OPERATION_UPDATE,
QD_ROUTER_OPERATION_DELETE,
+ QD_ROUTER_OPERATION_ADD_CONTAINER,
+ QD_ROUTER_OPERATION_REMOVE_CONTAINER_CLEAN,
+ QD_ROUTER_OPERATION_REMOVE_CONTAINER_HARD
} qd_router_operation_type_t;
@@ -153,7 +159,7 @@ static void qd_manage_response_handler(void *context, const qd_amqp_error_t *sta
qd_management_context_t *ctx = (qd_management_context_t*) context;
if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) {
- if (status == &QD_AMQP_OK) { // There is no error, proceed to conditionally call get_next
+ if (status->status / 100 == 2) { // There is no error, proceed to conditionally call get_next
if (more) {
//If there are no more rows to process or the status returned is something other than
// QD_AMQP_OK, we will close the list, send the message and
@@ -356,8 +362,6 @@ static bool qd_can_handle_request(qd_field_iterator_t *props,
*entity_type = QD_ROUTER_ADDRESS;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), link_entity_type))
*entity_type = QD_ROUTER_LINK;
- else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), waypoint_entity_type))
- *entity_type = QD_ROUTER_WAYPOINT;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), route_entity_type))
*entity_type = QD_ROUTER_ROUTE;
else
@@ -379,6 +383,12 @@ static bool qd_can_handle_request(qd_field_iterator_t *props,
(*operation_type) = QD_ROUTER_OPERATION_UPDATE;
else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_DELETE))
(*operation_type) = QD_ROUTER_OPERATION_DELETE;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_ADD_CONTAINER) && *entity_type == QD_ROUTER_ROUTE)
+ (*operation_type) = QD_ROUTER_OPERATION_ADD_CONTAINER;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_REMOVE_CONTAINER_CLEAN) && *entity_type == QD_ROUTER_ROUTE)
+ (*operation_type) = QD_ROUTER_OPERATION_REMOVE_CONTAINER_CLEAN;
+ else if (qd_field_iterator_equal(qd_parse_raw(parsed_field), MANAGEMENT_REMOVE_CONTAINER_HARD) && *entity_type == QD_ROUTER_ROUTE)
+ (*operation_type) = QD_ROUTER_OPERATION_REMOVE_CONTAINER_HARD;
else
// This is an unknown operation type. cannot be handled, return false.
return false;
@@ -423,22 +433,26 @@ void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unuse
if (qd_can_handle_request(app_properties_iter, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) {
switch (operation_type) {
- case QD_ROUTER_OPERATION_QUERY:
- qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset);
- break;
- case QD_ROUTER_OPERATION_CREATE:
- qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter);
- break;
- case QD_ROUTER_OPERATION_READ:
- qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
- break;
- case QD_ROUTER_OPERATION_UPDATE:
- qd_core_agent_update_handler();
- break;
- case QD_ROUTER_OPERATION_DELETE:
- qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
- break;
- }
+ case QD_ROUTER_OPERATION_QUERY:
+ qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset);
+ break;
+ case QD_ROUTER_OPERATION_CREATE:
+ qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter);
+ break;
+ case QD_ROUTER_OPERATION_READ:
+ qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
+ break;
+ case QD_ROUTER_OPERATION_UPDATE:
+ qd_core_agent_update_handler();
+ break;
+ case QD_ROUTER_OPERATION_DELETE:
+ qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
+ break;
+ case QD_ROUTER_OPERATION_ADD_CONTAINER:
+ case QD_ROUTER_OPERATION_REMOVE_CONTAINER_CLEAN:
+ case QD_ROUTER_OPERATION_REMOVE_CONTAINER_HARD:
+ break;
+ }
} else {
//
// The C management agent is not going to handle this request. Forward it off to Python.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
new file mode 100644
index 0000000..634c1d2
--- /dev/null
+++ b/src/router_core/route_control.c
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "route_control.h"
+
+ALLOC_DEFINE(qdr_route_active_t);
+ALLOC_DEFINE(qdr_route_config_t);
+ALLOC_DEFINE(qdr_conn_identifier_t);
+
+
+
+static const char *qdr_configure_address_prefix_CT(qdr_core_t *core,
+ qd_parsed_field_t *addr_field,
+ char cls,
+ qd_address_treatment_t treatment,
+ qdr_address_config_t **_addr)
+{
+ if (!addr_field)
+ return "Missing address field";
+
+ qd_field_iterator_t *iter = qd_parse_raw(addr_field);
+ qd_address_iterator_override_prefix(iter, cls);
+ qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+
+ qdr_address_config_t *addr = 0;
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+ if (addr)
+ return "Address prefix conflicts with existing prefix";
+
+ addr = new_qdr_address_config_t();
+ DEQ_ITEM_INIT(addr);
+ addr->treatment = treatment;
+
+ if (!!addr) {
+ qd_field_iterator_reset(iter);
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addr_config, addr);
+ }
+
+ *_addr = addr;
+ return 0;
+}
+
+/*
+static const char *qdr_configure_address_CT(qdr_core_t *core,
+ qd_parsed_field_t *addr_field,
+ char cls,
+ qd_address_treatment_t treatment,
+ qdr_address_t **_addr)
+{
+ if (!addr_field)
+ return "Missing address field";
+
+ qd_field_iterator_t *iter = qd_parse_raw(addr_field);
+ qd_address_iterator_override_prefix(iter, cls);
+ qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+
+ qdr_address_t *addr = 0;
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+ if (addr)
+ return "Address conflicts with existing address";
+
+ addr = qdr_address_CT(core, treatment);
+
+ if (!!addr) {
+ qd_field_iterator_reset(iter);
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ }
+
+ *_addr = addr;
+ return 0;
+}
+*/
+
+const char *qdr_route_create_CT(qdr_core_t *core,
+ qd_field_iterator_t *name,
+ qdr_route_path_t path,
+ qd_address_treatment_t treatment,
+ qd_parsed_field_t *addr_field,
+ qd_parsed_field_t *route_addr_field,
+ qdr_route_config_t **_route)
+{
+ const char *error = 0;
+
+ qdr_route_config_t *route = new_qdr_route_config_t();
+ ZERO(route);
+
+ if (name)
+ route->name = (char*) qd_field_iterator_copy(name);
+ route->identity = qdr_identifier(core);
+ route->path = path;
+ route->treatment = treatment;
+
+ switch (path) {
+ case QDR_ROUTE_PATH_DIRECT:
+ error = qdr_configure_address_prefix_CT(core, addr_field, 'Z', treatment, &route->addr_config);
+ break;
+
+ case QDR_ROUTE_PATH_SOURCE:
+ case QDR_ROUTE_PATH_SINK:
+ case QDR_ROUTE_PATH_WAYPOINT:
+ break;
+ }
+
+ if (error) {
+ if (route->name) free(route->name);
+ free_qdr_route_config_t(route);
+ } else {
+ DEQ_INSERT_TAIL(core->route_config, route);
+ *_route = route;
+ }
+
+ return error;
+}
+
+
+void qdr_route_delete_CT(qdr_route_config_t *route)
+{
+}
+
+
+void qdr_route_connection_add_CT(qdr_route_config_t *route,
+ qd_parsed_field_t *conn_id,
+ bool is_container)
+{
+}
+
+
+void qdr_route_connection_delete_CT(qdr_route_config_t *route,
+ qd_parsed_field_t *conn_id,
+ bool is_container)
+{
+}
+
+
+void qdr_route_connection_kill_CT(qdr_route_config_t *route,
+ qd_parsed_field_t *conn_id,
+ bool is_container)
+{
+}
+
+
+void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn)
+{
+}
+
+
+void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn)
+{
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/route_control.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h
new file mode 100644
index 0000000..7e86721
--- /dev/null
+++ b/src/router_core/route_control.h
@@ -0,0 +1,50 @@
+#ifndef qd_router_core_route_control
+#define qd_router_core_route_control 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+const char *qdr_route_create_CT(qdr_core_t *core,
+ qd_field_iterator_t *name,
+ qdr_route_path_t path,
+ qd_address_treatment_t treatment,
+ qd_parsed_field_t *addr_field,
+ qd_parsed_field_t *route_addr_field,
+ qdr_route_config_t **route);
+
+void qdr_route_delete_CT(qdr_route_config_t *route);
+
+void qdr_route_connection_add_CT(qdr_route_config_t *route,
+ qd_parsed_field_t *conn_id,
+ bool is_container);
+
+void qdr_route_connection_delete_CT(qdr_route_config_t *route,
+ qd_parsed_field_t *conn_id,
+ bool is_container);
+
+void qdr_route_connection_kill_CT(qdr_route_config_t *route,
+ qd_parsed_field_t *conn_id,
+ bool is_container);
+
+void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn);
+
+void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn);
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index db139a9..0529a84 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -171,7 +171,8 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
{
DEQ_INIT(core->addrs);
DEQ_INIT(core->routers);
- core->addr_hash = qd_hash(10, 32, 0);
+ core->addr_hash = qd_hash(12, 32, 0);
+ core->conn_id_hash = qd_hash(6, 4, 0);
if (core->router_mode == QD_ROUTER_MODE_INTERIOR) {
core->hello_addr = qdr_add_local_address_CT(core, 'L', "qdhello", QD_TREATMENT_MULTICAST_FLOOD);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index d4a937d..4603555 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -30,7 +30,6 @@ ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
ALLOC_DEFINE(qdr_general_work_t);
-ALLOC_DEFINE(qdr_route_t);
static void qdr_general_handler(void *context);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/dc675a38/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 8dc2fac..e58e31d 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -33,7 +33,9 @@ typedef struct qdr_link_ref_t qdr_link_ref_t;
typedef struct qdr_lrp_t qdr_lrp_t;
typedef struct qdr_lrp_ref_t qdr_lrp_ref_t;
typedef struct qdr_forwarder_t qdr_forwarder_t;
-typedef struct qdr_route_t qdr_route_t;
+typedef struct qdr_route_config_t qdr_route_config_t;
+typedef struct qdr_route_active_t qdr_route_active_t;
+typedef struct qdr_conn_identifier_t qdr_conn_identifier_t;
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -81,6 +83,7 @@ struct qdr_action_t {
//
struct {
qdr_connection_t *conn;
+ const char *label;
qdr_link_t *link;
qdr_delivery_t *delivery;
qd_message_t *msg;
@@ -147,7 +150,7 @@ struct qdr_query_t {
qdr_field_t *next_key;
int next_offset;
bool more;
- const qd_amqp_error_t *status;
+ qd_amqp_error_t status;
};
ALLOC_DECLARE(qdr_query_t);
@@ -383,7 +386,7 @@ struct qdr_connection_t {
void *user_context;
bool incoming;
qdr_connection_role_t role;
- const char *label;
+ qdr_conn_identifier_t *conn_id;
bool strip_annotations_in;
bool strip_annotations_out;
int mask_bit;
@@ -398,30 +401,55 @@ ALLOC_DECLARE(qdr_connection_t);
DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
typedef enum {
- QDR_ROUTE_TYPE_ADDRESS,
- QDR_ROUTE_TYPE_LINK_DEST,
- QDR_ROUTE_TYPE_WAYPOINT
-} qdr_route_type_t;
-
-struct qdr_route_t {
- DEQ_LINKS(qdr_route_t);
- char *name;
- uint64_t identity;
- qdr_route_type_t object_type;
- qdr_address_config_t *addr_config;
- qdr_address_t *addr;
- qdr_address_t *ingress_addr;
- qdr_address_t *egress_addr;
- bool direction_in;
- bool direction_out;
- qd_address_treatment_t treatment;
- qd_address_treatment_t ingress_treatment;
- qd_address_treatment_t egress_treatment;
- char *connector_label;
+ QDR_ROUTE_PATH_DIRECT,
+ QDR_ROUTE_PATH_SOURCE,
+ QDR_ROUTE_PATH_SINK,
+ QDR_ROUTE_PATH_WAYPOINT
+} qdr_route_path_t;
+
+typedef enum {
+ QDR_ROUTE_STATE_DOWN,
+ QDR_ROUTE_STATE_UP,
+ QDR_ROUTE_STATE_QUIESCING
+} qdr_route_state_t;
+
+struct qdr_route_active_t {
+ DEQ_LINKS(qdr_route_active_t);
+ DEQ_LINKS_N(REF, qdr_route_active_t);
+ qdr_route_config_t *config;
+ qdr_conn_identifier_t *conn_id;
+ qd_direction_t dir;
+ qdr_route_state_t state;
+ qdr_link_t *link;
};
-ALLOC_DECLARE(qdr_route_t);
-DEQ_DECLARE(qdr_route_t, qdr_route_list_t);
+ALLOC_DECLARE(qdr_route_active_t);
+DEQ_DECLARE(qdr_route_active_t, qdr_route_active_list_t);
+
+struct qdr_route_config_t {
+ DEQ_LINKS(qdr_route_config_t);
+ char *name;
+ uint64_t identity;
+ qdr_route_path_t path;
+ qdr_address_config_t *addr_config;
+ qdr_address_t *addr;
+ qdr_address_t *ingress_addr;
+ qdr_address_t *egress_addr;
+ qd_address_treatment_t treatment;
+
+ qdr_route_active_list_t active_list;
+};
+
+ALLOC_DECLARE(qdr_route_config_t);
+DEQ_DECLARE(qdr_route_config_t, qdr_route_config_list_t);
+
+struct qdr_conn_identifier_t {
+ qd_hash_handle_t *hash_handle;
+ qdr_connection_t *open_connection;
+ qdr_route_active_list_t active_refs;
+};
+
+ALLOC_DECLARE(qdr_conn_identifier_t);
struct qdr_core_t {
@@ -437,7 +465,7 @@ struct qdr_core_t {
qdr_general_work_list_t work_list;
qd_timer_t *work_timer;
- qdr_route_list_t routes;
+ qdr_route_config_list_t route_config;
qdr_connection_list_t open_connections;
qdr_link_list_t open_links;
@@ -498,6 +526,8 @@ struct qdr_core_t {
uint64_t next_identifier;
sys_mutex_t *id_lock;
+ qd_hash_t *conn_id_hash;
+
qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1];
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org