You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/10/15 23:05:50 UTC
svn commit: r1532528 - in /qpid/trunk/qpid/extras/dispatch:
python/qpid/dispatch/router/ src/ tests/ tools/src/py/
Author: tross
Date: Tue Oct 15 21:05:50 2013
New Revision: 1532528
URL: http://svn.apache.org/r1532528
Log:
QPID-5216
- Removed unneeded python router code
- Added propagation of subscribed global addresses
- Broke out address statistics to include to/from-container counts
- Trace no longer optional, broke down and added loop prevention
- Don't allow endpoint subscriptions to subscribe to local-class addresses
Removed:
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
Modified:
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
qpid/trunk/qpid/extras/dispatch/src/router_agent.c
qpid/trunk/qpid/extras/dispatch/src/router_node.c
qpid/trunk/qpid/extras/dispatch/src/router_private.h
qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py
qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py Tue Oct 15 21:05:50 2013
@@ -31,18 +31,18 @@ class MobileAddressEngine(object):
Note that this routing table maps from the mobile address to the remote router where that address
is directly bound.
"""
- def __init__(self, container):
+ def __init__(self, container, node_tracker):
self.container = container
+ self.node_tracker = node_tracker
self.id = self.container.id
self.area = self.container.area
self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
self.mobile_seq = 0
- self.local_keys = []
- self.added_keys = []
- self.deleted_keys = []
- self.remote_lists = {} # map router_id => (sequence, list of keys)
+ self.local_addrs = []
+ self.added_addrs = []
+ self.deleted_addrs = []
+ self.remote_lists = {} # map router_id => (sequence, list of addrs)
self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
- self.remote_changed = False
self.needed_mars = {}
@@ -51,48 +51,41 @@ class MobileAddressEngine(object):
self._send_mars()
##
- ## If local keys have changed, collect the changes and send a MAU with the diffs
+ ## If local addrs have changed, collect the changes and send a MAU with the diffs
## Note: it is important that the differential-MAU be sent before a RA is sent
##
- if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
+ if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
self.mobile_seq += 1
self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
- MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
- self.local_keys.extend(self.added_keys)
- for key in self.deleted_keys:
- self.local_keys.remove(key)
- self.added_keys = []
- self.deleted_keys = []
+ MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs))
+ self.local_addrs.extend(self.added_addrs)
+ for addr in self.deleted_addrs:
+ self.local_addrs.remove(addr)
+ self.added_addrs = []
+ self.deleted_addrs = []
self.container.mobile_sequence_changed(self.mobile_seq)
- ##
- ## If remotes have changed, start the process of updating local bindings
- ##
- if self.remote_changed:
- self.remote_changed = False
- self._update_remote_keys()
-
- def add_local_address(self, key):
+ def add_local_address(self, addr):
"""
"""
- if self.local_keys.count(key) == 0:
- if self.added_keys.count(key) == 0:
- self.added_keys.append(key)
+ if self.local_addrs.count(addr) == 0:
+ if self.added_addrs.count(addr) == 0:
+ self.added_addrs.append(addr)
else:
- if self.deleted_keys.count(key) > 0:
- self.deleted_keys.remove(key)
+ if self.deleted_addrs.count(addr) > 0:
+ self.deleted_addrs.remove(addr)
- def del_local_address(self, key):
+ def del_local_address(self, addr):
"""
"""
- if self.local_keys.count(key) > 0:
- if self.deleted_keys.count(key) == 0:
- self.deleted_keys.append(key)
+ if self.local_addrs.count(addr) > 0:
+ if self.deleted_addrs.count(addr) == 0:
+ self.deleted_addrs.append(addr)
else:
- if self.added_keys.count(key) > 0:
- self.added_keys.remove(key)
+ if self.added_addrs.count(addr) > 0:
+ self.added_addrs.remove(addr)
def handle_ra(self, msg, now):
@@ -131,7 +124,8 @@ class MobileAddressEngine(object):
return
self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
self.remote_last_seen[msg.id] = now
- self.remote_changed = True
+ (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
+ self._activate_remotes(msg.id, add_list, del_list)
else:
##
## Differential MAU
@@ -148,10 +142,12 @@ class MobileAddressEngine(object):
if msg.add_list and msg.add_list.__class__ == list:
_list.extend(msg.add_list)
if msg.del_list and msg.del_list.__class__ == list:
- for key in msg.del_list:
- _list.remove(key)
+ for addr in msg.del_list:
+ _list.remove(addr)
self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- self.remote_changed = True
+ self.node_tracker.add_addresses(msg.id, msg.add_list)
+ self.node_tracker.del_addresses(msg.id, msg.del_list)
+ self._activate_remotes(msg.id, msg.add_list, msg.del_list)
else:
self.needed_mars[(msg.id, msg.area, _seq)] = None
else:
@@ -163,14 +159,7 @@ class MobileAddressEngine(object):
return
if msg.have_seq < self.mobile_seq:
self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
- MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
-
- def _update_remote_keys(self):
- keys = {}
- for _id,(seq,key_list) in self.remote_lists.items():
- keys[_id] = key_list
- self.container.mobile_keys_changed(keys)
+ MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs))
def _expire_remotes(self, now):
@@ -186,3 +175,11 @@ class MobileAddressEngine(object):
self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
self.needed_mars = {}
+
+ def _activate_remotes(self, _id, added, deleted):
+ bit = self.node_tracker.maskbit_for_node(_id)
+ for a in added:
+ self.container.router_adapter.map_destination(a, bit)
+ for d in deleted:
+ self.container.router_adapter.unmap_destination(d, bit)
+
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py Tue Oct 15 21:05:50 2013
@@ -113,6 +113,33 @@ class NodeTracker(object):
return None
+ def add_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ for a in addrs:
+ node.addrs[a] = 1
+
+
+ def del_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ for a in addrs:
+ node.addrs.pop(a)
+
+
+ def overwrite_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ added = []
+ deleted = []
+ for a in addrs:
+ if a not in node.addrs.keys():
+ added.append(a)
+ for a in node.addrs.keys():
+ if a not in addrs:
+ deleted.append(a)
+ for a in addrs:
+ node.addrs[a] = 1
+ return (added, deleted)
+
+
def _allocate_maskbit(self):
if self.next_maskbit == None:
raise Exception("Exceeded Maximum Router Count")
@@ -143,4 +170,5 @@ class RemoteNode(object):
self.maskbit = maskbit
self.neighbor = neighbor
self.remote = not neighbor
+ self.addrs = {} # Address => Count at Node (1 only for the present)
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Tue Oct 15 21:05:50 2013
@@ -27,8 +27,6 @@ from link import LinkStateEngine
from path import PathEngine
from mobile import MobileAddressEngine
from routing import RoutingTableEngine
-from binding import BindingEngine
-from adapter import AdapterEngine
from node import NodeTracker
##
@@ -75,10 +73,8 @@ class RouterEngine:
self.neighbor_engine = NeighborEngine(self)
self.link_state_engine = LinkStateEngine(self)
self.path_engine = PathEngine(self)
- self.mobile_address_engine = MobileAddressEngine(self)
+ self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker)
self.routing_table_engine = RoutingTableEngine(self, self.node_tracker)
- self.binding_engine = BindingEngine(self)
- self.adapter_engine = AdapterEngine(self)
@@ -92,24 +88,26 @@ class RouterEngine:
return self.id
- def addLocalAddress(self, key):
+ def addressAdded(self, addr):
"""
"""
try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
+ if addr.find('Mtemp.') == 0:
return
- self.mobile_address_engine.add_local_address(key)
+ if addr.find('M') == 0:
+ self.mobile_address_engine.add_local_address(addr[1:])
except Exception, e:
self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
- def delLocalAddress(self, key):
+ def addressRemoved(self, addr):
"""
"""
try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
+ if addr.find('Mtemp.') == 0:
return
- self.mobile_address_engine.del_local_address(key)
+ if key.find('M') == 0:
+ self.mobile_address_engine.del_local_address(addr[1:])
except Exception, e:
self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
@@ -124,8 +122,6 @@ class RouterEngine:
self.path_engine.tick(now)
self.mobile_address_engine.tick(now)
self.routing_table_engine.tick(now)
- self.binding_engine.tick(now)
- self.adapter_engine.tick(now)
self.node_tracker.tick(now)
except Exception, e:
self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
@@ -190,14 +186,10 @@ class RouterEngine:
return { 'help' : "Get list of supported values for kind",
'link-state' : "This router's link state",
'link-state-set' : "The set of link states from known routers",
- 'next-hops' : "Next hops to each known router",
- 'topo-table' : "Topological routing table",
- 'mobile-table' : "Mobile key routing table"
+ 'next-hops' : "Next hops to each known router"
}
if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
if kind == 'next-hops' : return self.routing_table_engine.next_hops
- if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']}
- if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']}
if kind == 'link-state-set' :
copy = {}
for _id,_ls in self.link_state_engine.collection.items():
@@ -249,7 +241,6 @@ class RouterEngine:
def next_hops_changed(self, next_hop_table):
self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
self.routing_table_engine.next_hops_changed(next_hop_table)
- self.binding_engine.next_hops_changed()
def valid_origins_changed(self, valid_origins):
self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
@@ -259,17 +250,9 @@ class RouterEngine:
self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
self.link_state_engine.set_mobile_sequence(mobile_seq)
- def mobile_keys_changed(self, keys):
- self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
- self.binding_engine.mobile_keys_changed(keys)
-
def get_next_hops(self):
return self.routing_table_engine.get_next_hops()
- def remote_routes_changed(self, key_class, routes):
- self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
- self.adapter_engine.remote_routes_changed(key_class, routes)
-
def new_neighbor(self, rid, link_id):
self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
self.node_tracker.new_neighbor(rid, link_id)
Modified: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_agent.c?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_agent.c Tue Oct 15 21:05:50 2013
@@ -140,6 +140,8 @@ static void dx_router_query_address(dx_r
dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress);
dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress);
dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit);
+ dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container);
+ dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container);
addr = DEQ_NEXT(addr);
dx_agent_value_complete(cor, addr != 0);
}
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Tue Oct 15 21:05:50 2013
@@ -107,7 +107,7 @@ void dx_router_del_node_ref_LH(dx_router
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
-static void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
{
if (addr == 0)
return;
@@ -314,7 +314,7 @@ static int router_writable_link_handler(
}
-static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop)
{
dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
@@ -333,25 +333,27 @@ static dx_field_iterator_t *router_annot
//
// If there is a trace field, append this router's ID to the trace.
//
+ dx_compose_insert_string(out_da, DX_DA_TRACE);
+ dx_compose_start_list(out_da);
if (trace) {
- dx_compose_insert_string(out_da, DX_DA_TRACE);
- dx_compose_start_list(out_da);
-
if (dx_parse_is_list(trace)) {
uint32_t idx = 0;
dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
while (trace_item) {
dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix))
+ *drop = 1;
+ dx_field_iterator_reset(iter);
dx_compose_insert_string_iterator(out_da, iter);
idx++;
trace_item = dx_parse_sub_value(trace, idx);
}
}
-
- dx_compose_insert_string(out_da, direct_prefix);
- dx_compose_end_list(out_da);
}
+ dx_compose_insert_string(out_da, direct_prefix);
+ dx_compose_end_list(out_da);
+
//
// If there is no ingress field, annotate the ingress as this router else
// keep the original field.
@@ -475,25 +477,26 @@ static void router_rx_handler(void* cont
// Interpret and update the delivery annotations of the message. As a convenience,
// this function returns the iterator to the ingress field (if it exists).
//
- dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
+ int drop = 0;
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
//
// Forward to the in-process handler for this message if there is one. The
// actual invocation of the handler will occur later after we've released
// the lock.
//
- if (addr->handler) {
+ if (!drop && addr->handler) {
in_process_copy = dx_message_copy(msg);
handler = addr->handler;
handler_context = addr->handler_context;
- addr->deliveries_egress++;
+ addr->deliveries_to_container++;
}
//
// If the address form is local (i.e. is prefixed by _local), don't forward
// outside of the router process.
//
- if (!is_local) {
+ if (!drop && !is_local) {
//
// Forward to all of the local links receiving this address.
//
@@ -725,6 +728,7 @@ static int router_outgoing_link_handler(
const char *r_src = pn_terminus_get_address(dx_link_remote_source(link));
int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link));
int is_router = dx_router_terminus_is_router(dx_link_remote_target(link));
+ int propagate = 0;
dx_field_iterator_t *iter = 0;
if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) {
@@ -745,15 +749,15 @@ static int router_outgoing_link_handler(
//
// If this is an endpoint link with a source address, make sure the address is
- // appropriate for endpoint links. If it is not a local or mobile address, (i.e.
- // a router or area address), it cannot be bound to an endpoint link.
+ // appropriate for endpoint links. If it is not mobile address, it cannot be
+ // bound to an endpoint link.
//
if(r_src && !is_router && !is_dynamic) {
iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
unsigned char prefix = dx_field_iterator_octet(iter);
dx_field_iterator_reset(iter);
- if (prefix != 'L' && prefix != 'M') {
+ if (prefix != 'M') {
dx_field_iterator_free(iter);
pn_link_close(pn_link);
dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
@@ -819,15 +823,26 @@ static int router_outgoing_link_handler(
dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
- dx_field_iterator_free(iter);
rlink->owning_addr = addr;
dx_router_add_link_ref_LH(&addr->rlinks, rlink);
+
+ //
+ // If this is not a dynamic address and it is the first local subscription
+ // to the address, supply the address to the router module for propagation
+ // to other nodes.
+ //
+ propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
}
DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
+ if (propagate)
+ dx_router_global_added(router, iter);
+
+ if (iter)
+ dx_field_iterator_free(iter);
pn_link_open(pn_link);
return 0;
}
@@ -1059,6 +1074,8 @@ dx_router_t *dx_router(dx_dispatch_t *dx
router->dtag = 1;
router->pyRouter = 0;
router->pyTick = 0;
+ router->pyAdded = 0;
+ router->pyRemoved = 0;
//
// Create addresses for all of the routers in the topology. It will be registered
@@ -1172,7 +1189,7 @@ void dx_router_send(dx_dispatch_t
//
// Forward to all of the local links receiving this address.
//
- addr->deliveries_ingress++;
+ addr->deliveries_from_container++;
dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
Modified: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Tue Oct 15 21:05:50 2013
@@ -120,6 +120,8 @@ struct dx_address_t {
uint64_t deliveries_ingress;
uint64_t deliveries_egress;
uint64_t deliveries_transit;
+ uint64_t deliveries_to_container;
+ uint64_t deliveries_from_container;
};
ALLOC_DECLARE(dx_address_t);
@@ -150,6 +152,8 @@ struct dx_router_t {
PyObject *pyRouter;
PyObject *pyTick;
+ PyObject *pyAdded;
+ PyObject *pyRemoved;
dx_agent_class_t *class_router;
dx_agent_class_t *class_link;
@@ -158,11 +162,16 @@ struct dx_router_t {
};
+
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr);
void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter);
+void dx_router_global_removed(dx_router_t *router, const char *addr);
+
#endif
Modified: qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_pynode.c?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_pynode.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_pynode.c Tue Oct 15 21:05:50 2013
@@ -347,15 +347,48 @@ static PyObject* dx_del_neighbor_router(
static PyObject* dx_map_destination(PyObject *self, PyObject *args)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *addr;
- int router_maskbit;
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+ dx_address_t *addr;
+ dx_field_iterator_t *iter;
- if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- // TODO
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
+ DEQ_ITEM_INIT(addr);
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ }
+ dx_field_iterator_free(iter);
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+ sys_mutex_unlock(router->lock);
+
+ dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -364,15 +397,43 @@ static PyObject* dx_map_destination(PyOb
static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *addr;
- int router_maskbit;
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+ dx_address_t *addr;
+
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+ return 0;
- if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ dx_field_iterator_free(iter);
+
+ if (!addr) {
+ PyErr_SetString(PyExc_Exception, "Address Not Found");
+ sys_mutex_unlock(router->lock);
+ return 0;
+ }
+
+ dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ dx_router_check_addr_LH(router, addr);
+ sys_mutex_unlock(router->lock);
- // TODO
+ dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -534,6 +595,18 @@ void dx_router_python_setup(dx_router_t
dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
return;
}
+
+ router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded");
+ if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method");
+ return;
+ }
+
+ router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved");
+ if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
+ return;
+ }
}
@@ -557,3 +630,35 @@ void dx_pyrouter_tick(dx_router_t *route
}
}
+
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
+{
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ char *address = (char*) dx_field_iterator_copy(iter);
+
+ dx_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
+ pValue = PyObject_CallObject(router->pyAdded, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
+ dx_python_unlock();
+
+ free(address);
+ }
+}
+
+
+void dx_router_global_removed(dx_router_t *router, const char *addr)
+{
+}
+
Modified: qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py (original)
+++ qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py Tue Oct 15 21:05:50 2013
@@ -370,7 +370,7 @@ class RouterTest(unittest.TestCase):
da = rm.instructions
self.assertEqual(da.__class__, dict)
self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
- self.assertFalse('qdx.trace' in da)
+ self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
##
## Pre-existing ingress
@@ -388,7 +388,7 @@ class RouterTest(unittest.TestCase):
da = rm.instructions
self.assertEqual(da.__class__, dict)
self.assertEqual(da['qdx.ingress'], 'ingress-router')
- self.assertFalse('qdx.trace' in da)
+ self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
##
## Invalid trace type
Modified: qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat (original)
+++ qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat Tue Oct 15 21:05:50 2013
@@ -102,7 +102,7 @@ class BusManager:
self.M.start()
self.M.route("amqp:/*", "amqp://%s/$1" % host)
self.address = "amqp:/_local/agent"
- self.reply = "amqp:/reply-address/0001" # FIX THIS!
+ self.reply = "amqp:/temp.reply-address/0001" # FIX THIS!
self.M.subscribe(self.reply)
def Disconnect(self):
@@ -217,6 +217,8 @@ class BusManager:
heads.append(Header("in", Header.COMMAS))
heads.append(Header("out", Header.COMMAS))
heads.append(Header("thru", Header.COMMAS))
+ heads.append(Header("to-proc", Header.COMMAS))
+ heads.append(Header("from-proc", Header.COMMAS))
rows = []
request = Message()
@@ -243,6 +245,8 @@ class BusManager:
row.append(addr['deliveries-ingress'])
row.append(addr['deliveries-egress'])
row.append(addr['deliveries-transit'])
+ row.append(addr['deliveries-to-container'])
+ row.append(addr['deliveries-from-container'])
rows.append(row)
title = "Router Addresses"
sorter = Sorter(heads, rows, 'address', 0, True)
@@ -316,7 +320,6 @@ def main(argv=None):
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
- raise
bm.Disconnect() # try to deallocate brokers
return 1
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org