You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/11/01 16:24:39 UTC
qpid-dispatch git commit: DISPATCH-1160 - Added edge address tracking
module to interior router This closes #410
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 9af66023d -> 6cf2f20ab
DISPATCH-1160 - Added edge address tracking module to interior router
This closes #410
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6cf2f20a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6cf2f20a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6cf2f20a
Branch: refs/heads/master
Commit: 6cf2f20ab297d963330e635c53393cba9dc7e8a4
Parents: 9af6602
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Thu Oct 18 11:46:42 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Nov 1 11:54:15 2018 -0400
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
.../edge_addr_tracking/edge_addr_tracking.c | 373 ++++++++++
src/router_core/router_core.c | 13 +
src/router_core/router_core_private.h | 7 +
tests/system_tests_edge_router.py | 709 ++++++++++++++++++-
5 files changed, 1098 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b8eb36f..d7f3405 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -105,6 +105,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/edge_router/addr_proxy.c
router_core/modules/edge_router/connection_manager.c
router_core/modules/test_hooks/core_test_hooks.c
+ router_core/modules/edge_addr_tracking/edge_addr_tracking.c
router_node.c
router_pynode.c
schema_enum.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
new file mode 100644
index 0000000..43244e9
--- /dev/null
+++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+#include <stdio.h>
+
+typedef struct qdr_addr_tracking_module_context_t qdr_addr_tracking_module_context_t;
+typedef struct qdr_addr_endpoint_state_t qdr_addr_endpoint_state_t;
+
+struct qdr_addr_endpoint_state_t {
+ DEQ_LINKS(qdr_addr_endpoint_state_t);
+ qdrc_endpoint_t *endpoint;
+ qdr_connection_t *conn; // The connection associated with the endpoint.
+ qdr_addr_tracking_module_context_t *mc;
+};
+
+DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
+ALLOC_DECLARE(qdr_addr_endpoint_state_t);
+ALLOC_DEFINE(qdr_addr_endpoint_state_t);
+
+struct qdr_addr_tracking_module_context_t {
+ qdr_core_t *core;
+ qdr_addr_endpoint_state_list_t endpoint_state_list;
+ qdrc_event_subscription_t *event_sub;
+ qdrc_endpoint_desc_t addr_tracking_endpoint;
+};
+
+
+static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t *addr, bool insert_addr)
+{
+ qd_message_t *msg = qd_message();
+
+ //
+ // Start header
+ //
+ qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(fld);
+ qd_compose_insert_bool(fld, 0); // durable
+ qd_compose_end_list(fld);
+
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ qd_compose_start_list(body);
+
+ const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
+
+ qd_compose_insert_string(body, addr_str);
+ qd_compose_insert_bool(body, insert_addr);
+ qd_compose_end_list(body);
+
+ // Finally, compose and retuen the message so it can be sent out.
+ qd_message_compose_3(msg, fld, body);
+
+ return msg;
+}
+
+static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
+ while(endpoint_state) {
+ if (endpoint_state->conn == conn) {
+ return endpoint_state;
+ }
+ endpoint_state = DEQ_NEXT(endpoint_state);
+ }
+ return 0;
+}
+
+
+static void qdrc_address_endpoint_first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+ qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
+
+ qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
+
+ ZERO(endpoint_state);
+ endpoint_state->endpoint = endpoint;
+ endpoint_state->mc = bc;
+ endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint);
+
+ DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
+
+
+ //
+ // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link
+ // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
+ //
+ if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
+ *link_context = endpoint_state;
+ qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
+ }
+ else {
+ //
+ // We simply detach any links that dont match the above condition.
+ //
+ *link_context = 0;
+ qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
+ }
+}
+
+
+static void qdrc_address_endpoint_on_first_detach(void *link_context,
+ qdr_error_t *error)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link_context;
+ qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0);
+ qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+ DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+}
+
+
+static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
+{
+ if (!addr)
+ return false;
+
+ bool can_send = false;
+ if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) {
+ // There is at least one receiver for this address somewhere in the router network
+ can_send = true;
+ }
+ if (!can_send) {
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+ if (link_ref->link->conn != conn)
+ can_send=true;
+ }
+ }
+ return can_send;
+}
+
+
+static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr)
+{
+ if (!addr)
+ return;
+
+ if (!endpoint)
+ return;
+
+ qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
+ qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
+
+ qdrc_endpoint_send_CT(core, endpoint, dlv, true);
+}
+
+static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
+{
+ // We only care about mobile addresses.
+ if(!qdr_address_is_mobile_CT(addr))
+ return;
+
+ qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context;
+ switch (event) {
+ case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
+ //
+ // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+ }
+ case QDRC_EVENT_ADDR_BECAME_DEST : {
+ //
+ // This address transitioned from zero to one destination. If this address already had local destinations
+ //
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+
+ case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
+ // The address no longer has any local destinations.
+ // If there are no remote destinations either, we have to tell the edge routers to delete their sender links
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the disappearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+
+ break;
+ }
+ case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
+ //
+ // This address transitioned from N destinations to one local dest
+ // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it
+ //
+ assert(DEQ_SIZE(addr->rlinks) == 1);
+ //
+ // There should be only one rlink in the rlinks list
+ //
+ qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+ qdr_link_t *link = rlink_ref->link;
+
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ if (endpoint_state->conn == link->conn) {
+ qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+ break;
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+ case QDRC_EVENT_ADDR_TWO_DEST: {
+ //
+ // The address transitioned from one local dest to two destinations, The second destination might be local or remote.
+ //
+ qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+ qdr_link_t *link = rlink_ref->link;
+
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ if (link->conn == endpoint_state->conn) {
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ break;
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+}
+
+static void on_link_event(void *context, qdrc_event_t event, qdr_link_t *link)
+{
+ switch (event) {
+ case QDRC_EVENT_LINK_EDGE_DATA_ATTACHED :
+ {
+ qdr_addr_tracking_module_context_t *mc = (qdr_addr_tracking_module_context_t *) context;
+ qdr_address_t *addr = link->owning_addr;
+ if (addr && qdr_address_is_mobile_CT(addr)) {
+ qdr_addr_endpoint_state_t *endpoint_state = qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list, link->conn, link);
+ link->edge_context = endpoint_state;
+
+ if (qdrc_can_send_address(addr, link->conn) && endpoint_state) {
+ qdrc_send_message(mc->core, addr, endpoint_state->endpoint, true);
+ }
+ }
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+
+static void qdrc_edge_address_tracking_module_init_CT(qdr_core_t *core, void **module_context)
+{
+ //
+ // Address tracking is ennabled only on interior routers
+ //
+ if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
+ return;
+
+ qdr_addr_tracking_module_context_t *context = NEW(qdr_addr_tracking_module_context_t);
+ ZERO(context);
+ context->core = core;
+ *module_context = context;
+
+
+ //
+ // Bind to the static address QD_TERMINUS_EDGE_ADDRESS_TRACKING
+ //
+ context->addr_tracking_endpoint.label = "qdrc_edge_address_tracking_module_init_CT";
+ context->addr_tracking_endpoint.on_first_attach = qdrc_address_endpoint_first_attach;
+ context->addr_tracking_endpoint.on_first_detach = qdrc_address_endpoint_on_first_detach;
+ qdrc_endpoint_bind_mobile_address_CT(core, QD_TERMINUS_EDGE_ADDRESS_TRACKING, '0', &context->addr_tracking_endpoint, context);
+
+ //
+ // Subscribe to address and link events.
+ //
+ context->event_sub = qdrc_event_subscribe_CT(core,
+ QDRC_EVENT_ADDR_BECAME_LOCAL_DEST | QDRC_EVENT_ADDR_ONE_LOCAL_DEST |
+ QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST | QDRC_EVENT_ADDR_BECAME_DEST | QDRC_EVENT_ADDR_TWO_DEST |
+ QDRC_EVENT_LINK_EDGE_DATA_ATTACHED,
+ 0,
+ on_link_event,
+ on_addr_event,
+ context);
+}
+
+
+static void qdrc_edge_address_tracking_module_final_CT(void *module_context)
+{
+ if (module_context == 0)
+ return;
+
+ qdr_addr_tracking_module_context_t *mc = ( qdr_addr_tracking_module_context_t *)module_context;
+
+ // If there are any endpoint states still hanging around, clean it up.
+ qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(mc->endpoint_state_list);
+ while (endpoint_state) {
+ DEQ_REMOVE_HEAD(mc->endpoint_state_list);
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+ endpoint_state = DEQ_HEAD(mc->endpoint_state_list);
+ }
+ qdrc_event_unsubscribe_CT(mc->core, mc->event_sub);
+ free(mc);
+}
+
+
+QDR_CORE_MODULE_DECLARE("edge_addr_tracking", qdrc_edge_address_tracking_module_init_CT, qdrc_edge_address_tracking_module_final_CT)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 119a5a1..c990f7b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -362,6 +362,19 @@ qdr_address_t *qdr_add_mobile_address_CT(qdr_core_t *core, const char *prefix, c
}
+bool qdr_address_is_mobile_CT(qdr_address_t *addr)
+{
+ if (!addr)
+ return false;
+
+ const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
+
+ if (addr_str && addr_str[0] == QD_ITER_HASH_PREFIX_MOBILE)
+ return true;
+
+ return false;
+}
+
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr)
{
if (addr) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index d87aec6..ec51fd7 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -841,6 +841,13 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain);
void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr);
void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
+
+/**
+ * Returns true if the passed in address is a mobile address, false otherwise
+ * If the first character of the address_key (obtained using its hash_handle) is M, the address is mobile.
+ */
+bool qdr_address_is_mobile_CT(qdr_address_t *addr);
+
void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6cf2f20a/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index d7d16ae..1012dd3 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -22,6 +22,16 @@ from proton import Message, Timeout
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties
+from qpid_dispatch.management.client import Node
+
+
+class AddrTimer(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.check_address()
+
class RouterTest(TestCase):
@@ -146,7 +156,6 @@ class RouterTest(TestCase):
self.assertEqual(None, test.error)
def test_12_mobile_address_edge_to_interior(self):
- self.skipTest("Temporarily disabled")
test = MobileAddressTest(self.routers[0].addresses[0],
self.routers[2].addresses[0],
"test_12")
@@ -154,7 +163,6 @@ class RouterTest(TestCase):
self.assertEqual(None, test.error)
def test_13_mobile_address_edge_to_edge_one_interior(self):
- self.skipTest("Temporarily disabled")
test = MobileAddressTest(self.routers[2].addresses[0],
self.routers[3].addresses[0],
"test_13")
@@ -162,13 +170,279 @@ class RouterTest(TestCase):
self.assertEqual(None, test.error)
def test_14_mobile_address_edge_to_edge_two_interior(self):
- self.skipTest("Temporarily disabled")
test = MobileAddressTest(self.routers[2].addresses[0],
self.routers[4].addresses[0],
"test_14")
test.run()
self.assertEqual(None, test.error)
+ # One sender two receiver tests.
+ # One sender and two receivers on the same edge
+ def test_15_mobile_address_same_edge(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "test_15")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # One sender and two receivers on the different edges. The edges are
+ # hanging off the same interior router.
+ def test_16_mobile_address_edge_to_another_edge_same_interior(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ "test_16")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on the interior and sender on the edge
+ def test_17_mobile_address_edge_to_interior(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[0].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "test_17")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on the edge and the sender on the interior
+ def test_18_mobile_address_interior_to_edge(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ "test_18")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on the edge and the sender on the 'other' interior
+ def test_19_mobile_address_other_interior_to_edge(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[1].addresses[0],
+ "test_19")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on the edge and the sender on the edge of
+ # the 'other' interior
+ def test_20_mobile_address_edge_to_edge_two_interiors(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[5].addresses[0],
+ "test_20")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # One receiver in an edge, another one in interior and the sender
+ # is on the edge of another interior
+ def test_21_mobile_address_edge_interior_receivers(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[4].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[2].addresses[0],
+ "test_21")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers one on each interior router and and an edge sender
+ # connectoed to the first interior
+ def test_22_mobile_address_edge_sender_two_interior_receivers(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[0].addresses[0],
+ self.routers[1].addresses[0],
+ self.routers[3].addresses[0],
+ "test_22")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_23_mobile_address_edge_sender_two_edge_receivers(self):
+ test = MobileAddressOneSenderTwoReceiversTest(self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[2].addresses[0],
+ "test_23")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # 1 Sender and 3 receivers all on the same edge
+ def test_24_multicast_mobile_address_same_edge(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.24")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # 1 Sender and receiver on one edge and 2 receivers on another edge
+ # all in the same interior
+ def test_25_multicast_mobile_address_different_edges_same_interior(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[3].addresses[0],
+ "multicast.25",
+ self.routers[0].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on each edge, one receiver on interior and sender
+ # on the edge
+ def test_26_multicast_mobile_address_edge_to_interior(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.26",
+ self.routers[0].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on the interior
+ def test_27_multicast_mobile_address_interior_to_edge(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.27",
+ self.routers[0].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on an interior that is not connected
+ # to the edges.
+ def test_28_multicast_mobile_address_other_interior_to_edge(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[1].addresses[0],
+ "multicast.28")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Sender on an interior and 3 receivers connected to three different edges
+ def test_29_multicast_mobile_address_edge_to_edge_two_interiors(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.29")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_30_multicast_mobile_address_all_edges(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ "multicast.30",
+ self.routers[0].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+
+ ######### Multicast Large message tests ######################
+
+ # 1 Sender and 3 receivers all on the same edge
+ def test_31_multicast_mobile_address_same_edge(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.31", large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # 1 Sender on one edge and 3 receivers on another edge all in the same
+ # interior
+ def test_32_multicast_mobile_address_different_edges_same_interior(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[3].addresses[0],
+ "multicast.32",
+ self.routers[0].addresses[0],
+ large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Two receivers on each edge, one receiver on interior and sender
+ # on the edge
+ def test_33_multicast_mobile_address_edge_to_interior(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ self.routers[2].addresses[0],
+ "multicast.33", large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on the interior
+ def test_34_multicast_mobile_address_interior_to_edge(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.34", large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Receivers on the edge and sender on an interior that is not connected
+ # to the edges.
+ def test_35_multicast_mobile_address_other_interior_to_edge(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[1].addresses[0],
+ "multicast.35",
+ self.routers[0].addresses[0],
+ large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ # Sender on an interior and 3 receivers connected to three different edges
+ def test_36_multicast_mobile_address_edge_to_edge_two_interiors(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[0].addresses[0],
+ "multicast.36", large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_37_multicast_mobile_address_all_edges(self):
+ test = MobileAddressMulticastTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ "multicast.37",
+ self.routers[0].addresses[0],
+ large_msg=True)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_38_mobile_addr_event_three_receivers_same_interior(self):
+ test = MobileAddressEventTest(self.routers[2].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[3].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ "test_38")
+
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_39_mobile_addr_event_three_receivers_diff_interior(self):
+ # This will test the QDRC_EVENT_ADDR_TWO_DEST event
+ test = MobileAddressEventTest(self.routers[2].addresses[0],
+ self.routers[4].addresses[0],
+ self.routers[5].addresses[0],
+ self.routers[2].addresses[0],
+ self.routers[0].addresses[0],
+ "test_39")
+
+ test.run()
+ self.assertEqual(None, test.error)
+
+
class Timeout(object):
def __init__(self, parent):
@@ -296,7 +570,10 @@ class MobileAddressTest(MessagingHandler):
self.receiver_conn = None
self.sender_conn = None
+
self.receiver = None
+ self.sender = None
+
self.count = 300
self.rel_count = 50
self.n_rcvd = 0
@@ -320,7 +597,8 @@ class MobileAddressTest(MessagingHandler):
def on_sendable(self, event):
while self.n_sent < self.count:
- self.sender.send(Message(body="Message %d" % self.n_sent))
+ message = Message(body="Message %d" % self.n_sent)
+ self.sender.send(message)
self.n_sent += 1
def on_message(self, event):
@@ -345,5 +623,426 @@ class MobileAddressTest(MessagingHandler):
Container(self).run()
-if __name__ == '__main__':
+class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, sender_host, address):
+ super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.sender = None
+
+ self.count = 300
+ self.rel_count = 50
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd_msg_bodies = dict()
+ self.dup_msg = None
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "Duplicate message %s received " % self.dup_msg
+ else:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
+ (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address)
+
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+ self.address)
+
+ # Create one sender
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.sender = event.container.create_sender(self.sender_conn,
+ self.address)
+
+ def on_sendable(self, event):
+ while self.n_sent < self.count:
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if self.recvd_msg_bodies.get(event.message.body):
+ self.dup_msg = event.message.body
+ self.timeout()
+ else:
+ self.recvd_msg_bodies[event.message.body] = event.message.body
+
+ if event.receiver == self.receiver1:
+ self.n_rcvd1 += 1
+ if event.receiver == self.receiver2:
+ self.n_rcvd2 += 1
+
+ if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
+ self.all_msgs_received = True
+
+ def on_settled(self, event):
+ self.n_settled += 1
+ if self.n_settled == self.count:
+ self.receiver1.close()
+ self.receiver2.close()
+ for i in range(self.rel_count):
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == self.rel_count and self.all_msgs_received:
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class MobileAddressMulticastTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+ sender_host, address, check_addr_host=None, large_msg=False):
+ super(MobileAddressMulticastTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.receiver3_host = receiver3_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.receiver3_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.receiver3 = None
+ self.sender = None
+
+ self.count = 200
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_rcvd3 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd1_msgs = dict()
+ self.recvd2_msgs = dict()
+ self.recvd3_msgs = dict()
+ self.dup_msg_rcvd = False
+ self.dup_msg = None
+ self.receiver_name = None
+ self.large_msg = large_msg
+ self.body = ""
+ self.r_attaches = 0
+ self.reactor = None
+ self.addr_timer = None
+
+ # The maximum number of times we are going to try to check if the
+ # address has propagated.
+ self.max_attempts = 5
+ self.num_attempts = 0
+ self.num_attempts = 0
+ self.container = None
+ self.check_addr_host = check_addr_host
+ if not self.check_addr_host:
+ self.check_addr_host = self.sender_host
+
+ if self.large_msg:
+ for i in range(10000):
+ self.body += "0123456789101112131415"
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "%s received duplicate message %s" % \
+ (self.receiver_name, self.dup_msg)
+ else:
+ if not self.error:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+ "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+ (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+ self.n_rcvd3, self.address)
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ if self.sender_conn:
+ self.sender_conn.close()
+
+ def create_sndr(self):
+ self.sender_conn = self.container.connect(self.sender_host)
+ self.sender = self.container.create_sender(self.sender_conn,
+ self.address)
+
+ def check_address(self):
+ local_node = Node.connect(self.check_addr_host, timeout=TIMEOUT)
+ outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
+ found = False
+ self.num_attempts += 1
+ for result in outs.results:
+ if self.address in result[0]:
+ found = True
+ self.create_sndr()
+ local_node.close()
+ self.addr_timer.cancel()
+ break
+
+ if not found:
+
+ if self.num_attempts < self.max_attempts:
+ self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
+ else:
+ self.error = "Unable to create sender because of " \
+ "absence of address in the address table"
+ self.timeout()
+ local_node.close()
+
+ def on_start(self, event):
+ if self.large_msg:
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+ else:
+ self.timer = event.reactor.schedule(20.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver3_conn = event.container.connect(self.receiver3_host)
+ self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+ self.address)
+ self.receiver3 = event.container.create_receiver(self.receiver3_conn,
+ self.address)
+ self.container = event.container
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver1 or \
+ event.receiver == self.receiver2 or \
+ event.receiver == self.receiver3:
+ self.r_attaches += 1
+ if self.r_attaches == 3:
+ self.reactor = event.reactor
+ self.addr_timer = self.reactor.schedule(1.0, AddrTimer(self))
+
+ def on_sendable(self, event):
+ while self.n_sent < self.count:
+ msg = None
+ if self.large_msg:
+ msg = Message(body=self.body)
+ else:
+ msg = Message(body="Message %d" % self.n_sent)
+ msg.correlation_id = self.n_sent
+ self.sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver1:
+ if self.recvd1_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 1"
+ self.timeout()
+ self.n_rcvd1 += 1
+ self.recvd1_msgs[event.message.correlation_id] = event.message.correlation_id
+ if event.receiver == self.receiver2:
+ if self.recvd2_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 2"
+ self.timeout()
+ self.n_rcvd2 += 1
+ self.recvd2_msgs[event.message.correlation_id] = event.message.correlation_id
+ if event.receiver == self.receiver3:
+ if self.recvd3_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 3"
+ self.timeout()
+ self.n_rcvd3 += 1
+ self.recvd3_msgs[event.message.correlation_id] = event.message.correlation_id
+
+ if self.n_rcvd1 == self.count and self.n_rcvd2 == self.count and \
+ self.n_rcvd3 == self.count:
+ self.timer.cancel()
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ self.sender_conn.close()
+
+ def run(self):
+ Container(self).run()
+
+
+class MobileAddressEventTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+ sender_host, interior_host, address):
+ super(MobileAddressEventTest, self).__init__(auto_accept=False)
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.receiver3_host = receiver3_host
+ self.sender_host = sender_host
+ self.address = address
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.receiver3_conn = None
+ self.sender_conn = None
+ self.recvd1_msgs = dict()
+ self.recvd2_msgs = dict()
+ self.recvd3_msgs = dict()
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_rcvd3 = 0
+ self.timer = None
+ self.receiver1 = None
+ self.receiver2 = None
+ self.receiver3 = None
+ self.sender = None
+ self.interior_host = interior_host
+ self.container = None
+ self.count = 600
+ self.dup_msg = None
+ self.receiver_name = None
+ self.n_sent = 0
+ self.error = None
+ self.r_attaches = 0
+ self.n_released = 0
+ self.n_settled = 0
+ self.addr_timer = None
+ self.container = None
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "%s received duplicate message %s" % \
+ (self.receiver_name, self.dup_msg)
+ else:
+ if not self.error:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+ "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+ (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+ self.n_rcvd3, self.address)
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ if self.sender_conn:
+ self.sender_conn.close()
+
+ def check_address(self):
+ local_node = Node.connect(self.interior_host, timeout=TIMEOUT)
+ outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
+ remote_count = outs.attribute_names.index("remoteCount")
+ found = False
+ for result in outs.results:
+
+ if self.address in result[0]:
+ found = True
+ self.sender_conn = self.container.connect(self.sender_host)
+ self.sender = self.container.create_sender(self.sender_conn,
+ self.address)
+ break
+
+ if not found:
+ self.error = "Unable to create sender because of " \
+ "absence of address in the address table"
+ self.addr_timer.cancel()
+ self.timeout()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver3_conn = event.container.connect(self.receiver3_host)
+
+ # Create all 3 receivers first.
+ self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+ self.address)
+ self.receiver3 = event.container.create_receiver(self.receiver3_conn,
+ self.address)
+ self.container = event.container
+
+ self.addr_timer = event.reactor.schedule(1.0, AddrTimer(self))
+
+ def on_sendable(self, event):
+ if self.n_sent < self.count:
+ msg = Message(body="Message %d" % self.n_sent)
+ msg.correlation_id = self.n_sent
+ self.sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver1:
+ if self.recvd1_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 1"
+ self.timeout()
+ self.n_rcvd1 += 1
+ self.recvd1_msgs[
+ event.message.correlation_id] = event.message.correlation_id
+
+ event.delivery.settle()
+
+ if event.receiver == self.receiver2:
+ if self.recvd2_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 2"
+ self.timeout()
+ self.n_rcvd2 += 1
+ self.recvd2_msgs[
+ event.message.correlation_id] = event.message.correlation_id
+
+ event.delivery.settle()
+
+ if event.receiver == self.receiver3:
+ if self.recvd3_msgs.get(event.message.correlation_id):
+ self.dup_msg = event.message.correlation_id
+ self.receiver_name = "Receiver 3"
+ self.timeout()
+ self.n_rcvd3 += 1
+ self.recvd3_msgs[
+ event.message.correlation_id] = event.message.correlation_id
+
+ event.delivery.settle()
+
+ def on_settled(self, event):
+ if self.n_rcvd1 + self.n_rcvd2 + self.n_rcvd3 == self.count and \
+ self.n_rcvd2 !=0 and self.n_rcvd3 !=0:
+ self.timer.cancel()
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ self.sender_conn.close()
+
+ def on_released(self, event):
+ self.n_released += 1
+
+ def run(self):
+ Container(self).run()
+
+
+if __name__== '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org