You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/11/30 15:53:14 UTC
qpid-dispatch git commit: DISPATCH-581 - Fix the propagation of
detach vs. close in link routes.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 45150d2fd -> f65cc10b9
DISPATCH-581 - Fix the propagation of detach vs. close in link routes.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f65cc10b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f65cc10b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f65cc10b
Branch: refs/heads/master
Commit: f65cc10b9da9914928189b769d1e832cf756c5ee
Parents: 45150d2
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Nov 30 10:50:37 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Nov 30 10:50:37 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/container.h | 1 +
include/qpid/dispatch/router_core.h | 2 +-
src/container.c | 11 +++
src/router_core/connections.c | 27 +++---
src/router_core/route_control.c | 2 +-
src/router_core/router_core_private.h | 3 +-
src/router_node.c | 8 +-
tests/system_tests_link_routes.py | 137 +++++++++++++++++++++++++++++
8 files changed, 173 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index a59d6e7..3e175af 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -181,6 +181,7 @@ pn_terminus_t *qd_link_remote_source(qd_link_t *link);
pn_terminus_t *qd_link_remote_target(qd_link_t *link);
void qd_link_activate(qd_link_t *link);
void qd_link_close(qd_link_t *link);
+void qd_link_detach(qd_link_t *link);
bool qd_link_drain_changed(qd_link_t *link, bool *mode);
void qd_link_free(qd_link_t *link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 6755b35..cb2d319 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -524,7 +524,7 @@ typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
-typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first);
+typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close);
typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit);
typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count);
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 9c0680b..5430356 100644
--- a/src/container.c
+++ b/src/container.c
@@ -883,6 +883,17 @@ void qd_link_close(qd_link_t *link)
if (link->pn_link)
pn_link_close(link->pn_link);
+ if (link->close_sess_with_link && link->pn_sess &&
+ pn_link_state(link->pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
+ pn_session_close(link->pn_sess);
+ }
+}
+
+
+void qd_link_detach(qd_link_t *link)
+{
+ if (link->pn_link)
+ pn_link_detach(link->pn_link);
if (link->close_sess_with_link && link->pn_sess &&
pn_link_state(link->pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 231095a..21fc2e3 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -138,13 +138,13 @@ int qdr_connection_process(qdr_connection_t *conn)
break;
case QDR_CONNECTION_WORK_FIRST_DETACH :
- core->detach_handler(core->user_context, work->link, work->error, true);
+ core->detach_handler(core->user_context, work->link, work->error, true, work->close_link);
if (work->error)
qdr_error_free(work->error);
break;
case QDR_CONNECTION_WORK_SECOND_DETACH :
- core->detach_handler(core->user_context, work->link, work->error, false);
+ core->detach_handler(core->user_context, work->link, work->error, false, work->close_link);
if (work->error)
qdr_error_free(work->error);
free_qdr_link_t(work->link);
@@ -601,12 +601,13 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
}
-void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition)
+void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close)
{
qdr_connection_work_t *work = new_qdr_connection_work_t();
ZERO(work);
- work->work_type = ++link->detach_count == 1 ? QDR_CONNECTION_WORK_FIRST_DETACH : QDR_CONNECTION_WORK_SECOND_DETACH;
- work->link = link;
+ work->work_type = ++link->detach_count == 1 ? QDR_CONNECTION_WORK_FIRST_DETACH : QDR_CONNECTION_WORK_SECOND_DETACH;
+ work->link = link;
+ work->close_link = close;
if (error)
work->error = error;
@@ -1023,7 +1024,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// Reject any attaches of inter-router links that arrive on connections that are not inter-router.
//
if (((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
return;
@@ -1036,7 +1037,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT &&
core->control_links_by_mask_bit[conn->mask_bit] == 0) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
return;
@@ -1063,7 +1064,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// No route to this destination, reject the link
//
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
}
@@ -1074,7 +1075,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
success = qdr_forward_attach_CT(core, addr, link, source, target);
if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
}
@@ -1120,7 +1121,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// No route to this destination, reject the link
//
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
}
@@ -1131,7 +1132,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
bool success = qdr_forward_attach_CT(core, addr, link, source, target);
if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
}
@@ -1284,7 +1285,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
// For routed links, propagate the detach
//
if (link->connected_link) {
- qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE);
+ qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt != QD_DETACHED);
//
// If the link is completely detached, release its resources
@@ -1361,7 +1362,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
// If the detach occurred via protocol, send a detach back.
//
if (dt != QD_LOST)
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE);
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED);
} else {
qdr_link_cleanup_CT(core, conn, link);
free_qdr_link_t(link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index d077210..0d0311a 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -154,7 +154,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, q
qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn);
if (al->link) {
- qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_ROUTED_LINK_LOST);
+ qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_ROUTED_LINK_LOST, true);
al->link->auto_link = 0;
al->link = 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 61d86a2..cf94c75 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -432,6 +432,7 @@ typedef struct qdr_connection_work_t {
qdr_terminus_t *source;
qdr_terminus_t *target;
qdr_error_t *error;
+ bool close_link;
} qdr_connection_work_t;
ALLOC_DECLARE(qdr_connection_work_t);
@@ -642,7 +643,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
qdr_terminus_t *source,
qdr_terminus_t *target);
-void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition);
+void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index d35ea3f..abf80f4 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -774,7 +774,7 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu
}
-static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first)
+static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
@@ -788,7 +788,11 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
pn_condition_t *cond = pn_link_condition(pn_link);
qdr_error_copy(error, cond);
}
- qd_link_close(qlink);
+
+ if (close)
+ qd_link_close(qlink);
+ else
+ qd_link_detach(qlink);
//
// This is the last event for this link that we are going to send into Proton.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f65cc10b/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index fd7a673..3508457 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -509,6 +509,16 @@ class LinkRouteTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_detach_without_close(self):
+ test = DetachNoCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_detach_mixed_close(self):
+ test = DetachMixedCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+ test.run()
+ self.assertEqual(None, test.error)
+
class Timeout(object):
def __init__(self, parent):
@@ -696,6 +706,133 @@ class DynamicSourceTest(MessagingHandler):
def run(self):
Container(self).run()
+
+class DetachNoCloseTest(MessagingHandler):
+ ##
+ ## This test verifies that link-detach (not close) is propagated properly
+ ##
+ def __init__(self, normal_addr, route_addr):
+ super(DetachNoCloseTest, self).__init__(prefetch=0, auto_accept=False)
+ self.normal_addr = normal_addr
+ self.route_addr = route_addr
+ self.dest = "pulp.task.DetachNoClose"
+ self.error = None
+
+ def timeout(self):
+ self.error = "Timeout Expired - Check for cores"
+ self.conn_normal.close()
+ self.conn_route.close()
+
+ def stop(self):
+ self.conn_normal.close()
+ self.conn_route.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn_route = event.container.connect(self.route_addr)
+
+ def on_connection_opened(self, event):
+ if event.connection == self.conn_route:
+ self.conn_normal = event.container.connect(self.normal_addr)
+ elif event.connection == self.conn_normal:
+ self.receiver = event.container.create_receiver(self.conn_normal, self.dest)
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.receiver.detach()
+
+ def on_link_remote_detach(self, event):
+ if event.sender == self.sender:
+ self.sender.detach()
+ if event.receiver == self.receiver:
+ ##
+ ## Test passed, we expected a detach on the propagated sender and back
+ ##
+ self.stop()
+
+ def on_link_closing(self, event):
+ if event.sender == self.sender:
+ self.error = 'Propagated link was closed. Expected it to be detached'
+ self.stop()
+
+ if event.receiver == self.receiver:
+ self.error = 'Client link was closed. Expected it to be detached'
+ self.stop()
+
+ def on_link_opening(self, event):
+ if event.sender:
+ self.sender = event.sender
+ self.sender.source.address = self.sender.remote_source.address
+ self.sender.open()
+
+ def run(self):
+ Container(self).run()
+
+
+class DetachMixedCloseTest(MessagingHandler):
+ ##
+ ## This test verifies that link-detach (not close) is propagated properly
+ ##
+ def __init__(self, normal_addr, route_addr):
+ super(DetachMixedCloseTest, self).__init__(prefetch=0, auto_accept=False)
+ self.normal_addr = normal_addr
+ self.route_addr = route_addr
+ self.dest = "pulp.task.DetachMixedClose"
+ self.error = None
+
+ def timeout(self):
+ self.error = "Timeout Expired - Check for cores"
+ self.conn_normal.close()
+ self.conn_route.close()
+
+ def stop(self):
+ self.conn_normal.close()
+ self.conn_route.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn_route = event.container.connect(self.route_addr)
+
+ def on_connection_opened(self, event):
+ if event.connection == self.conn_route:
+ self.conn_normal = event.container.connect(self.normal_addr)
+ elif event.connection == self.conn_normal:
+ self.receiver = event.container.create_receiver(self.conn_normal, self.dest)
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.receiver.detach()
+
+ def on_link_remote_detach(self, event):
+ if event.sender == self.sender:
+ self.sender.close()
+ if event.receiver == self.receiver:
+ self.error = 'Client link was detached. Expected it to be closed'
+ self.stop()
+
+ def on_link_closing(self, event):
+ if event.sender == self.sender:
+ self.error = 'Propagated link was closed. Expected it to be detached'
+ self.stop()
+
+ if event.receiver == self.receiver:
+ ##
+ ## Test Passed
+ ##
+ self.stop()
+
+ def on_link_opening(self, event):
+ if event.sender:
+ self.sender = event.sender
+ self.sender.source.address = self.sender.remote_source.address
+ self.sender.open()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org