You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/12/16 19:21:44 UTC
qpid-dispatch git commit: DISPATCH-529 - Added support for
vhost/multi-tenancy - Modified iterator module to support a namespace-prefix
- Added boolean flag to the listener configuration to enable multi-tenancy -
Added logic to handle address-annot
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 85372a6ae -> 21a32cad7
DISPATCH-529 - Added support for vhost/multi-tenancy
- Modified iterator module to support a namespace-prefix
- Added boolean flag to the listener configuration to enable multi-tenancy
- Added logic to handle address-annotation where needed in all use cases
- Added a suite of tests for the new feature
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/21a32cad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/21a32cad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/21a32cad
Branch: refs/heads/master
Commit: 21a32cad7f5c0d147733426c9573e42e36397e7e
Parents: 85372a6
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Dec 16 13:45:09 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Dec 16 13:54:27 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/iterator.h | 9 +-
include/qpid/dispatch/router_core.h | 23 +-
include/qpid/dispatch/server.h | 6 +
python/qpid_dispatch/management/qdrouter.json | 7 +
src/connection_manager.c | 1 +
src/iterator.c | 53 +-
src/router_core/connections.c | 65 +-
src/router_core/route_control.c | 2 +-
src/router_core/router_core_private.h | 4 +-
src/router_core/terminus.c | 40 +-
src/router_core/transfer.c | 6 +-
src/router_node.c | 46 +-
tests/CMakeLists.txt | 3 +-
tests/field_test.c | 82 ++-
tests/system_tests_multi_tenancy.py | 754 +++++++++++++++++++++
15 files changed, 1061 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index ad7b7f9..a0ace4b 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -90,12 +90,19 @@ typedef struct qd_iterator_t qd_iterator_t;
* <my_area>/<router>
* R^^^^^^^^
*
+ * ITER_VIEW_ADDRESS_WITH_SPACE
+ * Same as ADDRESS_HASH but:
+ * - Does not show the prefix/phase
+ * - Does not hash-ize local and topological addresses
+ * - Does not show namespace on local and topological addresses
+ *
*/
typedef enum {
ITER_VIEW_ALL,
ITER_VIEW_ADDRESS_NO_HOST,
ITER_VIEW_ADDRESS_HASH,
- ITER_VIEW_NODE_HASH
+ ITER_VIEW_NODE_HASH,
+ ITER_VIEW_ADDRESS_WITH_SPACE
} qd_iterator_view_t;
/** @} */
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index cb2d319..6a00925 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -157,6 +157,7 @@ typedef enum {
* @param strip_annotations_in True if configured to remove annotations on inbound messages.
* @param strip_annotations_out True if configured to remove annotations on outbound messages.
* @param link_capacity The capacity, in deliveries, for links in this connection.
+ * @param vhost If non-null, this is the vhost of the connection to be used for multi-tenancy.
* @return Pointer to a connection object that can be used to refer to this connection over its lifetime.
*/
qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
@@ -168,7 +169,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
const char *remote_container_id,
bool strip_annotations_in,
bool strip_annotations_out,
- int link_capacity);
+ int link_capacity,
+ const char *vhost);
/**
* qdr_connection_closed
@@ -196,6 +198,14 @@ void qdr_connection_set_context(qdr_connection_t *conn, void *context);
void *qdr_connection_get_context(const qdr_connection_t *conn);
/**
+ * qdr_connection_get_tenant_space
+ *
+ * Retrieve the multi-tenant space for a connection. Returns 0 if there is
+ * no multi-tenancy on this connection.
+ */
+const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *len);
+
+/**
* qdr_connection_process
*
* Allow the core to process work associated with this connection.
@@ -313,6 +323,7 @@ bool qdr_terminus_is_dynamic(qdr_terminus_t *term);
* @param addr An AMQP address (null-terminated string)
*/
void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr);
+void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr);
/**
* qdr_terminus_get_address
@@ -336,6 +347,16 @@ qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term);
*/
qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term);
+/**
+ * qdr_terminus_set_dnp_address_iterator
+ *
+ * Overwrite the dynamic-node-properties.address in the terminus
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @param iter An iterator whos view shall be placed in the dnp.address
+ */
+void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *iter);
+
/**
******************************************************************************
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 8544f81..0ca2bdd 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -405,6 +405,12 @@ typedef struct qd_server_config_t {
bool allow_redirect;
/**
+ * MultiTenancy support. If true, the vhost is used to define the address space of
+ * addresses used over this connection.
+ */
+ bool multi_tenant;
+
+ /**
* The specified role of the connection. This can be used to control the behavior and
* capabilities of the connections.
*/
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e666455..584b9c4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -669,6 +669,13 @@
"required": false,
"description": "The capacity of links within this connection, in terms of message deliveries. The capacity is the number of messages that can be in-flight concurrently for each link."
},
+ "multiTenant": {
+ "type": "boolean",
+ "create": true,
+ "required": false,
+ "default": false,
+ "description": "If true, apply multi-tenancy to endpoints connected at this listener. The address space is defined by the virtual host (hostname field in the Open)."
+ },
"addr": {
"description":"(DEPRECATED)IP address: ipv4 or ipv6 literal or a host name. This attribute has been deprecated. Use host instead",
"deprecated": true,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 8f5314c..606f731 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -251,6 +251,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
config->sasl_mechanisms = qd_entity_opt_string(entity, "saslMechanisms", 0); CHECK();
config->ssl_profile = qd_entity_opt_string(entity, "sslProfile", 0); CHECK();
config->link_capacity = qd_entity_opt_long(entity, "linkCapacity", 0); CHECK();
+ config->multi_tenant = qd_entity_opt_bool(entity, "multiTenant", false); CHECK();
set_config_host(config, entity);
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 5f2f30a..d922b29 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -70,6 +70,7 @@ struct qd_iterator_t {
const char *space;
int space_length;
int space_cursor;
+ bool view_space;
};
ALLOC_DECLARE(qd_iterator_t);
@@ -101,9 +102,17 @@ static void parse_address_view(qd_iterator_t *iter)
// in order to aid the router in looking up addresses.
//
+ pointer_t save_pointer = iter->view_pointer;
iter->annotation_length = 1;
if (iter->prefix_override == '\0' && qd_iterator_prefix(iter, "_")) {
+ if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+ iter->view_pointer = save_pointer;
+ iter->view_space = false;
+ iter->annotation_length = 0;
+ return;
+ }
+
if (qd_iterator_prefix(iter, "local/")) {
iter->prefix = 'L';
iter->state = STATE_AT_PREFIX;
@@ -137,10 +146,26 @@ static void parse_address_view(qd_iterator_t *iter)
iter->prefix = iter->prefix_override ? iter->prefix_override : 'M';
iter->state = STATE_AT_PREFIX;
+ iter->view_space = true;
iter->annotation_length = iter->space_length + (iter->prefix == 'M' ? 2 : 1);
}
+static void adjust_address_with_space(qd_iterator_t *iter)
+{
+ //
+ // Convert an ADDRESS_HASH view to an ADDRESS_WITH_SPACE view
+ //
+ if (iter->view_space) {
+ iter->annotation_length -= iter->prefix == 'M' ? 2 : 1;
+ iter->state = iter->space ? STATE_IN_SPACE : STATE_IN_BODY;
+ } else {
+ iter->annotation_length = 0;
+ iter->state = STATE_IN_BODY;
+ }
+}
+
+
static void parse_node_view(qd_iterator_t *iter)
{
//
@@ -192,6 +217,7 @@ static void view_initialize(qd_iterator_t *iter)
iter->mode = MODE_TO_END;
iter->annotation_length = 0;
iter->annotation_remaining = 0;
+ iter->view_space = false;
if (iter->view == ITER_VIEW_ALL)
return;
@@ -270,9 +296,11 @@ static void view_initialize(qd_iterator_t *iter)
if (iter->view == ITER_VIEW_ADDRESS_NO_HOST)
return;
- if (iter->view == ITER_VIEW_ADDRESS_HASH) {
- qd_iterator_remove_trailing_separator(iter); // FIXME - need this?
+ if (iter->view == ITER_VIEW_ADDRESS_HASH || iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+ qd_iterator_remove_trailing_separator(iter);
parse_address_view(iter);
+ if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE)
+ adjust_address_with_space(iter);
return;
}
@@ -405,8 +433,15 @@ void qd_iterator_reset(qd_iterator_t *iter)
{
if (iter) {
iter->view_pointer = iter->view_start_pointer;
- iter->state = iter->prefix ? STATE_AT_PREFIX : STATE_IN_BODY;
iter->annotation_remaining = iter->annotation_length;
+
+ if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+ if (iter->space && iter->view_space) {
+ iter->state = STATE_IN_SPACE;
+ iter->space_cursor = 0;
+ }
+ } else
+ iter->state = iter->prefix ? STATE_AT_PREFIX : STATE_IN_BODY;
}
}
@@ -469,8 +504,12 @@ void qd_iterator_annotate_space(qd_iterator_t *iter, const char* space, int spac
if (iter) {
iter->space = space;
iter->space_length = space_length;
- if (iter->view == ITER_VIEW_ADDRESS_HASH)
- iter->annotation_length = space_length + (iter->prefix == 'M' ? 2 : 1);
+ if (iter->view == ITER_VIEW_ADDRESS_HASH)
+ iter->annotation_length = (iter->view_space ? space_length : 0) + (iter->prefix == 'M' ? 2 : 1);
+ else if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+ if (iter->view_space)
+ iter->annotation_length = space_length;
+ }
}
}
@@ -481,14 +520,14 @@ unsigned char qd_iterator_octet(qd_iterator_t *iter)
return 0;
if (iter->state == STATE_AT_PREFIX) {
- iter->state = iter->prefix == 'M' ? STATE_AT_PHASE : (iter->space ? STATE_IN_SPACE : STATE_IN_BODY);
+ iter->state = iter->prefix == 'M' ? STATE_AT_PHASE : (iter->view_space && iter->space) ? STATE_IN_SPACE : STATE_IN_BODY;
iter->space_cursor = 0;
iter->annotation_remaining--;
return iter->prefix;
}
if (iter->state == STATE_AT_PHASE) {
- iter->state = iter->space ? STATE_IN_SPACE : STATE_IN_BODY;
+ iter->state = (iter->view_space && iter->space) ? STATE_IN_SPACE : STATE_IN_BODY;
iter->space_cursor = 0;
iter->annotation_remaining--;
return iter->phase;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index ddcde3c..acb23e8 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -64,7 +64,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
const char *remote_container_id,
bool strip_annotations_in,
bool strip_annotations_out,
- int link_capacity)
+ int link_capacity,
+ const char *vhost)
{
qdr_action_t *action = qdr_action(qdr_connection_opened_CT, "connection_opened");
qdr_connection_t *conn = new_qdr_connection_t();
@@ -84,6 +85,13 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
DEQ_INIT(conn->work_list);
conn->work_lock = sys_mutex();
+ if (vhost) {
+ conn->tenant_space_len = strlen(vhost) + 1;
+ conn->tenant_space = (char*) malloc(conn->tenant_space_len + 1);
+ strcpy(conn->tenant_space, vhost);
+ strcat(conn->tenant_space, "/");
+ }
+
action->args.connection.conn = conn;
action->args.connection.connection_label = qdr_field(label);
action->args.connection.container_id = qdr_field(remote_container_id);
@@ -114,6 +122,13 @@ void *qdr_connection_get_context(const qdr_connection_t *conn)
}
+const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *len)
+{
+ *len = conn ? conn->tenant_space_len : 0;
+ return conn ? conn->tenant_space : 0;
+}
+
+
int qdr_connection_process(qdr_connection_t *conn)
{
qdr_connection_work_list_t work_list;
@@ -692,7 +707,7 @@ static char qdr_prefix_for_dir(qd_direction_t dir)
}
-qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qd_iterator_t *iter, int *in_phase, int *out_phase)
+qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase)
{
qdr_address_config_t *addr = 0;
@@ -701,6 +716,8 @@ qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qd_iterato
// specific match
//
qd_iterator_annotate_prefix(iter, 'Z');
+ if (conn && conn->tenant_space)
+ qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
qd_iterator_annotate_prefix(iter, '\0');
if (in_phase) *in_phase = addr ? addr->in_phase : 0;
@@ -795,18 +812,20 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
*
* @param core Pointer to the core object
* @param dir Direction of the link for the terminus
+ * @param conn The connection over which the terminus was attached
* @param terminus The terminus containing the addressing information to be looked up
* @param create_if_not_found Iff true, return a pointer to a newly created address record
* @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
* @param [out] link_route True iff the lookup indicates that an attach should be routed
* @return Pointer to an address record or 0 if none is found
*/
-static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
- qd_direction_t dir,
- qdr_terminus_t *terminus,
- bool create_if_not_found,
- bool accept_dynamic,
- bool *link_route)
+static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
+ qd_direction_t dir,
+ qdr_connection_t *conn,
+ qdr_terminus_t *terminus,
+ bool create_if_not_found,
+ bool accept_dynamic,
+ bool *link_route)
{
qdr_address_t *addr = 0;
@@ -825,7 +844,19 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
if (dnp_address) {
qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_HASH);
qd_iterator_annotate_prefix(dnp_address, qdr_prefix_for_dir(dir));
+ if (conn->tenant_space)
+ qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
qd_hash_retrieve_prefix(core->addr_hash, dnp_address, (void**) &addr);
+
+ if (addr && conn->tenant_space) {
+ //
+ // If this link is in a tenant space, translate the dnp address to
+ // the fully-qualified view
+ //
+ qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
+ qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
+ }
+
qd_iterator_free(dnp_address);
*link_route = true;
return addr;
@@ -875,9 +906,20 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
qd_iterator_t *iter = qdr_terminus_get_address(terminus);
qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
qd_iterator_annotate_prefix(iter, qdr_prefix_for_dir(dir));
+ if (conn->tenant_space)
+ qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
if (addr) {
*link_route = true;
+
+ //
+ // If this link is in a tenant space, translate the terminus address to
+ // the fully-qualified view
+ //
+ if (conn->tenant_space) {
+ qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+ qdr_terminus_set_address_iterator(terminus, iter);
+ }
return addr;
}
@@ -887,7 +929,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
int in_phase;
int out_phase;
int addr_phase;
- qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, iter, &in_phase, &out_phase);
+ qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase);
qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
@@ -1027,6 +1069,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
DEQ_REMOVE(core->open_connections, conn);
sys_mutex_free(conn->work_lock);
+ free(conn->tenant_space);
free_qdr_connection_t(conn);
}
@@ -1088,7 +1131,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// This link has a target address
//
bool link_route;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, target, true, false, &link_route);
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, false, &link_route);
if (!addr) {
//
// No route to this destination, reject the link
@@ -1145,7 +1188,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
switch (link->link_type) {
case QD_LINK_ENDPOINT: {
bool link_route;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, source, true, true, &link_route);
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route);
if (!addr) {
//
// No route to this destination, reject the link
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 0d0311a..b109e03 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -279,7 +279,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core,
qd_hash_retrieve(core->addr_hash, iter, (void*) &al->addr);
if (!al->addr) {
- al->addr = qdr_address_CT(core, qdr_treatment_for_address_CT(core, iter, 0, 0));
+ al->addr = qdr_address_CT(core, qdr_treatment_for_address_CT(core, 0, iter, 0, 0));
DEQ_INSERT_TAIL(core->addrs, al->addr);
qd_hash_insert(core->addr_hash, iter, al->addr, &al->addr->hash_handle);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5962a17..f3b7d87 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -460,6 +460,8 @@ struct qdr_connection_t {
qdr_link_ref_list_t links;
qdr_link_ref_list_t links_with_deliveries;
qdr_link_ref_list_t links_with_credit;
+ char *tenant_space;
+ int tenant_space_len;
};
ALLOC_DECLARE(qdr_connection_t);
@@ -629,7 +631,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
-qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qd_iterator_t *iter, int *in_phase, int *out_phase);
+qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);
qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter);
void qdr_connection_enqueue_work_CT(qdr_core_t *core,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 114d736..1587047 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -146,6 +146,14 @@ void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr)
}
+void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr)
+{
+ qdr_field_t *old = term->address;
+ term->address = qdr_field_from_iter(addr);
+ qdr_field_free(old);
+}
+
+
qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
{
if (qdr_terminus_is_anonymous(term))
@@ -158,7 +166,6 @@ qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term)
{
pn_data_t *props = term->properties;
-
if (!props)
return 0;
@@ -178,3 +185,34 @@ qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term)
}
+void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *iter)
+{
+ char buffer[1000];
+ char *text = buffer;
+ bool on_heap = false;
+ pn_data_t *old = term->properties;
+
+ if (!old)
+ return;
+
+ if (qd_iterator_length(iter) < 1000)
+ qd_iterator_ncopy(iter, (unsigned char*) text, 1000);
+ else {
+ text = (char*) qd_iterator_copy(iter);
+ on_heap = true;
+ }
+
+ pn_data_t *new = pn_data(pn_data_size(old));
+ pn_data_put_map(new);
+ pn_data_enter(new);
+ pn_data_put_symbol(new, pn_bytes(strlen(QD_DYNAMIC_NODE_PROPERTY_ADDRESS), QD_DYNAMIC_NODE_PROPERTY_ADDRESS));
+ pn_data_put_string(new, pn_bytes(strlen(text), text));
+ pn_data_exit(new);
+
+ term->properties = new;
+ pn_data_free(old);
+
+ if (on_heap)
+ free(text);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 2595a70..1ffa539 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -591,8 +591,12 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
if (DEQ_IS_EMPTY(link->undelivered)) {
qdr_address_t *addr = link->owning_addr;
- if (!addr && dlv->to_addr)
+ if (!addr && dlv->to_addr) {
+ qdr_connection_t *conn = link->conn;
+ if (conn && conn->tenant_space)
+ qd_iterator_annotate_space(dlv->to_addr, conn->tenant_space, conn->tenant_space_len);
qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
+ }
//
// Give the action reference to the qdr_link_forward function.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index abf80f4..8a6ae19 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -45,6 +45,7 @@ static void qd_router_connection_get_config(const qd_connection_t *conn,
qdr_connection_role_t *role,
int *cost,
const char **name,
+ bool *multi_tenant,
bool *strip_annotations_in,
bool *strip_annotations_out,
int *link_capacity)
@@ -73,6 +74,8 @@ static void qd_router_connection_get_config(const qd_connection_t *conn,
strncmp("connector/", *name, 10) == 0)
*name = 0;
}
+
+ *multi_tenant = cf ? cf->multi_tenant : false;
}
}
@@ -270,8 +273,11 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
// If the user is not allowed to proxy the user_id then the message user_id
// must be blank or it must be equal to the connection user name.
//
- bool check_user = false;
- qd_connection_t *conn = qd_link_connection(link);
+ bool check_user = false;
+ qd_connection_t *conn = qd_link_connection(link);
+ qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+ int tenant_space_len;
+ const char *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len);
if (conn->policy_settings)
check_user = !conn->policy_settings->allowUserIdProxy;
@@ -332,9 +338,22 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
//
// Still no destination address? Use the TO field from the message properties.
//
- if (!addr_iter)
+ if (!addr_iter) {
addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
+ //
+ // If the address came from the TO field and we need to apply a tenant-space,
+ // set the to-override with the annotated address.
+ //
+ if (addr_iter && tenant_space) {
+ qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+ qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_len);
+ qd_composed_field_t *to_override = qd_compose_subfield(0);
+ qd_compose_insert_string_iterator(to_override, addr_iter);
+ qd_message_set_to_override_annotation(msg, to_override);
+ }
+ }
+
if (addr_iter) {
qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
if (phase > 0)
@@ -343,13 +362,22 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
link_exclusions);
}
} else {
+ //
+ // This is a targeted link, not anonymous.
+ //
const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link));
if (!term_addr)
term_addr = pn_terminus_get_address(qd_link_source(link));
if (term_addr) {
qd_composed_field_t *to_override = qd_compose_subfield(0);
- qd_compose_insert_string(to_override, term_addr);
+ if (tenant_space) {
+ qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE);
+ qd_iterator_annotate_space(aiter, tenant_space, tenant_space_len);
+ qd_compose_insert_string_iterator(to_override, aiter);
+ qd_iterator_free(aiter);
+ } else
+ qd_compose_insert_string(to_override, term_addr);
qd_message_set_to_override_annotation(msg, to_override);
int phase = qdr_link_phase(rlink);
if (phase != 0)
@@ -557,10 +585,12 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
bool strip_annotations_out = false;
int link_capacity = 1;
const char *name = 0;
+ bool multi_tenant = false;
+ const char *vhost = 0;
uint64_t connection_id = qd_connection_connection_id(conn);
pn_connection_t *pn_conn = qd_connection_pn(conn);
- qd_router_connection_get_config(conn, &role, &cost, &name,
+ qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant,
&strip_annotations_in, &strip_annotations_out, &link_capacity);
if (role == QDR_ROLE_INTER_ROUTER) {
@@ -595,9 +625,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
cost = remote_cost;
}
+ if (multi_tenant)
+ vhost = pn_connection_remote_hostname(pn_conn);
+
qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, inbound, role, cost, connection_id, name,
pn_connection_remote_container(pn_conn),
- strip_annotations_in, strip_annotations_out, link_capacity);
+ strip_annotations_in, strip_annotations_out, link_capacity,
+ vhost);
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 95840f6..d2c6159 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -86,7 +86,8 @@ foreach(py_test_module
system_tests_user_id_proxy
system_tests_deprecated
system_tests_two_routers
- system_tests_three_routers)
+ system_tests_three_routers
+ system_tests_multi_tenancy)
add_test(${py_test_module} ${TEST_WRAP} -m unittest -v ${py_test_module})
list(APPEND SYSTEM_TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/${py_test_module}.py)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/tests/field_test.c
----------------------------------------------------------------------
diff --git a/tests/field_test.c b/tests/field_test.c
index 82e512b..508e357 100644
--- a/tests/field_test.c
+++ b/tests/field_test.c
@@ -281,6 +281,71 @@ static char* test_view_address_hash(void *context)
}
+static char* test_view_address_with_space(void *context)
+{
+ struct {const char *addr; const char *view;} cases[] = {
+ {"amqp:/_local/my-addr/sub", "_local/my-addr/sub"},
+ {"amqp:/_local/my-addr", "_local/my-addr"},
+ {"amqp:/_topo/area/router/local/sub", "_topo/area/router/local/sub"},
+ {"amqp:/_topo/my-area/router/local/sub", "_topo/my-area/router/local/sub"},
+ {"amqp:/_topo/my-area/my-router/local/sub", "_topo/my-area/my-router/local/sub"},
+ {"amqp:/_topo/area/all/local/sub", "_topo/area/all/local/sub"},
+ {"amqp:/_topo/my-area/all/local/sub", "_topo/my-area/all/local/sub"},
+ {"amqp:/_topo/all/all/local/sub", "_topo/all/all/local/sub"},
+ {"amqp://host:port/_local/my-addr", "_local/my-addr"},
+ {"_topo/area/router/my-addr", "_topo/area/router/my-addr"},
+ {"_topo/my-area/router/my-addr", "_topo/my-area/router/my-addr"},
+ {"_topo/my-area/my-router/my-addr", "_topo/my-area/my-router/my-addr"},
+ {"_topo/my-area/router", "_topo/my-area/router"},
+ {"amqp:/mobile", "space/mobile"},
+ {"mobile", "space/mobile"},
+ {"/mobile", "space/mobile"},
+
+ // Re-run the above tests to make sure trailing dots are ignored.
+ {"amqp:/_local/my-addr/sub.", "_local/my-addr/sub"},
+ {"amqp:/_local/my-addr.", "_local/my-addr"},
+ {"amqp:/_topo/area/router/local/sub.", "_topo/area/router/local/sub"},
+ {"amqp:/_topo/my-area/router/local/sub.", "_topo/my-area/router/local/sub"},
+ {"amqp:/_topo/my-area/my-router/local/sub.", "_topo/my-area/my-router/local/sub"},
+ {"amqp:/_topo/area/all/local/sub.", "_topo/area/all/local/sub"},
+ {"amqp:/_topo/my-area/all/local/sub.", "_topo/my-area/all/local/sub"},
+ {"amqp:/_topo/all/all/local/sub.", "_topo/all/all/local/sub"},
+ {"amqp://host:port/_local/my-addr.", "_local/my-addr"},
+ {"_topo/area/router/my-addr.", "_topo/area/router/my-addr"},
+ {"_topo/my-area/router/my-addr.", "_topo/my-area/router/my-addr"},
+ {"_topo/my-area/my-router/my-addr.", "_topo/my-area/my-router/my-addr"},
+ {"_topo/my-area/router.", "_topo/my-area/router"},
+ {"_topo/my-area/router:", "_topo/my-area/router:"},
+
+ {0, 0}
+ };
+ int idx;
+
+ for (idx = 0; cases[idx].addr; idx++) {
+ qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_WITH_SPACE);
+ qd_iterator_annotate_space(iter, "space/", 6);
+ char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+ qd_iterator_free(iter);
+ if (ret) return ret;
+ }
+
+ for (idx = 0; cases[idx].addr; idx++) {
+ qd_buffer_list_t chain;
+ DEQ_INIT(chain);
+ build_buffer_chain(&chain, cases[idx].addr, 3);
+ qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(chain), 0,
+ strlen(cases[idx].addr),
+ ITER_VIEW_ADDRESS_WITH_SPACE);
+ qd_iterator_annotate_space(iter, "space/", 6);
+ char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+ release_buffer_chain(&chain);
+ if (ret) return ret;
+ }
+
+ return 0;
+}
+
+
static char* test_view_address_hash_override(void *context)
{
struct {const char *addr; const char *view;} cases[] = {
@@ -308,13 +373,15 @@ static char* test_view_address_hash_override(void *context)
}
-static char* test_view_address_with_space(void *context)
+static char* test_view_address_hash_with_space(void *context)
{
struct {const char *addr; const char *view;} cases[] = {
{"amqp:/link-target", "M0test.vhost.link-target"},
{"amqp:/domain/link-target", "M0test.vhost.domain/link-target"},
{"domain/link-target", "M0test.vhost.domain/link-target"},
{"bbc79fb3-e1fd-4a08-92b2-9a2de232b558", "M0test.vhost.bbc79fb3-e1fd-4a08-92b2-9a2de232b558"},
+ {"_topo/my-area/router/address", "Rrouter"},
+ {"_topo/my-area/my-router/address", "Laddress"},
{0, 0}
};
int idx;
@@ -324,13 +391,9 @@ static char* test_view_address_with_space(void *context)
qd_iterator_annotate_space(iter, "test.vhost.", 11);
if (!qd_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
char *got = (char*) qd_iterator_copy(iter);
- snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed. Expected '%s', got '%s'",
- cases[idx].addr, cases[idx].view, got);
- return fail_text;
- }
- if (qd_iterator_length(iter) != strlen(cases[idx].view)) {
- snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed. Length %d, iter_length returned %d",
- cases[idx].addr, (int) strlen(cases[idx].view), (int) qd_iterator_length(iter));
+ snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed. Expected '%s', got '%s' (len: %d)",
+ cases[idx].addr, cases[idx].view, got, qd_iterator_length(iter));
+ free(got);
return fail_text;
}
qd_iterator_free(iter);
@@ -873,8 +936,9 @@ int field_tests(void)
TEST_CASE(test_trim, 0);
TEST_CASE(test_sub_iterator, 0);
TEST_CASE(test_view_address_hash, 0);
- TEST_CASE(test_view_address_hash_override, 0);
TEST_CASE(test_view_address_with_space, 0);
+ TEST_CASE(test_view_address_hash_override, 0);
+ TEST_CASE(test_view_address_hash_with_space, 0);
TEST_CASE(test_view_node_hash, 0);
TEST_CASE(test_field_advance_string, 0);
TEST_CASE(test_field_advance_buffer, 0);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/tests/system_tests_multi_tenancy.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_multi_tenancy.py b/tests/system_tests_multi_tenancy.py
new file mode 100644
index 0000000..cd34fd0
--- /dev/null
+++ b/tests/system_tests_multi_tenancy.py
@@ -0,0 +1,754 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+import time
+
+# PROTON-828:
+try:
+ from proton import MODIFIED
+except ImportError:
+ from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+class RouterTest(TestCase):
+
+ inter_router_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(RouterTest, cls).setUpClass()
+
+ def router(name, connection):
+
+ config = [
+ ('router', {'mode': 'interior', 'id': name}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+ ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'in', 'containerId': 'LRC'}),
+ ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'out', 'containerId': 'LRC'}),
+ ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'in'}),
+ ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'out'}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
+ connection
+ ]
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+
+ router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
+ router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostName': 'no'}))
+
+ cls.routers[0].wait_router_connected('B')
+ cls.routers[1].wait_router_connected('A')
+
+
+ def test_01_one_router_targeted_sender_no_tenant(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ "anything/addr_01",
+ "anything/addr_01",
+ self.routers[0].addresses[0],
+ "M0anything/addr_01")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_02_one_router_targeted_sender_tenant_on_sender(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[0],
+ "addr_02",
+ "0.0.0.0/addr_02",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_02")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_03_one_router_targeted_sender_tenant_on_receiver(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[1],
+ "0.0.0.0/addr_03",
+ "addr_03",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_03")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_04_one_router_targeted_sender_tenant_on_both(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[1],
+ "addr_04",
+ "addr_04",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_04")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_05_two_router_targeted_sender_no_tenant(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ "0.0.0.0/addr_05",
+ "0.0.0.0/addr_05",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_05")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_06_two_router_targeted_sender_tenant_on_sender(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[0],
+ "addr_06",
+ "0.0.0.0/addr_06",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_06")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_07_two_router_targeted_sender_tenant_on_receiver(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[1],
+ "0.0.0.0/addr_07",
+ "addr_07",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_07")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_08_two_router_targeted_sender_tenant_on_both(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[1],
+ "addr_08",
+ "addr_08",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_08")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_09_one_router_anonymous_sender_no_tenant(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ "anything/addr_09",
+ "anything/addr_09",
+ self.routers[0].addresses[0],
+ "M0anything/addr_09",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_10_one_router_anonymous_sender_tenant_on_sender(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[0],
+ "addr_10",
+ "0.0.0.0/addr_10",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_10",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_11_one_router_anonymous_sender_tenant_on_receiver(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[1],
+ "0.0.0.0/addr_11",
+ "addr_11",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_11",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_12_one_router_anonymous_sender_tenant_on_both(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[1],
+ "addr_12",
+ "addr_12",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_12",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_13_two_router_anonymous_sender_no_tenant(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ "anything/addr_13",
+ "anything/addr_13",
+ self.routers[0].addresses[0],
+ "M0anything/addr_13",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_14_two_router_anonymous_sender_tenant_on_sender(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[0],
+ "addr_14",
+ "0.0.0.0/addr_14",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_14",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_15_two_router_anonymous_sender_tenant_on_receiver(self):
+ test = MessageTransferTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[1],
+ "0.0.0.0/addr_15",
+ "addr_15",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_15",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_16_two_router_anonymous_sender_tenant_on_both(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[1],
+ "addr_16",
+ "addr_16",
+ self.routers[0].addresses[0],
+ "M00.0.0.0/addr_16",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_17_one_router_link_route_targeted(self):
+ test = LinkRouteTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[2],
+ "link.addr_17",
+ "0.0.0.0/link.addr_17",
+ False)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_18_one_router_link_route_targeted_no_tenant(self):
+ test = LinkRouteTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[2],
+ "0.0.0.0/link.addr_18",
+ "0.0.0.0/link.addr_18",
+ False)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_19_one_router_link_route_dynamic(self):
+ test = LinkRouteTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[2],
+ "link.addr_19",
+ "0.0.0.0/link.addr_19",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_20_one_router_link_route_dynamic_no_tenant(self):
+ test = LinkRouteTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[2],
+ "0.0.0.0/link.addr_20",
+ "0.0.0.0/link.addr_20",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_21_two_router_link_route_targeted(self):
+ test = LinkRouteTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[2],
+ "link.addr_21",
+ "0.0.0.0/link.addr_21",
+ False)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_22_two_router_link_route_targeted_no_tenant(self):
+ test = LinkRouteTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[2],
+ "0.0.0.0/link.addr_22",
+ "0.0.0.0/link.addr_22",
+ False)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_23_two_router_link_route_dynamic(self):
+ test = LinkRouteTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[2],
+ "link.addr_23",
+ "0.0.0.0/link.addr_23",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_24_two_router_link_route_dynamic_no_tenant(self):
+ test = LinkRouteTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[2],
+ "0.0.0.0/link.addr_24",
+ "0.0.0.0/link.addr_24",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_25_one_router_anonymous_sender_non_mobile(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[0],
+ "_local/addr_25",
+ "_local/addr_25",
+ self.routers[0].addresses[0],
+ "Laddr_25",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_26_one_router_targeted_sender_non_mobile(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[0],
+ "_local/addr_26",
+ "_local/addr_26",
+ self.routers[0].addresses[0],
+ "Laddr_26",
+ False)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_27_two_router_anonymous_sender_non_mobile(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[0],
+ "_topo/0/B/addr_27",
+ "_local/addr_27",
+ self.routers[1].addresses[0],
+ "Laddr_27",
+ True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_28_two_router_targeted_sender_non_mobile(self):
+ test = MessageTransferTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[0],
+ "_topo/0/B/addr_28",
+ "_local/addr_28",
+ self.routers[1].addresses[0],
+ "Laddr_28",
+ False)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_29_one_router_waypoint_no_tenant(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[2],
+ "0.0.0.0/queue.waypoint",
+ "0.0.0.0/queue.waypoint")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_30_one_router_waypoint(self):
+ test = WaypointTest(self.routers[0].addresses[1],
+ self.routers[0].addresses[2],
+ "queue.waypoint",
+ "0.0.0.0/queue.waypoint")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_31_two_router_waypoint_no_tenant(self):
+ test = WaypointTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[2],
+ "0.0.0.0/queue.waypoint",
+ "0.0.0.0/queue.waypoint")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_32_two_router_waypoint(self):
+ test = WaypointTest(self.routers[0].addresses[1],
+ self.routers[1].addresses[2],
+ "queue.waypoint",
+ "0.0.0.0/queue.waypoint")
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+class Entity(object):
+ def __init__(self, status_code, status_description, attrs):
+ self.status_code = status_code
+ self.status_description = status_description
+ self.attrs = attrs
+
+ def __getattr__(self, key):
+ return self.attrs[key]
+
+
+class RouterProxy(object):
+ def __init__(self, reply_addr):
+ self.reply_addr = reply_addr
+
+ def response(self, msg):
+ ap = msg.properties
+ return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+ def read_address(self, name):
+ ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
+ return Message(properties=ap, reply_to=self.reply_addr)
+
+ def query_addresses(self):
+ ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
+ return Message(properties=ap, reply_to=self.reply_addr)
+
+
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
+class MessageTransferTest(MessagingHandler):
+ def __init__(self, sender_host, receiver_host, sender_address, receiver_address, lookup_host, lookup_address, anonymous=False):
+ super(MessageTransferTest, self).__init__()
+ self.sender_host = sender_host
+ self.receiver_host = receiver_host
+ self.sender_address = sender_address
+ self.receiver_address = receiver_address
+ self.lookup_host = lookup_host
+ self.lookup_address = lookup_address
+ self.anonymous = anonymous
+
+ self.sender_conn = None
+ self.receiver_conn = None
+ self.lookup_conn = None
+ self.error = None
+ self.sender = None
+ self.receiver = None
+ self.proxy = None
+
+ self.count = 10
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.n_accepted = 0
+ self.n_released = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_accepted=%d n_released=%d" % (self.n_sent, self.n_rcvd, self.n_accepted, self.n_released)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.lookup_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.receiver_conn = event.container.connect(self.receiver_host)
+ self.lookup_conn = event.container.connect(self.lookup_host)
+ self.reply_receiver = event.container.create_receiver(self.lookup_conn, dynamic=True)
+ self.agent_sender = event.container.create_sender(self.lookup_conn, "$management")
+
+ def send(self):
+ while self.sender.credit > 0 and self.n_sent < self.count:
+ self.n_sent += 1
+ m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+ if self.anonymous:
+ m.address = self.sender_address
+ self.sender.send(m)
+
+ def on_released(self, event):
+ self.n_sent -= 1
+ self.n_released += 1
+
+ def on_link_opened(self, event):
+ if event.receiver == self.reply_receiver:
+ self.proxy = RouterProxy(self.reply_receiver.remote_source.address)
+ self.sender = event.container.create_sender(self.sender_conn, None if self.anonymous else self.sender_address)
+ self.receiver = event.container.create_receiver(self.receiver_conn, self.receiver_address)
+
+ def on_sendable(self, event):
+ if self.n_sent == 0 and self.anonymous:
+ time.sleep(0.3)
+ if event.sender == self.sender:
+ self.send()
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ self.n_rcvd += 1
+ if event.receiver == self.reply_receiver:
+ response = self.proxy.response(event.message)
+ if response.status_code != 200:
+ self.error = "Unexpected error code from agent: %d - %s" % (response.status_code, response.status_description)
+ if self.n_sent != self.count or self.n_rcvd != self.count:
+ self.error = "Unexpected counts: n_sent=%d n_rcvd=%d n_accepted=%d" % (self.n_sent, self.n_rcvd, self.n_accepted)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.lookup_conn.close()
+ self.timer.cancel()
+
+ def on_accepted(self, event):
+ if event.sender == self.sender:
+ self.n_accepted += 1
+ if self.n_accepted == self.count:
+ request = self.proxy.read_address(self.lookup_address)
+ self.agent_sender.send(request)
+
+ def run(self):
+ Container(self).run()
+
+
+class LinkRouteTest(MessagingHandler):
+ def __init__(self, first_host, second_host, first_address, second_address, dynamic):
+ super(LinkRouteTest, self).__init__(prefetch=0)
+ self.first_host = first_host
+ self.second_host = second_host
+ self.first_address = first_address
+ self.second_address = second_address
+ self.dynamic = dynamic
+
+ self.first_conn = None
+ self.second_conn = None
+ self.error = None
+ self.first_sender = None
+ self.first_receiver = None
+ self.second_sender = None
+ self.second_receiver = None
+
+ self.count = 10
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.n_settled = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" % (self.n_sent, self.n_rcvd, self.n_settled)
+ self.first_conn.close()
+ self.second_conn.close()
+
+ def fail(self, text):
+ self.error = text
+ self.second_conn.close()
+ self.first_conn.close()
+ self.timer.cancel()
+
+ def send(self):
+ while self.first_sender.credit > 0 and self.n_sent < self.count:
+ self.n_sent += 1
+ m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+ self.first_sender.send(m)
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.second_conn = event.container.connect(self.second_host)
+
+ def on_connection_opened(self, event):
+ if event.connection == self.first_conn:
+ self.first_sender = event.container.create_sender(self.first_conn, self.first_address)
+ if self.dynamic:
+ self.first_receiver = event.container.create_receiver(self.first_conn,
+ dynamic=True,
+ options=DynamicNodeProperties({"x-opt-qd.address": unicode(self.first_address)}))
+ else:
+ self.first_receiver = event.container.create_receiver(self.first_conn, self.first_address)
+
+ if event.connection == self.second_conn:
+ time.sleep(1)
+ self.first_conn = event.container.connect(self.first_host)
+
+ def on_link_opening(self, event):
+ if event.sender:
+ self.second_sender = event.sender
+ if self.dynamic:
+ if event.sender.remote_source.dynamic:
+ event.sender.source.address = self.second_address
+ event.sender.open()
+ else:
+ self.fail("Expected dynamic source on sender")
+ else:
+ if event.sender.remote_source.address == self.second_address:
+ event.sender.source.address = self.second_address
+ event.sender.open()
+ else:
+ self.fail("Incorrect address on incoming sender: got %s, expected %s" %
+ (event.sender.remote_source.address, self.second_address))
+
+ elif event.receiver:
+ self.second_receiver = event.receiver
+ if event.receiver.remote_target.address == self.second_address:
+ event.receiver.target.address = self.second_address
+ event.receiver.open()
+ else:
+ self.fail("Incorrect address on incoming receiver: got %s, expected %s" %
+ (event.receiver.remote_target.address, self.second_address))
+
+
+ def on_link_opened(self, event):
+ if event.receiver:
+ event.receiver.flow(self.count)
+
+ def on_sendable(self, event):
+ if event.sender == self.first_sender:
+ self.send()
+
+ def on_message(self, event):
+ if event.receiver == self.first_receiver:
+ self.n_rcvd += 1
+
+ def on_settled(self, event):
+ if event.sender == self.first_sender:
+ self.n_settled += 1
+ if self.n_settled == self.count:
+ self.fail(None)
+
+ def run(self):
+ container = Container(self)
+ container.container_id = 'LRC'
+ container.run()
+
+
+class WaypointTest(MessagingHandler):
+ def __init__(self, first_host, second_host, first_address, second_address):
+ super(WaypointTest, self).__init__()
+ self.first_host = first_host
+ self.second_host = second_host
+ self.first_address = first_address
+ self.second_address = second_address
+
+ self.first_conn = None
+ self.second_conn = None
+ self.error = None
+ self.first_sender = None
+ self.first_receiver = None
+ self.waypoint_sender = None
+ self.waypoint_receiver = None
+ self.waypoint_queue = []
+
+ self.count = 10
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.n_thru = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_thru=%d" % (self.n_sent, self.n_rcvd, self.n_thru)
+ self.first_conn.close()
+ self.second_conn.close()
+
+ def fail(self, text):
+ self.error = text
+ self.second_conn.close()
+ self.first_conn.close()
+ self.timer.cancel()
+
+ def send_client(self):
+ while self.first_sender.credit > 0 and self.n_sent < self.count:
+ self.n_sent += 1
+ m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+ self.first_sender.send(m)
+
+ def send_waypoint(self):
+ while self.waypoint_sender.credit > 0 and len(self.waypoint_queue) > 0:
+ self.n_thru += 1
+ m = self.waypoint_queue.pop()
+ self.waypoint_sender.send(m)
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.first_conn = event.container.connect(self.first_host)
+ self.second_conn = event.container.connect(self.second_host)
+
+ def on_connection_opened(self, event):
+ if event.connection == self.first_conn:
+ self.first_sender = event.container.create_sender(self.first_conn, self.first_address)
+ self.first_receiver = event.container.create_receiver(self.first_conn, self.first_address)
+
+ def on_link_opening(self, event):
+ if event.sender:
+ self.waypoint_sender = event.sender
+ if event.sender.remote_source.address == self.second_address:
+ event.sender.source.address = self.second_address
+ event.sender.open()
+ else:
+ self.fail("Incorrect address on incoming sender: got %s, expected %s" %
+ (event.sender.remote_source.address, self.second_address))
+
+ elif event.receiver:
+ self.waypoint_receiver = event.receiver
+ if event.receiver.remote_target.address == self.second_address:
+ event.receiver.target.address = self.second_address
+ event.receiver.open()
+ else:
+ self.fail("Incorrect address on incoming receiver: got %s, expected %s" %
+ (event.receiver.remote_target.address, self.second_address))
+
+
+ def on_sendable(self, event):
+ if event.sender == self.first_sender:
+ self.send_client()
+ elif event.sender == self.waypoint_sender:
+ self.send_waypoint()
+
+ def on_message(self, event):
+ if event.receiver == self.first_receiver:
+ self.n_rcvd += 1
+ if self.n_rcvd == self.count and self.n_thru == self.count:
+ self.fail(None)
+ elif event.receiver == self.waypoint_receiver:
+ m = Message(body=event.message.body)
+ self.waypoint_queue.append(m)
+ self.send_waypoint()
+
+ def run(self):
+ container = Container(self)
+ container.container_id = 'ALC'
+ container.run()
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org