You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/10/04 19:58:55 UTC
svn commit: r1529242 - in /qpid/trunk/qpid/extras/dispatch:
python/qpid/dispatch/router/path.py
python/qpid/dispatch/router/router_engine.py
python/qpid/dispatch/router/routing.py src/router_node.c
src/router_pynode.c tests/router_engine_test.py
Author: tross
Date: Fri Oct 4 17:58:54 2013
New Revision: 1529242
URL: http://svn.apache.org/r1529242
Log:
QPID-4967 - work in progress on multi-router networks
- Added computation of valid origins for destinations
- Modified the forwarding algorithm to ensure that only one copy of a message is sent on
a given inter-router link
- Added test coverage for valid origins
Modified:
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
qpid/trunk/qpid/extras/dispatch/src/router_node.c
qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py Fri Oct 4 17:58:54 2013
@@ -49,7 +49,7 @@ class PathEngine(object):
def _calculate_tree_from_root(self, root):
##
## Make a copy of the current collection of link-states that contains
- ## an empty link-state for nodes that are known-peers but are not in the
+ ## a fake link-state for nodes that are known-peers but are not in the
## collection currently. This is needed to establish routes to those nodes
## so we can trade link-state information with them.
##
@@ -58,7 +58,7 @@ class PathEngine(object):
link_states[_id] = ls.peers
for p in ls.peers:
if p not in link_states:
- link_states[p] = []
+ link_states[p] = [_id]
##
## Setup Dijkstra's Algorithm
@@ -102,11 +102,42 @@ class PathEngine(object):
return prev
+ def _calculate_valid_origins(self, nodeset):
+ ##
+ ## Calculate the tree from each origin, determine the set of origins-per-dest
+ ## for which the path from origin to dest passes through us. This is the set
+ ## of valid origins for forwarding to the destination.
+ ##
+ valid_origin = {} # Map of destination => List of Valid Origins
+ for node in nodeset:
+ if node != self.id:
+ valid_origin[node] = []
+
+ for root in valid_origin.keys():
+ prev = self._calculate_tree_from_root(root)
+ nodes = prev.keys()
+ while len(nodes) > 0:
+ u = nodes[0]
+ path = [u]
+ nodes.remove(u)
+ v = prev[u]
+ while v != root:
+ if v in nodes:
+ if v != self.id:
+ path.append(v)
+ nodes.remove(v)
+ if v == self.id:
+ valid_origin[root].extend(path)
+ u = v
+ v = prev[u]
+ return valid_origin
+
+
def _calculate_routes(self):
##
## Generate the shortest-path tree with the local node as root
##
- prev = self._calculate_tree_from_root(self.id)
+ prev = self._calculate_tree_from_root(self.id)
nodes = prev.keys()
##
@@ -127,12 +158,14 @@ class PathEngine(object):
for w in path: # mark each node in the path as reachable via the next hop
next_hops[w] = u
+ self.container.next_hops_changed(next_hops)
+
##
- ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
- ## for which the path from origin to dest passes through us. This is the set
- ## of valid origins for forwarding to the destination.
+ ## Calculate the valid origins for remote routers
##
- self.container.next_hops_changed(next_hops)
+ valid_origin = self._calculate_valid_origins(prev.keys())
+ self.container.valid_origins_changed(valid_origin)
+
class NodeSet(object):
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Fri Oct 4 17:58:54 2013
@@ -247,6 +247,10 @@ class RouterEngine:
self.routing_table_engine.next_hops_changed(next_hop_table)
self.binding_engine.next_hops_changed()
+ def valid_origins_changed(self, valid_origins):
+ self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
+ self.routing_table_engine.valid_origins_changed(valid_origins)
+
def mobile_sequence_changed(self, mobile_seq):
self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
self.link_state_engine.set_mobile_sequence(mobile_seq)
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py Fri Oct 4 17:58:54 2013
@@ -48,6 +48,15 @@ class RoutingTableEngine(object):
self.container.router_adapter.set_next_hop(mb_id, mb_nh)
+ def valid_origins_changes(self, valid_origins):
+ for _id, vo in valid_origins.items():
+ mb_id = self.node_tracker.maskbit_for_node(_id)
+ mb_vo = []
+ for o in vo:
+ mb_vo.append(self.node_tracker.maskbit_for_node(o))
+ self.container.router_adapted.set_valid_origins(mb_id, mb_vo)
+
+
def get_next_hops(self):
return self.next_hops
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Oct 4 17:58:54 2013
@@ -502,12 +502,34 @@ static void router_rx_handler(void* cont
if (origin >= 0) {
dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
dx_router_link_t *dest_link;
+ dx_bitmask_t *link_set = dx_bitmask(0);
+
+ //
+ // Loop over the target nodes for this address. Build a set of outgoing links
+ // for which there are valid targets. We do this to avoid sending more than one
+ // message down a given link. It's possible that there are multiple destinations
+ // for this address that are all reachable over the same link. In this case, we
+ // will send only one copy of the message over the link and allow a downstream
+ // router to fan the message out.
+ //
while (dest_node_ref) {
if (dest_node_ref->router->next_hop)
dest_link = dest_node_ref->router->next_hop->peer_link;
else
dest_link = dest_node_ref->router->peer_link;
- if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) {
+ if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin))
+ dx_bitmask_set_bit(link_set, dest_link->mask_bit);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
+
+ //
+ // Send a copy of the message outbound on each identified link.
+ //
+ int link_bit;
+ while (dx_bitmask_first_set(link_set, &link_bit)) {
+ dx_bitmask_clear_bit(link_set, link_bit);
+ dest_link = router->out_links_by_mask_bit[link_bit];
+ if (link) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
@@ -522,8 +544,9 @@ static void router_rx_handler(void* cont
dx_link_activate(dest_link->link);
}
- dest_node_ref = DEQ_NEXT(dest_node_ref);
}
+
+ dx_bitmask_free(link_set);
}
}
}
Modified: qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_pynode.c?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_pynode.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_pynode.c Fri Oct 4 17:58:54 2013
@@ -246,6 +246,61 @@ static PyObject* dx_set_next_hop(PyObjec
}
+static PyObject* dx_set_valid_origins(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+ PyObject *origin_list;
+ Py_ssize_t idx;
+
+ if (!PyArg_ParseTuple(args, "iO", &router_maskbit, &origin_list))
+ return 0;
+
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ if (!PyList_Check(origin_list)) {
+ PyErr_SetString(PyExc_Exception, "Expected List as argument 2");
+ return 0;
+ }
+
+ Py_ssize_t origin_count = PyTuple_Size(origin_list);
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ int maskbit;
+
+ for (idx = 0; idx < origin_count; idx++) {
+ maskbit = PyInt_AS_LONG(PyTuple_GetItem(origin_list, idx));
+
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Origin bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Origin router Not Found");
+ return 0;
+ }
+ }
+
+ dx_bitmask_clear_all(rnode->valid_origins);
+ for (idx = 0; idx < origin_count; idx++) {
+ maskbit = PyInt_AS_LONG(PyTuple_GetItem(origin_list, idx));
+ dx_bitmask_set_bit(rnode->valid_origins, maskbit);
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
@@ -326,6 +381,7 @@ static PyMethodDef RouterAdapter_methods
{"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
{"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"},
{"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
+ {"set_valid_origins", dx_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"},
{"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"},
{"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"},
{"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
Modified: qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py (original)
+++ qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py Fri Oct 4 17:58:54 2013
@@ -344,6 +344,7 @@ class PathTest(unittest.TestCase):
self.id = 'R1'
self.area = 'area'
self.next_hops = None
+ self.valid_origins = None
self.engine = PathEngine(self)
def log(self, level, text):
@@ -352,6 +353,9 @@ class PathTest(unittest.TestCase):
def next_hops_changed(self, nh):
self.next_hops = nh
+ def valid_origins_changed(self, vo):
+ self.valid_origins = vo
+
def test_topology1(self):
"""
@@ -369,6 +373,11 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R2'], 'R2')
self.assertEqual(self.next_hops['R3'], 'R2')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.assertEqual(self.valid_origins['R2'], [])
+ self.assertEqual(self.valid_origins['R3'], [])
+
def test_topology2(self):
"""
@@ -396,6 +405,17 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R5'], 'R2')
self.assertEqual(self.next_hops['R6'], 'R2')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.valid_origins['R6'].sort()
+ self.assertEqual(self.valid_origins['R2'], [])
+ self.assertEqual(self.valid_origins['R3'], [])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], [])
+ self.assertEqual(self.valid_origins['R6'], [])
+
def test_topology3(self):
"""
@@ -404,7 +424,7 @@ class PathTest(unittest.TestCase):
+----+ +----+ +----+
| |
+====+ +----+ +----+
- | R1 |------| R5 |------| R6 |
+ | R1 |------| R5 |------| R6 |
+====+ +----+ +----+
"""
@@ -423,6 +443,17 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R5'], 'R5')
self.assertEqual(self.next_hops['R6'], 'R5')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.valid_origins['R6'].sort()
+ self.assertEqual(self.valid_origins['R2'], ['R5', 'R6'])
+ self.assertEqual(self.valid_origins['R3'], ['R5', 'R6'])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+
def test_topology4(self):
"""
@@ -451,6 +482,19 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R6'], 'R5')
self.assertEqual(self.next_hops['R7'], 'R5')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.valid_origins['R6'].sort()
+ self.valid_origins['R7'].sort()
+ self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
def test_topology5(self):
"""
@@ -479,6 +523,19 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R6'], 'R5')
self.assertEqual(self.next_hops['R7'], 'R5')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.valid_origins['R6'].sort()
+ self.valid_origins['R7'].sort()
+ self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
def test_topology5_with_asymmetry1(self):
"""
@@ -507,6 +564,19 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R6'], 'R5')
self.assertEqual(self.next_hops['R7'], 'R5')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.valid_origins['R6'].sort()
+ self.valid_origins['R7'].sort()
+ self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
def test_topology5_with_asymmetry2(self):
"""
@@ -535,6 +605,19 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R6'], 'R5')
self.assertEqual(self.next_hops['R7'], 'R5')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.valid_origins['R6'].sort()
+ self.valid_origins['R7'].sort()
+ self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+ self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
def test_topology5_with_asymmetry3(self):
"""
@@ -561,6 +644,15 @@ class PathTest(unittest.TestCase):
self.assertEqual(self.next_hops['R4'], 'R3')
self.assertEqual(self.next_hops['R5'], 'R5')
+ self.valid_origins['R2'].sort()
+ self.valid_origins['R3'].sort()
+ self.valid_origins['R4'].sort()
+ self.valid_origins['R5'].sort()
+ self.assertEqual(self.valid_origins['R2'], ['R5'])
+ self.assertEqual(self.valid_origins['R3'], ['R5'])
+ self.assertEqual(self.valid_origins['R4'], [])
+ self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+
if __name__ == '__main__':
unittest.main()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org