You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2018/11/12 18:18:31 UTC
qpid-dispatch git commit: DISPATCH-1181: add hint about treatment to
MAU and use that on receipt if there is no locally defined treatment
Repository: qpid-dispatch
Updated Branches:
refs/heads/master a78e15815 -> 7699d55b0
DISPATCH-1181: add hint about treatment to MAU and use that on receipt if there is no locally defined treatment
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7699d55b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7699d55b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7699d55b
Branch: refs/heads/master
Commit: 7699d55b057e5493fef37e3b3454feb7b35edfc3
Parents: a78e158
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Nov 9 22:43:10 2018 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Nov 12 10:03:46 2018 +0000
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 4 +-
python/qpid_dispatch_internal/router/data.py | 10 ++-
python/qpid_dispatch_internal/router/engine.py | 4 +-
python/qpid_dispatch_internal/router/mobile.py | 15 +++-
python/qpid_dispatch_internal/router/node.py | 4 +-
src/router_core/connections.c | 7 +-
src/router_core/exchange_bindings.c | 3 +-
src/router_core/route_tables.c | 34 +++++++--
src/router_core/router_core.c | 4 +-
src/router_core/router_core_private.h | 5 +-
src/router_pynode.c | 10 ++-
tests/system_tests_two_routers.py | 84 +++++++++++++++++++++
12 files changed, 158 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 4d61275..b8d48ed 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -85,10 +85,10 @@ void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_m
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
void qdr_core_set_cost(qdr_core_t *core, int router_maskbit, int cost);
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash, int treatment_hint);
void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash);
-typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash);
+typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash, qd_address_treatment_t treatment);
typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash);
typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py
index 6c1db7f..810f98d 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -250,7 +250,7 @@ class MessageLSR(object):
class MessageMAU(object):
"""
"""
- def __init__(self, body, _id=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None):
+ def __init__(self, body, _id=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None, _hints=None):
if body:
self.id = getMandatory(body, 'id', PY_TEXT_TYPE)
self.version = getOptional(body, 'pv', 0, PY_LONG_TYPE)
@@ -259,6 +259,7 @@ class MessageMAU(object):
self.add_list = getOptional(body, 'add', None, list)
self.del_list = getOptional(body, 'del', None, list)
self.exist_list = getOptional(body, 'exist', None, list)
+ self.hints = getOptional(body, 'hints', None, list)
else:
self.id = _id
self.version = ProtocolVersion
@@ -267,6 +268,7 @@ class MessageMAU(object):
self.add_list = _add_list
self.del_list = _del_list
self.exist_list = _exist_list
+ self.hints = _hints
def get_opcode(self):
return 'MAU'
@@ -278,8 +280,9 @@ class MessageMAU(object):
if self.add_list != None: _add = ' add=%r' % self.add_list
if self.del_list != None: _del = ' del=%r' % self.del_list
if self.exist_list != None: _exist = ' exist=%r' % self.exist_list
- return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \
- (self.id, self.version, self.area, self.mobile_seq, _add, _del, _exist)
+ if self.hints != None: _hints = ' hints=%r' % self.hints
+ return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s%s)" % \
+ (self.id, self.version, self.area, self.mobile_seq, _add, _del, _exist, _hints)
def to_dict(self):
body = {'id' : self.id,
@@ -289,6 +292,7 @@ class MessageMAU(object):
if self.add_list != None: body['add'] = self.add_list
if self.del_list != None: body['del'] = self.del_list
if self.exist_list != None: body['exist'] = self.exist_list
+ if self.hints != None: body['hints'] = self.hints
return body
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py
index 13d7a78..753d743 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -98,12 +98,12 @@ class RouterEngine(object):
raise ValueError("No router configuration found")
return self._config
- def addressAdded(self, addr):
+ def addressAdded(self, addr, treatment):
"""
"""
try:
if addr[0] in 'MCDEFH':
- self.mobile_address_engine.add_local_address(addr)
+ self.mobile_address_engine.add_local_address(addr, treatment)
except Exception:
self.log_ma(LOG_ERROR, "Exception in new-address processing\n%s" % format_exc(LOG_STACK_LIMIT))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/mobile.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/mobile.py b/python/qpid_dispatch_internal/router/mobile.py
index 6e768c2..b4c73f9 100644
--- a/python/qpid_dispatch_internal/router/mobile.py
+++ b/python/qpid_dispatch_internal/router/mobile.py
@@ -43,6 +43,7 @@ class MobileAddressEngine(object):
self.added_addrs = []
self.deleted_addrs = []
self.sent_deltas = {}
+ self.treatments = {}
def tick(self, now):
@@ -52,7 +53,8 @@ class MobileAddressEngine(object):
##
if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
self.mobile_seq += 1
- msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, self.deleted_addrs)
+ hints = [self.treatments[a] for a in self.added_addrs]
+ msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, self.deleted_addrs, _hints=hints)
self.sent_deltas[self.mobile_seq] = msg
if len(self.sent_deltas) > MAX_KEPT_DELTAS:
@@ -68,9 +70,10 @@ class MobileAddressEngine(object):
return self.mobile_seq
- def add_local_address(self, addr):
+ def add_local_address(self, addr, treatment):
"""
"""
+ self.treatments[addr] = treatment
if self.local_addrs.count(addr) == 0:
if self.added_addrs.count(addr) == 0:
self.added_addrs.append(addr)
@@ -82,6 +85,7 @@ class MobileAddressEngine(object):
def del_local_address(self, addr):
"""
"""
+ del self.treatments[addr]
if self.local_addrs.count(addr) > 0:
if self.deleted_addrs.count(addr) == 0:
self.deleted_addrs.append(addr)
@@ -118,8 +122,13 @@ class MobileAddressEngine(object):
## This message represents the next expected sequence, incorporate the deltas
##
node.mobile_address_sequence += 1
+ treatments = msg.hints or []
for a in msg.add_list:
- node.map_address(a)
+ if len(treatments):
+ treatment = treatments.pop(0)
+ else:
+ treatment = -1
+ node.map_address(a, treatment)
for a in msg.del_list:
node.unmap_address(a)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index 8d8841e..16930fb 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -541,9 +541,9 @@ class RouterNode(object):
return False
- def map_address(self, addr):
+ def map_address(self, addr, treatment):
self.mobile_addresses.append(addr)
- self.adapter.map_destination(addr, self.maskbit)
+ self.adapter.map_destination(addr, treatment, self.maskbit)
self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % (self._logify(addr), self.id))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 1ca6324..db271f4 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1015,12 +1015,17 @@ qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connec
qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter)
{
+ return qdr_treatment_for_address_hash_with_default_CT(core, iter, core->qd->default_treatment);
+}
+
+qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment)
+{
#define HASH_STORAGE_SIZE 1000
char storage[HASH_STORAGE_SIZE + 1];
char *copy = storage;
bool on_heap = false;
int length = qd_iterator_length(iter);
- qd_address_treatment_t trt = core->qd->default_treatment;
+ qd_address_treatment_t trt = default_treatment;
if (length > HASH_STORAGE_SIZE) {
copy = (char*) malloc(length + 1);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/exchange_bindings.c
----------------------------------------------------------------------
diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c
index 43a5ca4..15d369e 100644
--- a/src/router_core/exchange_bindings.c
+++ b/src/router_core/exchange_bindings.c
@@ -927,7 +927,8 @@ static qdr_exchange_t *qdr_exchange(qdr_core_t *core,
}
qdr_post_mobile_added_CT(core,
- (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle));
+ (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle),
+ ex->qdr_addr->treatment);
}
return ex;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 84834fe..28b1414 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -108,11 +108,12 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask
}
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash)
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash, int treatment_hint)
{
qdr_action_t *action = qdr_action(qdr_map_destination_CT, "map_destination");
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address_hash);
+ action->args.route_table.treatment_hint = treatment_hint;
qdr_action_enqueue(core, action);
}
@@ -557,11 +558,30 @@ static void qdr_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, boo
qd_bitmask_free(valid_origins);
}
+static qd_address_treatment_t default_treatment(qdr_core_t *core, int hint) {
+ switch (hint) {
+ case QD_TREATMENT_MULTICAST_FLOOD:
+ return QD_TREATMENT_MULTICAST_FLOOD;
+ case QD_TREATMENT_MULTICAST_ONCE:
+ return QD_TREATMENT_MULTICAST_ONCE;
+ case QD_TREATMENT_ANYCAST_CLOSEST:
+ return QD_TREATMENT_ANYCAST_CLOSEST;
+ case QD_TREATMENT_ANYCAST_BALANCED:
+ return QD_TREATMENT_ANYCAST_BALANCED;
+ case QD_TREATMENT_LINK_BALANCED:
+ return QD_TREATMENT_LINK_BALANCED;
+ case QD_TREATMENT_UNAVAILABLE:
+ return QD_TREATMENT_UNAVAILABLE;
+ default:
+ return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment;
+ }
+}
static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
+ int treatment_hint = action->args.route_table.treatment_hint;
if (discard) {
qdr_field_free(address);
@@ -584,8 +604,11 @@ static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qdr_address_CT(core, qdr_treatment_for_address_hash_CT(core, iter));
- if (!addr) break;
+ addr = qdr_address_CT(core, qdr_treatment_for_address_hash_with_default_CT(core, iter, default_treatment(core, treatment_hint)));
+ if (!addr) {
+ qd_log(core->log, QD_LOG_CRITICAL, "map_destination: ignored");
+ break;
+ }
qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(core->addrs, addr);
@@ -730,7 +753,7 @@ static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work)
{
char *address_hash = qdr_field_copy(work->field);
if (address_hash) {
- core->rt_mobile_added(core->rt_context, address_hash);
+ core->rt_mobile_added(core->rt_context, address_hash, work->treatment);
free(address_hash);
}
@@ -756,10 +779,11 @@ static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work)
}
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash)
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment)
{
qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added);
work->field = qdr_field(address_hash);
+ work->treatment = treatment;
qdr_post_general_work_CT(core, work);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index af7a736..6b7e2b8 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -431,7 +431,7 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li
if (DEQ_SIZE(addr->rlinks) == 1) {
const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key == QD_ITER_HASH_PREFIX_MOBILE))
- qdr_post_mobile_added_CT(core, key);
+ qdr_post_mobile_added_CT(core, key, addr->treatment);
qdr_addr_start_inlinks_CT(core, addr);
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr);
} else if (DEQ_SIZE(addr->rlinks) == 2 && qd_bitmask_cardinality(addr->rnodes) == 0)
@@ -476,7 +476,7 @@ void qdr_core_bind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_co
qdr_add_connection_ref(&addr->conns, conn);
if (DEQ_SIZE(addr->conns) == 1) {
const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
- qdr_post_mobile_added_CT(core, key);
+ qdr_post_mobile_added_CT(core, key, addr->treatment);
qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index ec51fd7..19a1d1e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -94,6 +94,7 @@ struct qdr_action_t {
int router_maskbit;
int nh_router_maskbit;
int cost;
+ int treatment_hint;
qd_bitmask_t *router_set;
qdr_field_t *address;
} route_table;
@@ -193,6 +194,7 @@ struct qdr_general_work_t {
void *on_message_context;
qd_message_t *msg;
uint64_t in_conn_id;
+ int treatment;
};
ALLOC_DECLARE(qdr_general_work_t);
@@ -884,7 +886,7 @@ qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment);
void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
@@ -897,6 +899,7 @@ void qdr_connection_free(qdr_connection_t *conn);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase, int *priority);
qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter);
+qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment);
qdr_edge_t *qdr_edge(qdr_core_t *);
void qdr_edge_free(qdr_edge_t *);
void qdr_edge_connection_opened(qdr_edge_t *edge, qdr_connection_t *conn);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 1d64cfc..1d0c8d5 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -242,9 +242,10 @@ static PyObject* qd_map_destination(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
const char *addr_string;
+ int treatment;
int maskbit;
- if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+ if (!PyArg_ParseTuple(args, "sii", &addr_string, &treatment, &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -252,7 +253,7 @@ static PyObject* qd_map_destination(PyObject *self, PyObject *args)
return 0;
}
- qdr_core_map_destination(router->router_core, maskbit, addr_string);
+ qdr_core_map_destination(router->router_core, maskbit, addr_string, treatment);
Py_INCREF(Py_None);
return Py_None;
@@ -316,7 +317,7 @@ static PyTypeObject RouterAdapterType = {
};
-static void qd_router_mobile_added(void *context, const char *address_hash)
+static void qd_router_mobile_added(void *context, const char *address_hash, qd_address_treatment_t treatment)
{
qd_router_t *router = (qd_router_t*) context;
PyObject *pArgs;
@@ -324,8 +325,9 @@ static void qd_router_mobile_added(void *context, const char *address_hash)
if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
+ pArgs = PyTuple_New(2);
PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(address_hash));
+ PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) treatment));
pValue = PyObject_CallObject(pyAdded, pArgs);
qd_error_py();
Py_DECREF(pArgs);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 615f059..778b40f 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -1382,6 +1382,90 @@ class TwoRouterConnection(TestCase):
self.assertTrue(self.success)
+class PropagationTest(TestCase):
+
+ inter_router_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router and a messenger"""
+ super(PropagationTest, cls).setUpClass()
+
+ def router(name, extra_config):
+
+ config = [
+ ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
+
+ ('listener', {'port': cls.tester.get_port()}),
+
+ ] + extra_config
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+ router('A', [('listener', {'role': 'inter-router', 'port': inter_router_port}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'})])
+ router('B', [('connector', {'role': 'inter-router', 'port': inter_router_port})])
+
+ cls.routers[0].wait_router_connected('QDR.B')
+ cls.routers[1].wait_router_connected('QDR.A')
+
+ def test_propagation_of_locally_undefined_address(self):
+ test = MulticastTestClient(self.routers[0].addresses[0], self.routers[1].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+ self.assertEqual(test.received, 2)
+
+class CreateReceiver(MessagingHandler):
+ def __init__(self, connection, address):
+ super(CreateReceiver, self).__init__()
+ self.connection = connection
+ self.address = address
+
+ def on_timer_task(self, event):
+ event.container.create_receiver(self.connection, self.address)
+
+class DelayedSend(MessagingHandler):
+ def __init__(self, connection, address, message):
+ super(DelayedSend, self).__init__()
+ self.connection = connection
+ self.address = address
+ self.message = message
+
+ def on_timer_task(self, event):
+ event.container.create_sender(self.connection, self.address).send(self.message)
+
+class MulticastTestClient(MessagingHandler):
+ def __init__(self, router1, router2):
+ super(MulticastTestClient, self).__init__()
+ self.routers = [router1, router2]
+ self.received = 0
+ self.error = None
+
+ def on_start(self, event):
+ self.connections = [event.container.connect(r) for r in self.routers]
+ event.container.create_receiver(self.connections[0], "multicast")
+ # wait for knowledge of receiver1 to propagate to second router
+ event.container.schedule(5, CreateReceiver(self.connections[1], "multicast"))
+ event.container.schedule(7, DelayedSend(self.connections[1], "multicast", Message(body="testing1,2,3")))
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+
+ def on_message(self, event):
+ self.received += 1
+ event.connection.close()
+ if self.received == 2:
+ self.timer.cancel()
+
+ def timeout(self):
+ self.error = "Timeout Expired:received=%d" % self.received
+ for c in self.connections:
+ c.close()
+
+ def run(self):
+ Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org