You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2019/05/08 13:20:40 UTC

[qpid-dispatch] branch master updated: DISPATCH-1326 - Handling of anonymous messages sent to edge routers. Added small message and large message tests. This closes #502

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new fa1b167  DISPATCH-1326 - Handling of anonymous messages sent to edge routers. Added small message and large message tests. This closes #502
fa1b167 is described below

commit fa1b167dcf1e77a40f9fcb5bba3074a698b6a6ab
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon May 6 17:40:30 2019 -0400

    DISPATCH-1326 - Handling of anonymous messages sent to edge routers. Added small message and large message tests. This closes #502
---
 src/router_core/modules/edge_router/addr_proxy.c |  12 +
 src/router_core/router_core_private.h            |   4 +
 src/router_core/transfer.c                       |  53 +++
 src/router_node.c                                |  10 +
 tests/system_tests_edge_router.py                | 570 ++++++++++++++++++++++-
 5 files changed, 645 insertions(+), 4 deletions(-)

diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index 5cc54a7..8a7a0ee 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -432,6 +432,14 @@ static void on_transfer(void           *link_context,
     qdrc_endpoint_flow_CT(ap->core, ap->tracking_endpoint, 1, false);
 }
 
+qdr_address_t *qcm_edge_conn_addr(void *link_context)
+{
+    qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) link_context;
+    if (!ap)
+        return 0;
+    return ap->edge_conn_addr;
+}
+
 
 static void on_cleanup(void *link_context)
 {
@@ -477,10 +485,14 @@ qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core)
                                             on_addr_event,
                                             ap);
 
+    core->edge_conn_addr = qcm_edge_conn_addr;
+    core->edge_context = ap;
+
     return ap;
 }
 
 
+
 void qcm_edge_addr_proxy_final(qcm_edge_addr_proxy_t *ap)
 {
     qdrc_event_unsubscribe_CT(ap->core, ap->event_sub);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e7161e5..9cdb25f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -678,6 +678,7 @@ void qdr_core_delete_auto_link (qdr_core_t *core,  qdr_auto_link_t *al);
 
 // Core timer related field/data structures
 typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
+typedef qdr_address_t * (*qdr_edge_conn_addr_t) (void *context);
 
 typedef struct qdr_core_timer_t {
     DEQ_LINKS(struct qdr_core_timer_t);
@@ -850,6 +851,9 @@ struct qdr_core_t {
     uint64_t  deliveries_ingress_route_container;
     uint64_t  deliveries_delayed_1sec;
     uint64_t  deliveries_delayed_10sec;
+
+    qdr_edge_conn_addr_t          edge_conn_addr;
+    void                         *edge_context;
 };
 
 struct qdr_terminus_t {
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index cd6bc81..e327636 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -480,6 +480,18 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         return;
     }
 
+    //
+    // If the anonymous delivery could not be sent anywhere (fanout = 0) and it is not multicasted, try sending it over
+    // the anonymous link.
+    //
+    if (fanout == 0 && !dlv->multicast && link->owning_addr == 0 && dlv->to_addr != 0) {
+        if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
+            qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
+            if (sender_address && sender_address != addr) {
+                fanout += qdr_forward_message_CT(core, sender_address, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
+            }
+        }
+    }
 
     if (fanout == 0) {
         //
@@ -616,12 +628,48 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
     //
 
     if (DEQ_IS_EMPTY(link->undelivered)) {
+        qdr_link_ref_t *temp_rlink = 0;
         qdr_address_t *addr = link->owning_addr;
         if (!addr && dlv->to_addr) {
             qdr_connection_t *conn = link->conn;
             if (conn && conn->tenant_space)
                 qd_iterator_annotate_space(dlv->to_addr, conn->tenant_space, conn->tenant_space_len);
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
+
+            if (!addr) {
+                //
+                // This is an anonymous delivery but the address that it wants sent to is
+                // not in this router's address table. We will send this delivery up the
+                // anonymous link to the interior router (if this is an edge router).
+                // Only edge routers have a non null core->edge_conn_addr
+                //
+                if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
+                    qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
+                    if (sender_address) {
+                        addr = sender_address;
+                    }
+                }
+            }
+            else {
+                //
+                // (core->edge_conn_addr is non-zero ONLY on edge routers. So there is no need to check if the
+                // core->router_mode is edge.
+                //
+                // The connection on which the delivery arrived should not be QDR_ROLE_EDGE_CONNECTION because
+                // we do not want to send it back over the same connections
+                //
+                if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION && qdr_is_addr_treatment_multicast(addr)) {
+                    qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
+                    if (sender_address && sender_address != addr) {
+                        qdr_link_ref_t *sender_rlink = DEQ_HEAD(sender_address->rlinks);
+                        if (sender_rlink) {
+                            temp_rlink = new_qdr_link_ref_t();
+                            temp_rlink->link = sender_rlink->link;
+                            DEQ_INSERT_TAIL(addr->rlinks, temp_rlink);
+                        }
+                    }
+                }
+            }
         }
 
         //
@@ -637,6 +685,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
             //
             qdr_link_forward_CT(core, link, dlv, addr, more);
         }
+
+        if (addr && temp_rlink) {
+            DEQ_REMOVE(addr->rlinks, temp_rlink);
+            free_qdr_link_ref_t(temp_rlink);
+        }
     } else {
         //
         // Take the action reference and use it for undelivered.  Don't decref/incref.
diff --git a/src/router_node.c b/src/router_node.c
index 141ae63..959020c 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1383,6 +1383,16 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu
     //
     pn_link_open(qd_link_pn(qlink));
 
+    qd_connection_t  *conn     = qd_link_connection(qlink);
+    qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+    //
+    // All links on the inter router or edge connection have unbounded q2 limit
+    //
+    if (qdr_conn->role == QDR_ROLE_EDGE_CONNECTION || qdr_conn->role == QDR_ROLE_INTER_ROUTER) {
+        qd_link_set_q2_limit_unbounded(qlink, true);
+    }
+
+
     //
     // Mark the link as stalled and waiting for initial credit.
     //
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index 533eecc..113c790 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -328,7 +328,33 @@ class RouterTest(TestCase):
                      'test_38' : 0,
                      'test_39' : 0,
                      'test_40' : 0,
-                     'test_41' : 0
+                     'test_41' : 0,
+                     'test_42' : 0,
+                     'test_43':  0,
+                     'test_44':  0,
+                     'test_45':  0,
+                     'test_46':  0,
+                     'test_47':  0,
+                     'test_48':  0,
+                     'test_49':  0,
+                     'test_50':  0,
+                     'test_51':  0,
+                     'test_52':  0,
+                     'test_53':  0,
+                     'test_54':  0,
+                     'test_55':  0,
+                     'test_56':  0,
+                     'test_57':  0,
+                     'test_58':  0,
+                     'test_59':  0,
+                     'test_60':  0,
+                     'test_61':  0,
+                     'test_62':  0,
+                     'test_63':  0,
+                     'test_64':  0,
+                     'test_65':  0,
+                     'test_66':  0,
+                     'test_67':  0
                    }
 
     def test_01_connectivity_INTA_EA1(self):
@@ -837,6 +863,353 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_42_anon_sender_mobile_address_same_edge(self):
+        if self.skip [ 'test_42' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "test_42")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_43_anon_sender_mobile_address_interior_to_edge(self):
+        if self.skip [ 'test_43' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "test_43")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_44_anon_sender_mobile_address_edge_to_interior(self):
+        if self.skip [ 'test_44' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[0].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "test_44")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_45_anon_sender_mobile_address_edge_to_edge_one_interior(self):
+        if self.skip [ 'test_45' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          "test_45")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_46_anon_sender_mobile_address_edge_to_edge_two_interior(self):
+        if self.skip [ 'test_46' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          "test_46")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_47_anon_sender_mobile_address_large_msg_same_edge(self):
+        if self.skip [ 'test_47' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "test_47", True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_48_anon_sender_mobile_address_large_msg_interior_to_edge(self):
+        if self.skip [ 'test_48' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "test_48", True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_49_anon_sender_mobile_address_large_msg_edge_to_interior(self):
+        if self.skip [ 'test_49' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[0].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "test_49", True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_50_anon_sender_mobile_address_large_msg_edge_to_edge_one_interior(self):
+        if self.skip [ 'test_50' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          "test_50", True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_51_anon_sender_mobile_address_large_msg_edge_to_edge_two_interior(self):
+        if self.skip [ 'test_51' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressAnonymousTest(self.routers[2].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          "test_51", True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # 1 Sender and 3 receivers all on the same edge
+    def test_52_anon_sender_multicast_mobile_address_same_edge(self):
+        if self.skip [ 'test_52' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.52", anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # 1 Sender and receiver on one edge and 2 receivers on another edge
+    # all in the same  interior
+    def test_53_anon_sender_multicast_mobile_address_different_edges_same_interior(self):
+        if self.skip [ 'test_53' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          "multicast.53",
+                                          self.routers[0].addresses[0],
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on each edge, one receiver on interior and sender
+    # on the edge
+    def test_54_anon_sender_multicast_mobile_address_edge_to_interior(self):
+        if self.skip [ 'test_54' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.54",
+                                          self.routers[0].addresses[0],
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on the interior
+    def test_55_anon_sender_multicast_mobile_address_interior_to_edge(self):
+        if self.skip [ 'test_55' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.55",
+                                          self.routers[0].addresses[0],
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on an interior that is not connected
+    # to the edges.
+    def test_56_anon_sender_multicast_mobile_address_other_interior_to_edge(self):
+        if self.skip [ 'test_56' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[1].addresses[0],
+                                          "multicast.56",
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Sender on an interior and 3 receivers connected to three different edges
+    def test_57_anon_sender_multicast_mobile_address_edge_to_edge_two_interiors(self):
+        if self.skip [ 'test_57' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.57",
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_58_anon_sender_multicast_mobile_address_all_edges(self):
+        if self.skip [ 'test_58' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[5].addresses[0],
+                                          "multicast.58",
+                                          self.routers[0].addresses[0],
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    ######### Multicast Large message anon sender tests ####################
+
+    # 1 Sender and 3 receivers all on the same edge
+    def test_59_anon_sender__multicast_mobile_address_same_edge(self):
+        if self.skip [ 'test_59' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.59",
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # 1 Sender on one edge and 3 receivers on another edge all in the same
+    # interior
+    def test_60_anon_sender_multicast_mobile_address_different_edges_same_interior(self):
+        if self.skip [ 'test_60' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          "multicast.60",
+                                          self.routers[0].addresses[0],
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on each edge, one receiver on interior and sender
+    # on the edge
+    def test_61_anon_sender_multicast_mobile_address_edge_to_interior(self):
+        if self.skip [ 'test_61' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.61",
+                                          self.routers[3].addresses[0],
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on the interior
+    def test_62_anon_sender_multicast_mobile_address_interior_to_edge(self):
+        if self.skip [ 'test_62' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.62",
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on an interior that is not connected
+    # to the edges.
+    def test_63_anon_sender_multicast_mobile_address_other_interior_to_edge(self):
+        if self.skip [ 'test_63' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[1].addresses[0],
+                                          "multicast.63",
+                                          self.routers[0].addresses[0],
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Sender on an interior and 3 receivers connected to three different edges
+    def test_64_anon_sender_multicast_mobile_address_edge_to_edge_two_interiors(self):
+        if self.skip [ 'test_64' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.64",
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_65_anon_sender_multicast_mobile_address_all_edges(self):
+        if self.skip [ 'test_65' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[5].addresses[0],
+                                          "multicast.65",
+                                          self.routers[0].addresses[0],
+                                          large_msg=True,
+                                          anon_sender=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_66_anon_sender_drop_rx_client_multicast_large_message(self):
+        # test what happens if some multicast receivers close in the middle of
+        # a multiframe transfer. The sender is an anonymous sender.
+        if self.skip [ 'test_66' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddrMcastAnonSenderDroppedRxTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      "multicast.66")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_67_drop_rx_client_multicast_small_message(self):
+        # test what happens if some multicast receivers close in the middle of
+        # a multiframe transfer. The sender is an anonymous sender.
+        if self.skip [ 'test_67' ] :
+            self.skipTest ( "Test skipped during development." )
+
+        test = MobileAddrMcastAnonSenderDroppedRxTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      "multicast.67",
+                                                      large_msg=False)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class LinkRouteProxyTest(TestCase):
     """
@@ -1401,6 +1774,142 @@ class Logger(object):
         return '\n'.join(self.msgs)
 
 
+class CustomTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        message = Message(body="Test Message")
+        message.address = self.parent.address
+        self.parent.sender.send(message)
+        self.parent.cancel_custom()
+
+
+class MobileAddressAnonymousTest(MessagingHandler):
+    """
+    Attach a receiver to the interior and an anonymous sender to the edge router
+    In a non-anonymous sender scenario, the sender will never be given credit
+    to send until a receiver on the same address shows up . Since this
+    is an anonymous sender, credit is given instatnly and the sender starts
+    sending immediately.
+
+    This test will first send 3 messages with a one second interval to make
+    sure receiver is available. Then it will fire off 300 messages
+    After dispositions are received for the 300 messages, it will close the
+    receiver and send 50 more messages. These 50 messages should be released
+    or modified.
+    """
+    def __init__(self, receiver_host, sender_host, address, large_msg=False):
+        super(MobileAddressAnonymousTest, self).__init__()
+        self.receiver_host = receiver_host
+        self.sender_host = sender_host
+        self.receiver_conn = None
+        self.sender_conn = None
+        self.receiver = None
+        self.sender = None
+        self.error = None
+        self.n_sent = 0
+        self.n_rcvd = 0
+        self.address = address
+        self.ready = False
+        self.custom_timer = None
+        self.num_msgs = 300
+        self.extra_msgs = 50
+        self.n_accepted = 0
+        self.n_modified = 0
+        self.n_released = 0
+        self.error = None
+        self.max_attempts = 3
+        self.num_attempts = 0
+        self.test_started = False
+        self.large_msg = large_msg
+        if self.large_msg:
+            self.body = "0123456789101112131415" * 10000
+            self.properties = {'big field': 'X' * 32000}
+
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(15.0 if self.large_msg else 5.0, Timeout(self))
+        self.receiver_conn = event.container.connect(self.receiver_host)
+        self.sender_conn   = event.container.connect(self.sender_host)
+        self.receiver      = event.container.create_receiver(self.receiver_conn, self.address)
+        # This is an anonymous sender.
+        self.sender        = event.container.create_sender(self.sender_conn)
+
+    def cancel_custom(self):
+        self.custom_timer.cancel()
+
+    def timeout(self):
+        if self.ready:
+            self.error = "Timeout Expired - n_sent=%d n_accepted=%d n_modified=%d n_released=%d" % (
+            self.n_sent,  self.n_accepted, self.n_modified, self.n_released)
+        else:
+            self.error = "Did not get a settlement from the receiver. The test cannot be started until " \
+                         "a settlement to a test message is received"
+        self.receiver_conn.close()
+        self.sender_conn.close()
+
+    def on_sendable(self, event):
+        if not self.test_started:
+            message = Message(body="Test Message")
+            message.address = self.address
+            self.sender.send(message)
+            self.num_attempts += 1
+            self.test_started = True
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if self.ready:
+                self.n_rcvd += 1
+
+    def on_link_closed(self, event):
+        # The receiver has closed. We will send messages again and
+        # make sure they are released.
+        if event.receiver == self.receiver:
+            for i in range(self.extra_msgs):
+                if self.large_msg:
+                    message = Message(body=self.body, properties=self.properties)
+                else:
+                    message = Message(body="Message %d" % self.n_sent)
+                message.address = self.address
+                self.sender.send(message)
+                self.n_sent += 1
+
+    def on_settled(self, event):
+        rdisp = str(event.delivery.remote_state)
+        if rdisp == "RELEASED" and not self.ready:
+            if self.num_attempts < self.max_attempts:
+                self.custom_timer = event.reactor.schedule(1, CustomTimeout(self))
+                self.num_attempts += 1
+        elif rdisp == "ACCEPTED" and not self.ready:
+            self.ready = True
+            for i in range(self.num_msgs):
+                if self.large_msg:
+                    message = Message(body=self.body, properties=self.properties)
+                else:
+                    message = Message(body="Message %d" % self.n_sent)
+                message.address = self.address
+                self.sender.send(message)
+                self.n_sent += 1
+        elif rdisp == "ACCEPTED" and self.ready:
+            self.n_accepted += 1
+            if self.n_accepted == self.num_msgs:
+                # Close the receiver after sending 300 messages
+                self.receiver.close()
+        elif rdisp == "RELEASED" and self.ready:
+            self.n_released += 1
+        elif rdisp == "MODIFIED" and self.ready:
+            self.n_modified += 1
+
+        if self.num_msgs == self.n_accepted and self.extra_msgs == self.n_released + self.n_modified:
+            self.receiver_conn.close()
+            self.sender_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
 class MobileAddressTest(MessagingHandler):
     """
     From a single container create a sender and a receiver connection.
@@ -1494,7 +2003,7 @@ class MobileAddressTest(MessagingHandler):
                                 (self.n_accepted, self.normal_count))
             elif rdisp == "RELEASED" or rdisp == "MODIFIED":
                 self.n_rel_or_mod += 1
-                self.logger.log("on_settled sender: %s %d (of %d)" % 
+                self.logger.log("on_settled sender: %s %d (of %d)" %
                                 (rdisp, self.n_rel_or_mod, self.extra_count))
             else:
                 self.logger.log("on_settled sender: WARNING unexpected settlement: %s, n_accepted: %d, n_rel_or_mod: %d" %
@@ -1635,13 +2144,15 @@ class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
 
 class MobileAddressMulticastTest(MessagingHandler):
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,
-                 sender_host, address, check_addr_host=None, large_msg=False):
+                 sender_host, address, check_addr_host=None, large_msg=False,
+                 anon_sender=False):
         super(MobileAddressMulticastTest, self).__init__()
         self.receiver1_host = receiver1_host
         self.receiver2_host = receiver2_host
         self.receiver3_host = receiver3_host
         self.sender_host = sender_host
         self.address = address
+        self.anon_sender = anon_sender
 
         # One sender connection and two receiver connections
         self.receiver1_conn = None
@@ -1708,7 +2219,10 @@ class MobileAddressMulticastTest(MessagingHandler):
 
     def create_sndr(self):
         self.sender_conn = self.container.connect(self.sender_host)
-        self.sender = self.container.create_sender(self.sender_conn,
+        if self.anon_sender:
+            self.sender = self.container.create_sender(self.sender_conn)
+        else:
+            self.sender = self.container.create_sender(self.sender_conn,
                                                    self.address)
 
     def check_address(self):
@@ -1765,6 +2279,8 @@ class MobileAddressMulticastTest(MessagingHandler):
                 msg = Message(body=self.body, properties=self.properties)
             else:
                 msg = Message(body="Message %d" % self.n_sent)
+            if self.anon_sender:
+                msg.address = self.address
             msg.correlation_id = self.n_sent
             self.sender.send(msg)
             self.n_sent += 1
@@ -1847,6 +2363,52 @@ class MobileAddrMcastDroppedRxTest(MobileAddressMulticastTest):
         self.n_released += 1
         self._check_done()
 
+
+class MobileAddrMcastAnonSenderDroppedRxTest(MobileAddressMulticastTest):
+    # failure scenario - cause some receiving clients to close while a large
+    # message is in transit
+    def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+                 sender_host, address, check_addr_host=None, large_msg=True, anon_sender=True):
+        super(MobileAddrMcastAnonSenderDroppedRxTest, self).__init__(receiver1_host,
+                                                                     receiver2_host,
+                                                                     receiver3_host,
+                                                                     sender_host,
+                                                                     address,
+                                                                     check_addr_host=check_addr_host,
+                                                                     large_msg=large_msg,
+                                                                     anon_sender=anon_sender)
+        self.n_accepted = 0
+        self.n_released = 0
+        self.recv1_closed = False
+        self.recv2_closed = False
+
+    def _check_done(self):
+        if self.n_accepted + self.n_released == self.count:
+            self.receiver3_conn.close()
+            self.sender_conn.close()
+            self.timer.cancel()
+
+    def on_message(self, event):
+        super(MobileAddrMcastAnonSenderDroppedRxTest, self).on_message(event)
+
+        # start closing receivers
+        if self.n_rcvd1 == 50:
+            if not self.recv1_closed:
+                self.receiver1_conn.close()
+                self.recv1_closed = True
+        if self.n_rcvd2 == 75:
+            if not self.recv2_closed:
+                self.recv2_closed = True
+                self.receiver2_conn.close()
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        self._check_done()
+
+    def on_released(self, event):
+        self.n_released += 1
+        self._check_done()
+
 class MobileAddressEventTest(MessagingHandler):
     def __init__(self, receiver1_host, receiver2_host, receiver3_host,
                  sender_host, interior_host, address):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org