You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/11/30 15:53:14 UTC

qpid-dispatch git commit: DISPATCH-581 - Fix the propagation of detach vs. close in link routes.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 45150d2fd -> f65cc10b9


DISPATCH-581 - Fix the propagation of detach vs. close in link routes.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f65cc10b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f65cc10b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f65cc10b

Branch: refs/heads/master
Commit: f65cc10b9da9914928189b769d1e832cf756c5ee
Parents: 45150d2
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Nov 30 10:50:37 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Nov 30 10:50:37 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/container.h     |   1 +
 include/qpid/dispatch/router_core.h   |   2 +-
 src/container.c                       |  11 +++
 src/router_core/connections.c         |  27 +++---
 src/router_core/route_control.c       |   2 +-
 src/router_core/router_core_private.h |   3 +-
 src/router_node.c                     |   8 +-
 tests/system_tests_link_routes.py     | 137 +++++++++++++++++++++++++++++
 8 files changed, 173 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index a59d6e7..3e175af 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -181,6 +181,7 @@ pn_terminus_t *qd_link_remote_source(qd_link_t *link);
 pn_terminus_t *qd_link_remote_target(qd_link_t *link);
 void qd_link_activate(qd_link_t *link);
 void qd_link_close(qd_link_t *link);
+void qd_link_detach(qd_link_t *link);
 bool qd_link_drain_changed(qd_link_t *link, bool *mode);
 void qd_link_free(qd_link_t *link);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 6755b35..cb2d319 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -524,7 +524,7 @@ typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn,
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
                                           qdr_terminus_t *source, qdr_terminus_t *target);
-typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_error_t *error, bool first);
+typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close);
 typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link, int credit);
 typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int delivery_count);
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 9c0680b..5430356 100644
--- a/src/container.c
+++ b/src/container.c
@@ -883,6 +883,17 @@ void qd_link_close(qd_link_t *link)
     if (link->pn_link)
         pn_link_close(link->pn_link);
 
+    if (link->close_sess_with_link && link->pn_sess &&
+        pn_link_state(link->pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
+        pn_session_close(link->pn_sess);
+    }
+}
+
+
+void qd_link_detach(qd_link_t *link)
+{
+    if (link->pn_link)
+        pn_link_detach(link->pn_link);
 
     if (link->close_sess_with_link && link->pn_sess &&
         pn_link_state(link->pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 231095a..21fc2e3 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -138,13 +138,13 @@ int qdr_connection_process(qdr_connection_t *conn)
             break;
 
         case QDR_CONNECTION_WORK_FIRST_DETACH :
-            core->detach_handler(core->user_context, work->link, work->error, true);
+            core->detach_handler(core->user_context, work->link, work->error, true, work->close_link);
             if (work->error)
                 qdr_error_free(work->error);
             break;
 
         case QDR_CONNECTION_WORK_SECOND_DETACH :
-            core->detach_handler(core->user_context, work->link, work->error, false);
+            core->detach_handler(core->user_context, work->link, work->error, false, work->close_link);
             if (work->error)
                 qdr_error_free(work->error);
             free_qdr_link_t(work->link);
@@ -601,12 +601,13 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
 }
 
 
-void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition)
+void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close)
 {
     qdr_connection_work_t *work = new_qdr_connection_work_t();
     ZERO(work);
-    work->work_type = ++link->detach_count == 1 ? QDR_CONNECTION_WORK_FIRST_DETACH : QDR_CONNECTION_WORK_SECOND_DETACH;
-    work->link      = link;
+    work->work_type  = ++link->detach_count == 1 ? QDR_CONNECTION_WORK_FIRST_DETACH : QDR_CONNECTION_WORK_SECOND_DETACH;
+    work->link       = link;
+    work->close_link = close;
 
     if (error)
         work->error = error;
@@ -1023,7 +1024,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
     // Reject any attaches of inter-router links that arrive on connections that are not inter-router.
     //
     if (((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)) {
-        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN);
+        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true);
         qdr_terminus_free(source);
         qdr_terminus_free(target);
         return;
@@ -1036,7 +1037,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
     //
     if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT &&
         core->control_links_by_mask_bit[conn->mask_bit] == 0) {
-        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE);
+        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true);
         qdr_terminus_free(source);
         qdr_terminus_free(target);
         return;
@@ -1063,7 +1064,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     //
                     // No route to this destination, reject the link
                     //
-                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                     qdr_terminus_free(source);
                     qdr_terminus_free(target);
                 }
@@ -1074,7 +1075,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     //
                     success = qdr_forward_attach_CT(core, addr, link, source, target);
                     if (!success) {
-                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                         qdr_terminus_free(source);
                         qdr_terminus_free(target);
                     }
@@ -1120,7 +1121,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 //
                 // No route to this destination, reject the link
                 //
-                qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                 qdr_terminus_free(source);
                 qdr_terminus_free(target);
             }
@@ -1131,7 +1132,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 //
                 bool success = qdr_forward_attach_CT(core, addr, link, source, target);
                 if (!success) {
-                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                     qdr_terminus_free(source);
                     qdr_terminus_free(target);
                 }
@@ -1284,7 +1285,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
     // For routed links, propagate the detach
     //
     if (link->connected_link) {
-        qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE);
+        qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt != QD_DETACHED);
 
         //
         // If the link is completely detached, release its resources
@@ -1361,7 +1362,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
         // If the detach occurred via protocol, send a detach back.
         //
         if (dt != QD_LOST)
-            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE);
+            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED);
     } else {
         qdr_link_cleanup_CT(core, conn, link);
         free_qdr_link_t(link);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index d077210..0d0311a 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -154,7 +154,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, q
     qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn);
 
     if (al->link) {
-        qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_ROUTED_LINK_LOST);
+        qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_ROUTED_LINK_LOST, true);
         al->link->auto_link = 0;
         al->link            = 0;
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 61d86a2..cf94c75 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -432,6 +432,7 @@ typedef struct qdr_connection_work_t {
     qdr_terminus_t             *source;
     qdr_terminus_t             *target;
     qdr_error_t                *error;
+    bool                        close_link;
 } qdr_connection_work_t;
 
 ALLOC_DECLARE(qdr_connection_work_t);
@@ -642,7 +643,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
                                qdr_terminus_t   *source,
                                qdr_terminus_t   *target);
 
-void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition);
+void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
 
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index d35ea3f..abf80f4 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -774,7 +774,7 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu
 }
 
 
-static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first)
+static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
 {
     qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
     if (!qlink)
@@ -788,7 +788,11 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
         pn_condition_t *cond = pn_link_condition(pn_link);
         qdr_error_copy(error, cond);
     }
-    qd_link_close(qlink);
+
+    if (close)
+        qd_link_close(qlink);
+    else
+        qd_link_detach(qlink);
 
     //
     // This is the last event for this link that we are going to send into Proton.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index fd7a673..3508457 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -509,6 +509,16 @@ class LinkRouteTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_detach_without_close(self):
+        test = DetachNoCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_detach_mixed_close(self):
+        test = DetachMixedCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -696,6 +706,133 @@ class DynamicSourceTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
+class DetachNoCloseTest(MessagingHandler):
+    ##
+    ## This test verifies that link-detach (not close) is propagated properly
+    ##
+    def __init__(self, normal_addr, route_addr):
+        super(DetachNoCloseTest, self).__init__(prefetch=0, auto_accept=False)
+        self.normal_addr = normal_addr
+        self.route_addr  = route_addr
+        self.dest = "pulp.task.DetachNoClose"
+        self.error = None
+
+    def timeout(self):
+        self.error = "Timeout Expired - Check for cores"
+        self.conn_normal.close()
+        self.conn_route.close()
+
+    def stop(self):
+        self.conn_normal.close()
+        self.conn_route.close()
+        self.timer.cancel()
+        
+    def on_start(self, event):
+        self.timer      = event.reactor.schedule(5, Timeout(self))
+        self.conn_route = event.container.connect(self.route_addr)
+
+    def on_connection_opened(self, event):
+        if event.connection == self.conn_route:
+            self.conn_normal = event.container.connect(self.normal_addr)
+        elif event.connection == self.conn_normal:
+            self.receiver = event.container.create_receiver(self.conn_normal, self.dest)
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.receiver.detach()
+
+    def on_link_remote_detach(self, event):
+        if event.sender == self.sender:
+            self.sender.detach()
+        if event.receiver == self.receiver:
+            ##
+            ## Test passed, we expected a detach on the propagated sender and back
+            ##
+            self.stop()
+
+    def on_link_closing(self, event):
+        if event.sender == self.sender:
+            self.error = 'Propagated link was closed.  Expected it to be detached'
+            self.stop()
+
+        if event.receiver == self.receiver:
+            self.error = 'Client link was closed.  Expected it to be detached'
+            self.stop()
+
+    def on_link_opening(self, event):
+        if event.sender:
+            self.sender = event.sender
+            self.sender.source.address = self.sender.remote_source.address
+            self.sender.open()
+
+    def run(self):
+        Container(self).run()
+
+
+class DetachMixedCloseTest(MessagingHandler):
+    ##
+    ## This test verifies that link-detach (not close) is propagated properly
+    ##
+    def __init__(self, normal_addr, route_addr):
+        super(DetachMixedCloseTest, self).__init__(prefetch=0, auto_accept=False)
+        self.normal_addr = normal_addr
+        self.route_addr  = route_addr
+        self.dest = "pulp.task.DetachMixedClose"
+        self.error = None
+
+    def timeout(self):
+        self.error = "Timeout Expired - Check for cores"
+        self.conn_normal.close()
+        self.conn_route.close()
+
+    def stop(self):
+        self.conn_normal.close()
+        self.conn_route.close()
+        self.timer.cancel()
+        
+    def on_start(self, event):
+        self.timer      = event.reactor.schedule(5, Timeout(self))
+        self.conn_route = event.container.connect(self.route_addr)
+
+    def on_connection_opened(self, event):
+        if event.connection == self.conn_route:
+            self.conn_normal = event.container.connect(self.normal_addr)
+        elif event.connection == self.conn_normal:
+            self.receiver = event.container.create_receiver(self.conn_normal, self.dest)
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.receiver.detach()
+
+    def on_link_remote_detach(self, event):
+        if event.sender == self.sender:
+            self.sender.close()
+        if event.receiver == self.receiver:
+            self.error = 'Client link was detached.  Expected it to be closed'
+            self.stop()
+
+    def on_link_closing(self, event):
+        if event.sender == self.sender:
+            self.error = 'Propagated link was closed.  Expected it to be detached'
+            self.stop()
+
+        if event.receiver == self.receiver:
+            ##
+            ## Test Passed
+            ##
+            self.stop()
+
+    def on_link_opening(self, event):
+        if event.sender:
+            self.sender = event.sender
+            self.sender.source.address = self.sender.remote_source.address
+            self.sender.open()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org