You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/10/04 16:33:15 UTC
qpid-dispatch git commit: DISPATCH-1133 - Updated the attach/detach
behavior of the core-endpoint API, added echo test for attach/detach.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 3b1d84c96 -> 41c582838
DISPATCH-1133 - Updated the attach/detach behavior of the core-endpoint API, added echo test for attach/detach.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/41c58283
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/41c58283
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/41c58283
Branch: refs/heads/master
Commit: 41c582838771317ed79cbcf68b022c6665aaf8f1
Parents: 3b1d84c
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Oct 4 12:32:04 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Oct 4 12:32:04 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 28 ++---
src/router_core/core_link_endpoint.c | 62 ++++++-----
src/router_core/core_link_endpoint.h | 46 +++++---
src/router_core/modules/core_test_hooks.c | 144 ++++++++++++++++++-------
src/router_core/router_core_private.h | 1 +
tests/system_tests_core_endpoint.py | 56 ++++++++++
6 files changed, 239 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index ef4b8e4..2eda2d6 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -988,7 +988,7 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t
}
-static void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
+void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
{
qdr_connection_work_t *work = new_qdr_connection_work_t();
ZERO(work);
@@ -1594,15 +1594,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable, &core_endpoint);
if (core_endpoint) {
- qdr_error_t *error = 0;
- if (qdrc_endpoint_do_bound_attach_CT(core, addr, link, &error)) {
- link->owning_addr = addr;
- qdr_link_outbound_second_attach_CT(core, link, source, target);
- } else {
- qdr_link_outbound_detach_CT(core, link, error, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
+ qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
}
else if (unavailable) {
@@ -1705,15 +1697,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable, &core_endpoint);
if (core_endpoint) {
- qdr_error_t *error = 0;
- if (qdrc_endpoint_do_bound_attach_CT(core, addr, link, &error)) {
- link->owning_addr = addr;
- qdr_link_outbound_second_attach_CT(core, link, source, target);
- } else {
- qdr_link_outbound_detach_CT(core, link, error, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
+ qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
}
else if (unavailable) {
@@ -1795,6 +1779,11 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
link->oper_status = QDR_LINK_OPER_UP;
+ if (link->core_endpoint) {
+ qdrc_endpoint_do_second_attach_CT(core, link->core_endpoint, source, target);
+ return;
+ }
+
//
// Handle attach-routed links
//
@@ -1901,6 +1890,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
if (link->core_endpoint) {
qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error);
+ return;
} else {
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/core_link_endpoint.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 961c219..05e7fb8 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -79,7 +79,7 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core,
qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *ep)
{
- return ep->link->link_direction;
+ return !!ep ? (!!ep->link ? ep->link->link_direction : QD_INCOMING) : QD_INCOMING;
}
@@ -89,6 +89,22 @@ qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *ep)
}
+void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target)
+{
+ qdr_link_outbound_second_attach_CT(core, ep->link, source, target);
+}
+
+
+void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
+{
+ qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
+ if (ep->link->detach_count == 2) {
+ ep->link->core_endpoint = 0;
+ free_qdrc_endpoint_t(ep);
+ }
+}
+
+
void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
{
qdr_link_issue_credit_CT(core, ep->link, credit, drain);
@@ -146,17 +162,7 @@ void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t dis
}
-void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
-{
- qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
- if (ep->link->detach_count == 2) {
- ep->link->core_endpoint = 0;
- free_qdrc_endpoint_t(ep);
- }
-}
-
-
-bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_error_t **error)
+void qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
{
qdrc_endpoint_t *ep = new_qdrc_endpoint_t();
ZERO(ep);
@@ -164,37 +170,40 @@ bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr
ep->link = link;
link->core_endpoint = ep;
+ link->owning_addr = addr;
- *error = 0;
- bool accept = !!ep->desc->on_first_attach ?
- ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, error) : false;
+ ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, source, target);
+}
- if (!accept) {
- link->core_endpoint = 0;
- free_qdrc_endpoint_t(ep);
- }
- return accept;
+void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target)
+{
+ if (!!ep->desc->on_second_attach)
+ ep->desc->on_second_attach(ep->link_context, source, target);
}
-
void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv)
{
- ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
+ if (!!ep->desc->on_transfer)
+ ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
}
void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
{
- ep->desc->on_flow(ep->link_context, credit, drain);
+ if (!!ep->desc->on_flow)
+ ep->desc->on_flow(ep->link_context, credit, drain);
}
void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
{
- ep->desc->on_detach(ep->link_context, error);
- if (ep->link->detach_count == 2) {
+ if (ep->link->detach_count == 1) {
+ ep->desc->on_first_detach(ep->link_context, error);
+ } else {
+ if (!!ep->desc->on_second_detach)
+ ep->desc->on_second_detach(ep->link_context, error);
ep->link->core_endpoint = 0;
free_qdrc_endpoint_t(ep);
}
@@ -203,7 +212,8 @@ void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error
void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *ep)
{
- ep->desc->on_cleanup(ep->link_context);
+ if (!!ep->desc->on_cleanup)
+ ep->desc->on_cleanup(ep->link_context);
free_qdrc_endpoint_t(ep);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/core_link_endpoint.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.h b/src/router_core/core_link_endpoint.h
index 4e9c2a0..ab4e54f 100644
--- a/src/router_core/core_link_endpoint.h
+++ b/src/router_core/core_link_endpoint.h
@@ -32,17 +32,22 @@ typedef struct qdrc_endpoint_t qdrc_endpoint_t;
/**
* Event - An attach for a new core-endpoint link has arrived
*
+ * This handler must either invoke qdrc_endpoint_detach_CT or qdrc_endpoint_second_attach_CT
+ * to reject or accept the link. If desired, this handler may schedule the detach/attach
+ * response to happen asynchronously at a later time.
+ *
* @param bind_context The opaque context provided in the mobile address binding
* @param endpoint A new endpoint object for the link. If the link is accepted, this
* object must be stored for later use.
* @param link_context [out] Handler-provided opaque context to be associated with the endpoint
- * @param error [out] Error indication which may be supplied if the link is rejected
- * @return True if the link is to be accepted, False if the link should be rejected and detached
+ * @param repote_source Pointer to the remote source terminus of the link
+ * @param remote_target Pointer to the remote target terminus of the link
*/
-typedef bool (*qdrc_first_attach_t) (void *bind_context,
+typedef void (*qdrc_first_attach_t) (void *bind_context,
qdrc_endpoint_t *endpoint,
void **link_context,
- qdr_error_t **error);
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target);
/**
* Event - The attachment of a link initiated by the core-endpoint was completed
@@ -127,7 +132,8 @@ typedef struct qdrc_endpoint_desc_t {
qdrc_flow_t on_flow;
qdrc_update_t on_update;
qdrc_transfer_t on_transfer;
- qdrc_detach_t on_detach;
+ qdrc_detach_t on_first_detach;
+ qdrc_detach_t on_second_detach;
qdrc_cleanup_t on_cleanup;
} qdrc_endpoint_desc_t;
@@ -188,6 +194,24 @@ qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *endpoint
qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint);
/**
+ * Detach a link attached to the core-endpoint
+ *
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param error An error indication or 0 for no error
+ */
+void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_terminus_t *source, qdr_terminus_t *target);
+
+/**
+ * Detach a link attached to the core-endpoint
+ *
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param error An error indication or 0 for no error
+ */
+void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
+
+/**
* Issue credit and control drain for an incoming link
*
* @param core Pointer to the core object
@@ -228,21 +252,13 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end
*/
void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition);
-/**
- * Detach a link attached to the core-endpoint
- *
- * @param core Pointer to the core object
- * @param endpoint Pointer to an endpoint object
- * @param error An error indication or 0 for no error
- */
-void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
-
//=====================================================================================
// Private functions, not part of the API
//=====================================================================================
-bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_error_t **error);
+void qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
+void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_terminus_t *source, qdr_terminus_t *target);
void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery);
void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit, bool drain);
void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/modules/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/core_test_hooks.c b/src/router_core/modules/core_test_hooks.c
index eaa4407..0b1f65a 100644
--- a/src/router_core/modules/core_test_hooks.c
+++ b/src/router_core/modules/core_test_hooks.c
@@ -39,12 +39,14 @@ typedef struct test_node_t test_node_t;
typedef struct test_endpoint_t {
DEQ_LINKS(struct test_endpoint_t);
- test_node_t *node;
- qdrc_endpoint_t *ep;
- qdr_delivery_list_t deliveries;
- int credit;
- bool in_action_list;
- bool detached;
+ test_node_t *node;
+ qdrc_endpoint_t *ep;
+ qdr_delivery_list_t deliveries;
+ int credit;
+ bool in_action_list;
+ bool detached;
+ qd_direction_t dir;
+ struct test_endpoint_t *peer;
} test_endpoint_t;
DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
@@ -118,7 +120,7 @@ static void free_endpoint(test_endpoint_t *ep)
{
test_node_t *node = ep->node;
- if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
+ if (ep->dir == QD_INCOMING)
DEQ_REMOVE(node->in_links, ep);
else
DEQ_REMOVE(node->out_links, ep);
@@ -158,19 +160,21 @@ static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard
}
-static bool first_attach(void *bind_context,
- qdrc_endpoint_t *endpoint,
- void **link_context,
- qdr_error_t **error)
+static void on_first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target)
{
test_node_t *node = (test_node_t*) bind_context;
test_endpoint_t *test_ep = 0;
bool incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
+ qdr_error_t *error = 0;
switch (node->behavior) {
case TEST_NODE_DENY :
- *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
- return false;
+ error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
+ break;
case TEST_NODE_ECHO :
break;
@@ -179,16 +183,14 @@ static bool first_attach(void *bind_context,
if (incoming) {
qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
} else {
- *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
- return false;
+ error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
}
break;
case TEST_NODE_SOURCE :
case TEST_NODE_SOURCE_PS :
if (incoming) {
- *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
- return false;
+ error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
}
break;
@@ -196,16 +198,23 @@ static bool first_attach(void *bind_context,
if (incoming) {
qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
} else {
- *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
- return false;
+ error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
}
break;
}
+ if (!!error) {
+ qdrc_endpoint_detach_CT(node->core, endpoint, error);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ return;
+ }
+
test_ep = NEW(test_endpoint_t);
ZERO(test_ep);
test_ep->node = node;
test_ep->ep = endpoint;
+ test_ep->dir = incoming ? QD_INCOMING : QD_OUTGOING;
*link_context = test_ep;
if (incoming)
@@ -213,23 +222,48 @@ static bool first_attach(void *bind_context,
else
DEQ_INSERT_TAIL(node->out_links, test_ep);
- return true;
+ if (node->behavior == TEST_NODE_ECHO) {
+ test_endpoint_t *peer = NEW(test_endpoint_t);
+ ZERO(peer);
+ peer->node = node;
+ peer->ep = qdrc_endpoint_create_link_CT(node->core,
+ qdrc_endpoint_get_connection_CT(endpoint),
+ incoming ? QD_OUTGOING : QD_INCOMING,
+ source,
+ target,
+ node->desc,
+ peer);
+ test_ep->dir = incoming ? QD_INCOMING : QD_OUTGOING;
+ test_ep->peer = peer;
+ peer->peer = test_ep;
+
+ if (incoming)
+ DEQ_INSERT_TAIL(node->out_links, peer);
+ else
+ DEQ_INSERT_TAIL(node->in_links, peer);
+ } else
+ qdrc_endpoint_second_attach_CT(node->core, endpoint, source, target);
}
-static void second_attach(void *link_context,
- qdr_terminus_t *remote_source,
- qdr_terminus_t *remote_target)
+static void on_second_attach(void *link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+ if (!!ep->peer) {
+ qdrc_endpoint_second_attach_CT(ep->node->core, ep->peer->ep, remote_source, remote_target);
+ }
}
-static void flow(void *link_context,
- int available_credit,
- bool drain)
+static void on_flow(void *link_context,
+ int available_credit,
+ bool drain)
{
test_endpoint_t *ep = (test_endpoint_t*) link_context;
- if (available_credit == 0)
+ if (!ep || available_credit == 0)
return;
ep->credit = available_credit;
@@ -254,17 +288,17 @@ static void flow(void *link_context,
}
-static void update(void *link_context,
- qdr_delivery_t *delivery,
- bool settled,
- uint64_t disposition)
+static void on_update(void *link_context,
+ qdr_delivery_t *delivery,
+ bool settled,
+ uint64_t disposition)
{
}
-static void transfer(void *link_context,
- qdr_delivery_t *delivery,
- qd_message_t *message)
+static void on_transfer(void *link_context,
+ qdr_delivery_t *delivery,
+ qd_message_t *message)
{
test_endpoint_t *ep = (test_endpoint_t*) link_context;
@@ -294,11 +328,20 @@ static void transfer(void *link_context,
}
-static void detach(void *link_context,
- qdr_error_t *error)
+static void on_first_detach(void *link_context,
+ qdr_error_t *error)
{
test_endpoint_t *ep = (test_endpoint_t*) link_context;
+ if (ep->node->behavior == TEST_NODE_ECHO) {
+ if (!!ep->peer) {
+ qdrc_endpoint_detach_CT(ep->node->core, ep->peer->ep, error);
+ return;
+ }
+ }
+
+ qdrc_endpoint_detach_CT(ep->node->core, ep->ep, 0);
+
if (ep->in_action_list) {
ep->detached = true;
} else {
@@ -307,12 +350,37 @@ static void detach(void *link_context,
}
-static void cleanup(void *link_context)
+static void on_second_detach(void *link_context,
+ qdr_error_t *error)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+ if (!!ep) {
+ if (ep->node->behavior == TEST_NODE_ECHO) {
+ if (!!ep->peer) {
+ qdrc_endpoint_detach_CT(ep->node->core, ep->peer->ep, error);
+ if (ep->peer->in_action_list)
+ ep->peer->detached = true;
+ else
+ free_endpoint(ep->peer);
+ }
+ }
+
+ if (ep->in_action_list)
+ ep->detached = true;
+ else
+ free_endpoint(ep);
+ }
+}
+
+
+static void on_cleanup(void *link_context)
{
}
-static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
+static qdrc_endpoint_desc_t descriptor = {on_first_attach, on_second_attach, on_flow, on_update,
+ on_transfer, on_first_detach, on_second_detach, on_cleanup};
static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0e7f968..450a7de 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -889,6 +889,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
qdr_terminus_t *target);
void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
+void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c58283/tests/system_tests_core_endpoint.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_core_endpoint.py b/tests/system_tests_core_endpoint.py
index 14f4fbf..897ab9e 100644
--- a/tests/system_tests_core_endpoint.py
+++ b/tests/system_tests_core_endpoint.py
@@ -84,6 +84,11 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_05_echo_attach_detach(self):
+ test = EchoTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/echo")
+ test.run()
+ self.assertEqual(None, test.error)
+
class Timeout(object):
def __init__(self, parent):
@@ -227,5 +232,56 @@ class SourceTest(MessagingHandler):
Container(self).run()
+class EchoTest(MessagingHandler):
+ def __init__(self, host, address):
+ super(EchoTest, self).__init__(prefetch = 0)
+ self.host = host
+ self.address = address
+
+ self.conn = None
+ self.error = None
+ self.action = "Connecting to router"
+ self.receiver = None
+ self.sender = None
+
+ def timeout(self):
+ self.error = "Timeout Expired while attempting action: %s" % self.action
+ self.conn.close()
+
+ def fail(self, error):
+ self.error = error
+ self.conn.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+ self.conn = event.container.connect(self.host)
+ self.receiver = event.container.create_receiver(self.conn, self.address)
+
+ def on_link_opening(self, event):
+ if event.sender:
+ self.action = "Attaching incoming echoed link"
+ self.sender = event.sender
+ if event.sender.remote_source.address == self.address:
+ event.sender.source.address = self.address
+ event.sender.open()
+ else:
+ self.fail("Incorrect address on incoming sender: got %s, expected %s" %
+ (event.sender.remote_source.address, self.address))
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.action = "Closing the echoed link"
+ self.receiver.close()
+
+ def on_link_closed(self, event):
+ if event.receiver == self.receiver:
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org