You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2019/05/08 13:20:40 UTC
[qpid-dispatch] branch master updated: DISPATCH-1326 - Handling of
anonymous messages sent to edge routers. Added small message and large
message tests. This closes #502
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new fa1b167 DISPATCH-1326 - Handling of anonymous messages sent to edge routers. Added small message and large message tests. This closes #502
fa1b167 is described below
commit fa1b167dcf1e77a40f9fcb5bba3074a698b6a6ab
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon May 6 17:40:30 2019 -0400
DISPATCH-1326 - Handling of anonymous messages sent to edge routers. Added small message and large message tests. This closes #502
---
src/router_core/modules/edge_router/addr_proxy.c | 12 +
src/router_core/router_core_private.h | 4 +
src/router_core/transfer.c | 53 +++
src/router_node.c | 10 +
tests/system_tests_edge_router.py | 570 ++++++++++++++++++++++-
5 files changed, 645 insertions(+), 4 deletions(-)
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index 5cc54a7..8a7a0ee 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -432,6 +432,14 @@ static void on_transfer(void *link_context,
qdrc_endpoint_flow_CT(ap->core, ap->tracking_endpoint, 1, false);
}
+qdr_address_t *qcm_edge_conn_addr(void *link_context)
+{
+ qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context;
+ if (!ap)
+ return 0;
+ return ap->edge_conn_addr;
+}
+
static void on_cleanup(void *link_context)
{
@@ -477,10 +485,14 @@ qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core)
on_addr_event,
ap);
+ core->edge_conn_addr = qcm_edge_conn_addr;
+ core->edge_context = ap;
+
return ap;
}
+
void qcm_edge_addr_proxy_final(qcm_edge_addr_proxy_t *ap)
{
qdrc_event_unsubscribe_CT(ap->core, ap->event_sub);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e7161e5..9cdb25f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -678,6 +678,7 @@ void qdr_core_delete_auto_link (qdr_core_t *core, qdr_auto_link_t *al);
// Core timer related field/data structures
typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
+typedef qdr_address_t * (*qdr_edge_conn_addr_t) (void *context);
typedef struct qdr_core_timer_t {
DEQ_LINKS(struct qdr_core_timer_t);
@@ -850,6 +851,9 @@ struct qdr_core_t {
uint64_t deliveries_ingress_route_container;
uint64_t deliveries_delayed_1sec;
uint64_t deliveries_delayed_10sec;
+
+ qdr_edge_conn_addr_t edge_conn_addr;
+ void *edge_context;
};
struct qdr_terminus_t {
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index cd6bc81..e327636 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -480,6 +480,18 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
return;
}
+ //
+ // If the anonymous delivery could not be sent anywhere (fanout = 0) and it is not multicasted, try sending it over
+ // the anonymous link.
+ //
+ if (fanout == 0 && !dlv->multicast && link->owning_addr == 0 && dlv->to_addr != 0) {
+ if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
+ qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
+ if (sender_address && sender_address != addr) {
+ fanout += qdr_forward_message_CT(core, sender_address, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
+ }
+ }
+ }
if (fanout == 0) {
//
@@ -616,12 +628,48 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
//
if (DEQ_IS_EMPTY(link->undelivered)) {
+ qdr_link_ref_t *temp_rlink = 0;
qdr_address_t *addr = link->owning_addr;
if (!addr && dlv->to_addr) {
qdr_connection_t *conn = link->conn;
if (conn && conn->tenant_space)
qd_iterator_annotate_space(dlv->to_addr, conn->tenant_space, conn->tenant_space_len);
qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
+
+ if (!addr) {
+ //
+ // This is an anonymous delivery but the address that it wants sent to is
+ // not in this router's address table. We will send this delivery up the
+ // anonymous link to the interior router (if this is an edge router).
+ // Only edge routers have a non null core->edge_conn_addr
+ //
+ if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
+ qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
+ if (sender_address) {
+ addr = sender_address;
+ }
+ }
+ }
+ else {
+ //
+ // (core->edge_conn_addr is non-zero ONLY on edge routers. So there is no need to check if the
+ // core->router_mode is edge.
+ //
+ // The connection on which the delivery arrived should not be QDR_ROLE_EDGE_CONNECTION because
+ // we do not want to send it back over the same connections
+ //
+ if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION && qdr_is_addr_treatment_multicast(addr)) {
+ qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
+ if (sender_address && sender_address != addr) {
+ qdr_link_ref_t *sender_rlink = DEQ_HEAD(sender_address->rlinks);
+ if (sender_rlink) {
+ temp_rlink = new_qdr_link_ref_t();
+ temp_rlink->link = sender_rlink->link;
+ DEQ_INSERT_TAIL(addr->rlinks, temp_rlink);
+ }
+ }
+ }
+ }
}
//
@@ -637,6 +685,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
//
qdr_link_forward_CT(core, link, dlv, addr, more);
}
+
+ if (addr && temp_rlink) {
+ DEQ_REMOVE(addr->rlinks, temp_rlink);
+ free_qdr_link_ref_t(temp_rlink);
+ }
} else {
//
// Take the action reference and use it for undelivered. Don't decref/incref.
diff --git a/src/router_node.c b/src/router_node.c
index 141ae63..959020c 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1383,6 +1383,16 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu
//
pn_link_open(qd_link_pn(qlink));
+ qd_connection_t *conn = qd_link_connection(qlink);
+ qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+ //
+ // All links on the inter router or edge connection have unbounded q2 limit
+ //
+ if (qdr_conn->role == QDR_ROLE_EDGE_CONNECTION || qdr_conn->role == QDR_ROLE_INTER_ROUTER) {
+ qd_link_set_q2_limit_unbounded(qlink, true);
+ }
+
+
//
// Mark the link as stalled and waiting for initial credit.
//
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 533eecc..113c790 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -328,7 +328,33 @@ class RouterTest(TestCase):
'test_38' : 0,
'test_39' : 0,
'test_40' : 0,
- 'test_41' : 0
+ 'test_41' : 0,
+ 'test_42' : 0,
+ 'test_43': 0,
+ 'test_44': 0,
+ 'test_45': 0,
+ 'test_46': 0,
+ 'test_47': 0,
+ 'test_48': 0,
+ 'test_49': 0,
+ 'test_50': 0,
+ 'test_51': 0,
+ 'test_52': 0,
+ 'test_53': 0,
+ 'test_54': 0,
+ 'test_55': 0,
+ 'test_56': 0,
+ 'test_57': 0,
+ 'test_58': 0,
+ 'test_59': 0,
+ 'test_60': 0,
+ 'test_61': 0,
+ 'test_62': 0,
+ 'test_63': 0,
+ 'test_64': 0,
+ 'test_65': 0,
+ 'test_66': 0,
+ 'test_67': 0
}
def test_01_connectivity_INTA_EA1(self):
@@ -837,6 +863,353 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_42_anon_sender_mobile_address_same_edge(self):
+ if self.skip [ 'test_42' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "test_42")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_43_anon_sender_mobile_address_interior_to_edge(self):
+ if self.skip [ 'test_43' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ "test_43")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_44_anon_sender_mobile_address_edge_to_interior(self):
+ if self.skip [ 'test_44' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "test_44")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_45_anon_sender_mobile_address_edge_to_edge_one_interior(self):
+ if self.skip [ 'test_45' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ "test_45")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_46_anon_sender_mobile_address_edge_to_edge_two_interior(self):
+ if self.skip [ 'test_46' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[4].addresses[0],
+ "test_46")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_47_anon_sender_mobile_address_large_msg_same_edge(self):
+ if self.skip [ 'test_47' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "test_47", True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_48_anon_sender_mobile_address_large_msg_interior_to_edge(self):
+ if self.skip [ 'test_48' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ "test_48", True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_49_anon_sender_mobile_address_large_msg_edge_to_interior(self):
+ if self.skip [ 'test_49' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "test_49", True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_50_anon_sender_mobile_address_large_msg_edge_to_edge_one_interior(self):
+ if self.skip [ 'test_50' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ "test_50", True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_51_anon_sender_mobile_address_large_msg_edge_to_edge_two_interior(self):
+ if self.skip [ 'test_51' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+ self.routers[4].addresses[0],
+ "test_51", True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # 1 Sender and 3 receivers all on the same edge
+ def test_52_anon_sender_multicast_mobile_address_same_edge(self):
+ if self.skip [ 'test_52' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.52", anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # 1 Sender and receiver on one edge and 2 receivers on another edge
+ # all in the same interior
+ def test_53_anon_sender_multicast_mobile_address_different_edges_same_interior(self):
+ if self.skip [ 'test_53' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[3].addresses[0],
+ "multicast.53",
+ self.routers[0].addresses[0],
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on each edge, one receiver on interior and sender
+ # on the edge
+ def test_54_anon_sender_multicast_mobile_address_edge_to_interior(self):
+ if self.skip [ 'test_54' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.54",
+ self.routers[0].addresses[0],
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on the interior
+ def test_55_anon_sender_multicast_mobile_address_interior_to_edge(self):
+ if self.skip [ 'test_55' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.55",
+ self.routers[0].addresses[0],
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on an interior that is not connected
+ # to the edges.
+ def test_56_anon_sender_multicast_mobile_address_other_interior_to_edge(self):
+ if self.skip [ 'test_56' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[1].addresses[0],
+ "multicast.56",
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Sender on an interior and 3 receivers connected to three different edges
+ def test_57_anon_sender_multicast_mobile_address_edge_to_edge_two_interiors(self):
+ if self.skip [ 'test_57' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.57",
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_58_anon_sender_multicast_mobile_address_all_edges(self):
+ if self.skip [ 'test_58' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ "multicast.58",
+ self.routers[0].addresses[0],
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ ######### Multicast Large message anon sender tests ####################
+
+ # 1 Sender and 3 receivers all on the same edge
+ def test_59_anon_sender__multicast_mobile_address_same_edge(self):
+ if self.skip [ 'test_59' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.59",
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # 1 Sender on one edge and 3 receivers on another edge all in the same
+ # interior
+ def test_60_anon_sender_multicast_mobile_address_different_edges_same_interior(self):
+ if self.skip [ 'test_60' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[3].addresses[0],
+ "multicast.60",
+ self.routers[0].addresses[0],
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on each edge, one receiver on interior and sender
+ # on the edge
+ def test_61_anon_sender_multicast_mobile_address_edge_to_interior(self):
+ if self.skip [ 'test_61' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.61",
+ self.routers[3].addresses[0],
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on the interior
+ def test_62_anon_sender_multicast_mobile_address_interior_to_edge(self):
+ if self.skip [ 'test_62' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.62",
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on an interior that is not connected
+ # to the edges.
+ def test_63_anon_sender_multicast_mobile_address_other_interior_to_edge(self):
+ if self.skip [ 'test_63' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[1].addresses[0],
+ "multicast.63",
+ self.routers[0].addresses[0],
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Sender on an interior and 3 receivers connected to three different edges
+ def test_64_anon_sender_multicast_mobile_address_edge_to_edge_two_interiors(self):
+ if self.skip [ 'test_64' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.64",
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_65_anon_sender_multicast_mobile_address_all_edges(self):
+ if self.skip [ 'test_65' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ "multicast.65",
+ self.routers[0].addresses[0],
+ large_msg=True,
+ anon_sender=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_66_anon_sender_drop_rx_client_multicast_large_message(self):
+ # test what happens if some multicast receivers close in the middle of
+ # a multiframe transfer. The sender is an anonymous sender.
+ if self.skip [ 'test_66' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddrMcastAnonSenderDroppedRxTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.66")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_67_drop_rx_client_multicast_small_message(self):
+ # test what happens if some multicast receivers close in the middle of
+ # a multiframe transfer. The sender is an anonymous sender.
+ if self.skip [ 'test_67' ] :
+ self.skipTest ( "Test skipped during development." )
+
+ test = MobileAddrMcastAnonSenderDroppedRxTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.67",
+ large_msg=False)
+ test.run()
+ self.assertEqual(None, test.error)
+
class LinkRouteProxyTest(TestCase):
"""
@@ -1401,6 +1774,142 @@ class Logger(object):
return '\n'.join(self.msgs)
+class CustomTimeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ message = Message(body="Test Message")
+ message.address = self.parent.address
+ self.parent.sender.send(message)
+ self.parent.cancel_custom()
+
+
+class MobileAddressAnonymousTest(MessagingHandler):
+ """
+ Attach a receiver to the interior and an anonymous sender to the edge router
+ In a non-anonymous sender scenario, the sender will never be given credit
+ to send until a receiver on the same address shows up . Since this
+ is an anonymous sender, credit is given instatnly and the sender starts
+ sending immediately.
+
+ This test will first send 3 messages with a one second interval to make
+ sure receiver is available. Then it will fire off 300 messages
+ After dispositions are received for the 300 messages, it will close the
+ receiver and send 50 more messages. These 50 messages should be released
+ or modified.
+ """
+ def __init__(self, receiver_host, sender_host, address, large_msg=False):
+ super(MobileAddressAnonymousTest, self).__init__()
+ self.receiver_host = receiver_host
+ self.sender_host = sender_host
+ self.receiver_conn = None
+ self.sender_conn = None
+ self.receiver = None
+ self.sender = None
+ self.error = None
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.address = address
+ self.ready = False
+ self.custom_timer = None
+ self.num_msgs = 300
+ self.extra_msgs = 50
+ self.n_accepted = 0
+ self.n_modified = 0
+ self.n_released = 0
+ self.error = None
+ self.max_attempts = 3
+ self.num_attempts = 0
+ self.test_started = False
+ self.large_msg = large_msg
+ if self.large_msg:
+ self.body = "0123456789101112131415" * 10000
+ self.properties = {'big field': 'X' * 32000}
+
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(15.0 if self.large_msg else 5.0, Timeout(self))
+ self.receiver_conn = event.container.connect(self.receiver_host)
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.receiver = event.container.create_receiver(self.receiver_conn, self.address)
+ # This is an anonymous sender.
+ self.sender = event.container.create_sender(self.sender_conn)
+
+ def cancel_custom(self):
+ self.custom_timer.cancel()
+
+ def timeout(self):
+ if self.ready:
+ self.error = "Timeout Expired - n_sent=%d n_accepted=%d n_modified=%d n_released=%d" % (
+ self.n_sent, self.n_accepted, self.n_modified, self.n_released)
+ else:
+ self.error = "Did not get a settlement from the receiver. The test cannot be started until " \
+ "a settlement to a test message is received"
+ self.receiver_conn.close()
+ self.sender_conn.close()
+
+ def on_sendable(self, event):
+ if not self.test_started:
+ message = Message(body="Test Message")
+ message.address = self.address
+ self.sender.send(message)
+ self.num_attempts += 1
+ self.test_started = True
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ if self.ready:
+ self.n_rcvd += 1
+
+ def on_link_closed(self, event):
+ # The receiver has closed. We will send messages again and
+ # make sure they are released.
+ if event.receiver == self.receiver:
+ for i in range(self.extra_msgs):
+ if self.large_msg:
+ message = Message(body=self.body, properties=self.properties)
+ else:
+ message = Message(body="Message %d" % self.n_sent)
+ message.address = self.address
+ self.sender.send(message)
+ self.n_sent += 1
+
+ def on_settled(self, event):
+ rdisp = str(event.delivery.remote_state)
+ if rdisp == "RELEASED" and not self.ready:
+ if self.num_attempts < self.max_attempts:
+ self.custom_timer = event.reactor.schedule(1, CustomTimeout(self))
+ self.num_attempts += 1
+ elif rdisp == "ACCEPTED" and not self.ready:
+ self.ready = True
+ for i in range(self.num_msgs):
+ if self.large_msg:
+ message = Message(body=self.body, properties=self.properties)
+ else:
+ message = Message(body="Message %d" % self.n_sent)
+ message.address = self.address
+ self.sender.send(message)
+ self.n_sent += 1
+ elif rdisp == "ACCEPTED" and self.ready:
+ self.n_accepted += 1
+ if self.n_accepted == self.num_msgs:
+ # Close the receiver after sending 300 messages
+ self.receiver.close()
+ elif rdisp == "RELEASED" and self.ready:
+ self.n_released += 1
+ elif rdisp == "MODIFIED" and self.ready:
+ self.n_modified += 1
+
+ if self.num_msgs == self.n_accepted and self.extra_msgs == self.n_released + self.n_modified:
+ self.receiver_conn.close()
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
class MobileAddressTest(MessagingHandler):
"""
From a single container create a sender and a receiver connection.
@@ -1494,7 +2003,7 @@ class MobileAddressTest(MessagingHandler):
(self.n_accepted, self.normal_count))
elif rdisp == "RELEASED" or rdisp == "MODIFIED":
self.n_rel_or_mod += 1
- self.logger.log("on_settled sender: %s %d (of %d)" %
+ self.logger.log("on_settled sender: %s %d (of %d)" %
(rdisp, self.n_rel_or_mod, self.extra_count))
else:
self.logger.log("on_settled sender: WARNING unexpected settlement: %s, n_accepted: %d, n_rel_or_mod: %d" %
@@ -1635,13 +2144,15 @@ class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
class MobileAddressMulticastTest(MessagingHandler):
def __init__(self, receiver1_host, receiver2_host, receiver3_host,
- sender_host, address, check_addr_host=None, large_msg=False):
+ sender_host, address, check_addr_host=None, large_msg=False,
+ anon_sender=False):
super(MobileAddressMulticastTest, self).__init__()
self.receiver1_host = receiver1_host
self.receiver2_host = receiver2_host
self.receiver3_host = receiver3_host
self.sender_host = sender_host
self.address = address
+ self.anon_sender = anon_sender
# One sender connection and two receiver connections
self.receiver1_conn = None
@@ -1708,7 +2219,10 @@ class MobileAddressMulticastTest(MessagingHandler):
def create_sndr(self):
self.sender_conn = self.container.connect(self.sender_host)
- self.sender = self.container.create_sender(self.sender_conn,
+ if self.anon_sender:
+ self.sender = self.container.create_sender(self.sender_conn)
+ else:
+ self.sender = self.container.create_sender(self.sender_conn,
self.address)
def check_address(self):
@@ -1765,6 +2279,8 @@ class MobileAddressMulticastTest(MessagingHandler):
msg = Message(body=self.body, properties=self.properties)
else:
msg = Message(body="Message %d" % self.n_sent)
+ if self.anon_sender:
+ msg.address = self.address
msg.correlation_id = self.n_sent
self.sender.send(msg)
self.n_sent += 1
@@ -1847,6 +2363,52 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
self.n_released += 1
self._check_done()
+
+class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddressMulticastTest):
+ # failure scenario - cause some receiving clients to close while a large
+ # message is in transit
+ def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+ sender_host, address, check_addr_host=None, large_msg=True, anon_sender=True):
+ super(MobileAddrMcastAnonSenderDroppedRxTest, self).__init__(receiver1_host,
+ receiver2_host,
+ receiver3_host,
+ sender_host,
+ address,
+ check_addr_host=check_addr_host,
+ large_msg=large_msg,
+ anon_sender=anon_sender)
+ self.n_accepted = 0
+ self.n_released = 0
+ self.recv1_closed = False
+ self.recv2_closed = False
+
+ def _check_done(self):
+ if self.n_accepted + self.n_released == self.count:
+ self.receiver3_conn.close()
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def on_message(self, event):
+ super(MobileAddrMcastAnonSenderDroppedRxTest, self).on_message(event)
+
+ # start closing receivers
+ if self.n_rcvd1 == 50:
+ if not self.recv1_closed:
+ self.receiver1_conn.close()
+ self.recv1_closed = True
+ if self.n_rcvd2 == 75:
+ if not self.recv2_closed:
+ self.recv2_closed = True
+ self.receiver2_conn.close()
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+ self._check_done()
+
+ def on_released(self, event):
+ self.n_released += 1
+ self._check_done()
+
class MobileAddressEventTest(MessagingHandler):
def __init__(self, receiver1_host, receiver2_host, receiver3_host,
sender_host, interior_host, address):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org