You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/09/08 20:32:46 UTC
[2/2] qpid-dispatch git commit: DISPATCH-584 - Implemented flood
limiting based on known topology radius.
DISPATCH-584 - Implemented flood limiting based on known topology radius.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/08b36786
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/08b36786
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/08b36786
Branch: refs/heads/master
Commit: 08b3678655f015aeacb62eb1466649adf788af0a
Parents: 3671d8c
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 8 16:28:45 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 8 16:28:45 2017 -0400
----------------------------------------------------------------------
include/qpid/dispatch/parse.h | 2 +-
python/qpid_dispatch_internal/router/node.py | 8 +++-
python/qpid_dispatch_internal/router/path.py | 19 +++++++--
src/router_node.c | 33 +++++++++++++---
src/router_private.h | 7 ++++
src/router_pynode.c | 14 +++++++
tests/router_engine_test.py | 47 +++++++++++++++++------
7 files changed, 107 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/include/qpid/dispatch/parse.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 2f5d1e5..7be09f0 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -203,7 +203,7 @@ int64_t qd_parse_as_long(qd_parsed_field_t *field);
bool qd_parse_as_bool(qd_parsed_field_t *field);
/**
- * Return the number of sub-field in a compound field. If the field is
+ * Return the number of sub-fields in a compound field. If the field is
* a list or array, this is the number of items in the list/array. If
* the field is a map, this is the number of key/value pairs in the map
* (i.e. half the number of actual sub-field in the map).
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index 448e684..88d820f 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -148,10 +148,16 @@ class NodeTracker(object):
collection = {self.my_id : self.link_state}
for node_id, node in self.nodes.items():
collection[node_id] = node.link_state
- next_hops, costs, valid_origins = self.container.path_engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.container.path_engine.calculate_routes(collection)
self.container.log_ls(LOG_INFO, "Computed next hops: %r" % next_hops)
self.container.log_ls(LOG_INFO, "Computed costs: %r" % costs)
self.container.log_ls(LOG_INFO, "Computed valid origins: %r" % valid_origins)
+ self.container.log_ls(LOG_INFO, "Computed radius: %d" % radius)
+
+ ##
+ ## Update the topology radius
+ ##
+ self.container.router_adapter.set_radius(radius)
##
## Update the next hops and valid origins for each node
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/python/qpid_dispatch_internal/router/path.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/path.py b/python/qpid_dispatch_internal/router/path.py
index 1df094c..4e215e2 100644
--- a/python/qpid_dispatch_internal/router/path.py
+++ b/python/qpid_dispatch_internal/router/path.py
@@ -45,11 +45,14 @@ class PathEngine(object):
##
## Setup Dijkstra's Algorithm
##
+ hops = {}
cost = {}
prev = {}
for _id in link_states:
+ hops[_id] = None # infinite
cost[_id] = None # infinite
prev[_id] = None # undefined
+ hops[root] = 0 # no hops to the root node
cost[root] = 0 # no cost to the root node
unresolved = NodeSet(cost)
@@ -65,6 +68,7 @@ class PathEngine(object):
if unresolved.contains(v):
alt = cost[u] + v_cost
if cost[v] == None or alt < cost[v]:
+ hops[v] = hops[u] + 1
cost[v] = alt
prev[v] = u
unresolved.set_cost(v, alt)
@@ -76,13 +80,14 @@ class PathEngine(object):
for u, val in prev.items():
if not val:
prev.pop(u)
+ hops.pop(u)
cost.pop(u)
##
## Return previous-node and cost maps. Prev is a map of all reachable, remote nodes to
## their predecessor node. Cost is a map of all reachable nodes and their costs.
##
- return prev, cost
+ return prev, cost, hops
def _calculate_valid_origins(self, nodeset, collection):
@@ -97,7 +102,7 @@ class PathEngine(object):
valid_origin[node] = []
for root in valid_origin.keys():
- prev, cost = self._calculate_tree_from_root(root, collection)
+ prev, cost, hops = self._calculate_tree_from_root(root, collection)
nodes = prev.keys()
while len(nodes) > 0:
u = nodes[0]
@@ -121,10 +126,16 @@ class PathEngine(object):
##
## Generate the shortest-path tree with the local node as root
##
- prev, cost = self._calculate_tree_from_root(self.id, collection)
+ prev, cost, hops = self._calculate_tree_from_root(self.id, collection)
nodes = prev.keys()
##
+ ## We will also compute the radius of the topology. This is the number of
+ ## hops (not cost) to the most distant router from the local node
+ ##
+ radius = max(hops.values()) if len(hops) > 0 else 0
+
+ ##
## Distill the path tree into a map of next hops for each node
##
next_hops = {}
@@ -147,7 +158,7 @@ class PathEngine(object):
##
valid_origins = self._calculate_valid_origins(prev.keys(), collection)
- return (next_hops, cost, valid_origins)
+ return (next_hops, cost, valid_origins, radius)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 9aa2845..3ed4eac 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -92,9 +92,10 @@ static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn,
}
-static qd_iterator_t *router_annotate_message(qd_router_t *router,
- qd_message_t *msg,
- qd_bitmask_t **link_exclusions)
+static qd_iterator_t *router_annotate_message(qd_router_t *router,
+ qd_message_t *msg,
+ qd_bitmask_t **link_exclusions,
+ uint32_t *distance)
{
qd_iterator_t *ingress_iter = 0;
@@ -105,6 +106,7 @@ static qd_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *phase = qd_message_get_phase(msg);
*link_exclusions = 0;
+ *distance = 0;
//
// QD_MA_TRACE:
@@ -117,6 +119,11 @@ static qd_iterator_t *router_annotate_message(qd_router_t *router,
if (trace) {
if (qd_parse_is_list(trace)) {
//
+ // Return the distance in hops that this delivery has traveled.
+ //
+ *distance = qd_parse_sub_count(trace);
+
+ //
// Create a link-exclusion map for the items in the trace. This map will
// contain a one-bit for each link that leads to a neighbor router that
// the message has already passed through.
@@ -351,10 +358,26 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
}
qd_message_message_annotations(msg);
- qd_bitmask_t *link_exclusions;
+ qd_bitmask_t *link_exclusions;
+ uint32_t distance;
- qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions);
+ qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance);
+ //
+ // If this delivery has traveled further than the known radius of the network topology (plus 1),
+ // release and settle the delivery. This can happen in the case of "flood" multicast where the
+ // deliveries follow all available paths. This will only discard messages that will reach their
+ // destinations via shorter paths.
+ //
+ if (distance > (router->topology_radius + 1)) {
+ qd_message_set_discard(msg, true);
+ pn_link_flow(pn_link, 1);
+ pn_delivery_update(pnd, PN_RELEASED);
+ pn_delivery_settle(pnd);
+ qd_message_free(msg);
+ return;
+ }
+
if (anonymous_link) {
qd_iterator_t *addr_iter = 0;
int phase = 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 4e9ac9c..33c526a 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -65,6 +65,13 @@ struct qd_router_t {
sys_mutex_t *lock;
qd_timer_t *timer;
+
+ //
+ // Store the "radius" of the current network topology. This is defined as the
+ // distance in hops (not cost) from the local router to the most distant known
+ // router in the topology.
+ //
+ int topology_radius;
};
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 3984f74..a791973 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -219,6 +219,19 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
}
+static PyObject* qd_set_radius(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ qd_router_t *router = adapter->router;
+
+ if (!PyArg_ParseTuple(args, "i", &router->topology_radius))
+ return 0;
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
static PyObject* qd_map_destination(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
@@ -281,6 +294,7 @@ static PyMethodDef RouterAdapter_methods[] = {
{"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"},
{"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"},
{"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"},
+ {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"},
{"map_destination", qd_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
{"unmap_destination", qd_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
{"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"},
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/08b36786/tests/router_engine_test.py
----------------------------------------------------------------------
diff --git a/tests/router_engine_test.py b/tests/router_engine_test.py
index 8132270..b9a4fc3 100644
--- a/tests/router_engine_test.py
+++ b/tests/router_engine_test.py
@@ -228,7 +228,7 @@ class PathTest(unittest.TestCase):
collection = { 'R1': LinkState(None, 'R1', 1, {'R2':1}),
'R2': LinkState(None, 'R2', 1, {'R1':1, 'R3':1}),
'R3': LinkState(None, 'R3', 1, {'R2':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 2)
self.assertEqual(next_hops['R2'], 'R2')
self.assertEqual(next_hops['R3'], 'R2')
@@ -238,6 +238,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R2'], [])
self.assertEqual(valid_origins['R3'], [])
+ self.assertEqual(radius, 2)
+
def test_topology2(self):
"""
@@ -256,7 +258,7 @@ class PathTest(unittest.TestCase):
'R4': LinkState(None, 'R4', 1, {'R2':1, 'R5':1}),
'R5': LinkState(None, 'R5', 1, {'R3':1, 'R4':1, 'R6':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 5)
self.assertEqual(next_hops['R2'], 'R2')
self.assertEqual(next_hops['R3'], 'R2')
@@ -275,6 +277,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R5'], [])
self.assertEqual(valid_origins['R6'], [])
+ self.assertEqual(radius, 4)
+
def test_topology3(self):
"""
@@ -293,7 +297,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':1, 'R5':1}),
'R5': LinkState(None, 'R5', 1, {'R1':1, 'R4':1, 'R6':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 5)
self.assertEqual(next_hops['R2'], 'R3')
self.assertEqual(next_hops['R3'], 'R3')
@@ -312,6 +316,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R5'], ['R2', 'R3'])
self.assertEqual(valid_origins['R6'], ['R2', 'R3'])
+ self.assertEqual(radius, 2)
+
def test_topology4(self):
"""
@@ -330,7 +336,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':1, 'R5':1}),
'R5': LinkState(None, 'R5', 1, {'R1':1, 'R4':1, 'R6':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 6)
self.assertEqual(next_hops['R2'], 'R3')
self.assertEqual(next_hops['R3'], 'R3')
@@ -352,6 +358,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R6'], ['R2', 'R3'])
self.assertEqual(valid_origins['R7'], ['R2', 'R3'])
+ self.assertEqual(radius, 3)
+
def test_topology5(self):
"""
@@ -370,7 +378,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':1, 'R5':1, 'R2':1}),
'R5': LinkState(None, 'R5', 1, {'R1':1, 'R4':1, 'R6':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 6)
self.assertEqual(next_hops['R2'], 'R2')
self.assertEqual(next_hops['R3'], 'R3')
@@ -399,6 +407,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R6'], ['R2', 'R3'])
self.assertEqual(valid_origins['R7'], ['R2', 'R3'])
+ self.assertEqual(radius, 3)
+
def test_topology5_with_asymmetry1(self):
"""
@@ -417,7 +427,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':1, 'R5':1, 'R2':1}),
'R5': LinkState(None, 'R5', 1, {'R1':1, 'R4':1, 'R6':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 6)
self.assertEqual(next_hops['R2'], 'R2')
self.assertEqual(next_hops['R3'], 'R3')
@@ -439,6 +449,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R6'], ['R2', 'R3'])
self.assertEqual(valid_origins['R7'], ['R2', 'R3'])
+ self.assertEqual(radius, 3)
+
def test_topology5_with_asymmetry2(self):
"""
@@ -457,7 +469,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':1, 'R5':1}),
'R5': LinkState(None, 'R5', 1, {'R1':1, 'R4':1, 'R6':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 6)
self.assertEqual(next_hops['R2'], 'R3')
self.assertEqual(next_hops['R3'], 'R3')
@@ -479,6 +491,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R6'], ['R2', 'R3'])
self.assertEqual(valid_origins['R7'], ['R2', 'R3'])
+ self.assertEqual(radius, 3)
+
def test_topology5_with_asymmetry3(self):
"""
@@ -497,7 +511,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':1, 'R5':1}),
'R5': LinkState(None, 'R5', 1, {'R1':1, 'R4':1}),
'R6': LinkState(None, 'R6', 1, {'R5':1, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 4)
self.assertEqual(next_hops['R2'], 'R3')
self.assertEqual(next_hops['R3'], 'R3')
@@ -513,6 +527,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R4'], [])
self.assertEqual(valid_origins['R5'], ['R2', 'R3'])
+ self.assertEqual(radius, 2)
+
def test_topology5_with_costs1(self):
"""
@@ -533,7 +549,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':3, 'R5':10, 'R2':20}),
'R5': LinkState(None, 'R5', 1, {'R1':10, 'R4':5, 'R6':2}),
'R6': LinkState(None, 'R6', 1, {'R5':2, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 6)
self.assertEqual(next_hops['R2'], 'R3')
self.assertEqual(next_hops['R3'], 'R3')
@@ -562,6 +578,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R6'], [])
self.assertEqual(valid_origins['R7'], [])
+ self.assertEqual(radius, 3)
+
def test_topology5_with_costs2(self):
"""
@@ -582,7 +600,7 @@ class PathTest(unittest.TestCase):
'R1': LinkState(None, 'R1', 1, {'R3':100, 'R5':10, 'R2':5}),
'R5': LinkState(None, 'R5', 1, {'R1':10, 'R4':100, 'R6':2}),
'R6': LinkState(None, 'R6', 1, {'R5':2, 'R7':1}) }
- next_hops, costs, valid_origins = self.engine.calculate_routes(collection)
+ next_hops, costs, valid_origins, radius = self.engine.calculate_routes(collection)
self.assertEqual(len(next_hops), 6)
self.assertEqual(next_hops['R2'], 'R2')
self.assertEqual(next_hops['R3'], 'R2')
@@ -611,6 +629,8 @@ class PathTest(unittest.TestCase):
self.assertEqual(valid_origins['R6'], ['R2', 'R3', 'R4'])
self.assertEqual(valid_origins['R7'], ['R2', 'R3', 'R4'])
+ self.assertEqual(radius, 3)
+
def test_topology6_path_vs_valid_origin(self):
"""
@@ -634,15 +654,18 @@ class PathTest(unittest.TestCase):
self.id = 'R3'
self.engine = PathEngine(self)
- r3_next_hops, r3_costs, r3_valid_origins = self.engine.calculate_routes(collection)
+ r3_next_hops, r3_costs, r3_valid_origins, r3_radius = self.engine.calculate_routes(collection)
self.id = 'R1'
self.engine = PathEngine(self)
- r1_next_hops, r1_costs, r1_valid_origins = self.engine.calculate_routes(collection)
+ r1_next_hops, r1_costs, r1_valid_origins, r1_radius = self.engine.calculate_routes(collection)
self.assertEqual(r1_next_hops['R6'], 'R2')
self.assertEqual(r3_valid_origins['R6'], [])
+ self.assertEqual(r1_radius, 3)
+ self.assertEqual(r3_radius, 2)
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org