You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/10/26 20:07:12 UTC

[2/2] qpid-dispatch git commit: DISPATCH-1143: New feature: connection-scoped link routes

DISPATCH-1143: New feature: connection-scoped link routes

This closes #393


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fcf457c9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fcf457c9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fcf457c9

Branch: refs/heads/master
Commit: fcf457c981c262d5648d6c1a944059a3c3b47458
Parents: 8a93f7c
Author: Kenneth Giusti <kg...@apache.org>
Authored: Mon Oct 8 16:06:45 2018 -0400
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Fri Oct 26 15:41:19 2018 -0400

----------------------------------------------------------------------
 docs/man/qdrouterd.conf.5.py                  |   8 +
 include/qpid/dispatch/router_core.h           |  23 +-
 python/qpid_dispatch/management/qdrouter.json |  25 ++
 src/CMakeLists.txt                            |   1 +
 src/python_embedded.c                         |   3 +-
 src/router_config.c                           |   2 +-
 src/router_core/agent.c                       |  40 +-
 src/router_core/agent_config_link_route.c     |   2 +-
 src/router_core/agent_config_link_route.h     |   1 +
 src/router_core/agent_conn_link_route.c       | 350 ++++++++++++++++
 src/router_core/agent_conn_link_route.h       |  53 +++
 src/router_core/forwarder.c                   |   3 +-
 src/router_core/management_agent.c            |  41 +-
 src/router_core/route_control.c               | 113 +++++-
 src/router_core/route_control.h               |   9 +
 src/router_core/router_core.c                 |   2 +-
 src/router_core/router_core_private.h         |  13 +-
 tests/system_test.py                          | 289 +++++++++++---
 tests/system_tests_edge_router.py             |  41 +-
 tests/system_tests_link_routes.py             | 438 ++++++++++++++++++---
 tests/system_tests_qdmanage.py                |   9 +-
 tests/test_broker.py                          | 174 ++++++++
 22 files changed, 1455 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/docs/man/qdrouterd.conf.5.py
----------------------------------------------------------------------
diff --git a/docs/man/qdrouterd.conf.5.py b/docs/man/qdrouterd.conf.5.py
index e2bdd8e..6749dd2 100644
--- a/docs/man/qdrouterd.conf.5.py
+++ b/docs/man/qdrouterd.conf.5.py
@@ -34,6 +34,12 @@ from qpid_dispatch_internal.compat import dict_itervalues
 CONNECTOR = 'org.apache.qpid.dispatch.connector'
 LISTENER = 'org.apache.qpid.dispatch.listener'
 
+# avoid writing these config entity types to the man file, they are not allowed
+# in the configuration file and are only supported at run time via management
+#
+TYPE_FILTER = ['router.connection.linkRoute']
+
+
 class ManPageWriter(SchemaWriter):
 
     def __init__(self):
@@ -136,6 +142,8 @@ listener {
 
             config = self.schema.entity_type("configurationEntity")
             for entity_type in dict_itervalues(self.schema.entity_types):
+                if entity_type.short_name in TYPE_FILTER:
+                    continue
                 if config in entity_type.all_bases:
                     with self.section(entity_type.short_name):
                         if entity_type.description:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 6deee88..5d5ba5b 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -103,7 +103,8 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
  * In-process messaging functions
  ******************************************************************************
  */
-typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit, int inter_router_cost);
+typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit, int inter_router_cost,
+                               uint64_t conn_id);
 
 qdr_subscription_t *qdr_core_subscribe(qdr_core_t             *core,
                                        const char             *address,
@@ -705,7 +706,8 @@ typedef enum {
     QD_ROUTER_ADDRESS,
     QD_ROUTER_EXCHANGE,
     QD_ROUTER_BINDING,
-    QD_ROUTER_FORBIDDEN
+    QD_ROUTER_FORBIDDEN,
+    QD_ROUTER_CONN_LINK_ROUTE
 } qd_router_entity_type_t;
 
 typedef struct qdr_query_t qdr_query_t;
@@ -721,10 +723,11 @@ typedef struct qdr_query_t qdr_query_t;
  * @param name The name supplied for the entity
  * @param in_body The body of the request message
  * @param out_body A composed field for the body of the response message
+ * @param in_conn The identity of the connection over which the mgmt message arrived (0 if config file)
  */
 void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t type,
                        qd_iterator_t *name, qd_parsed_field_t *in_body, qd_composed_field_t *out_body,
-                       qd_buffer_list_t body_buffers);
+                       qd_buffer_list_t body_buffers, uint64_t in_conn);
 
 /**
  * qdr_manage_delete
@@ -736,9 +739,10 @@ void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t
  * @param type The entity type for the create request
  * @param name The name supplied with the request (or 0 if the identity was supplied)
  * @param identity The identity supplied with the request (or 0 if the name was supplied)
+ * @param in_conn The identity of the connection over which the mgmt message arrived (0 if config file)
  */
 void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type,
-                       qd_iterator_t *name, qd_iterator_t *identity);
+                       qd_iterator_t *name, qd_iterator_t *identity, uint64_t in_conn);
 
 /**
  * qdr_manage_read
@@ -751,9 +755,11 @@ void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t
  * @param name The name supplied with the request (or 0 if the identity was supplied)
  * @param identity The identity supplied with the request (or 0 if the name was supplied)
  * @param body A composed field for the body of the response message
+ * @param in_conn The identity of the connection over which the mgmt message arrived (0 if config file)
  */
 void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type,
-                     qd_iterator_t *name, qd_iterator_t *identity, qd_composed_field_t *body);
+                     qd_iterator_t *name, qd_iterator_t *identity, qd_composed_field_t *body,
+                     uint64_t in_conn);
 
 
 /**
@@ -768,10 +774,12 @@ void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t ty
  * @param identity The identity supplied with the request (or 0 if the name was supplied)
  * @param in_body The body of the request message
  * @param out_body A composed field for the body of the response message
+ * @param in_conn The identity of the connection over which the mgmt message arrived (0 if config file)
  */
 void qdr_manage_update(qdr_core_t *core, void *context, qd_router_entity_type_t type,
                        qd_iterator_t *name, qd_iterator_t *identity,
-                       qd_parsed_field_t *in_body, qd_composed_field_t *out_body);
+                       qd_parsed_field_t *in_body, qd_composed_field_t *out_body,
+                       uint64_t in_conn);
 
 /**
  * Sequence for running a query:
@@ -789,7 +797,8 @@ void qdr_manage_update(qdr_core_t *core, void *context, qd_router_entity_type_t
  */
 
 qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_type_t type,
-                              qd_parsed_field_t *attribute_names, qd_composed_field_t *body);
+                              qd_parsed_field_t *attribute_names, qd_composed_field_t *body,
+                              uint64_t in_conn);
 void qdr_query_add_attribute_names(qdr_query_t *query);
 void qdr_query_get_first(qdr_query_t *query, int offset);
 void qdr_query_get_next(qdr_query_t *query);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 5257576..1f111bf 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1925,6 +1925,31 @@
                 "num1": {"type": "integer", "create": true, "update": true},
                 "num2": {"type": "integer", "create": true, "update": true}
             }
+        },
+
+        "router.connection.linkRoute": {
+            "description": "[EXPERIMENTAL] A linkRoute that is scoped to the connection that created it. Connection linkRoutes only exist within the context of a connection and when the connection is closed, they vanish. This differs from configured linkRoutes (router.config.linkRoute) which remain configured should their associated connection restart. Connection linkRoutes may be used by a client to create link-routed message flows that are automatically removed when the client disconnects.  Note well that connection.linkRoute cannot be declared in a configuration file - they must be created at run time via management operations over the connection which they are to be used. Connection linkRoutes are only visible to management access that is via the containing connection.",
+            "extends": "configurationEntity",
+            "operations": ["CREATE", "DELETE"],
+            "attributes" : {
+                "pattern": {
+                    "description": "A wildcarded pattern for address matching. Incoming addresses are matched against this pattern. Matching addresses use the configured settings. The pattern consists of one or more tokens separated by a forward slash '/'. A token can be one of the following: a * character, a # character, or a sequence of characters that do not include /, *, or #.  The * token matches any single token.  The # token matches zero or more tokens. * has higher precedence than #, and exact match has the highest precedence.",
+                    "type": "string",
+                    "create": true,
+                    "required": true
+                },
+                "direction": {
+                    "description": "The permitted direction of links: 'in' means client senders; 'out' means client receivers",
+                    "type": ["in", "out"],
+                    "create": true,
+                    "required": true
+                },
+                "containerId": {
+                    "type": "string",
+                    "description": "Name of the container which has instantiated this linkRoute.",
+                    "create": false
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 3159db9..b8eb36f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -85,6 +85,7 @@ set(qpid_dispatch_SOURCES
   router_core/agent_config_link_route.c
   router_core/agent_link.c
   router_core/agent_router.c
+  router_core/agent_conn_link_route.c
   router_core/connections.c
   router_core/core_events.c
   router_core/core_link_endpoint.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/python_embedded.c
----------------------------------------------------------------------
diff --git a/src/python_embedded.c b/src/python_embedded.c
index e4b4073..243ab0a 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -512,7 +512,8 @@ static qd_error_t iter_to_py_attr(qd_iterator_t *iter,
     return qd_error_code();
 }
 
-static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id, int inter_router_cost)
+static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id, int inter_router_cost,
+                             uint64_t ignore)
 {
     IoAdapter *self = (IoAdapter*) context;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_config.c
----------------------------------------------------------------------
diff --git a/src/router_config.c b/src/router_config.c
index 837dde5..87df3ed 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -42,7 +42,7 @@ static void qdi_router_configure_body(qdr_core_t              *core,
     if (name)
         name_iter = qd_iterator_string(name, ITER_VIEW_ALL);
 
-    qdr_manage_create(core, 0, type, name_iter, in_body, 0, buffers);
+    qdr_manage_create(core, 0, type, name_iter, in_body, 0, buffers, 0);
 
     qd_iterator_free(name_iter);
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 76cdcb3..71b39f5 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -28,6 +28,8 @@
 #include "exchange_bindings.h"
 #include <stdio.h>
 #include "agent_router.h"
+#include "agent_conn_link_route.h"
+
 
 static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -80,7 +82,8 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,
                        qd_router_entity_type_t  type,
-                       qd_composed_field_t     *body)
+                       qd_composed_field_t     *body,
+                       uint64_t                 in_conn)
 {
     qdr_query_t *query = new_qdr_query_t();
 
@@ -90,6 +93,7 @@ qdr_query_t *qdr_query(qdr_core_t              *core,
     query->context     = context;
     query->body        = body;
     query->more        = false;
+    query->in_conn     = in_conn;
 
     return query;
 }
@@ -109,12 +113,13 @@ void qdr_manage_create(qdr_core_t              *core,
                        qd_iterator_t           *name,
                        qd_parsed_field_t       *in_body,
                        qd_composed_field_t     *out_body,
-                       qd_buffer_list_t         body_buffers)
+                       qd_buffer_list_t         body_buffers,
+                       uint64_t                 in_conn_id)
 {
     qdr_action_t *action = qdr_action(qdr_manage_create_CT, "manage_create");
 
     // Create a query object here
-    action->args.agent.query        = qdr_query(core, context, type, out_body);
+    action->args.agent.query        = qdr_query(core, context, type, out_body, in_conn_id);
     action->args.agent.name         = qdr_field_from_iter(name);
     action->args.agent.in_body      = in_body;
     action->args.agent.body_buffers = body_buffers;
@@ -127,12 +132,13 @@ void qdr_manage_delete(qdr_core_t *core,
                        void  *context,
                        qd_router_entity_type_t  type,
                        qd_iterator_t           *name,
-                       qd_iterator_t           *identity)
+                       qd_iterator_t           *identity,
+                       uint64_t                 in_conn_id)
 {
     qdr_action_t *action = qdr_action(qdr_manage_delete_CT, "manage_delete");
 
     // Create a query object here
-    action->args.agent.query = qdr_query(core, context, type, 0);
+    action->args.agent.query = qdr_query(core, context, type, 0, in_conn_id);
     action->args.agent.name = qdr_field_from_iter(name);
     action->args.agent.identity = qdr_field_from_iter(identity);
 
@@ -145,12 +151,13 @@ void qdr_manage_read(qdr_core_t              *core,
                      qd_router_entity_type_t  entity_type,
                      qd_iterator_t           *name,
                      qd_iterator_t           *identity,
-                     qd_composed_field_t     *body)
+                     qd_composed_field_t     *body,
+                     uint64_t                 in_conn_id)
 {
     qdr_action_t *action = qdr_action(qdr_manage_read_CT, "manage_read");
 
     // Create a query object here
-    action->args.agent.query = qdr_query(core, context, entity_type, body);
+    action->args.agent.query = qdr_query(core, context, entity_type, body, in_conn_id);
     action->args.agent.identity  = qdr_field_from_iter(identity);
     action->args.agent.name = qdr_field_from_iter(name);
 
@@ -164,10 +171,11 @@ void qdr_manage_update(qdr_core_t              *core,
                        qd_iterator_t           *name,
                        qd_iterator_t           *identity,
                        qd_parsed_field_t       *in_body,
-                       qd_composed_field_t     *out_body)
+                       qd_composed_field_t     *out_body,
+                       uint64_t                 in_conn_id)
 {
     qdr_action_t *action = qdr_action(qdr_manage_update_CT, "manage_update");
-    action->args.agent.query = qdr_query(core, context, type, out_body);
+    action->args.agent.query = qdr_query(core, context, type, out_body, in_conn_id);
     action->args.agent.name = qdr_field_from_iter(name);
     action->args.agent.identity = qdr_field_from_iter(identity);
     action->args.agent.in_body = in_body;
@@ -180,10 +188,11 @@ qdr_query_t *qdr_manage_query(qdr_core_t              *core,
                               void                    *context,
                               qd_router_entity_type_t  type,
                               qd_parsed_field_t       *attribute_names,
-                              qd_composed_field_t     *body)
+                              qd_composed_field_t     *body,
+                              uint64_t                 in_conn_id)
 {
 
-    qdr_query_t* query = qdr_query(core, context, type, body);
+    qdr_query_t* query = qdr_query(core, context, type, body, in_conn_id);
 
     switch (query->entity_type) {
     case QD_ROUTER_CONFIG_ADDRESS:    qdr_agent_set_columns(query, attribute_names, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT);  break;
@@ -196,6 +205,8 @@ qdr_query_t *qdr_manage_query(qdr_core_t              *core,
     case QD_ROUTER_FORBIDDEN:         break;
     case QD_ROUTER_EXCHANGE:          qdr_agent_set_columns(query, attribute_names, qdr_config_exchange_columns, QDR_CONFIG_EXCHANGE_COLUMN_COUNT); break;
     case QD_ROUTER_BINDING:           qdr_agent_set_columns(query, attribute_names, qdr_config_binding_columns, QDR_CONFIG_BINDING_COLUMN_COUNT); break;
+    case QD_ROUTER_CONN_LINK_ROUTE:   qdr_agent_set_columns(query, attribute_names, qdr_conn_link_route_columns,
+                                                            QDR_CONN_LINK_ROUTE_COLUMN_COUNT); break;
     }
 
     return query;
@@ -215,6 +226,7 @@ void qdr_query_add_attribute_names(qdr_query_t *query)
     case QD_ROUTER_FORBIDDEN:         qd_compose_empty_list(query->body); break;
     case QD_ROUTER_EXCHANGE:          qdr_agent_emit_columns(query, qdr_config_exchange_columns, QDR_CONFIG_EXCHANGE_COLUMN_COUNT); break;
     case QD_ROUTER_BINDING:           qdr_agent_emit_columns(query, qdr_config_binding_columns, QDR_CONFIG_BINDING_COLUMN_COUNT); break;
+    case QD_ROUTER_CONN_LINK_ROUTE:   qdr_agent_emit_columns(query, qdr_conn_link_route_columns, QDR_CONN_LINK_ROUTE_COLUMN_COUNT); break;
     }
 }
 
@@ -356,6 +368,7 @@ static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool disc
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
     case QD_ROUTER_EXCHANGE:          qdra_config_exchange_get_CT(core, name, identity, query, qdr_config_exchange_columns); break;
     case QD_ROUTER_BINDING:           qdra_config_binding_get_CT(core, name, identity, query, qdr_config_binding_columns); break;
+    case QD_ROUTER_CONN_LINK_ROUTE:   qdra_conn_link_route_get_CT(core, name, identity, query, qdr_conn_link_route_columns); break;
    }
 
     qdr_field_free(action->args.agent.name);
@@ -381,6 +394,7 @@ static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool di
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
     case QD_ROUTER_EXCHANGE:          qdra_config_exchange_create_CT(core, name, query, in_body); break;
     case QD_ROUTER_BINDING:           qdra_config_binding_create_CT(core, name, query, in_body); break;
+    case QD_ROUTER_CONN_LINK_ROUTE:   qdra_conn_link_route_create_CT(core, name, query, in_body); break;
 
    }
 
@@ -407,6 +421,7 @@ static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool di
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
     case QD_ROUTER_EXCHANGE:          qdra_config_exchange_delete_CT(core, query, name, identity); break;
     case QD_ROUTER_BINDING:           qdra_config_binding_delete_CT(core, query, name, identity); break;
+    case QD_ROUTER_CONN_LINK_ROUTE:   qdra_conn_link_route_delete_CT(core, query, name, identity); break;
    }
 
    qdr_field_free(action->args.agent.name);
@@ -431,6 +446,7 @@ static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool di
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
     case QD_ROUTER_EXCHANGE:          break;
     case QD_ROUTER_BINDING:           break;
+    case QD_ROUTER_CONN_LINK_ROUTE:   break;
    }
 
    qdr_field_free(action->args.agent.name);
@@ -458,6 +474,7 @@ static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool
         case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, true); break;
         case QD_ROUTER_EXCHANGE:          qdra_config_exchange_get_first_CT(core, query, offset); break;
         case QD_ROUTER_BINDING:           qdra_config_binding_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_CONN_LINK_ROUTE:   qdra_conn_link_route_get_first_CT(core, query, offset); break;
         }
     }
 }
@@ -479,6 +496,7 @@ static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool
         case QD_ROUTER_FORBIDDEN:         break;
         case QD_ROUTER_EXCHANGE:          qdra_config_exchange_get_next_CT(core, query); break;
         case QD_ROUTER_BINDING:           qdra_config_binding_get_next_CT(core, query); break;
+        case QD_ROUTER_CONN_LINK_ROUTE:   qdra_conn_link_route_get_next_CT(core, query); break;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/agent_config_link_route.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_link_route.c b/src/router_core/agent_config_link_route.c
index 2289f9f..c50131e 100644
--- a/src/router_core/agent_config_link_route.c
+++ b/src/router_core/agent_config_link_route.c
@@ -291,7 +291,7 @@ static const char *qdra_link_route_treatment_CT(qd_parsed_field_t *field, qd_add
 }
 
 
-static const char *qdra_link_route_direction_CT(qd_parsed_field_t *field, qd_direction_t *dir)
+const char *qdra_link_route_direction_CT(qd_parsed_field_t *field, qd_direction_t *dir)
 {
     if (field) {
         qd_iterator_t *iter = qd_parse_raw(field);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/agent_config_link_route.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_link_route.h b/src/router_core/agent_config_link_route.h
index 465fd22..b3b6a61 100644
--- a/src/router_core/agent_config_link_route.h
+++ b/src/router_core/agent_config_link_route.h
@@ -32,6 +32,7 @@ void qdra_config_link_route_get_CT(qdr_core_t    *core,
                                    qd_iterator_t *identity,
                                    qdr_query_t   *query,
                                    const char    *qdr_config_link_route_columns[]);
+const char *qdra_link_route_direction_CT(qd_parsed_field_t *field, qd_direction_t *dir);
 
 #define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 13
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/agent_conn_link_route.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_conn_link_route.c b/src/router_core/agent_conn_link_route.c
new file mode 100644
index 0000000..686bd07
--- /dev/null
+++ b/src/router_core/agent_conn_link_route.c
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "agent_conn_link_route.h"
+#include "agent_config_address.h"
+#include "agent_config_link_route.h"
+#include "route_control.h"
+
+#include <stdio.h>
+#include <inttypes.h>
+
+const char *qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_COLUMN_COUNT + 1] =
+    {"name",
+     "identity",
+     "type",
+     "pattern",
+     "direction",
+     "containerId",
+     0};
+
+const char *CONN_LINK_ROUTE_TYPE = "org.apache.qpid.dispatch.router.connection.linkRoute";
+
+
+static void _insert_column_CT(qdr_link_route_t *lr, int col, qd_composed_field_t *body, bool as_map)
+{
+    if (as_map)
+        qd_compose_insert_string(body, qdr_conn_link_route_columns[col]);
+
+    switch(col) {
+    case QDR_CONN_LINK_ROUTE_NAME:
+        if (lr->name)
+            qd_compose_insert_string(body, lr->name);
+        else
+            qd_compose_insert_null(body);
+        break;
+
+    case QDR_CONN_LINK_ROUTE_IDENTITY: {
+        char id_str[100];
+        snprintf(id_str, 100, "%"PRId64, lr->identity);
+        qd_compose_insert_string(body, id_str);
+        break;
+    }
+
+    case QDR_CONN_LINK_ROUTE_TYPE:
+        qd_compose_insert_string(body, CONN_LINK_ROUTE_TYPE);
+        break;
+
+    case QDR_CONN_LINK_ROUTE_PATTERN:
+        qd_compose_insert_string(body, lr->pattern);
+        break;
+    case QDR_CONN_LINK_ROUTE_DIRECTION:
+        qd_compose_insert_string(body,
+                                 lr->dir == QD_INCOMING
+                                 ? "in"
+                                 : "out");
+        break;
+    case QDR_CONN_LINK_ROUTE_CONTAINER_ID:
+        if (lr->parent_conn && lr->parent_conn->connection_info) {
+            if (lr->parent_conn->connection_info->container) {
+                qd_compose_insert_string(body,
+                                         lr->parent_conn->connection_info->container);
+                break;
+            }
+        }
+        qd_compose_insert_null(body);
+        break;
+    }
+}
+
+
+static void _write_as_list_CT(qdr_query_t *query, qdr_link_route_t *lr)
+{
+    qd_composed_field_t *body = query->body;
+
+    qd_compose_start_list(body);
+    int i = 0;
+    while (query->columns[i] >= 0) {
+        _insert_column_CT(lr, query->columns[i], body, false);
+        i++;
+    }
+    qd_compose_end_list(body);
+}
+
+
+static void _write_as_map_CT(qdr_query_t *query, qdr_link_route_t *lr)
+{
+    qd_composed_field_t *body = query->body;
+    qd_compose_start_map(body);
+    for (int col = 0; col < QDR_CONN_LINK_ROUTE_COLUMN_COUNT; col++)
+        _insert_column_CT(lr, col, body, true);
+    qd_compose_end_map(body);
+}
+
+
+
+// get conn via identity
+static qdr_connection_t *_find_conn_CT(qdr_core_t *core, uint64_t conn_id)
+{
+    qdr_connection_t *conn = DEQ_HEAD(core->open_connections);
+    while (conn) {
+        if (conn->identity == conn_id)
+            break;
+        conn = DEQ_NEXT(conn);
+    }
+    return conn;
+}
+
+
+// get the link route by either name or id
+static qdr_link_route_t *_find_link_route_CT(qdr_connection_t *conn,
+                                             qd_iterator_t *name, qd_iterator_t *identity)
+{
+    qdr_link_route_t *lr = NULL;
+
+    // if both id and name provided, prefer id
+    //
+    if (identity) {
+        qd_parsed_field_t *identity_field = qd_parse(identity);
+        uint64_t id = qd_parse_as_ulong(identity_field);
+        if (qd_parse_ok(identity_field)) {
+            lr = DEQ_HEAD(conn->conn_link_routes);
+            while (lr) {
+                if (id == lr->identity)
+                    break;
+                lr = DEQ_NEXT(lr);
+            }
+        }
+        qd_parse_free(identity_field);
+    } else if (name) {
+        lr = DEQ_HEAD(conn->conn_link_routes);
+        while (lr) {
+            if (qd_iterator_equal(name, (unsigned char *)lr->name))
+                break;
+            lr = DEQ_NEXT(lr);
+        }
+    }
+
+    return lr;
+}
+
+
+void qdra_conn_link_route_create_CT(qdr_core_t         *core,
+                                        qd_iterator_t      *name,
+                                        qdr_query_t        *query,
+                                        qd_parsed_field_t  *in_body)
+{
+    char *pattern = NULL;
+
+    query->status = QD_AMQP_BAD_REQUEST;
+
+    // fail if creating via a configuration file
+    if (query->in_conn == 0) {
+        query->status.description = "Can only create via management CREATE";
+        goto exit;
+    }
+
+    // find the associated connection
+    qdr_connection_t *conn = _find_conn_CT(core, query->in_conn);
+    if (!conn) {
+        query->status.description = "Parent connection no longer exists";
+        goto exit;
+    }
+
+    if (!qd_parse_is_map(in_body)) {
+        query->status.description = "Body of request must be a map";
+        goto exit;
+    }
+
+    //
+    // Extract the fields from the request
+    //
+    qd_parsed_field_t *pattern_field    = qd_parse_value_by_key(in_body, qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_PATTERN]);
+    qd_parsed_field_t *dir_field        = qd_parse_value_by_key(in_body, qdr_conn_link_route_columns[QDR_CONN_LINK_ROUTE_DIRECTION]);
+
+    if (!pattern_field) {
+        query->status.description = "Pattern field is required";
+        goto exit;
+    }
+
+    const char *error = NULL;
+    pattern = qdra_config_address_validate_pattern_CT(pattern_field, false, &error);
+    if (!pattern) {
+        query->status.description = error;
+        goto exit;
+    }
+
+    qd_direction_t dir;
+    error = qdra_link_route_direction_CT(dir_field, &dir);
+    if (error) {
+        query->status.description = error;
+        goto exit;
+    }
+
+    qdr_link_route_t *lr = qdr_route_add_conn_route_CT(core, conn, name, pattern, dir);
+    if (!lr) {
+        query->status.description = "creation failed";
+        goto exit;
+    }
+
+    query->status = QD_AMQP_CREATED;
+    _write_as_map_CT(query, lr);
+
+exit:
+    free(pattern);
+    if (query->status.status != QD_AMQP_CREATED.status) {
+        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s",
+               CONN_LINK_ROUTE_TYPE, query->status.description);
+        qd_compose_insert_null(query->body);  // no body map
+    }
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+void qdra_conn_link_route_delete_CT(qdr_core_t    *core,
+                                        qdr_query_t   *query,
+                                        qd_iterator_t *name,
+                                        qd_iterator_t *identity)
+{
+    query->status = QD_AMQP_BAD_REQUEST;
+
+    if (!name && !identity) {
+        query->status.description = "No name or identity provided";
+        goto exit;
+    }
+
+    // find the associated connection, if it is not present then the entity has
+    // automatically been deleted (not an error)
+    //
+    qdr_connection_t *conn = _find_conn_CT(core, query->in_conn);
+    if (!conn) {
+        query->status = QD_AMQP_NO_CONTENT;
+        goto exit;
+    }
+
+    // find the targetted link route
+    qdr_link_route_t *lr = _find_link_route_CT(conn, name, identity);
+    if (!lr) {
+        query->status = QD_AMQP_NOT_FOUND;
+        goto exit;
+    }
+
+    qdr_route_del_conn_route_CT(core, lr);
+    query->status = QD_AMQP_NO_CONTENT;
+
+exit:
+    if (query->status.status != QD_AMQP_NO_CONTENT.status) {
+        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of %s: %s",
+               CONN_LINK_ROUTE_TYPE, query->status.description);
+    }
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+void qdra_conn_link_route_get_CT(qdr_core_t    *core,
+                                     qd_iterator_t *name,
+                                     qd_iterator_t *identity,
+                                     qdr_query_t   *query,
+                                     const char    *columns[])
+{
+    query->status = QD_AMQP_BAD_REQUEST;
+
+    if (!name && !identity) {
+        query->status.description = "No name or identity provided";
+        goto exit;
+    }
+
+    qdr_connection_t *conn = _find_conn_CT(core, query->in_conn);
+    qdr_link_route_t *lr = (conn) ? _find_link_route_CT(conn, name, identity) : NULL;
+
+    if (!lr) {
+        // Send back a 404
+        query->status = QD_AMQP_NOT_FOUND;
+        goto exit;
+    }
+
+    //
+    // Write the columns of the linkRoute entity into the response body.
+    //
+    query->status = QD_AMQP_OK;
+    _write_as_map_CT(query, lr);
+
+exit:
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+void qdra_conn_link_route_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+    query->status = QD_AMQP_OK;
+
+    qdr_connection_t *conn = _find_conn_CT(core, query->in_conn);
+    if (!conn || offset >= DEQ_SIZE(conn->conn_link_routes)) {
+        query->more = false;
+    } else {
+        // Find the lr at the offset.
+        //
+        qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes);
+        for (int i = 0; i < offset && lr; i++)
+            lr = DEQ_NEXT(lr);
+        assert(lr);
+        // write the lr into the response and advance to next
+        _write_as_list_CT(query, lr);
+        query->next_offset = offset + 1;
+        query->more = DEQ_NEXT(lr) != NULL;
+    }
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+void qdra_conn_link_route_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+    qdr_connection_t *conn = _find_conn_CT(core, query->in_conn);
+    if (!conn || query->next_offset >= DEQ_SIZE(conn->conn_link_routes)) {
+        query->more = false;
+    } else {
+        // find the lr at the offset
+        //
+        qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes);
+        for (int i = 0; i < query->next_offset && lr; i++)
+            lr = DEQ_NEXT(lr);
+
+        if (lr) {
+            // write response and advance to next
+            _write_as_list_CT(query, lr);
+            ++query->next_offset;
+            query->more = DEQ_NEXT(lr) != NULL;
+        } else
+            query->more = false;
+    }
+    qdr_agent_enqueue_response_CT(core, query);
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/agent_conn_link_route.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_conn_link_route.h b/src/router_core/agent_conn_link_route.h
new file mode 100644
index 0000000..d78e002
--- /dev/null
+++ b/src/router_core/agent_conn_link_route.h
@@ -0,0 +1,53 @@
+#ifndef qdr_agent_conn_link_route_h
+#define qdr_agent_conn_link_route_h 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+#define QDR_CONN_LINK_ROUTE_NAME         0
+#define QDR_CONN_LINK_ROUTE_IDENTITY     1
+#define QDR_CONN_LINK_ROUTE_TYPE         2
+#define QDR_CONN_LINK_ROUTE_PATTERN      3
+#define QDR_CONN_LINK_ROUTE_DIRECTION    4
+#define QDR_CONN_LINK_ROUTE_CONTAINER_ID 5
+#define QDR_CONN_LINK_ROUTE_COLUMN_COUNT 6
+
+
+extern const char *qdr_conn_link_route_columns[];
+extern const char *CONN_LINK_ROUTE_TYPE;
+
+
+void qdra_conn_link_route_create_CT(qdr_core_t         *core,
+                                    qd_iterator_t      *name,
+                                    qdr_query_t        *query,
+                                    qd_parsed_field_t  *in_body);
+void qdra_conn_link_route_delete_CT(qdr_core_t    *core,
+                                    qdr_query_t   *query,
+                                    qd_iterator_t *name,
+                                    qd_iterator_t *identity);
+void qdra_conn_link_route_get_CT(qdr_core_t    *core,
+                                 qd_iterator_t *name,
+                                 qd_iterator_t *identity,
+                                 qdr_query_t   *query,
+                                 const char    *columns[]);
+void qdra_conn_link_route_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset);
+void qdra_conn_link_route_get_next_CT(qdr_core_t *core, qdr_query_t *query);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index ca3f5df..56d998e 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -257,7 +257,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
 
 void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work)
 {
-    work->on_message(work->on_message_context, work->msg, work->maskbit, work->inter_router_cost);
+    work->on_message(work->on_message_context, work->msg, work->maskbit, work->inter_router_cost, work->in_conn_id);
     qd_message_free(work->msg);
 }
 
@@ -270,6 +270,7 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li
     work->msg                = qd_message_copy(msg);
     work->maskbit            = link ? link->conn->mask_bit : 0;
     work->inter_router_cost  = link ? link->conn->inter_router_cost : 1;
+    work->in_conn_id         = link ? link->conn->identity : 0;
     qdr_post_general_work_CT(core, work);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index 82e56d0..8dbaf98 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -51,6 +51,7 @@ const unsigned char *router_entity_type          = (unsigned char*) "org.apache.
 const unsigned char *connection_entity_type      = (unsigned char*) "org.apache.qpid.dispatch.connection";
 const unsigned char *config_exchange_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.exchange";
 const unsigned char *config_binding_entity_type  = (unsigned char*) "org.apache.qpid.dispatch.router.config.binding";
+const unsigned char *conn_link_route_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.connection.linkRoute";
 
 const char * const status_description = "statusDescription";
 const char * const correlation_id = "correlation-id";
@@ -241,7 +242,8 @@ static void qd_core_agent_query_handler(qdr_core_t                 *core,
                                         qd_router_operation_type_t  operation_type,
                                         qd_message_t               *msg,
                                         int                        *count,
-                                        int                        *offset)
+                                        int                        *offset,
+                                        uint64_t                    in_conn)
 {
     //
     // Add the Body.
@@ -269,7 +271,7 @@ static void qd_core_agent_query_handler(qdr_core_t                 *core,
 
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
-    ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field);
+    ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field, in_conn);
 
     //Add the attribute names
     qdr_query_add_attribute_names(ctx->query); //this adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",]
@@ -288,7 +290,8 @@ static void qd_core_agent_read_handler(qdr_core_t                 *core,
                                        qd_router_entity_type_t     entity_type,
                                        qd_router_operation_type_t  operation_type,
                                        qd_iterator_t              *identity_iter,
-                                       qd_iterator_t              *name_iter)
+                                       qd_iterator_t              *name_iter,
+                                       uint64_t                    in_conn)
 {
     //
     // Add the Body
@@ -302,7 +305,7 @@ static void qd_core_agent_read_handler(qdr_core_t                 *core,
     qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
 
     //Call the read API function
-    qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body);
+    qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body, in_conn);
 }
 
 
@@ -310,7 +313,8 @@ static void qd_core_agent_create_handler(qdr_core_t                 *core,
                                          qd_message_t               *msg,
                                          qd_router_entity_type_t     entity_type,
                                          qd_router_operation_type_t  operation_type,
-                                         qd_iterator_t              *name_iter)
+                                         qd_iterator_t              *name_iter,
+                                         uint64_t                    in_conn_id)
 {
     //
     // Add the Body
@@ -329,7 +333,7 @@ static void qd_core_agent_create_handler(qdr_core_t                 *core,
 
     qd_buffer_list_t empty_list;
     DEQ_INIT(empty_list);
-    qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body, empty_list);
+    qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body, empty_list, in_conn_id);
     qd_iterator_free(body_iter);
 }
 
@@ -339,7 +343,8 @@ static void qd_core_agent_update_handler(qdr_core_t                 *core,
                                          qd_router_entity_type_t     entity_type,
                                          qd_router_operation_type_t  operation_type,
                                          qd_iterator_t              *identity_iter,
-                                         qd_iterator_t              *name_iter)
+                                         qd_iterator_t              *name_iter,
+                                         uint64_t                    in_conn)
 {
     qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
 
@@ -352,7 +357,7 @@ static void qd_core_agent_update_handler(qdr_core_t                 *core,
     qd_parsed_field_t *in_body= qd_parse(iter);
     qd_iterator_free(iter);
 
-    qdr_manage_update(core, ctx, entity_type, name_iter, identity_iter, in_body, out_body);
+    qdr_manage_update(core, ctx, entity_type, name_iter, identity_iter, in_body, out_body, in_conn);
 
 }
 
@@ -362,7 +367,8 @@ static void qd_core_agent_delete_handler(qdr_core_t                 *core,
                                          qd_router_entity_type_t     entity_type,
                                          qd_router_operation_type_t  operation_type,
                                          qd_iterator_t              *identity_iter,
-                                         qd_iterator_t              *name_iter)
+                                         qd_iterator_t              *name_iter,
+                                         uint64_t                    in_conn)
 {
     //
     // Add the Body
@@ -375,7 +381,7 @@ static void qd_core_agent_delete_handler(qdr_core_t                 *core,
     // Call local function that creates and returns a qd_management_context_t containing the values passed in.
     qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
 
-    qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter);
+    qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter, in_conn);
 }
 
 
@@ -439,6 +445,8 @@ static bool qd_can_handle_request(qd_parsed_field_t           *properties_fld,
         *entity_type = QD_ROUTER_EXCHANGE;
     else if (qd_iterator_equal(qd_parse_raw(parsed_field), config_binding_entity_type))
         *entity_type = QD_ROUTER_BINDING;
+    else if (qd_iterator_equal(qd_parse_raw(parsed_field), conn_link_route_entity_type))
+        *entity_type = QD_ROUTER_CONN_LINK_ROUTE;
     else
         return false;
 
@@ -484,7 +492,8 @@ static bool qd_can_handle_request(qd_parsed_field_t           *properties_fld,
  * Handler for the management agent.
  *
  */
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost)
+void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost,
+                                     uint64_t in_conn_id)
 {
     qdr_core_t *core = (qdr_core_t*) context;
     qd_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
@@ -503,19 +512,19 @@ void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unuse
     if (qd_can_handle_request(properties_fld, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) {
         switch (operation_type) {
         case QD_ROUTER_OPERATION_QUERY:
-            qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset);
+            qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset, in_conn_id);
             break;
         case QD_ROUTER_OPERATION_CREATE:
-            qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter);
+            qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter, in_conn_id);
             break;
         case QD_ROUTER_OPERATION_READ:
-            qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
+            qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter, in_conn_id);
             break;
         case QD_ROUTER_OPERATION_UPDATE:
-            qd_core_agent_update_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
+            qd_core_agent_update_handler(core, msg, entity_type, operation_type, identity_iter, name_iter, in_conn_id);
             break;
         case QD_ROUTER_OPERATION_DELETE:
-            qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter);
+            qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter, in_conn_id);
             break;
         }
     } else {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 98ccf61..d0bdb4b 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -100,6 +100,8 @@ static void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identi
     if (DEQ_IS_EMPTY(cid->connection_refs) && DEQ_IS_EMPTY(cid->link_route_refs) && DEQ_IS_EMPTY(cid->auto_link_refs)) {
         qd_hash_remove_by_handle(core->conn_id_hash, cid->connection_hash_handle);
         qd_hash_remove_by_handle(core->conn_id_hash, cid->container_hash_handle);
+        qd_hash_handle_free(cid->connection_hash_handle);
+        qd_hash_handle_free(cid->container_hash_handle);
         free_qdr_conn_identifier_t(cid);
     }
 }
@@ -107,9 +109,21 @@ static void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identi
 
 static void qdr_route_log_CT(qdr_core_t *core, const char *text, const char *name, uint64_t id, qdr_connection_t *conn)
 {
-    const char *key = (const char*) qd_hash_key_by_handle(conn->conn_id->connection_hash_handle);
-    if (!key)
-        key = (const char*) qd_hash_key_by_handle(conn->conn_id->container_hash_handle);
+    const char *key = NULL;
+    const char *type = "<unknown>";
+    if (conn->conn_id) {
+        key = (const char*) qd_hash_key_by_handle(conn->conn_id->connection_hash_handle);
+        if (!key) {
+            key = (const char*) qd_hash_key_by_handle(conn->conn_id->container_hash_handle);
+        }
+        if (key)
+            type = (*key++ == 'L') ? "connection" : "container";
+    }
+    if (!key && conn->connection_info) {
+        type = "container";
+        key = conn->connection_info->container;
+    }
+
     char  id_string[64];
     const char *log_name = name ? name : id_string;
 
@@ -117,7 +131,7 @@ static void qdr_route_log_CT(qdr_core_t *core, const char *text, const char *nam
         snprintf(id_string, 64, "%"PRId64, id);
 
     qd_log(core->log, QD_LOG_INFO, "%s '%s' on %s %s",
-           text, log_name, key[0] == 'L' ? "connection" : "container", &key[1]);
+           text, log_name, type, key ? key : "<unknown>");
 }
 
 
@@ -281,6 +295,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, q
 }
 
 
+// router.config.linkRoute
 qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t             *core,
                                               qd_iterator_t          *name,
                                               const char             *addr_pattern,
@@ -407,6 +422,7 @@ void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link)
 }
 
 
+// router.config.linkRoute
 void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr)
 {
     //
@@ -435,6 +451,7 @@ void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr)
     //
     // Remove the link route from the core list.
     //
+    DEQ_REMOVE(core->link_routes, lr);
     qd_log(core->log, QD_LOG_TRACE, "Link route %spattern removed: pattern=%s name=%s",
            lr->is_prefix ? "prefix " : "", lr->pattern, lr->name);
     qdr_core_delete_link_route(core, lr);
@@ -580,6 +597,16 @@ void qdr_route_connection_opened_CT(qdr_core_t       *core,
 
 void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn)
 {
+    //
+    // release any connection-based link routes.  These can exist on
+    // QDR_ROLE_NORMAL connections.
+    //
+    while (DEQ_HEAD(conn->conn_link_routes)) {
+        qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes);
+        // removes the link route from conn->link_routes
+        qdr_route_del_conn_route_CT(core, lr);
+    }
+
     if (conn->role != QDR_ROLE_ROUTE_CONTAINER)
         return;
 
@@ -662,3 +689,81 @@ void qdr_link_route_unmap_pattern_CT(qdr_core_t *core, qd_iterator_t *address)
     qd_iterator_free(iter);
     free(pattern);
 }
+
+
+qdr_link_route_t *qdr_route_add_conn_route_CT(qdr_core_t             *core,
+                                              qdr_connection_t       *conn,
+                                              qd_iterator_t          *name,
+                                              const char             *addr_pattern,
+                                              qd_direction_t          dir)
+{
+    //
+    // Set up the link_route structure
+    //
+    qdr_link_route_t *lr = new_qdr_link_route_t();
+    ZERO(lr);
+    lr->identity  = qdr_identifier(core);
+    lr->name      = name ? (char*) qd_iterator_copy(name) : 0;
+    lr->dir       = dir;
+    lr->treatment = QD_TREATMENT_LINK_BALANCED;
+    lr->is_prefix = false;
+    lr->pattern   = strdup(addr_pattern);
+    lr->parent_conn = conn;
+
+        //
+    // Add the address to the routing hash table and map it as a pattern in the
+    // wildcard pattern parse tree
+    //
+    {
+        char *addr_hash = qdr_link_route_pattern_to_address(lr->pattern, dir);
+        qd_iterator_t *a_iter = qd_iterator_string(addr_hash, ITER_VIEW_ALL);
+        qd_hash_retrieve(core->addr_hash, a_iter, (void*) &lr->addr);
+        if (!lr->addr) {
+            lr->addr = qdr_address_CT(core, lr->treatment);
+            DEQ_INSERT_TAIL(core->addrs, lr->addr);
+            qd_hash_insert(core->addr_hash, a_iter, lr->addr, &lr->addr->hash_handle);
+            qdr_link_route_map_pattern_CT(core, a_iter, lr->addr);
+        }
+
+        qd_iterator_free(a_iter);
+        free(addr_hash);
+    }
+    lr->addr->ref_count++;
+
+    //
+    // Add the link route to the parent connection's link route list
+    // and fire it up
+    //
+    DEQ_INSERT_TAIL(conn->conn_link_routes, lr);
+    qdr_link_route_activate_CT(core, lr, lr->parent_conn);
+
+    qd_log(core->log, QD_LOG_TRACE,
+           "Connection based link route pattern added: conn=%s pattern=%s name=%s",
+           conn->connection_info->container, lr->pattern, lr->name);
+    return lr;
+}
+
+
+void qdr_route_del_conn_route_CT(qdr_core_t       *core,
+                                 qdr_link_route_t *lr)
+{
+    qdr_connection_t *conn = lr->parent_conn;
+    qdr_link_route_deactivate_CT(core, lr, conn);
+
+    //
+    // Disassociate the link route from its address.  Check to see if the address
+    // (and its associated pattern) should be removed.
+    //
+    qdr_address_t *addr = lr->addr;
+    if (addr && --addr->ref_count == 0)
+        qdr_check_addr_CT(core, addr);
+
+    //
+    // Remove the link route from the parent's link route list
+    //
+    DEQ_REMOVE(conn->conn_link_routes, lr);
+    qd_log(core->log, QD_LOG_TRACE,
+           "Connection based link route pattern removed: conn=%s pattern=%s name=%s",
+           conn->connection_info->container, lr->pattern, lr->name);
+    qdr_core_delete_link_route(core, lr);
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/route_control.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h
index a4845aa..7d893a3 100644
--- a/src/router_core/route_control.h
+++ b/src/router_core/route_control.h
@@ -72,4 +72,13 @@ void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link);
  */
 void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link);
 
+// Connection scoped link routes:
+qdr_link_route_t *qdr_route_add_conn_route_CT(qdr_core_t       *core,
+                                              qdr_connection_t *conn,
+                                              qd_iterator_t    *name,
+                                              const char       *addr_pattern,
+                                              qd_direction_t    dir);
+
+void qdr_route_del_conn_route_CT(qdr_core_t       *core,
+                                 qdr_link_route_t *lr);
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 58135a3..119a5a1 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -131,6 +131,7 @@ void qdr_core_free(qdr_core_t *core)
 
     qdr_link_route_t *link_route = 0;
     while ( (link_route = DEQ_HEAD(core->link_routes))) {
+        DEQ_REMOVE_HEAD(core->link_routes);
         qdr_core_delete_link_route(core, link_route);
     }
 
@@ -372,7 +373,6 @@ bool qdr_is_addr_treatment_multicast(qdr_address_t *addr)
 
 void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr)
 {
-    DEQ_REMOVE(core->link_routes, lr);
     free(lr->add_prefix);
     free(lr->del_prefix);
     free(lr->name);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 352f568..6dfdca9 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -40,6 +40,7 @@ typedef struct qdr_connection_ref_t  qdr_connection_ref_t;
 typedef struct qdr_exchange          qdr_exchange_t;
 typedef struct qdr_edge_t            qdr_edge_t;
 
+
 #include "core_link_endpoint.h"
 #include "core_events.h"
 
@@ -191,6 +192,7 @@ struct qdr_general_work_t {
     qdr_receive_t               on_message;
     void                       *on_message_context;
     qd_message_t               *msg;
+    uint64_t                    in_conn_id;
 };
 
 ALLOC_DECLARE(qdr_general_work_t);
@@ -284,6 +286,7 @@ struct qdr_query_t {
     int                      next_offset;
     bool                     more;
     qd_amqp_error_t          status;
+    uint64_t                 in_conn;  // or perhaps a pointer???
 };
 
 DEQ_DECLARE(qdr_query_t, qdr_query_list_t); 
@@ -582,6 +585,8 @@ struct qdr_connection_info_t {
 
 ALLOC_DECLARE(qdr_connection_info_t);
 
+DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
+
 struct qdr_connection_t {
     DEQ_LINKS(qdr_connection_t);
     DEQ_LINKS_N(ACTIVATE, qdr_connection_t);
@@ -604,6 +609,7 @@ struct qdr_connection_t {
     int                         tenant_space_len;
     qdr_connection_info_t      *connection_info;
     void                       *user_context; /* Updated from IO thread, use work_lock */
+    qdr_link_route_list_t       conn_link_routes;  // connection scoped link routes
 };
 
 ALLOC_DECLARE(qdr_connection_t);
@@ -631,10 +637,10 @@ struct qdr_link_route_t {
     char                   *pattern;
     char                   *add_prefix;
     char                   *del_prefix;
+    qdr_connection_t       *parent_conn;
 };
 
 ALLOC_DECLARE(qdr_link_route_t);
-DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
 void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr);
 
 // Core timer related field/data structures
@@ -826,7 +832,7 @@ ALLOC_DECLARE(qdr_terminus_t);
 
 void *router_core_thread(void *arg);
 uint64_t qdr_identifier(qdr_core_t* core);
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost);
+void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id);
 void  qdr_route_table_setup_CT(qdr_core_t *core);
 void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);
@@ -909,7 +915,8 @@ void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,
                        qd_router_entity_type_t  type,
-                       qd_composed_field_t     *body);
+                       qd_composed_field_t     *body,
+                       uint64_t                 conn_id);
 
 /**
  * Create a new timer which will only be used inside the code thread.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index 7794d61..d268671 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -34,16 +34,21 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 import errno, os, time, socket, random, subprocess, shutil, unittest, __main__, re, sys
+from subprocess import PIPE, STDOUT
 from copy import copy
 try:
     import queue as Queue   # 3.x
 except ImportError:
     import Queue as Queue   # 2.7
 from threading import Thread
+from threading import Event
+import json
 
 import proton
 from proton import Message, Timeout
+from proton.handlers import MessagingHandler
 from proton.utils import BlockingConnection
+from proton.reactor import AtLeastOnce, Container
 from qpid_dispatch.management.client import Node
 from qpid_dispatch_internal.compat import dict_iteritems
 
@@ -706,65 +711,251 @@ def main_module():
     return os.path.splitext(os.path.basename(__main__.__file__))[0]
 
 
-class AsyncTestReceiver(object):
+class AsyncTestReceiver(MessagingHandler):
     """
     A simple receiver that runs in the background and queues any received
     messages.  Messages can be retrieved from this thread via the queue member
     """
     Empty = Queue.Empty
 
-    def __init__(self, address, source, credit=100, timeout=0.1,
-                 conn_args=None, link_args=None):
-        """
-        Runs a BlockingReceiver in a separate thread.
-
-        :param address: address of router (URL)
-        :param source: the node address to consume from
-        :param credit: max credit for receiver
-        :param timeout: receive poll frequency in seconds
-        :param conn_args: map of BlockingConnection arguments
-        :param link_args: map of BlockingReceiver arguments
-        """
+    def __init__(self, address, source, conn_args=None, container_id=None):
         super(AsyncTestReceiver, self).__init__()
+        self.address = address
+        self.source = source
+        self.conn_args = conn_args
         self.queue = Queue.Queue()
-        kwargs = {'url': address}
-        if conn_args:
-            kwargs.update(conn_args)
-        self._conn = BlockingConnection(**kwargs)
-        kwargs = {'address': source,
-                  'credit': credit}
-        if link_args:
-            kwargs.update(link_args)
-        self._rcvr = self._conn.create_receiver(**kwargs)
-        self._thread = Thread(target=self._poll)
-        self._run = True
-        self._timeout = timeout
+        self._conn = None
+        self._container = Container(self)
+        if container_id is not None:
+            self._container.container_id = container_id
+        self._ready = Event()
+        self._stop_thread = False
+        self._thread = Thread(target=self._main)
+        self._thread.daemon = True
+        self._thread.start()
+        if self._ready.wait(timeout=TIMEOUT) is False:
+            raise Exception("Timed out waiting for receiver start")
+
+    def _main(self):
+        self._container.start()
+        while self._container.process():
+            if self._stop_thread:
+                if self._conn:
+                    self._conn.close()
+                    self._conn = None
+
+    def stop(self, timeout=TIMEOUT):
+        self._stop_thread = True
+        self._container.wakeup()
+        self._thread.join(timeout=TIMEOUT)
+        if self._thread.is_alive():
+            raise Exception("AsyncTestReceiver did not exit")
+
+    def on_start(self, event):
+        kwargs = {'url': self.address}
+        if self.conn_args:
+            kwargs.update(self.conn_args)
+        self._conn = event.container.connect(**kwargs)
+
+    def on_connection_opened(self, event):
+        kwargs = {'source': self.source}
+        rcv = event.container.create_receiver(event.connection,
+                                              **kwargs)
+    def on_link_opened(self, event):
+        self._ready.set()
+
+    def on_message(self, event):
+        self.queue.put(event.message)
+
+
+class AsyncTestSender(MessagingHandler):
+    """
+    A simple sender that runs in the background and sends 'count' messages to a
+    given target.
+    """
+    def __init__(self, address, target, count=1, body=None, container_id=None):
+        super(AsyncTestSender, self).__init__()
+        self.address = address
+        self.target = target
+        self.count = count
+        self._unaccepted = count
+        self._body = body or "test"
+        self._container = Container(self)
+        if container_id is not None:
+            self._container.container_id = container_id
+        self._thread = Thread(target=self._main)
+        self._thread.daemon = True
         self._thread.start()
 
-    def _poll(self):
-        """
-        Thread main loop
-        """
+    def _main(self):
+        self._container.start()
+        while self._container.process():
+            pass
 
-        while self._run:
-            try:
-                msg = self._rcvr.receive(timeout=self._timeout)
-            except Timeout:
-                continue
-            try:
-                self._rcvr.accept()
-            except IndexError:
-                # PROTON-1743
-                pass
-            self.queue.put(msg)
-        self._rcvr.close()
-        self._conn.close()
-
-    def stop(self, timeout=10):
-        """
-        Called to terminate the AsyncTestReceiver
-        """
-        self._run = False
-        self._thread.join(timeout=timeout)
+    def wait(self):
+        # don't stop it - wait until everything is sent
+        self._thread.join(timeout=TIMEOUT)
+        assert not self._thread.is_alive(), "sender did not complete"
+
+    def on_start(self, event):
+        self._conn = self._container.connect(self.address)
+
+    def on_connection_opened(self, event):
+        self._sender = self._container.create_sender(self._conn,
+                                                     target=self.target,
+                                                     options=AtLeastOnce())
 
+    def on_sendable(self, event):
+        if self.count:
+            self._sender.send(Message(body=self._body))
+            self.count -= 1
+
+    def on_accepted(self, event):
+        self._unaccepted -= 1;
+        if self._unaccepted == 0:
+            self._conn.close()
+            self._conn = None
+
+
+class QdManager(object):
+    """
+    A means to invoke qdmanage during a testcase
+    """
+    def __init__(self, tester, address=None, timeout=TIMEOUT):
+        # 'tester' - can be 'self' when called in a test,
+        # or an instance any class derived from Process (like Qdrouterd)
+        self._tester = tester
+        self._timeout = timeout
+        self._address = address
+
+    def __call__(self, cmd, address=None, input=None, expect=Process.EXIT_OK,
+                 timeout=None):
+        assert address or self._address, "address missing"
+        p = self._tester.popen(
+            ['qdmanage'] + cmd.split(' ')
+            + ['--bus', address or self._address,
+               '--indent=-1',
+               '--timeout', str(timeout or self._timeout)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def create(self, long_type, kwargs):
+        cmd = "CREATE --type=%s" % long_type
+        for k, v in kwargs.items():
+            cmd += " %s=%s" % (k, v)
+        return json.loads(self(cmd))
+
+    def delete(self, long_type, name=None, identity=None):
+        cmd = 'DELETE --type=%s' %  long_type
+        if identity is not None:
+            cmd += " --identity=%s" % identity
+        elif name is not None:
+            cmd += " --name=%s" % name
+        else:
+            assert False, "name or identity not supplied!"
+        self(cmd)
+
+    def query(self, long_type):
+        return json.loads(self('QUERY --type=%s' % long_type))
+
+
+class MgmtMsgProxy(object):
+    """
+    Utility for creating and inspecting management messages
+    """
+    class _Response(object):
+        def __init__(self, status_code, status_description, body):
+            self.status_code        = status_code
+            self.status_description = status_description
+            if body.__class__ == dict and len(body.keys()) == 2 and 'attributeNames' in body.keys() and 'results' in body.keys():
+                results = []
+                names   = body['attributeNames']
+                for result in body['results']:
+                    result_map = {}
+                    for i in range(len(names)):
+                        result_map[names[i]] = result[i]
+                    results.append(MgmtMsgProxy._Response(status_code, status_description, result_map))
+                self.attrs = {'results': results}
+            else:
+                self.attrs = body
+
+        def __getattr__(self, key):
+            return self.attrs[key]
+
+
+    def __init__(self, reply_addr):
+        self.reply_addr = reply_addr
+
+    def response(self, msg):
+        ap = msg.properties
+        return self._Response(ap['statusCode'], ap['statusDescription'], msg.body)
+
+    def query_connections(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.connection'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_links(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.link'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_link_routes(self):
+        ap = {'operation': 'QUERY',
+              'type': 'org.apache.qpid.dispatch.router.config.linkRoute'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def create_link_route(self, name, kwargs):
+        ap = {'operation': 'CREATE',
+              'type': 'org.apache.qpid.dispatch.router.config.linkRoute',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr,
+                       body=kwargs)
+
+    def delete_link_route(self, name):
+        ap = {'operation': 'DELETE',
+              'type': 'org.apache.qpid.dispatch.router.config.linkRoute',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def create_connector(self, name, kwargs):
+        ap = {'operation': 'CREATE',
+              'type': 'org.apache.qpid.dispatch.connector',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr,
+                       body=kwargs)
+
+    def delete_connector(self, name):
+        ap = {'operation': 'DELETE',
+              'type': 'org.apache.qpid.dispatch.connector',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_conn_link_routes(self):
+        ap = {'operation': 'QUERY',
+              'type': 'org.apache.qpid.dispatch.router.connection.linkRoute'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def create_conn_link_route(self, name, kwargs):
+        ap = {'operation': 'CREATE',
+              'type': 'org.apache.qpid.dispatch.router.connection.linkRoute',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr,
+                       body=kwargs)
+
+    def delete_conn_link_route(self, name):
+        ap = {'operation': 'DELETE',
+              'type': 'org.apache.qpid.dispatch.router.connection.linkRoute',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def read_conn_link_route(self, name):
+        ap = {'operation': 'READ',
+              'type': 'org.apache.qpid.dispatch.router.connection.linkRoute',
+              'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fcf457c9/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index b489e2b..215bc08 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -19,7 +19,7 @@
 
 import unittest2 as unittest
 from proton import Message, Timeout
-from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, DynamicNodeProperties
 
@@ -170,43 +170,6 @@ class RouterTest(TestCase):
         self.assertEqual(None, test.error)
 
 
-class Entity(object):
-    def __init__(self, status_code, status_description, body):
-        self.status_code        = status_code
-        self.status_description = status_description
-        if body.__class__ == dict and len(body.keys()) == 2 and 'attributeNames' in body.keys() and 'results' in body.keys():
-            results = []
-            names   = body['attributeNames']
-            for result in body['results']:
-                result_map = {}
-                for i in range(len(names)):
-                    result_map[names[i]] = result[i]
-                results.append(Entity(status_code, status_description, result_map))
-            self.attrs = {'results': results}
-        else:
-            self.attrs = body
-
-    def __getattr__(self, key):
-        return self.attrs[key]
-
-
-class RouterProxy(object):
-    def __init__(self, reply_addr):
-        self.reply_addr = reply_addr
-
-    def response(self, msg):
-        ap = msg.properties
-        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
-
-    def query_connections(self):
-        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.connection'}
-        return Message(properties=ap, reply_to=self.reply_addr)
-
-    def query_links(self):
-        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.link'}
-        return Message(properties=ap, reply_to=self.reply_addr)
-
-
 class Timeout(object):
     def __init__(self, parent):
         self.parent = parent
@@ -249,7 +212,7 @@ class ConnectivityTest(MessagingHandler):
 
     def on_link_opened(self, event):
         if event.receiver == self.reply_receiver:
-            self.proxy        = RouterProxy(self.reply_receiver.remote_source.address)
+            self.proxy        = MgmtMsgProxy(self.reply_receiver.remote_source.address)
             self.agent_sender = event.container.create_sender(self.interior_conn, "$management")
 
     def on_sendable(self, event):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org