You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/10/04 16:33:15 UTC

qpid-dispatch git commit: DISPATCH-1133 - Updated the attach/detach behavior of the core-endpoint API, added echo test for attach/detach.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 3b1d84c96 -> 41c582838


DISPATCH-1133 - Updated the attach/detach behavior of the core-endpoint API, added echo test for attach/detach.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/41c58283
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/41c58283
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/41c58283

Branch: refs/heads/master
Commit: 41c582838771317ed79cbcf68b022c6665aaf8f1
Parents: 3b1d84c
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Oct 4 12:32:04 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Oct 4 12:32:04 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c             |  28 ++---
 src/router_core/core_link_endpoint.c      |  62 ++++++-----
 src/router_core/core_link_endpoint.h      |  46 +++++---
 src/router_core/modules/core_test_hooks.c | 144 ++++++++++++++++++-------
 src/router_core/router_core_private.h     |   1 +
 tests/system_tests_core_endpoint.py       |  56 ++++++++++
 6 files changed, 239 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index ef4b8e4..2eda2d6 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -988,7 +988,7 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t
 }
 
 
-static void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
+void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
 {
     qdr_connection_work_t *work = new_qdr_connection_work_t();
     ZERO(work);
@@ -1594,15 +1594,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable, &core_endpoint);
 
                 if (core_endpoint) {
-                    qdr_error_t *error = 0;
-                    if (qdrc_endpoint_do_bound_attach_CT(core, addr, link, &error)) {
-                        link->owning_addr = addr;
-                        qdr_link_outbound_second_attach_CT(core, link, source, target);
-                    } else {
-                        qdr_link_outbound_detach_CT(core, link, error, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
-                        qdr_terminus_free(source);
-                        qdr_terminus_free(target);
-                    }
+                    qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
                 }
 
                 else if (unavailable) {
@@ -1705,15 +1697,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
             qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable, &core_endpoint);
 
             if (core_endpoint) {
-                qdr_error_t *error = 0;
-                if (qdrc_endpoint_do_bound_attach_CT(core, addr, link, &error)) {
-                    link->owning_addr = addr;
-                    qdr_link_outbound_second_attach_CT(core, link, source, target);
-                } else {
-                    qdr_link_outbound_detach_CT(core, link, error, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
-                    qdr_terminus_free(source);
-                    qdr_terminus_free(target);
-                }
+                qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
             }
 
             else if (unavailable) {
@@ -1795,6 +1779,11 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
 
     link->oper_status = QDR_LINK_OPER_UP;
 
+    if (link->core_endpoint) {
+        qdrc_endpoint_do_second_attach_CT(core, link->core_endpoint, source, target);
+        return;
+    }
+
     //
     // Handle attach-routed links
     //
@@ -1901,6 +1890,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
 
     if (link->core_endpoint) {
         qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error);
+        return;
 
     } else {
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/core_link_endpoint.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 961c219..05e7fb8 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -79,7 +79,7 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t           *core,
 
 qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *ep)
 {
-    return ep->link->link_direction;
+    return !!ep ? (!!ep->link ? ep->link->link_direction : QD_INCOMING) : QD_INCOMING;
 }
 
 
@@ -89,6 +89,22 @@ qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *ep)
 }
 
 
+void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target)
+{
+    qdr_link_outbound_second_attach_CT(core, ep->link, source, target);
+}
+
+
+void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
+{
+    qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
+    if (ep->link->detach_count == 2) {
+        ep->link->core_endpoint = 0;
+        free_qdrc_endpoint_t(ep);
+    }
+}
+
+
 void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
 {
     qdr_link_issue_credit_CT(core, ep->link, credit, drain);
@@ -146,17 +162,7 @@ void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t dis
 }
 
 
-void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
-{
-    qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
-    if (ep->link->detach_count == 2) {
-        ep->link->core_endpoint = 0;
-        free_qdrc_endpoint_t(ep);
-    }
-}
-
-
-bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_error_t **error)
+void qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
 {
     qdrc_endpoint_t *ep = new_qdrc_endpoint_t();
     ZERO(ep);
@@ -164,37 +170,40 @@ bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr
     ep->link = link;
 
     link->core_endpoint = ep;
+    link->owning_addr   = addr;
 
-    *error = 0;
-    bool accept = !!ep->desc->on_first_attach ?
-        ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, error) : false;
+    ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, source, target);
+}
 
-    if (!accept) {
-        link->core_endpoint = 0;
-        free_qdrc_endpoint_t(ep);
-    }
 
-    return accept;
+void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target)
+{
+    if (!!ep->desc->on_second_attach)
+        ep->desc->on_second_attach(ep->link_context, source, target);
 }
 
 
-
 void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv)
 {
-    ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
+    if (!!ep->desc->on_transfer)
+        ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
 }
 
 
 void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
 {
-    ep->desc->on_flow(ep->link_context, credit, drain);
+    if (!!ep->desc->on_flow)
+        ep->desc->on_flow(ep->link_context, credit, drain);
 }
 
 
 void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
 {
-    ep->desc->on_detach(ep->link_context, error);
-    if (ep->link->detach_count == 2) {
+    if (ep->link->detach_count == 1) {
+        ep->desc->on_first_detach(ep->link_context, error);
+    } else {
+        if (!!ep->desc->on_second_detach)
+            ep->desc->on_second_detach(ep->link_context, error);
         ep->link->core_endpoint = 0;
         free_qdrc_endpoint_t(ep);
     }
@@ -203,7 +212,8 @@ void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error
 
 void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *ep)
 {
-    ep->desc->on_cleanup(ep->link_context);
+    if (!!ep->desc->on_cleanup)
+        ep->desc->on_cleanup(ep->link_context);
     free_qdrc_endpoint_t(ep);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/core_link_endpoint.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.h b/src/router_core/core_link_endpoint.h
index 4e9c2a0..ab4e54f 100644
--- a/src/router_core/core_link_endpoint.h
+++ b/src/router_core/core_link_endpoint.h
@@ -32,17 +32,22 @@ typedef struct qdrc_endpoint_t qdrc_endpoint_t;
 /**
  * Event - An attach for a new core-endpoint link has arrived
  *
+ * This handler must either invoke qdrc_endpoint_detach_CT or qdrc_endpoint_second_attach_CT
+ * to reject or accept the link.  If desired, this handler may schedule the detach/attach
+ * response to happen asynchronously at a later time.
+ *
  * @param bind_context The opaque context provided in the mobile address binding
  * @param endpoint A new endpoint object for the link.  If the link is accepted, this
  *                 object must be stored for later use.
  * @param link_context [out] Handler-provided opaque context to be associated with the endpoint
- * @param error [out] Error indication which may be supplied if the link is rejected
- * @return True if the link is to be accepted, False if the link should be rejected and detached
+ * @param repote_source Pointer to the remote source terminus of the link
+ * @param remote_target Pointer to the remote target terminus of the link
  */
-typedef bool (*qdrc_first_attach_t) (void             *bind_context,
+typedef void (*qdrc_first_attach_t) (void             *bind_context,
                                      qdrc_endpoint_t  *endpoint,
                                      void            **link_context,
-                                     qdr_error_t     **error);
+                                     qdr_terminus_t   *remote_source,
+                                     qdr_terminus_t   *remote_target);
 
 /**
  * Event - The attachment of a link initiated by the core-endpoint was completed
@@ -127,7 +132,8 @@ typedef struct qdrc_endpoint_desc_t {
     qdrc_flow_t           on_flow;
     qdrc_update_t         on_update;
     qdrc_transfer_t       on_transfer;
-    qdrc_detach_t         on_detach;
+    qdrc_detach_t         on_first_detach;
+    qdrc_detach_t         on_second_detach;
     qdrc_cleanup_t        on_cleanup;
 } qdrc_endpoint_desc_t;
 
@@ -188,6 +194,24 @@ qd_direction_t    qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *endpoint
 qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint);
 
 /**
+ * Detach a link attached to the core-endpoint
+ *
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param error An error indication or 0 for no error
+ */
+void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_terminus_t *source, qdr_terminus_t *target);
+
+/**
+ * Detach a link attached to the core-endpoint
+ *
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param error An error indication or 0 for no error
+ */
+void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
+
+/**
  * Issue credit and control drain for an incoming link
  *
  * @param core Pointer to the core object
@@ -228,21 +252,13 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end
  */
 void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition);
 
-/**
- * Detach a link attached to the core-endpoint
- *
- * @param core Pointer to the core object
- * @param endpoint Pointer to an endpoint object
- * @param error An error indication or 0 for no error
- */
-void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
-
 
 //=====================================================================================
 // Private functions, not part of the API
 //=====================================================================================
 
-bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_error_t **error);
+void qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
+void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_terminus_t *source, qdr_terminus_t *target);
 void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery);
 void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit, bool drain);
 void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/modules/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/core_test_hooks.c b/src/router_core/modules/core_test_hooks.c
index eaa4407..0b1f65a 100644
--- a/src/router_core/modules/core_test_hooks.c
+++ b/src/router_core/modules/core_test_hooks.c
@@ -39,12 +39,14 @@ typedef struct test_node_t   test_node_t;
 
 typedef struct test_endpoint_t {
     DEQ_LINKS(struct test_endpoint_t);
-    test_node_t         *node;
-    qdrc_endpoint_t     *ep;
-    qdr_delivery_list_t  deliveries;
-    int                  credit;
-    bool                 in_action_list;
-    bool                 detached;
+    test_node_t            *node;
+    qdrc_endpoint_t        *ep;
+    qdr_delivery_list_t     deliveries;
+    int                     credit;
+    bool                    in_action_list;
+    bool                    detached;
+    qd_direction_t          dir;
+    struct test_endpoint_t *peer;
 } test_endpoint_t;
 
 DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
@@ -118,7 +120,7 @@ static void free_endpoint(test_endpoint_t *ep)
 {
     test_node_t *node = ep->node;
 
-    if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
+    if (ep->dir == QD_INCOMING)
         DEQ_REMOVE(node->in_links, ep);
     else
         DEQ_REMOVE(node->out_links, ep);
@@ -158,19 +160,21 @@ static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard
 }
 
 
-static bool first_attach(void             *bind_context,
-                         qdrc_endpoint_t  *endpoint,
-                         void            **link_context,
-                         qdr_error_t     **error)
+static void on_first_attach(void             *bind_context,
+                            qdrc_endpoint_t  *endpoint,
+                            void            **link_context,
+                            qdr_terminus_t   *source,
+                            qdr_terminus_t   *target)
 {
     test_node_t     *node     = (test_node_t*) bind_context;
     test_endpoint_t *test_ep  = 0;
     bool             incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
+    qdr_error_t     *error    = 0;
 
     switch (node->behavior) {
     case TEST_NODE_DENY :
-        *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
-        return false;
+        error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
+        break;
 
     case TEST_NODE_ECHO :
         break;
@@ -179,16 +183,14 @@ static bool first_attach(void             *bind_context,
         if (incoming) {
             qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
         } else {
-            *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
-            return false;
+            error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
         }
         break;
 
     case TEST_NODE_SOURCE :
     case TEST_NODE_SOURCE_PS :
         if (incoming) {
-            *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
-            return false;
+            error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
         }
         break;
 
@@ -196,16 +198,23 @@ static bool first_attach(void             *bind_context,
         if (incoming) {
             qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
         } else {
-            *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
-            return false;
+            error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
         }
         break;
     }
 
+    if (!!error) {
+        qdrc_endpoint_detach_CT(node->core, endpoint, error);
+        qdr_terminus_free(source);
+        qdr_terminus_free(target);
+        return;
+    }
+
     test_ep = NEW(test_endpoint_t);
     ZERO(test_ep);
     test_ep->node = node;
     test_ep->ep   = endpoint;
+    test_ep->dir  = incoming ? QD_INCOMING : QD_OUTGOING;
     *link_context = test_ep;
 
     if (incoming)
@@ -213,23 +222,48 @@ static bool first_attach(void             *bind_context,
     else
         DEQ_INSERT_TAIL(node->out_links, test_ep);
 
-    return true;
+    if (node->behavior == TEST_NODE_ECHO) {
+        test_endpoint_t *peer = NEW(test_endpoint_t);
+        ZERO(peer);
+        peer->node = node;
+        peer->ep   = qdrc_endpoint_create_link_CT(node->core,
+                                                  qdrc_endpoint_get_connection_CT(endpoint),
+                                                  incoming ? QD_OUTGOING : QD_INCOMING,
+                                                  source,
+                                                  target,
+                                                  node->desc,
+                                                  peer);
+        test_ep->dir  = incoming ? QD_INCOMING : QD_OUTGOING;
+        test_ep->peer = peer;
+        peer->peer    = test_ep;
+
+        if (incoming)
+            DEQ_INSERT_TAIL(node->out_links, peer);
+        else
+            DEQ_INSERT_TAIL(node->in_links, peer);
+    } else
+        qdrc_endpoint_second_attach_CT(node->core, endpoint, source, target);
 }
 
 
-static void second_attach(void           *link_context,
-                          qdr_terminus_t *remote_source,
-                          qdr_terminus_t *remote_target)
+static void on_second_attach(void           *link_context,
+                             qdr_terminus_t *remote_source,
+                             qdr_terminus_t *remote_target)
 {
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+    if (!!ep->peer) {
+        qdrc_endpoint_second_attach_CT(ep->node->core, ep->peer->ep, remote_source, remote_target);
+    }
 }
 
 
-static void flow(void *link_context,
-                 int   available_credit,
-                 bool  drain)
+static void on_flow(void *link_context,
+                    int   available_credit,
+                    bool  drain)
 {
     test_endpoint_t *ep = (test_endpoint_t*) link_context;
-    if (available_credit == 0)
+    if (!ep || available_credit == 0)
         return;
 
     ep->credit = available_credit;
@@ -254,17 +288,17 @@ static void flow(void *link_context,
 }
 
 
-static void update(void           *link_context,
-                   qdr_delivery_t *delivery,
-                   bool            settled,
-                   uint64_t        disposition)
+static void on_update(void           *link_context,
+                      qdr_delivery_t *delivery,
+                      bool            settled,
+                      uint64_t        disposition)
 {
 }
 
 
-static void transfer(void           *link_context,
-                     qdr_delivery_t *delivery,
-                     qd_message_t   *message)
+static void on_transfer(void           *link_context,
+                        qdr_delivery_t *delivery,
+                        qd_message_t   *message)
 {
     test_endpoint_t *ep = (test_endpoint_t*) link_context;
 
@@ -294,11 +328,20 @@ static void transfer(void           *link_context,
 }
 
 
-static void detach(void        *link_context,
-                   qdr_error_t *error)
+static void on_first_detach(void        *link_context,
+                            qdr_error_t *error)
 {
     test_endpoint_t *ep = (test_endpoint_t*) link_context;
 
+    if (ep->node->behavior == TEST_NODE_ECHO) {
+        if (!!ep->peer) {
+            qdrc_endpoint_detach_CT(ep->node->core, ep->peer->ep, error);
+            return;
+        }
+    }
+
+    qdrc_endpoint_detach_CT(ep->node->core, ep->ep, 0);
+
     if (ep->in_action_list) {
         ep->detached = true;
     } else {
@@ -307,12 +350,37 @@ static void detach(void        *link_context,
 }
 
 
-static void cleanup(void *link_context)
+static void on_second_detach(void        *link_context,
+                             qdr_error_t *error)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+    if (!!ep) {
+        if (ep->node->behavior == TEST_NODE_ECHO) {
+            if (!!ep->peer) {
+                qdrc_endpoint_detach_CT(ep->node->core, ep->peer->ep, error);
+                if (ep->peer->in_action_list)
+                    ep->peer->detached = true;
+                else
+                    free_endpoint(ep->peer);
+            }
+        }
+
+        if (ep->in_action_list)
+            ep->detached = true;
+        else
+            free_endpoint(ep);
+    }
+}
+
+
+static void on_cleanup(void *link_context)
 {
 }
 
 
-static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
+static qdrc_endpoint_desc_t descriptor = {on_first_attach, on_second_attach, on_flow, on_update,
+                                          on_transfer, on_first_detach, on_second_detach, on_cleanup};
 
 
 static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0e7f968..450a7de 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -889,6 +889,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
                                qdr_terminus_t   *target);
 
 void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
+void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
 
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/tests/system_tests_core_endpoint.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_core_endpoint.py b/tests/system_tests_core_endpoint.py
index 14f4fbf..897ab9e 100644
--- a/tests/system_tests_core_endpoint.py
+++ b/tests/system_tests_core_endpoint.py
@@ -84,6 +84,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_05_echo_attach_detach(self):
+        test = EchoTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/echo")
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -227,5 +232,56 @@ class SourceTest(MessagingHandler):
         Container(self).run()
 
 
+class EchoTest(MessagingHandler):
+    def __init__(self, host, address):
+        super(EchoTest, self).__init__(prefetch = 0)
+        self.host      = host
+        self.address   = address
+
+        self.conn     = None
+        self.error    = None
+        self.action   = "Connecting to router"
+        self.receiver = None
+        self.sender   = None
+
+    def timeout(self):
+        self.error = "Timeout Expired while attempting action: %s" % self.action
+        self.conn.close()
+
+    def fail(self, error):
+        self.error = error
+        self.conn.close()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(5.0, Timeout(self))
+        self.conn     = event.container.connect(self.host)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
+
+    def on_link_opening(self, event):
+        if event.sender:
+            self.action = "Attaching incoming echoed link"
+            self.sender = event.sender
+            if event.sender.remote_source.address == self.address:
+                event.sender.source.address = self.address
+                event.sender.open()
+            else:
+                self.fail("Incorrect address on incoming sender: got %s, expected %s" %
+                          (event.sender.remote_source.address, self.address))
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.action = "Closing the echoed link"
+            self.receiver.close()
+
+    def on_link_closed(self, event):
+        if event.receiver == self.receiver:
+            self.conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org