You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/12/16 19:21:44 UTC

qpid-dispatch git commit: DISPATCH-529 - Added support for vhost/multi-tenancy - Modified iterator module to support a namespace-prefix - Added boolean flag to the listener configuration to enable multi-tenancy - Added logic to handle address-annot

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 85372a6ae -> 21a32cad7


DISPATCH-529 - Added support for vhost/multi-tenancy
  - Modified iterator module to support a namespace-prefix
  - Added boolean flag to the listener configuration to enable multi-tenancy
  - Added logic to handle address-annotation where needed in all use cases
  - Added a suite of tests for the new feature


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/21a32cad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/21a32cad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/21a32cad

Branch: refs/heads/master
Commit: 21a32cad7f5c0d147733426c9573e42e36397e7e
Parents: 85372a6
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Dec 16 13:45:09 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Dec 16 13:54:27 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/iterator.h              |   9 +-
 include/qpid/dispatch/router_core.h           |  23 +-
 include/qpid/dispatch/server.h                |   6 +
 python/qpid_dispatch/management/qdrouter.json |   7 +
 src/connection_manager.c                      |   1 +
 src/iterator.c                                |  53 +-
 src/router_core/connections.c                 |  65 +-
 src/router_core/route_control.c               |   2 +-
 src/router_core/router_core_private.h         |   4 +-
 src/router_core/terminus.c                    |  40 +-
 src/router_core/transfer.c                    |   6 +-
 src/router_node.c                             |  46 +-
 tests/CMakeLists.txt                          |   3 +-
 tests/field_test.c                            |  82 ++-
 tests/system_tests_multi_tenancy.py           | 754 +++++++++++++++++++++
 15 files changed, 1061 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index ad7b7f9..a0ace4b 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -90,12 +90,19 @@ typedef struct qd_iterator_t qd_iterator_t;
  *      <my_area>/<router>
  *               R^^^^^^^^
  *
+ * ITER_VIEW_ADDRESS_WITH_SPACE
+ *    Same as ADDRESS_HASH but:
+ *      - Does not show the prefix/phase
+ *      - Does not hash-ize local and topological addresses
+ *      - Does not show namespace on local and topological addresses
+ *
  */
 typedef enum {
     ITER_VIEW_ALL,
     ITER_VIEW_ADDRESS_NO_HOST,
     ITER_VIEW_ADDRESS_HASH,
-    ITER_VIEW_NODE_HASH
+    ITER_VIEW_NODE_HASH,
+    ITER_VIEW_ADDRESS_WITH_SPACE
 } qd_iterator_view_t;
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index cb2d319..6a00925 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -157,6 +157,7 @@ typedef enum {
  * @param strip_annotations_in True if configured to remove annotations on inbound messages.
  * @param strip_annotations_out True if configured to remove annotations on outbound messages.
  * @param link_capacity The capacity, in deliveries, for links in this connection.
+ * @param vhost If non-null, this is the vhost of the connection to be used for multi-tenancy.
  * @return Pointer to a connection object that can be used to refer to this connection over its lifetime.
  */
 qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
@@ -168,7 +169,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
                                         const char            *remote_container_id,
                                         bool                   strip_annotations_in,
                                         bool                   strip_annotations_out,
-                                        int                    link_capacity);
+                                        int                    link_capacity,
+                                        const char            *vhost);
 
 /**
  * qdr_connection_closed
@@ -196,6 +198,14 @@ void qdr_connection_set_context(qdr_connection_t *conn, void *context);
 void *qdr_connection_get_context(const qdr_connection_t *conn);
 
 /**
+ * qdr_connection_get_tenant_space
+ *
+ * Retrieve the multi-tenant space for a connection.  Returns 0 if there is
+ * no multi-tenancy on this connection.
+ */
+const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *len);
+
+/**
  * qdr_connection_process
  *
  * Allow the core to process work associated with this connection.
@@ -313,6 +323,7 @@ bool qdr_terminus_is_dynamic(qdr_terminus_t *term);
  * @param addr An AMQP address (null-terminated string)
  */
 void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr);
+void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr);
 
 /**
  * qdr_terminus_get_address
@@ -336,6 +347,16 @@ qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term);
  */
 qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term);
 
+/**
+ * qdr_terminus_set_dnp_address_iterator
+ *
+ * Overwrite the dynamic-node-properties.address in the terminus
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @param iter An iterator whos view shall be placed in the dnp.address
+ */
+void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *iter);
+
 
 /**
  ******************************************************************************

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 8544f81..0ca2bdd 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -405,6 +405,12 @@ typedef struct qd_server_config_t {
     bool allow_redirect;
 
     /**
+     * MultiTenancy support.  If true, the vhost is used to define the address space of
+     * addresses used over this connection.
+     */
+    bool multi_tenant;
+
+    /**
      * The specified role of the connection.  This can be used to control the behavior and
      * capabilities of the connections.
      */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e666455..584b9c4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -669,6 +669,13 @@
                     "required": false,
                     "description": "The capacity of links within this connection, in terms of message deliveries.  The capacity is the number of messages that can be in-flight concurrently for each link."
                 },
+                "multiTenant": {
+                    "type": "boolean",
+                    "create": true,
+                    "required": false,
+                    "default": false,
+                    "description": "If true, apply multi-tenancy to endpoints connected at this listener.  The address space is defined by the virtual host (hostname field in the Open)."
+                },
                 "addr": {
                     "description":"(DEPRECATED)IP address: ipv4 or ipv6 literal or a host name. This attribute has been deprecated. Use host instead",
                     "deprecated": true,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 8f5314c..606f731 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -251,6 +251,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     config->sasl_mechanisms      = qd_entity_opt_string(entity, "saslMechanisms", 0); CHECK();
     config->ssl_profile          = qd_entity_opt_string(entity, "sslProfile", 0);     CHECK();
     config->link_capacity        = qd_entity_opt_long(entity, "linkCapacity", 0);     CHECK();
+    config->multi_tenant         = qd_entity_opt_bool(entity, "multiTenant", false);  CHECK();
     set_config_host(config, entity);
 
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 5f2f30a..d922b29 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -70,6 +70,7 @@ struct qd_iterator_t {
     const char             *space;
     int                     space_length;
     int                     space_cursor;
+    bool                    view_space;
 };
 
 ALLOC_DECLARE(qd_iterator_t);
@@ -101,9 +102,17 @@ static void parse_address_view(qd_iterator_t *iter)
     // in order to aid the router in looking up addresses.
     //
 
+    pointer_t save_pointer = iter->view_pointer;
     iter->annotation_length = 1;
 
     if (iter->prefix_override == '\0' && qd_iterator_prefix(iter, "_")) {
+        if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+            iter->view_pointer      = save_pointer;
+            iter->view_space        = false;
+            iter->annotation_length = 0;
+            return;
+        }
+
         if (qd_iterator_prefix(iter, "local/")) {
             iter->prefix = 'L';
             iter->state  = STATE_AT_PREFIX;
@@ -137,10 +146,26 @@ static void parse_address_view(qd_iterator_t *iter)
 
     iter->prefix            = iter->prefix_override ? iter->prefix_override : 'M';
     iter->state             = STATE_AT_PREFIX;
+    iter->view_space        = true;
     iter->annotation_length = iter->space_length + (iter->prefix == 'M' ? 2 : 1);
 }
 
 
+static void adjust_address_with_space(qd_iterator_t *iter)
+{
+    //
+    // Convert an ADDRESS_HASH view to an ADDRESS_WITH_SPACE view
+    //
+    if (iter->view_space) {
+        iter->annotation_length -= iter->prefix == 'M' ? 2 : 1;
+        iter->state = iter->space ? STATE_IN_SPACE : STATE_IN_BODY;
+    } else {
+        iter->annotation_length = 0;
+        iter->state = STATE_IN_BODY;
+    }
+}
+
+
 static void parse_node_view(qd_iterator_t *iter)
 {
     //
@@ -192,6 +217,7 @@ static void view_initialize(qd_iterator_t *iter)
     iter->mode                 = MODE_TO_END;
     iter->annotation_length    = 0;
     iter->annotation_remaining = 0;
+    iter->view_space           = false;
 
     if (iter->view == ITER_VIEW_ALL)
         return;
@@ -270,9 +296,11 @@ static void view_initialize(qd_iterator_t *iter)
     if (iter->view == ITER_VIEW_ADDRESS_NO_HOST)
         return;
 
-    if (iter->view == ITER_VIEW_ADDRESS_HASH) {
-        qd_iterator_remove_trailing_separator(iter); // FIXME - need this?
+    if (iter->view == ITER_VIEW_ADDRESS_HASH || iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+        qd_iterator_remove_trailing_separator(iter);
         parse_address_view(iter);
+        if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE)
+            adjust_address_with_space(iter);
         return;
     }
 
@@ -405,8 +433,15 @@ void qd_iterator_reset(qd_iterator_t *iter)
 {
     if (iter) {
         iter->view_pointer         = iter->view_start_pointer;
-        iter->state                = iter->prefix ? STATE_AT_PREFIX : STATE_IN_BODY;
         iter->annotation_remaining = iter->annotation_length;
+
+        if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+            if (iter->space && iter->view_space) {
+                iter->state = STATE_IN_SPACE;
+                iter->space_cursor = 0;
+            }
+        } else
+            iter->state = iter->prefix ? STATE_AT_PREFIX : STATE_IN_BODY;
     }
 }
 
@@ -469,8 +504,12 @@ void qd_iterator_annotate_space(qd_iterator_t *iter, const char* space, int spac
     if (iter) {
         iter->space        = space;
         iter->space_length = space_length;
-        if (iter->view == ITER_VIEW_ADDRESS_HASH)
-            iter->annotation_length = space_length + (iter->prefix == 'M' ? 2 : 1);
+        if      (iter->view == ITER_VIEW_ADDRESS_HASH)
+            iter->annotation_length = (iter->view_space ? space_length : 0) + (iter->prefix == 'M' ? 2 : 1);
+        else if (iter->view == ITER_VIEW_ADDRESS_WITH_SPACE) {
+            if (iter->view_space)
+                iter->annotation_length = space_length;
+        }
     }
 }
 
@@ -481,14 +520,14 @@ unsigned char qd_iterator_octet(qd_iterator_t *iter)
         return 0;
 
     if (iter->state == STATE_AT_PREFIX) {
-        iter->state = iter->prefix == 'M' ? STATE_AT_PHASE : (iter->space ? STATE_IN_SPACE : STATE_IN_BODY);
+        iter->state = iter->prefix == 'M' ? STATE_AT_PHASE : (iter->view_space && iter->space) ? STATE_IN_SPACE : STATE_IN_BODY;
         iter->space_cursor = 0;
         iter->annotation_remaining--;
         return iter->prefix;
     }
 
     if (iter->state == STATE_AT_PHASE) {
-        iter->state = iter->space ? STATE_IN_SPACE : STATE_IN_BODY;
+        iter->state = (iter->view_space && iter->space) ? STATE_IN_SPACE : STATE_IN_BODY;
         iter->space_cursor = 0;
         iter->annotation_remaining--;
         return iter->phase;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index ddcde3c..acb23e8 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -64,7 +64,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
                                         const char            *remote_container_id,
                                         bool                   strip_annotations_in,
                                         bool                   strip_annotations_out,
-                                        int                    link_capacity)
+                                        int                    link_capacity,
+                                        const char            *vhost)
 {
     qdr_action_t     *action = qdr_action(qdr_connection_opened_CT, "connection_opened");
     qdr_connection_t *conn   = new_qdr_connection_t();
@@ -84,6 +85,13 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
     DEQ_INIT(conn->work_list);
     conn->work_lock = sys_mutex();
 
+    if (vhost) {
+        conn->tenant_space_len = strlen(vhost) + 1;
+        conn->tenant_space = (char*) malloc(conn->tenant_space_len + 1);
+        strcpy(conn->tenant_space, vhost);
+        strcat(conn->tenant_space, "/");
+    }
+
     action->args.connection.conn             = conn;
     action->args.connection.connection_label = qdr_field(label);
     action->args.connection.container_id     = qdr_field(remote_container_id);
@@ -114,6 +122,13 @@ void *qdr_connection_get_context(const qdr_connection_t *conn)
 }
 
 
+const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *len)
+{
+    *len = conn ? conn->tenant_space_len : 0;
+    return conn ? conn->tenant_space : 0;
+}
+
+
 int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
@@ -692,7 +707,7 @@ static char qdr_prefix_for_dir(qd_direction_t dir)
 }
 
 
-qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qd_iterator_t *iter, int *in_phase, int *out_phase)
+qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase)
 {
     qdr_address_config_t *addr = 0;
 
@@ -701,6 +716,8 @@ qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qd_iterato
     // specific match
     //
     qd_iterator_annotate_prefix(iter, 'Z');
+    if (conn && conn->tenant_space)
+        qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
     qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
     qd_iterator_annotate_prefix(iter, '\0');
     if (in_phase)  *in_phase  = addr ? addr->in_phase  : 0;
@@ -795,18 +812,20 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
  *
  * @param core Pointer to the core object
  * @param dir Direction of the link for the terminus
+ * @param conn The connection over which the terminus was attached
  * @param terminus The terminus containing the addressing information to be looked up
  * @param create_if_not_found Iff true, return a pointer to a newly created address record
  * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
  * @param [out] link_route True iff the lookup indicates that an attach should be routed
  * @return Pointer to an address record or 0 if none is found
  */
-static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t     *core,
-                                                     qd_direction_t  dir,
-                                                     qdr_terminus_t *terminus,
-                                                     bool            create_if_not_found,
-                                                     bool            accept_dynamic,
-                                                     bool           *link_route)
+static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
+                                                     qd_direction_t    dir,
+                                                     qdr_connection_t *conn,
+                                                     qdr_terminus_t   *terminus,
+                                                     bool              create_if_not_found,
+                                                     bool              accept_dynamic,
+                                                     bool             *link_route)
 {
     qdr_address_t *addr = 0;
 
@@ -825,7 +844,19 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t     *core,
         if (dnp_address) {
             qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_HASH);
             qd_iterator_annotate_prefix(dnp_address, qdr_prefix_for_dir(dir));
+            if (conn->tenant_space)
+                qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
             qd_hash_retrieve_prefix(core->addr_hash, dnp_address, (void**) &addr);
+
+            if (addr && conn->tenant_space) {
+                //
+                // If this link is in a tenant space, translate the dnp address to
+                // the fully-qualified view
+                //
+                qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
+                qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
+            }
+
             qd_iterator_free(dnp_address);
             *link_route = true;
             return addr;
@@ -875,9 +906,20 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t     *core,
     qd_iterator_t *iter = qdr_terminus_get_address(terminus);
     qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
     qd_iterator_annotate_prefix(iter, qdr_prefix_for_dir(dir));
+    if (conn->tenant_space)
+        qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
     qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
     if (addr) {
         *link_route = true;
+
+        //
+        // If this link is in a tenant space, translate the terminus address to
+        // the fully-qualified view
+        //
+        if (conn->tenant_space) {
+            qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+            qdr_terminus_set_address_iterator(terminus, iter);
+        }
         return addr;
     }
 
@@ -887,7 +929,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t     *core,
     int in_phase;
     int out_phase;
     int addr_phase;
-    qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, iter, &in_phase, &out_phase);
+    qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase);
 
     qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
     addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
@@ -1027,6 +1069,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
 
     DEQ_REMOVE(core->open_connections, conn);
     sys_mutex_free(conn->work_lock);
+    free(conn->tenant_space);
     free_qdr_connection_t(conn);
 }
 
@@ -1088,7 +1131,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 // This link has a target address
                 //
                 bool           link_route;
-                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, target, true, false, &link_route);
+                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, false, &link_route);
                 if (!addr) {
                     //
                     // No route to this destination, reject the link
@@ -1145,7 +1188,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
         switch (link->link_type) {
         case QD_LINK_ENDPOINT: {
             bool           link_route;
-            qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, source, true, true, &link_route);
+            qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route);
             if (!addr) {
                 //
                 // No route to this destination, reject the link

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 0d0311a..b109e03 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -279,7 +279,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t          *core,
 
     qd_hash_retrieve(core->addr_hash, iter, (void*) &al->addr);
     if (!al->addr) {
-        al->addr = qdr_address_CT(core, qdr_treatment_for_address_CT(core, iter, 0, 0));
+        al->addr = qdr_address_CT(core, qdr_treatment_for_address_CT(core, 0, iter, 0, 0));
         DEQ_INSERT_TAIL(core->addrs, al->addr);
         qd_hash_insert(core->addr_hash, iter, al->addr, &al->addr->hash_handle);
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5962a17..f3b7d87 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -460,6 +460,8 @@ struct qdr_connection_t {
     qdr_link_ref_list_t         links;
     qdr_link_ref_list_t         links_with_deliveries;
     qdr_link_ref_list_t         links_with_credit;
+    char                       *tenant_space;
+    int                         tenant_space_len;
 };
 
 ALLOC_DECLARE(qdr_connection_t);
@@ -629,7 +631,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
 void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
-qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qd_iterator_t *iter, int *in_phase, int *out_phase);
+qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);
 qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter);
 
 void qdr_connection_enqueue_work_CT(qdr_core_t            *core,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 114d736..1587047 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -146,6 +146,14 @@ void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr)
 }
 
 
+void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr)
+{
+    qdr_field_t *old = term->address;
+    term->address = qdr_field_from_iter(addr);
+    qdr_field_free(old);
+}
+
+
 qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
 {
     if (qdr_terminus_is_anonymous(term))
@@ -158,7 +166,6 @@ qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
 qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term)
 {
     pn_data_t *props = term->properties;
-
     if (!props)
         return 0;
 
@@ -178,3 +185,34 @@ qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term)
 }
 
 
+void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *iter)
+{
+    char       buffer[1000];
+    char      *text    = buffer;
+    bool       on_heap = false;
+    pn_data_t *old     = term->properties;
+
+    if (!old)
+        return;
+
+    if (qd_iterator_length(iter) < 1000)
+        qd_iterator_ncopy(iter, (unsigned char*) text, 1000);
+    else {
+        text    = (char*) qd_iterator_copy(iter);
+        on_heap = true;
+    }
+
+    pn_data_t *new = pn_data(pn_data_size(old));
+    pn_data_put_map(new);
+    pn_data_enter(new);
+    pn_data_put_symbol(new, pn_bytes(strlen(QD_DYNAMIC_NODE_PROPERTY_ADDRESS), QD_DYNAMIC_NODE_PROPERTY_ADDRESS));
+    pn_data_put_string(new, pn_bytes(strlen(text), text));
+    pn_data_exit(new);
+
+    term->properties = new;
+    pn_data_free(old);
+
+    if (on_heap)
+        free(text);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 2595a70..1ffa539 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -591,8 +591,12 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
 
     if (DEQ_IS_EMPTY(link->undelivered)) {
         qdr_address_t *addr = link->owning_addr;
-        if (!addr && dlv->to_addr)
+        if (!addr && dlv->to_addr) {
+            qdr_connection_t *conn = link->conn;
+            if (conn && conn->tenant_space)
+                qd_iterator_annotate_space(dlv->to_addr, conn->tenant_space, conn->tenant_space_len);
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
+        }
 
         //
         // Give the action reference to the qdr_link_forward function.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index abf80f4..8a6ae19 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -45,6 +45,7 @@ static void qd_router_connection_get_config(const qd_connection_t  *conn,
                                             qdr_connection_role_t  *role,
                                             int                    *cost,
                                             const char            **name,
+                                            bool                   *multi_tenant,
                                             bool                   *strip_annotations_in,
                                             bool                   *strip_annotations_out,
                                             int                    *link_capacity)
@@ -73,6 +74,8 @@ static void qd_router_connection_get_config(const qd_connection_t  *conn,
                 strncmp("connector/", *name, 10) == 0)
                 *name = 0;
         }
+
+        *multi_tenant = cf ? cf->multi_tenant : false;
     }
 }
 
@@ -270,8 +273,11 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
     // If the user is not allowed to proxy the user_id then the message user_id
     // must be blank or it must be equal to the connection user name.
     //
-    bool             check_user = false;
-    qd_connection_t *conn       = qd_link_connection(link);
+    bool              check_user   = false;
+    qd_connection_t  *conn         = qd_link_connection(link);
+    qdr_connection_t *qdr_conn     = (qdr_connection_t*) qd_connection_get_context(conn);
+    int               tenant_space_len;
+    const char       *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len);
     if (conn->policy_settings) 
         check_user = !conn->policy_settings->allowUserIdProxy;
 
@@ -332,9 +338,22 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
             //
             // Still no destination address?  Use the TO field from the message properties.
             //
-            if (!addr_iter)
+            if (!addr_iter) {
                 addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
 
+                //
+                // If the address came from the TO field and we need to apply a tenant-space,
+                // set the to-override with the annotated address.
+                //
+                if (addr_iter && tenant_space) {
+                    qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+                    qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_len);
+                    qd_composed_field_t *to_override = qd_compose_subfield(0);
+                    qd_compose_insert_string_iterator(to_override, addr_iter);
+                    qd_message_set_to_override_annotation(msg, to_override);
+                }
+            }
+
             if (addr_iter) {
                 qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
                 if (phase > 0)
@@ -343,13 +362,22 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
                                                link_exclusions);
             }
         } else {
+            //
+            // This is a targeted link, not anonymous.
+            //
             const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link));
             if (!term_addr)
                 term_addr = pn_terminus_get_address(qd_link_source(link));
 
             if (term_addr) {
                 qd_composed_field_t *to_override = qd_compose_subfield(0);
-                qd_compose_insert_string(to_override, term_addr);
+                if (tenant_space) {
+                    qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE);
+                    qd_iterator_annotate_space(aiter, tenant_space, tenant_space_len);
+                    qd_compose_insert_string_iterator(to_override, aiter);
+                    qd_iterator_free(aiter);
+                } else
+                    qd_compose_insert_string(to_override, term_addr);
                 qd_message_set_to_override_annotation(msg, to_override);
                 int phase = qdr_link_phase(rlink);
                 if (phase != 0)
@@ -557,10 +585,12 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
     bool                   strip_annotations_out = false;
     int                    link_capacity = 1;
     const char            *name = 0;
+    bool                   multi_tenant = false;
+    const char            *vhost = 0;
     uint64_t               connection_id = qd_connection_connection_id(conn);
     pn_connection_t       *pn_conn = qd_connection_pn(conn);
 
-    qd_router_connection_get_config(conn, &role, &cost, &name,
+    qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant,
                                     &strip_annotations_in, &strip_annotations_out, &link_capacity);
 
     if (role == QDR_ROLE_INTER_ROUTER) {
@@ -595,9 +625,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
             cost = remote_cost;
     }
 
+    if (multi_tenant)
+        vhost = pn_connection_remote_hostname(pn_conn);
+
     qdr_connection_t *qdrc = qdr_connection_opened(router->router_core, inbound, role, cost, connection_id, name,
                                                    pn_connection_remote_container(pn_conn),
-                                                   strip_annotations_in, strip_annotations_out, link_capacity);
+                                                   strip_annotations_in, strip_annotations_out, link_capacity,
+                                                   vhost);
 
     qd_connection_set_context(conn, qdrc);
     qdr_connection_set_context(qdrc, conn);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 95840f6..d2c6159 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -86,7 +86,8 @@ foreach(py_test_module
     system_tests_user_id_proxy
     system_tests_deprecated
     system_tests_two_routers
-    system_tests_three_routers)
+    system_tests_three_routers
+    system_tests_multi_tenancy)
 
   add_test(${py_test_module} ${TEST_WRAP} -m unittest -v ${py_test_module})
   list(APPEND SYSTEM_TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/${py_test_module}.py)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/tests/field_test.c
----------------------------------------------------------------------
diff --git a/tests/field_test.c b/tests/field_test.c
index 82e512b..508e357 100644
--- a/tests/field_test.c
+++ b/tests/field_test.c
@@ -281,6 +281,71 @@ static char* test_view_address_hash(void *context)
 }
 
 
+static char* test_view_address_with_space(void *context)
+{
+    struct {const char *addr; const char *view;} cases[] = {
+    {"amqp:/_local/my-addr/sub",                "_local/my-addr/sub"},
+    {"amqp:/_local/my-addr",                    "_local/my-addr"},
+    {"amqp:/_topo/area/router/local/sub",       "_topo/area/router/local/sub"},
+    {"amqp:/_topo/my-area/router/local/sub",    "_topo/my-area/router/local/sub"},
+    {"amqp:/_topo/my-area/my-router/local/sub", "_topo/my-area/my-router/local/sub"},
+    {"amqp:/_topo/area/all/local/sub",          "_topo/area/all/local/sub"},
+    {"amqp:/_topo/my-area/all/local/sub",       "_topo/my-area/all/local/sub"},
+    {"amqp:/_topo/all/all/local/sub",           "_topo/all/all/local/sub"},
+    {"amqp://host:port/_local/my-addr",         "_local/my-addr"},
+    {"_topo/area/router/my-addr",               "_topo/area/router/my-addr"},
+    {"_topo/my-area/router/my-addr",            "_topo/my-area/router/my-addr"},
+    {"_topo/my-area/my-router/my-addr",         "_topo/my-area/my-router/my-addr"},
+    {"_topo/my-area/router",                    "_topo/my-area/router"},
+    {"amqp:/mobile",                            "space/mobile"},
+    {"mobile",                                  "space/mobile"},
+    {"/mobile",                                 "space/mobile"},
+
+    // Re-run the above tests to make sure trailing dots are ignored.
+    {"amqp:/_local/my-addr/sub.",                "_local/my-addr/sub"},
+    {"amqp:/_local/my-addr.",                    "_local/my-addr"},
+    {"amqp:/_topo/area/router/local/sub.",       "_topo/area/router/local/sub"},
+    {"amqp:/_topo/my-area/router/local/sub.",    "_topo/my-area/router/local/sub"},
+    {"amqp:/_topo/my-area/my-router/local/sub.", "_topo/my-area/my-router/local/sub"},
+    {"amqp:/_topo/area/all/local/sub.",          "_topo/area/all/local/sub"},
+    {"amqp:/_topo/my-area/all/local/sub.",       "_topo/my-area/all/local/sub"},
+    {"amqp:/_topo/all/all/local/sub.",           "_topo/all/all/local/sub"},
+    {"amqp://host:port/_local/my-addr.",         "_local/my-addr"},
+    {"_topo/area/router/my-addr.",               "_topo/area/router/my-addr"},
+    {"_topo/my-area/router/my-addr.",            "_topo/my-area/router/my-addr"},
+    {"_topo/my-area/my-router/my-addr.",         "_topo/my-area/my-router/my-addr"},
+    {"_topo/my-area/router.",                    "_topo/my-area/router"},
+    {"_topo/my-area/router:",                    "_topo/my-area/router:"},
+
+    {0, 0}
+    };
+    int idx;
+
+    for (idx = 0; cases[idx].addr; idx++) {
+        qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_WITH_SPACE);
+        qd_iterator_annotate_space(iter, "space/", 6);
+        char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+        qd_iterator_free(iter);
+        if (ret) return ret;
+    }
+
+    for (idx = 0; cases[idx].addr; idx++) {
+        qd_buffer_list_t chain;
+        DEQ_INIT(chain);
+        build_buffer_chain(&chain, cases[idx].addr, 3);
+        qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(chain), 0,
+                                                 strlen(cases[idx].addr),
+                                                 ITER_VIEW_ADDRESS_WITH_SPACE);
+        qd_iterator_annotate_space(iter, "space/", 6);
+        char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+        release_buffer_chain(&chain);
+        if (ret) return ret;
+    }
+
+    return 0;
+}
+
+
 static char* test_view_address_hash_override(void *context)
 {
     struct {const char *addr; const char *view;} cases[] = {
@@ -308,13 +373,15 @@ static char* test_view_address_hash_override(void *context)
 }
 
 
-static char* test_view_address_with_space(void *context)
+static char* test_view_address_hash_with_space(void *context)
 {
     struct {const char *addr; const char *view;} cases[] = {
     {"amqp:/link-target",                    "M0test.vhost.link-target"},
     {"amqp:/domain/link-target",             "M0test.vhost.domain/link-target"},
     {"domain/link-target",                   "M0test.vhost.domain/link-target"},
     {"bbc79fb3-e1fd-4a08-92b2-9a2de232b558", "M0test.vhost.bbc79fb3-e1fd-4a08-92b2-9a2de232b558"},
+    {"_topo/my-area/router/address",         "Rrouter"},
+    {"_topo/my-area/my-router/address",      "Laddress"},
     {0, 0}
     };
     int idx;
@@ -324,13 +391,9 @@ static char* test_view_address_with_space(void *context)
         qd_iterator_annotate_space(iter, "test.vhost.", 11);
         if (!qd_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
             char *got = (char*) qd_iterator_copy(iter);
-            snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s'",
-                     cases[idx].addr, cases[idx].view, got);
-            return fail_text;
-        }
-        if (qd_iterator_length(iter) != strlen(cases[idx].view)) {
-            snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Length %d, iter_length returned %d",
-                     cases[idx].addr, (int) strlen(cases[idx].view), (int) qd_iterator_length(iter));
+            snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s' (len: %d)",
+                     cases[idx].addr, cases[idx].view, got, qd_iterator_length(iter));
+            free(got);
             return fail_text;
         }
         qd_iterator_free(iter);
@@ -873,8 +936,9 @@ int field_tests(void)
     TEST_CASE(test_trim, 0);
     TEST_CASE(test_sub_iterator, 0);
     TEST_CASE(test_view_address_hash, 0);
-    TEST_CASE(test_view_address_hash_override, 0);
     TEST_CASE(test_view_address_with_space, 0);
+    TEST_CASE(test_view_address_hash_override, 0);
+    TEST_CASE(test_view_address_hash_with_space, 0);
     TEST_CASE(test_view_node_hash, 0);
     TEST_CASE(test_field_advance_string, 0);
     TEST_CASE(test_field_advance_buffer, 0);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21a32cad/tests/system_tests_multi_tenancy.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_multi_tenancy.py b/tests/system_tests_multi_tenancy.py
new file mode 100644
index 0000000..cd34fd0
--- /dev/null
+++ b/tests/system_tests_multi_tenancy.py
@@ -0,0 +1,754 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+import time
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+    
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, connection):
+            
+            config = [
+                ('router', {'mode': 'interior', 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+                ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'in', 'containerId': 'LRC'}),
+                ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'out', 'containerId': 'LRC'}),
+                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'in'}),
+                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'out'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+                ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
+                connection
+            ]
+            
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+        
+        inter_router_port = cls.tester.get_port()
+        
+        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
+        router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostName': 'no'}))
+
+        cls.routers[0].wait_router_connected('B')
+        cls.routers[1].wait_router_connected('A')
+
+
+    def test_01_one_router_targeted_sender_no_tenant(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[0].addresses[0],
+                                   "anything/addr_01",
+                                   "anything/addr_01",
+                                   self.routers[0].addresses[0],
+                                   "M0anything/addr_01")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_02_one_router_targeted_sender_tenant_on_sender(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[0].addresses[0],
+                                   "addr_02",
+                                   "0.0.0.0/addr_02",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_02")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_03_one_router_targeted_sender_tenant_on_receiver(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[0].addresses[1],
+                                   "0.0.0.0/addr_03",
+                                   "addr_03",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_03")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_04_one_router_targeted_sender_tenant_on_both(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[0].addresses[1],
+                                   "addr_04",
+                                   "addr_04",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_04")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_05_two_router_targeted_sender_no_tenant(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[1].addresses[0],
+                                   "0.0.0.0/addr_05",
+                                   "0.0.0.0/addr_05",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_05")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_06_two_router_targeted_sender_tenant_on_sender(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[1].addresses[0],
+                                   "addr_06",
+                                   "0.0.0.0/addr_06",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_06")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_07_two_router_targeted_sender_tenant_on_receiver(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[1].addresses[1],
+                                   "0.0.0.0/addr_07",
+                                   "addr_07",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_07")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_08_two_router_targeted_sender_tenant_on_both(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[1].addresses[1],
+                                   "addr_08",
+                                   "addr_08",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_08")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_09_one_router_anonymous_sender_no_tenant(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[0].addresses[0],
+                                   "anything/addr_09",
+                                   "anything/addr_09",
+                                   self.routers[0].addresses[0],
+                                   "M0anything/addr_09",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_10_one_router_anonymous_sender_tenant_on_sender(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[0].addresses[0],
+                                   "addr_10",
+                                   "0.0.0.0/addr_10",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_10",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_11_one_router_anonymous_sender_tenant_on_receiver(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[0].addresses[1],
+                                   "0.0.0.0/addr_11",
+                                   "addr_11",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_11",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_12_one_router_anonymous_sender_tenant_on_both(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[0].addresses[1],
+                                   "addr_12",
+                                   "addr_12",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_12",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_13_two_router_anonymous_sender_no_tenant(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[1].addresses[0],
+                                   "anything/addr_13",
+                                   "anything/addr_13",
+                                   self.routers[0].addresses[0],
+                                   "M0anything/addr_13",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_14_two_router_anonymous_sender_tenant_on_sender(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[1].addresses[0],
+                                   "addr_14",
+                                   "0.0.0.0/addr_14",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_14",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_15_two_router_anonymous_sender_tenant_on_receiver(self):
+        test = MessageTransferTest(self.routers[0].addresses[0],
+                                   self.routers[1].addresses[1],
+                                   "0.0.0.0/addr_15",
+                                   "addr_15",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_15",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_16_two_router_anonymous_sender_tenant_on_both(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[1].addresses[1],
+                                   "addr_16",
+                                   "addr_16",
+                                   self.routers[0].addresses[0],
+                                   "M00.0.0.0/addr_16",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_17_one_router_link_route_targeted(self):
+        test = LinkRouteTest(self.routers[0].addresses[1],
+                             self.routers[0].addresses[2],
+                             "link.addr_17",
+                             "0.0.0.0/link.addr_17",
+                             False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_18_one_router_link_route_targeted_no_tenant(self):
+        test = LinkRouteTest(self.routers[0].addresses[0],
+                             self.routers[0].addresses[2],
+                             "0.0.0.0/link.addr_18",
+                             "0.0.0.0/link.addr_18",
+                             False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_19_one_router_link_route_dynamic(self):
+        test = LinkRouteTest(self.routers[0].addresses[1],
+                             self.routers[0].addresses[2],
+                             "link.addr_19",
+                             "0.0.0.0/link.addr_19",
+                             True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_20_one_router_link_route_dynamic_no_tenant(self):
+        test = LinkRouteTest(self.routers[0].addresses[0],
+                             self.routers[0].addresses[2],
+                             "0.0.0.0/link.addr_20",
+                             "0.0.0.0/link.addr_20",
+                             True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_21_two_router_link_route_targeted(self):
+        test = LinkRouteTest(self.routers[0].addresses[1],
+                             self.routers[1].addresses[2],
+                             "link.addr_21",
+                             "0.0.0.0/link.addr_21",
+                             False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_22_two_router_link_route_targeted_no_tenant(self):
+        test = LinkRouteTest(self.routers[0].addresses[0],
+                             self.routers[1].addresses[2],
+                             "0.0.0.0/link.addr_22",
+                             "0.0.0.0/link.addr_22",
+                             False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_23_two_router_link_route_dynamic(self):
+        test = LinkRouteTest(self.routers[0].addresses[1],
+                             self.routers[1].addresses[2],
+                             "link.addr_23",
+                             "0.0.0.0/link.addr_23",
+                             True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_24_two_router_link_route_dynamic_no_tenant(self):
+        test = LinkRouteTest(self.routers[0].addresses[0],
+                             self.routers[1].addresses[2],
+                             "0.0.0.0/link.addr_24",
+                             "0.0.0.0/link.addr_24",
+                             True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_25_one_router_anonymous_sender_non_mobile(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[0].addresses[0],
+                                   "_local/addr_25",
+                                   "_local/addr_25",
+                                   self.routers[0].addresses[0],
+                                   "Laddr_25",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_26_one_router_targeted_sender_non_mobile(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[0].addresses[0],
+                                   "_local/addr_26",
+                                   "_local/addr_26",
+                                   self.routers[0].addresses[0],
+                                   "Laddr_26",
+                                   False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_27_two_router_anonymous_sender_non_mobile(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[1].addresses[0],
+                                   "_topo/0/B/addr_27",
+                                   "_local/addr_27",
+                                   self.routers[1].addresses[0],
+                                   "Laddr_27",
+                                   True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_28_two_router_targeted_sender_non_mobile(self):
+        test = MessageTransferTest(self.routers[0].addresses[1],
+                                   self.routers[1].addresses[0],
+                                   "_topo/0/B/addr_28",
+                                   "_local/addr_28",
+                                   self.routers[1].addresses[0],
+                                   "Laddr_28",
+                                   False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_29_one_router_waypoint_no_tenant(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[0].addresses[2],
+                            "0.0.0.0/queue.waypoint",
+                            "0.0.0.0/queue.waypoint")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_30_one_router_waypoint(self):
+        test = WaypointTest(self.routers[0].addresses[1],
+                            self.routers[0].addresses[2],
+                            "queue.waypoint",
+                            "0.0.0.0/queue.waypoint")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_31_two_router_waypoint_no_tenant(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[1].addresses[2],
+                            "0.0.0.0/queue.waypoint",
+                            "0.0.0.0/queue.waypoint")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_32_two_router_waypoint(self):
+        test = WaypointTest(self.routers[0].addresses[1],
+                            self.routers[1].addresses[2],
+                            "queue.waypoint",
+                            "0.0.0.0/queue.waypoint")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Entity(object):
+    def __init__(self, status_code, status_description, attrs):
+        self.status_code        = status_code
+        self.status_description = status_description
+        self.attrs              = attrs
+
+    def __getattr__(self, key):
+        return self.attrs[key]
+
+
+class RouterProxy(object):
+    def __init__(self, reply_addr):
+        self.reply_addr = reply_addr
+
+    def response(self, msg):
+        ap = msg.properties
+        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+    def read_address(self, name):
+        ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_addresses(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class MessageTransferTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host, sender_address, receiver_address, lookup_host, lookup_address, anonymous=False):
+        super(MessageTransferTest, self).__init__()
+        self.sender_host      = sender_host
+        self.receiver_host    = receiver_host
+        self.sender_address   = sender_address
+        self.receiver_address = receiver_address
+        self.lookup_host      = lookup_host
+        self.lookup_address   = lookup_address
+        self.anonymous        = anonymous
+
+        self.sender_conn   = None
+        self.receiver_conn = None
+        self.lookup_conn   = None
+        self.error         = None
+        self.sender        = None
+        self.receiver      = None
+        self.proxy         = None
+
+        self.count      = 10
+        self.n_sent     = 0
+        self.n_rcvd     = 0
+        self.n_accepted = 0
+        self.n_released = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_accepted=%d n_released=%d" % (self.n_sent, self.n_rcvd, self.n_accepted, self.n_released)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.lookup_conn.close()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(5, Timeout(self))
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.lookup_conn    = event.container.connect(self.lookup_host)
+        self.reply_receiver = event.container.create_receiver(self.lookup_conn, dynamic=True)
+        self.agent_sender   = event.container.create_sender(self.lookup_conn, "$management")
+
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.count:
+            self.n_sent += 1
+            m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+            if self.anonymous:
+                m.address = self.sender_address
+            self.sender.send(m)
+
+    def on_released(self, event):
+        self.n_sent     -= 1
+        self.n_released += 1
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.proxy    = RouterProxy(self.reply_receiver.remote_source.address)
+            self.sender   = event.container.create_sender(self.sender_conn, None if self.anonymous else self.sender_address)
+            self.receiver = event.container.create_receiver(self.receiver_conn, self.receiver_address)
+
+    def on_sendable(self, event):
+        if self.n_sent == 0 and self.anonymous:
+            time.sleep(0.3)
+        if event.sender == self.sender:
+            self.send()
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_rcvd += 1
+        if event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            if response.status_code != 200:
+                self.error = "Unexpected error code from agent: %d - %s" % (response.status_code, response.status_description)
+            if self.n_sent != self.count or self.n_rcvd != self.count:
+                self.error = "Unexpected counts: n_sent=%d n_rcvd=%d n_accepted=%d" % (self.n_sent, self.n_rcvd, self.n_accepted)
+            self.sender_conn.close()
+            self.receiver_conn.close()
+            self.lookup_conn.close()
+            self.timer.cancel()
+            
+    def on_accepted(self, event):
+        if event.sender == self.sender:
+            self.n_accepted += 1
+            if self.n_accepted == self.count:
+                request = self.proxy.read_address(self.lookup_address)
+                self.agent_sender.send(request)
+
+    def run(self):
+        Container(self).run()
+
+
+class LinkRouteTest(MessagingHandler):
+    def __init__(self, first_host, second_host, first_address, second_address, dynamic):
+        super(LinkRouteTest, self).__init__(prefetch=0)
+        self.first_host     = first_host
+        self.second_host    = second_host
+        self.first_address  = first_address
+        self.second_address = second_address
+        self.dynamic        = dynamic
+
+        self.first_conn      = None
+        self.second_conn     = None
+        self.error           = None
+        self.first_sender    = None
+        self.first_receiver  = None
+        self.second_sender   = None
+        self.second_receiver = None
+
+        self.count     = 10
+        self.n_sent    = 0
+        self.n_rcvd    = 0
+        self.n_settled = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" % (self.n_sent, self.n_rcvd, self.n_settled)
+        self.first_conn.close()
+        self.second_conn.close()
+
+    def fail(self, text):
+        self.error = text
+        self.second_conn.close()
+        self.first_conn.close()
+        self.timer.cancel()
+
+    def send(self):
+        while self.first_sender.credit > 0 and self.n_sent < self.count:
+            self.n_sent += 1
+            m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+            self.first_sender.send(m)
+
+    def on_start(self, event):
+        self.timer       = event.reactor.schedule(5, Timeout(self))
+        self.second_conn = event.container.connect(self.second_host)
+
+    def on_connection_opened(self, event):
+        if event.connection == self.first_conn:
+            self.first_sender = event.container.create_sender(self.first_conn, self.first_address)
+            if self.dynamic:
+                self.first_receiver = event.container.create_receiver(self.first_conn,
+                                                                      dynamic=True,
+                                                                      options=DynamicNodeProperties({"x-opt-qd.address": unicode(self.first_address)}))
+            else:
+                self.first_receiver = event.container.create_receiver(self.first_conn, self.first_address)
+
+        if event.connection == self.second_conn:
+            time.sleep(1)
+            self.first_conn = event.container.connect(self.first_host)
+
+    def on_link_opening(self, event):
+        if event.sender:
+            self.second_sender = event.sender
+            if self.dynamic:
+                if event.sender.remote_source.dynamic:
+                    event.sender.source.address = self.second_address
+                    event.sender.open()
+                else:
+                    self.fail("Expected dynamic source on sender")
+            else:
+                if event.sender.remote_source.address == self.second_address:
+                    event.sender.source.address = self.second_address
+                    event.sender.open()
+                else:
+                    self.fail("Incorrect address on incoming sender: got %s, expected %s" %
+                              (event.sender.remote_source.address, self.second_address))
+
+        elif event.receiver:
+            self.second_receiver = event.receiver
+            if event.receiver.remote_target.address == self.second_address:
+                event.receiver.target.address = self.second_address
+                event.receiver.open()
+            else:
+                self.fail("Incorrect address on incoming receiver: got %s, expected %s" %
+                          (event.receiver.remote_target.address, self.second_address))
+
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow(self.count)
+
+    def on_sendable(self, event):
+        if event.sender == self.first_sender:
+            self.send()
+
+    def on_message(self, event):
+        if event.receiver == self.first_receiver:
+            self.n_rcvd += 1
+            
+    def on_settled(self, event):
+        if event.sender == self.first_sender:
+            self.n_settled += 1
+            if self.n_settled == self.count:
+                self.fail(None)
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'LRC'
+        container.run()
+
+
+class WaypointTest(MessagingHandler):
+    def __init__(self, first_host, second_host, first_address, second_address):
+        super(WaypointTest, self).__init__()
+        self.first_host     = first_host
+        self.second_host    = second_host
+        self.first_address  = first_address
+        self.second_address = second_address
+
+        self.first_conn        = None
+        self.second_conn       = None
+        self.error             = None
+        self.first_sender      = None
+        self.first_receiver    = None
+        self.waypoint_sender   = None
+        self.waypoint_receiver = None
+        self.waypoint_queue    = []
+
+        self.count  = 10
+        self.n_sent = 0
+        self.n_rcvd = 0
+        self.n_thru = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_thru=%d" % (self.n_sent, self.n_rcvd, self.n_thru)
+        self.first_conn.close()
+        self.second_conn.close()
+
+    def fail(self, text):
+        self.error = text
+        self.second_conn.close()
+        self.first_conn.close()
+        self.timer.cancel()
+
+    def send_client(self):
+        while self.first_sender.credit > 0 and self.n_sent < self.count:
+            self.n_sent += 1
+            m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+            self.first_sender.send(m)
+
+    def send_waypoint(self):
+        while self.waypoint_sender.credit > 0 and len(self.waypoint_queue) > 0:
+            self.n_thru += 1
+            m = self.waypoint_queue.pop()
+            self.waypoint_sender.send(m)
+
+    def on_start(self, event):
+        self.timer       = event.reactor.schedule(5, Timeout(self))
+        self.first_conn  = event.container.connect(self.first_host)
+        self.second_conn = event.container.connect(self.second_host)
+
+    def on_connection_opened(self, event):
+        if event.connection == self.first_conn:
+            self.first_sender   = event.container.create_sender(self.first_conn, self.first_address)
+            self.first_receiver = event.container.create_receiver(self.first_conn, self.first_address)
+
+    def on_link_opening(self, event):
+        if event.sender:
+            self.waypoint_sender = event.sender
+            if event.sender.remote_source.address == self.second_address:
+                event.sender.source.address = self.second_address
+                event.sender.open()
+            else:
+                self.fail("Incorrect address on incoming sender: got %s, expected %s" %
+                          (event.sender.remote_source.address, self.second_address))
+
+        elif event.receiver:
+            self.waypoint_receiver = event.receiver
+            if event.receiver.remote_target.address == self.second_address:
+                event.receiver.target.address = self.second_address
+                event.receiver.open()
+            else:
+                self.fail("Incorrect address on incoming receiver: got %s, expected %s" %
+                          (event.receiver.remote_target.address, self.second_address))
+
+
+    def on_sendable(self, event):
+        if event.sender == self.first_sender:
+            self.send_client()
+        elif event.sender == self.waypoint_sender:
+            self.send_waypoint()
+
+    def on_message(self, event):
+        if event.receiver == self.first_receiver:
+            self.n_rcvd += 1
+            if self.n_rcvd == self.count and self.n_thru == self.count:
+                self.fail(None)
+        elif event.receiver == self.waypoint_receiver:
+            m = Message(body=event.message.body)
+            self.waypoint_queue.append(m)
+            self.send_waypoint()
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'ALC'
+        container.run()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org