You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/11/01 16:24:39 UTC

qpid-dispatch git commit: DISPATCH-1160 - Added edge address tracking module to interior router This closes #410

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 9af66023d -> 6cf2f20ab


DISPATCH-1160 - Added edge address tracking module to interior router
This closes #410


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6cf2f20a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6cf2f20a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6cf2f20a

Branch: refs/heads/master
Commit: 6cf2f20ab297d963330e635c53393cba9dc7e8a4
Parents: 9af6602
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Thu Oct 18 11:46:42 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Nov 1 11:54:15 2018 -0400

----------------------------------------------------------------------
 src/CMakeLists.txt                              |   1 +
 .../edge_addr_tracking/edge_addr_tracking.c     | 373 ++++++++++
 src/router_core/router_core.c                   |  13 +
 src/router_core/router_core_private.h           |   7 +
 tests/system_tests_edge_router.py               | 709 ++++++++++++++++++-
 5 files changed, 1098 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b8eb36f..d7f3405 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -105,6 +105,7 @@ set(qpid_dispatch_SOURCES
   router_core/modules/edge_router/addr_proxy.c
   router_core/modules/edge_router/connection_manager.c
   router_core/modules/test_hooks/core_test_hooks.c
+  router_core/modules/edge_addr_tracking/edge_addr_tracking.c
   router_node.c
   router_pynode.c
   schema_enum.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
new file mode 100644
index 0000000..43244e9
--- /dev/null
+++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+#include <stdio.h>
+
+typedef struct qdr_addr_tracking_module_context_t     qdr_addr_tracking_module_context_t;
+typedef struct qdr_addr_endpoint_state_t              qdr_addr_endpoint_state_t;
+
+struct qdr_addr_endpoint_state_t {
+    DEQ_LINKS(qdr_addr_endpoint_state_t);
+    qdrc_endpoint_t                    *endpoint;
+    qdr_connection_t                   *conn;    // The connection associated with the endpoint.
+    qdr_addr_tracking_module_context_t *mc;
+};
+
+DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
+ALLOC_DECLARE(qdr_addr_endpoint_state_t);
+ALLOC_DEFINE(qdr_addr_endpoint_state_t);
+
+struct  qdr_addr_tracking_module_context_t {
+    qdr_core_t                     *core;
+    qdr_addr_endpoint_state_list_t  endpoint_state_list;
+    qdrc_event_subscription_t      *event_sub;
+    qdrc_endpoint_desc_t           addr_tracking_endpoint;
+};
+
+
+static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t   *addr, bool insert_addr)
+{
+    qd_message_t *msg = qd_message();
+
+    //
+    // Start header
+    //
+    qd_composed_field_t *fld   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(fld);
+    qd_compose_insert_bool(fld, 0);     // durable
+    qd_compose_end_list(fld);
+
+    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+    qd_compose_start_list(body);
+
+    const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
+
+    qd_compose_insert_string(body, addr_str);
+    qd_compose_insert_bool(body, insert_addr);
+    qd_compose_end_list(body);
+
+    // Finally, compose and retuen the message so it can be sent out.
+    qd_message_compose_3(msg, fld, body);
+
+    return msg;
+}
+
+static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t  endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+{
+    qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
+    while(endpoint_state) {
+        if (endpoint_state->conn == conn) {
+            return endpoint_state;
+        }
+        endpoint_state = DEQ_NEXT(endpoint_state);
+    }
+    return 0;
+}
+
+
+static void qdrc_address_endpoint_first_attach(void              *bind_context,
+                                               qdrc_endpoint_t   *endpoint,
+                                               void             **link_context,
+                                               qdr_terminus_t   *remote_source,
+                                               qdr_terminus_t   *remote_target)
+{
+    qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
+
+    qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
+
+    ZERO(endpoint_state);
+    endpoint_state->endpoint = endpoint;
+    endpoint_state->mc       = bc;
+    endpoint_state->conn     = qdrc_endpoint_get_connection_CT(endpoint);
+
+    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
+
+
+    //
+    // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link
+    // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
+    //
+    if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
+        *link_context = endpoint_state;
+        qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
+   }
+    else {
+        //
+        // We simply detach any links that dont match the above condition.
+        //
+        *link_context = 0;
+        qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
+    }
+}
+
+
+static void qdrc_address_endpoint_on_first_detach(void *link_context,
+                                              qdr_error_t *error)
+{
+    qdr_addr_endpoint_state_t *endpoint_state  = (qdr_addr_endpoint_state_t *)link_context;
+    qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0);
+    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+    DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+    free_qdr_addr_endpoint_state_t(endpoint_state);
+}
+
+
+static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
+{
+    if (!addr)
+        return false;
+
+    bool can_send = false;
+    if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) {
+        // There is at least one receiver for this address somewhere in the router network
+        can_send = true;
+    }
+    if (!can_send) {
+        if (DEQ_SIZE(addr->rlinks) == 1) {
+            qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+            if (link_ref->link->conn != conn)
+                can_send=true;
+        }
+    }
+    return can_send;
+}
+
+
+static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr)
+{
+    if (!addr)
+        return;
+
+    if (!endpoint)
+        return;
+
+    qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
+    qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
+
+    qdrc_endpoint_send_CT(core, endpoint, dlv, true);
+}
+
+static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
+{
+    // We only care about mobile addresses.
+    if(!qdr_address_is_mobile_CT(addr))
+        return;
+
+    qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context;
+    switch (event) {
+        case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
+            //
+            // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything
+            //
+            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+                //
+                // Every inlink that has an edge context must be informed of the appearence of this address.
+                //
+                while (inlink) {
+                    if(inlink->link->edge_context != 0) {
+                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+                        if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+                            qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+                            qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+                        }
+                    }
+                    inlink = DEQ_NEXT(inlink);
+                }
+            }
+            break;
+        }
+        case QDRC_EVENT_ADDR_BECAME_DEST : {
+            //
+            // This address transitioned from zero to one destination. If this address already had local destinations
+            //
+            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+            //
+            // Every inlink that has an edge context must be informed of the appearence of this address.
+            //
+            while (inlink) {
+                if(inlink->link->edge_context != 0) {
+                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+                    if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+                        qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+                    }
+                }
+                inlink = DEQ_NEXT(inlink);
+            }
+        }
+        break;
+
+        case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
+            // The address no longer has any local destinations.
+            // If there are no remote destinations either, we have to tell the edge routers to delete their sender links
+            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+                //
+                // Every inlink that has an edge context must be informed of the disappearence of this address.
+                //
+                while (inlink) {
+                    if(inlink->link->edge_context != 0) {
+                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+                        qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+                    }
+                    inlink = DEQ_NEXT(inlink);
+                }
+            }
+
+            break;
+        }
+        case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
+            //
+            // This address transitioned from N destinations to one local dest
+            // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it
+            //
+            assert(DEQ_SIZE(addr->rlinks) == 1);
+            //
+            // There should be only one rlink in the rlinks list
+            //
+            qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+            qdr_link_t *link = rlink_ref->link;
+
+            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+            while (inlink) {
+                if(inlink->link->edge_context != 0) {
+                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+                    qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+                    if (endpoint_state->conn == link->conn) {
+                        qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+                        break;
+                    }
+                }
+                inlink = DEQ_NEXT(inlink);
+            }
+        }
+        break;
+        case QDRC_EVENT_ADDR_TWO_DEST: {
+            //
+            // The address transitioned from one local dest to two destinations, The second destination might be local or remote.
+            //
+            qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+            qdr_link_t *link = rlink_ref->link;
+
+            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+            while (inlink) {
+                if(inlink->link->edge_context != 0) {
+                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+                    qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+                    if (link->conn == endpoint_state->conn) {
+                        qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+                        break;
+                    }
+                }
+                inlink = DEQ_NEXT(inlink);
+            }
+            break;
+        }
+
+        default:
+            break;
+    }
+}
+
+static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link)
+{
+    switch (event) {
+        case QDRC_EVENT_LINK_EDGE_DATA_ATTACHED :
+        {
+            qdr_addr_tracking_module_context_t *mc = (qdr_addr_tracking_module_context_t *) context;
+            qdr_address_t *addr = link->owning_addr;
+            if (addr && qdr_address_is_mobile_CT(addr)) {
+                qdr_addr_endpoint_state_t *endpoint_state = qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list, link->conn, link);
+                link->edge_context = endpoint_state;
+
+                if (qdrc_can_send_address(addr, link->conn) && endpoint_state) {
+                    qdrc_send_message(mc->core, addr, endpoint_state->endpoint, true);
+                }
+            }
+            break;
+        }
+        default:
+            break;
+    }
+}
+
+
+static void qdrc_edge_address_tracking_module_init_CT(qdr_core_t *core, void **module_context)
+{
+    //
+    // Address tracking is ennabled only on interior routers
+    //
+    if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
+        return;
+
+    qdr_addr_tracking_module_context_t *context = NEW(qdr_addr_tracking_module_context_t);
+    ZERO(context);
+    context->core = core;
+    *module_context = context;
+
+
+    //
+    // Bind to the static address QD_TERMINUS_EDGE_ADDRESS_TRACKING
+    //
+    context->addr_tracking_endpoint.label = "qdrc_edge_address_tracking_module_init_CT";
+    context->addr_tracking_endpoint.on_first_attach  = qdrc_address_endpoint_first_attach;
+    context->addr_tracking_endpoint.on_first_detach  = qdrc_address_endpoint_on_first_detach;
+    qdrc_endpoint_bind_mobile_address_CT(core, QD_TERMINUS_EDGE_ADDRESS_TRACKING, '0', &context->addr_tracking_endpoint, context);
+
+    //
+    // Subscribe to address and link events.
+    //
+    context->event_sub = qdrc_event_subscribe_CT(core,
+            QDRC_EVENT_ADDR_BECAME_LOCAL_DEST | QDRC_EVENT_ADDR_ONE_LOCAL_DEST |
+            QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST | QDRC_EVENT_ADDR_BECAME_DEST | QDRC_EVENT_ADDR_TWO_DEST |
+            QDRC_EVENT_LINK_EDGE_DATA_ATTACHED,
+            0,
+            on_link_event,
+            on_addr_event,
+            context);
+}
+
+
+static void qdrc_edge_address_tracking_module_final_CT(void *module_context)
+{
+    if (module_context == 0)
+        return;
+
+    qdr_addr_tracking_module_context_t *mc = ( qdr_addr_tracking_module_context_t *)module_context;
+
+    // If there are any endpoint states still hanging around, clean it up.
+    qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(mc->endpoint_state_list);
+    while (endpoint_state) {
+        DEQ_REMOVE_HEAD(mc->endpoint_state_list);
+        free_qdr_addr_endpoint_state_t(endpoint_state);
+        endpoint_state = DEQ_HEAD(mc->endpoint_state_list);
+    }
+    qdrc_event_unsubscribe_CT(mc->core, mc->event_sub);
+    free(mc);
+}
+
+
+QDR_CORE_MODULE_DECLARE("edge_addr_tracking", qdrc_edge_address_tracking_module_init_CT, qdrc_edge_address_tracking_module_final_CT)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 119a5a1..c990f7b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -362,6 +362,19 @@ qdr_address_t *qdr_add_mobile_address_CT(qdr_core_t *core, const char *prefix, c
 }
 
 
+bool qdr_address_is_mobile_CT(qdr_address_t *addr)
+{
+    if (!addr)
+        return false;
+
+    const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
+
+    if (addr_str && addr_str[0] == QD_ITER_HASH_PREFIX_MOBILE)
+        return true;
+
+    return false;
+}
+
 bool qdr_is_addr_treatment_multicast(qdr_address_t *addr)
 {
     if (addr) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index d87aec6..ec51fd7 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -841,6 +841,13 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain);
 void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr);
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
+
+/**
+ * Returns true if the passed in address is a mobile address, false otherwise
+ * If the first character of the address_key (obtained using its hash_handle) is M, the address is mobile.
+ */
+bool qdr_address_is_mobile_CT(qdr_address_t *addr);
+
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index d7d16ae..1012dd3 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -22,6 +22,16 @@ from proton import Message, Timeout
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, DynamicNodeProperties
+from qpid_dispatch.management.client import Node
+
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+            self.parent.check_address()
+
 
 class RouterTest(TestCase):
 
@@ -146,7 +156,6 @@ class RouterTest(TestCase):
         self.assertEqual(None, test.error)
 
     def test_12_mobile_address_edge_to_interior(self):
-        self.skipTest("Temporarily disabled")
         test = MobileAddressTest(self.routers[0].addresses[0],
                                  self.routers[2].addresses[0],
                                  "test_12")
@@ -154,7 +163,6 @@ class RouterTest(TestCase):
         self.assertEqual(None, test.error)
 
     def test_13_mobile_address_edge_to_edge_one_interior(self):
-        self.skipTest("Temporarily disabled")
         test = MobileAddressTest(self.routers[2].addresses[0],
                                  self.routers[3].addresses[0],
                                  "test_13")
@@ -162,13 +170,279 @@ class RouterTest(TestCase):
         self.assertEqual(None, test.error)
 
     def test_14_mobile_address_edge_to_edge_two_interior(self):
-        self.skipTest("Temporarily disabled")
         test = MobileAddressTest(self.routers[2].addresses[0],
                                  self.routers[4].addresses[0],
                                  "test_14")
         test.run()
         self.assertEqual(None, test.error)
 
+    # One sender two receiver tests.
+    # One sender and two receivers on the same edge
+    def test_15_mobile_address_same_edge(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      "test_15")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # One sender and two receivers on the different edges. The edges are
+    #  hanging off the  same interior router.
+    def test_16_mobile_address_edge_to_another_edge_same_interior(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[3].addresses[0],
+                                                      "test_16")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on the interior and sender on the edge
+    def test_17_mobile_address_edge_to_interior(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[0].addresses[0],
+                                                      self.routers[0].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      "test_17")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on the edge and the sender on the interior
+    def test_18_mobile_address_interior_to_edge(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[0].addresses[0],
+                                                      "test_18")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on the edge and the sender on the 'other' interior
+    def test_19_mobile_address_other_interior_to_edge(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[1].addresses[0],
+                                                      "test_19")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on the edge and the sender on the edge of
+    # the 'other' interior
+    def test_20_mobile_address_edge_to_edge_two_interiors(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      self.routers[5].addresses[0],
+                                                      "test_20")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # One receiver in an edge, another one in interior and the sender
+    # is on the edge of another interior
+    def test_21_mobile_address_edge_interior_receivers(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[4].addresses[0],
+                                                      self.routers[1].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      "test_21")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers one on each interior router and and an edge sender
+    # connectoed to the first interior
+    def test_22_mobile_address_edge_sender_two_interior_receivers(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[0].addresses[0],
+                                                      self.routers[1].addresses[0],
+                                                      self.routers[3].addresses[0],
+                                                      "test_22")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_23_mobile_address_edge_sender_two_edge_receivers(self):
+        test = MobileAddressOneSenderTwoReceiversTest(self.routers[4].addresses[0],
+                                                      self.routers[5].addresses[0],
+                                                      self.routers[2].addresses[0],
+                                                      "test_23")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # 1 Sender and 3 receivers all on the same edge
+    def test_24_multicast_mobile_address_same_edge(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.24")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # 1 Sender and receiver on one edge and 2 receivers on another edge
+    # all in the same  interior
+    def test_25_multicast_mobile_address_different_edges_same_interior(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          "multicast.25",
+                                          self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on each edge, one receiver on interior and sender
+    # on the edge
+    def test_26_multicast_mobile_address_edge_to_interior(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.26",
+                                          self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on the interior
+    def test_27_multicast_mobile_address_interior_to_edge(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.27",
+                                          self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on an interior that is not connected
+    # to the edges.
+    def test_28_multicast_mobile_address_other_interior_to_edge(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[1].addresses[0],
+                                          "multicast.28")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Sender on an interior and 3 receivers connected to three different edges
+    def test_29_multicast_mobile_address_edge_to_edge_two_interiors(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.29")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_30_multicast_mobile_address_all_edges(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[5].addresses[0],
+                                          "multicast.30",
+                                          self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+
+    ######### Multicast Large message tests ######################
+
+    # 1 Sender and 3 receivers all on the same edge
+    def test_31_multicast_mobile_address_same_edge(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.31", large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # 1 Sender on one edge and 3 receivers on another edge all in the same
+    # interior
+    def test_32_multicast_mobile_address_different_edges_same_interior(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          "multicast.32",
+                                          self.routers[0].addresses[0],
+                                          large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Two receivers on each edge, one receiver on interior and sender
+    # on the edge
+    def test_33_multicast_mobile_address_edge_to_interior(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          "multicast.33", large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on the interior
+    def test_34_multicast_mobile_address_interior_to_edge(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.34", large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Receivers on the edge and sender on an interior that is not connected
+    # to the edges.
+    def test_35_multicast_mobile_address_other_interior_to_edge(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[1].addresses[0],
+                                          "multicast.35",
+                                          self.routers[0].addresses[0],
+                                          large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    # Sender on an interior and 3 receivers connected to three different edges
+    def test_36_multicast_mobile_address_edge_to_edge_two_interiors(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[0].addresses[0],
+                                          "multicast.36", large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_37_multicast_mobile_address_all_edges(self):
+        test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+                                          self.routers[3].addresses[0],
+                                          self.routers[4].addresses[0],
+                                          self.routers[5].addresses[0],
+                                          "multicast.37",
+                                          self.routers[0].addresses[0],
+                                          large_msg=True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_38_mobile_addr_event_three_receivers_same_interior(self):
+        test = MobileAddressEventTest(self.routers[2].addresses[0],
+                                      self.routers[3].addresses[0],
+                                      self.routers[3].addresses[0],
+                                      self.routers[2].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      "test_38")
+
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_39_mobile_addr_event_three_receivers_diff_interior(self):
+        # This will test the QDRC_EVENT_ADDR_TWO_DEST event
+        test = MobileAddressEventTest(self.routers[2].addresses[0],
+                                      self.routers[4].addresses[0],
+                                      self.routers[5].addresses[0],
+                                      self.routers[2].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      "test_39")
+
+        test.run()
+        self.assertEqual(None, test.error)
+
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -296,7 +570,10 @@ class MobileAddressTest(MessagingHandler):
 
         self.receiver_conn = None
         self.sender_conn   = None
+
         self.receiver      = None
+        self.sender        = None
+
         self.count         = 300
         self.rel_count     = 50
         self.n_rcvd        = 0
@@ -320,7 +597,8 @@ class MobileAddressTest(MessagingHandler):
 
     def on_sendable(self, event):
         while self.n_sent < self.count:
-            self.sender.send(Message(body="Message %d" % self.n_sent))
+            message = Message(body="Message %d" % self.n_sent)
+            self.sender.send(message)
             self.n_sent += 1
 
     def on_message(self, event):
@@ -345,5 +623,426 @@ class MobileAddressTest(MessagingHandler):
         Container(self).run()
 
 
-if __name__ == '__main__':
+class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
+    def __init__(self, receiver1_host, receiver2_host, sender_host, address):
+        super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
+        self.receiver1_host = receiver1_host
+        self.receiver2_host = receiver2_host
+        self.sender_host = sender_host
+        self.address = address
+
+        # One sender connection and two receiver connections
+        self.receiver1_conn = None
+        self.receiver2_conn = None
+        self.sender_conn   = None
+
+        self.receiver1 = None
+        self.receiver2 = None
+        self.sender = None
+
+        self.count = 300
+        self.rel_count = 50
+        self.n_rcvd1 = 0
+        self.n_rcvd2 = 0
+        self.n_sent = 0
+        self.n_settled = 0
+        self.n_released = 0
+        self.error = None
+        self.timer = None
+        self.all_msgs_received = False
+        self.recvd_msg_bodies = dict()
+        self.dup_msg = None
+
+    def timeout(self):
+        if self.dup_msg:
+            self.error = "Duplicate message %s received " % self.dup_msg
+        else:
+            self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
+                         (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address)
+
+        self.receiver1_conn.close()
+        self.receiver2_conn.close()
+        self.sender_conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5.0, Timeout(self))
+
+        # Create two receivers
+        self.receiver1_conn = event.container.connect(self.receiver1_host)
+        self.receiver2_conn = event.container.connect(self.receiver2_host)
+        self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+                                                         self.address)
+        self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+                                                         self.address)
+
+        # Create one sender
+        self.sender_conn = event.container.connect(self.sender_host)
+        self.sender = event.container.create_sender(self.sender_conn,
+                                                    self.address)
+
+    def on_sendable(self, event):
+        while self.n_sent < self.count:
+            self.sender.send(Message(body="Message %d" % self.n_sent))
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if self.recvd_msg_bodies.get(event.message.body):
+            self.dup_msg = event.message.body
+            self.timeout()
+        else:
+            self.recvd_msg_bodies[event.message.body] = event.message.body
+
+        if event.receiver == self.receiver1:
+            self.n_rcvd1 += 1
+        if event.receiver == self.receiver2:
+            self.n_rcvd2 += 1
+
+        if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
+            self.all_msgs_received = True
+
+    def on_settled(self, event):
+        self.n_settled += 1
+        if self.n_settled == self.count:
+            self.receiver1.close()
+            self.receiver2.close()
+            for i in range(self.rel_count):
+                self.sender.send(Message(body="Message %d" % self.n_sent))
+                self.n_sent += 1
+
+    def on_released(self, event):
+        self.n_released += 1
+        if self.n_released == self.rel_count and self.all_msgs_received:
+            self.receiver1_conn.close()
+            self.receiver2_conn.close()
+            self.sender_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+class MobileAddressMulticastTest(MessagingHandler):
+    def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+                 sender_host, address, check_addr_host=None, large_msg=False):
+        super(MobileAddressMulticastTest, self).__init__()
+        self.receiver1_host = receiver1_host
+        self.receiver2_host = receiver2_host
+        self.receiver3_host = receiver3_host
+        self.sender_host = sender_host
+        self.address = address
+
+        # One sender connection and two receiver connections
+        self.receiver1_conn = None
+        self.receiver2_conn = None
+        self.receiver3_conn = None
+        self.sender_conn = None
+
+        self.receiver1 = None
+        self.receiver2 = None
+        self.receiver3 = None
+        self.sender = None
+
+        self.count = 200
+        self.n_rcvd1 = 0
+        self.n_rcvd2 = 0
+        self.n_rcvd3 = 0
+        self.n_sent = 0
+        self.n_settled = 0
+        self.n_released = 0
+        self.error = None
+        self.timer = None
+        self.all_msgs_received = False
+        self.recvd1_msgs = dict()
+        self.recvd2_msgs = dict()
+        self.recvd3_msgs = dict()
+        self.dup_msg_rcvd = False
+        self.dup_msg = None
+        self.receiver_name = None
+        self.large_msg = large_msg
+        self.body = ""
+        self.r_attaches = 0
+        self.reactor = None
+        self.addr_timer = None
+
+        # The maximum number of times we are going to try to check if the
+        # address  has propagated.
+        self.max_attempts = 5
+        self.num_attempts = 0
+        self.num_attempts = 0
+        self.container = None
+        self.check_addr_host = check_addr_host
+        if not self.check_addr_host:
+            self.check_addr_host = self.sender_host
+
+        if self.large_msg:
+            for i in range(10000):
+                self.body += "0123456789101112131415"
+
+    def timeout(self):
+        if self.dup_msg:
+            self.error = "%s received  duplicate message %s" % \
+                         (self.receiver_name, self.dup_msg)
+        else:
+            if not self.error:
+                self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+                             "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+                             (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+                              self.n_rcvd3, self.address)
+        self.receiver1_conn.close()
+        self.receiver2_conn.close()
+        self.receiver3_conn.close()
+        if self.sender_conn:
+            self.sender_conn.close()
+
+    def create_sndr(self):
+        self.sender_conn = self.container.connect(self.sender_host)
+        self.sender = self.container.create_sender(self.sender_conn,
+                                                   self.address)
+
+    def check_address(self):
+        local_node = Node.connect(self.check_addr_host, timeout=TIMEOUT)
+        outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
+        found = False
+        self.num_attempts += 1
+        for result in outs.results:
+            if self.address in result[0]:
+                found = True
+                self.create_sndr()
+                local_node.close()
+                self.addr_timer.cancel()
+                break
+
+        if not found:
+
+            if self.num_attempts < self.max_attempts:
+                self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
+            else:
+                self.error = "Unable to create sender because of " \
+                             "absence of address in the address table"
+                self.timeout()
+                local_node.close()
+
+    def on_start(self, event):
+        if self.large_msg:
+            self.timer = event.reactor.schedule(10.0, Timeout(self))
+        else:
+            self.timer = event.reactor.schedule(20.0, Timeout(self))
+
+        # Create two receivers
+        self.receiver1_conn = event.container.connect(self.receiver1_host)
+        self.receiver2_conn = event.container.connect(self.receiver2_host)
+        self.receiver3_conn = event.container.connect(self.receiver3_host)
+        self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+                                                         self.address)
+        self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+                                                         self.address)
+        self.receiver3 = event.container.create_receiver(self.receiver3_conn,
+                                                         self.address)
+        self.container = event.container
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver1 or \
+                event.receiver == self.receiver2 or \
+                event.receiver == self.receiver3:
+            self.r_attaches += 1
+            if self.r_attaches == 3:
+                self.reactor = event.reactor
+                self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
+
+    def on_sendable(self, event):
+        while self.n_sent < self.count:
+            msg = None
+            if self.large_msg:
+                msg = Message(body=self.body)
+            else:
+                msg = Message(body="Message %d" % self.n_sent)
+            msg.correlation_id = self.n_sent
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver1:
+            if self.recvd1_msgs.get(event.message.correlation_id):
+                self.dup_msg = event.message.correlation_id
+                self.receiver_name = "Receiver 1"
+                self.timeout()
+            self.n_rcvd1 += 1
+            self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id
+        if event.receiver == self.receiver2:
+            if self.recvd2_msgs.get(event.message.correlation_id):
+                self.dup_msg = event.message.correlation_id
+                self.receiver_name = "Receiver 2"
+                self.timeout()
+            self.n_rcvd2 += 1
+            self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id
+        if event.receiver == self.receiver3:
+            if self.recvd3_msgs.get(event.message.correlation_id):
+                self.dup_msg = event.message.correlation_id
+                self.receiver_name = "Receiver 3"
+                self.timeout()
+            self.n_rcvd3 += 1
+            self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id
+
+        if self.n_rcvd1 == self.count and self.n_rcvd2 == self.count and \
+                self.n_rcvd3 == self.count:
+            self.timer.cancel()
+            self.receiver1_conn.close()
+            self.receiver2_conn.close()
+            self.receiver3_conn.close()
+            self.sender_conn.close()
+
+    def run(self):
+        Container(self).run()
+
+
+class MobileAddressEventTest(MessagingHandler):
+    def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+                 sender_host, interior_host, address):
+        super(MobileAddressEventTest, self).__init__(auto_accept=False)
+        self.receiver1_host = receiver1_host
+        self.receiver2_host = receiver2_host
+        self.receiver3_host = receiver3_host
+        self.sender_host = sender_host
+        self.address = address
+        self.receiver1_conn = None
+        self.receiver2_conn = None
+        self.receiver3_conn = None
+        self.sender_conn = None
+        self.recvd1_msgs = dict()
+        self.recvd2_msgs = dict()
+        self.recvd3_msgs = dict()
+        self.n_rcvd1 = 0
+        self.n_rcvd2 = 0
+        self.n_rcvd3 = 0
+        self.timer = None
+        self.receiver1 = None
+        self.receiver2 = None
+        self.receiver3 = None
+        self.sender = None
+        self.interior_host = interior_host
+        self.container = None
+        self.count = 600
+        self.dup_msg = None
+        self.receiver_name = None
+        self.n_sent = 0
+        self.error = None
+        self.r_attaches = 0
+        self.n_released = 0
+        self.n_settled = 0
+        self.addr_timer = None
+        self.container = None
+
+    def timeout(self):
+        if self.dup_msg:
+            self.error = "%s received  duplicate message %s" % \
+                         (self.receiver_name, self.dup_msg)
+        else:
+            if not self.error:
+                self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+                             "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+                             (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+                              self.n_rcvd3, self.address)
+        self.receiver1_conn.close()
+        self.receiver2_conn.close()
+        self.receiver3_conn.close()
+        if self.sender_conn:
+            self.sender_conn.close()
+
+    def check_address(self):
+        local_node = Node.connect(self.interior_host, timeout=TIMEOUT)
+        outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
+        remote_count = outs.attribute_names.index("remoteCount")
+        found = False
+        for result in outs.results:
+
+            if self.address in result[0]:
+                found = True
+                self.sender_conn = self.container.connect(self.sender_host)
+                self.sender = self.container.create_sender(self.sender_conn,
+                                                           self.address)
+                break
+
+        if not found:
+            self.error = "Unable to create sender because of " \
+                         "absence of address in the address table"
+            self.addr_timer.cancel()
+            self.timeout()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(10.0, Timeout(self))
+
+        # Create two receivers
+        self.receiver1_conn = event.container.connect(self.receiver1_host)
+        self.receiver2_conn = event.container.connect(self.receiver2_host)
+        self.receiver3_conn = event.container.connect(self.receiver3_host)
+
+        # Create all 3 receivers first.
+        self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+                                                         self.address)
+        self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+                                                         self.address)
+        self.receiver3 = event.container.create_receiver(self.receiver3_conn,
+                                                         self.address)
+        self.container = event.container
+
+        self.addr_timer = event.reactor.schedule(1.0, AddrTimer(self))
+
+    def on_sendable(self, event):
+        if self.n_sent < self.count:
+            msg = Message(body="Message %d" % self.n_sent)
+            msg.correlation_id = self.n_sent
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver1:
+            if self.recvd1_msgs.get(event.message.correlation_id):
+                self.dup_msg = event.message.correlation_id
+                self.receiver_name = "Receiver 1"
+                self.timeout()
+            self.n_rcvd1 += 1
+            self.recvd1_msgs[
+                event.message.correlation_id] = event.message.correlation_id
+
+            event.delivery.settle()
+
+        if event.receiver == self.receiver2:
+            if self.recvd2_msgs.get(event.message.correlation_id):
+                self.dup_msg = event.message.correlation_id
+                self.receiver_name = "Receiver 2"
+                self.timeout()
+            self.n_rcvd2 += 1
+            self.recvd2_msgs[
+                event.message.correlation_id] = event.message.correlation_id
+
+            event.delivery.settle()
+
+        if event.receiver == self.receiver3:
+            if self.recvd3_msgs.get(event.message.correlation_id):
+                self.dup_msg = event.message.correlation_id
+                self.receiver_name = "Receiver 3"
+                self.timeout()
+            self.n_rcvd3 += 1
+            self.recvd3_msgs[
+                event.message.correlation_id] = event.message.correlation_id
+
+            event.delivery.settle()
+
+    def on_settled(self, event):
+        if self.n_rcvd1 + self.n_rcvd2 + self.n_rcvd3 == self.count and \
+                self.n_rcvd2 !=0 and self.n_rcvd3 !=0:
+            self.timer.cancel()
+            self.receiver1_conn.close()
+            self.receiver2_conn.close()
+            self.receiver3_conn.close()
+            self.sender_conn.close()
+
+    def on_released(self, event):
+        self.n_released += 1
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org