You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/10/15 23:05:50 UTC

svn commit: r1532528 - in /qpid/trunk/qpid/extras/dispatch: python/qpid/dispatch/router/ src/ tests/ tools/src/py/

Author: tross
Date: Tue Oct 15 21:05:50 2013
New Revision: 1532528

URL: http://svn.apache.org/r1532528
Log:
QPID-5216
  - Removed unneeded python router code
  - Added propagation of subscribed global addresses
  - Broke out address statistics to include to/from-container counts
  - Trace no longer optional, broke down and added loop prevention
  - Don't allow endpoint subscriptions to subscribe to local-class addresses

Removed:
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
Modified:
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
    qpid/trunk/qpid/extras/dispatch/src/router_agent.c
    qpid/trunk/qpid/extras/dispatch/src/router_node.c
    qpid/trunk/qpid/extras/dispatch/src/router_private.h
    qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
    qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py
    qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py Tue Oct 15 21:05:50 2013
@@ -31,18 +31,18 @@ class MobileAddressEngine(object):
     Note that this routing table maps from the mobile address to the remote router where that address
     is directly bound.
     """
-    def __init__(self, container):
+    def __init__(self, container, node_tracker):
         self.container = container
+        self.node_tracker = node_tracker
         self.id = self.container.id
         self.area = self.container.area
         self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
         self.mobile_seq = 0
-        self.local_keys = []
-        self.added_keys = []
-        self.deleted_keys = []
-        self.remote_lists = {}      # map router_id => (sequence, list of keys)
+        self.local_addrs = []
+        self.added_addrs = []
+        self.deleted_addrs = []
+        self.remote_lists = {}      # map router_id => (sequence, list of addrs)
         self.remote_last_seen = {}  # map router_id => time of last seen advertizement/update
-        self.remote_changed = False
         self.needed_mars = {}
 
 
@@ -51,48 +51,41 @@ class MobileAddressEngine(object):
         self._send_mars()
 
         ##
-        ## If local keys have changed, collect the changes and send a MAU with the diffs
+        ## If local addrs have changed, collect the changes and send a MAU with the diffs
         ## Note: it is important that the differential-MAU be sent before a RA is sent
         ##
-        if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
+        if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
             self.mobile_seq += 1
             self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
-                                MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
-            self.local_keys.extend(self.added_keys)
-            for key in self.deleted_keys:
-                self.local_keys.remove(key)
-            self.added_keys = []
-            self.deleted_keys = []
+                                MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs))
+            self.local_addrs.extend(self.added_addrs)
+            for addr in self.deleted_addrs:
+                self.local_addrs.remove(addr)
+            self.added_addrs = []
+            self.deleted_addrs = []
             self.container.mobile_sequence_changed(self.mobile_seq)
 
-        ##
-        ## If remotes have changed, start the process of updating local bindings
-        ##
-        if self.remote_changed:
-            self.remote_changed = False
-            self._update_remote_keys()
 
-
-    def add_local_address(self, key):
+    def add_local_address(self, addr):
         """
         """
-        if self.local_keys.count(key) == 0:
-            if self.added_keys.count(key) == 0:
-                self.added_keys.append(key)
+        if self.local_addrs.count(addr) == 0:
+            if self.added_addrs.count(addr) == 0:
+                self.added_addrs.append(addr)
         else:
-            if self.deleted_keys.count(key) > 0:
-                self.deleted_keys.remove(key)
+            if self.deleted_addrs.count(addr) > 0:
+                self.deleted_addrs.remove(addr)
 
 
-    def del_local_address(self, key):
+    def del_local_address(self, addr):
         """
         """
-        if self.local_keys.count(key) > 0:
-            if self.deleted_keys.count(key) == 0:
-                self.deleted_keys.append(key)
+        if self.local_addrs.count(addr) > 0:
+            if self.deleted_addrs.count(addr) == 0:
+                self.deleted_addrs.append(addr)
         else:
-            if self.added_keys.count(key) > 0:
-                self.added_keys.remove(key)
+            if self.added_addrs.count(addr) > 0:
+                self.added_addrs.remove(addr)
 
 
     def handle_ra(self, msg, now):
@@ -131,7 +124,8 @@ class MobileAddressEngine(object):
                     return
             self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
             self.remote_last_seen[msg.id] = now
-            self.remote_changed = True
+            (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
+            self._activate_remotes(msg.id, add_list, del_list)
         else:
             ##
             ## Differential MAU
@@ -148,10 +142,12 @@ class MobileAddressEngine(object):
                     if msg.add_list and msg.add_list.__class__ == list:
                         _list.extend(msg.add_list)
                     if msg.del_list and msg.del_list.__class__ == list:
-                        for key in msg.del_list:
-                            _list.remove(key)
+                        for addr in msg.del_list:
+                            _list.remove(addr)
                     self.remote_lists[msg.id] = (msg.mobile_seq, _list)
-                    self.remote_changed = True
+                    self.node_tracker.add_addresses(msg.id, msg.add_list)
+                    self.node_tracker.del_addresses(msg.id, msg.del_list)
+                    self._activate_remotes(msg.id, msg.add_list, msg.del_list)
                 else:
                     self.needed_mars[(msg.id, msg.area, _seq)] = None
             else:
@@ -163,14 +159,7 @@ class MobileAddressEngine(object):
             return
         if msg.have_seq < self.mobile_seq:
             self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
-                                MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
-
-    def _update_remote_keys(self):
-        keys = {}
-        for _id,(seq,key_list) in self.remote_lists.items():
-            keys[_id] = key_list
-        self.container.mobile_keys_changed(keys)
+                                MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs))
 
 
     def _expire_remotes(self, now):
@@ -186,3 +175,11 @@ class MobileAddressEngine(object):
             self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
         self.needed_mars = {}
 
+
+    def _activate_remotes(self, _id, added, deleted):
+        bit = self.node_tracker.maskbit_for_node(_id)
+        for a in added:
+            self.container.router_adapter.map_destination(a, bit)
+        for d in deleted:
+            self.container.router_adapter.unmap_destination(d, bit)
+

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/node.py Tue Oct 15 21:05:50 2013
@@ -113,6 +113,33 @@ class NodeTracker(object):
         return None
 
 
+    def add_addresses(self, node_id, addrs):
+        node = self.nodes[node_id]
+        for a in addrs:
+            node.addrs[a] = 1
+
+
+    def del_addresses(self, node_id, addrs):
+        node = self.nodes[node_id]
+        for a in addrs:
+            node.addrs.pop(a)
+
+
+    def overwrite_addresses(self, node_id, addrs):
+        node    = self.nodes[node_id]
+        added   = []
+        deleted = []
+        for a in addrs:
+            if a not in node.addrs.keys():
+                added.append(a)
+        for a in node.addrs.keys():
+            if a not in addrs:
+                deleted.append(a)
+        for a in addrs:
+            node.addrs[a] = 1
+        return (added, deleted)
+
+
     def _allocate_maskbit(self):
         if self.next_maskbit == None:
             raise Exception("Exceeded Maximum Router Count")
@@ -143,4 +170,5 @@ class RemoteNode(object):
         self.maskbit  = maskbit
         self.neighbor = neighbor
         self.remote   = not neighbor
+        self.addrs    = {}  # Address => Count at Node (1 only for the present)
 

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Tue Oct 15 21:05:50 2013
@@ -27,8 +27,6 @@ from link import LinkStateEngine
 from path import PathEngine
 from mobile import MobileAddressEngine
 from routing import RoutingTableEngine
-from binding import BindingEngine
-from adapter import AdapterEngine
 from node import NodeTracker
 
 ##
@@ -75,10 +73,8 @@ class RouterEngine:
         self.neighbor_engine       = NeighborEngine(self)
         self.link_state_engine     = LinkStateEngine(self)
         self.path_engine           = PathEngine(self)
-        self.mobile_address_engine = MobileAddressEngine(self)
+        self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker)
         self.routing_table_engine  = RoutingTableEngine(self, self.node_tracker)
-        self.binding_engine        = BindingEngine(self)
-        self.adapter_engine        = AdapterEngine(self)
 
 
 
@@ -92,24 +88,26 @@ class RouterEngine:
         return self.id
 
 
-    def addLocalAddress(self, key):
+    def addressAdded(self, addr):
         """
         """
         try:
-            if key.find('_topo') == 0 or key.find('_local') == 0:
+            if addr.find('Mtemp.') == 0:
                 return
-            self.mobile_address_engine.add_local_address(key)
+            if addr.find('M') == 0:
+                self.mobile_address_engine.add_local_address(addr[1:])
         except Exception, e:
             self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
 
 
-    def delLocalAddress(self, key):
+    def addressRemoved(self, addr):
         """
         """
         try:
-            if key.find('_topo') == 0 or key.find('_local') == 0:
+            if addr.find('Mtemp.') == 0:
                 return
-            self.mobile_address_engine.del_local_address(key)
+            if key.find('M') == 0:
+                self.mobile_address_engine.del_local_address(addr[1:])
         except Exception, e:
             self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
 
@@ -124,8 +122,6 @@ class RouterEngine:
             self.path_engine.tick(now)
             self.mobile_address_engine.tick(now)
             self.routing_table_engine.tick(now)
-            self.binding_engine.tick(now)
-            self.adapter_engine.tick(now)
             self.node_tracker.tick(now)
         except Exception, e:
             self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
@@ -190,14 +186,10 @@ class RouterEngine:
             return { 'help'           : "Get list of supported values for kind",
                      'link-state'     : "This router's link state",
                      'link-state-set' : "The set of link states from known routers",
-                     'next-hops'      : "Next hops to each known router",
-                     'topo-table'     : "Topological routing table",
-                     'mobile-table'   : "Mobile key routing table"
+                     'next-hops'      : "Next hops to each known router"
                      }
         if kind == 'link-state'     : return self.neighbor_engine.link_state.to_dict()
         if kind == 'next-hops'      : return self.routing_table_engine.next_hops
-        if kind == 'topo-table'     : return {'table': self.adapter_engine.key_classes['topological']}
-        if kind == 'mobile-table'   : return {'table': self.adapter_engine.key_classes['mobile-key']}
         if kind == 'link-state-set' :
             copy = {}
             for _id,_ls in self.link_state_engine.collection.items():
@@ -249,7 +241,6 @@ class RouterEngine:
     def next_hops_changed(self, next_hop_table):
         self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
         self.routing_table_engine.next_hops_changed(next_hop_table)
-        self.binding_engine.next_hops_changed()
 
     def valid_origins_changed(self, valid_origins):
         self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
@@ -259,17 +250,9 @@ class RouterEngine:
         self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
         self.link_state_engine.set_mobile_sequence(mobile_seq)
 
-    def mobile_keys_changed(self, keys):
-        self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
-        self.binding_engine.mobile_keys_changed(keys)
-
     def get_next_hops(self):
         return self.routing_table_engine.get_next_hops()
 
-    def remote_routes_changed(self, key_class, routes):
-        self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
-        self.adapter_engine.remote_routes_changed(key_class, routes)
-
     def new_neighbor(self, rid, link_id):
         self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
         self.node_tracker.new_neighbor(rid, link_id)

Modified: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_agent.c?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_agent.c Tue Oct 15 21:05:50 2013
@@ -140,6 +140,8 @@ static void dx_router_query_address(dx_r
         dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress);
         dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress);
         dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit);
+        dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container);
+        dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container);
         addr = DEQ_NEXT(addr);
         dx_agent_value_complete(cor, addr != 0);
     }

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Tue Oct 15 21:05:50 2013
@@ -107,7 +107,7 @@ void dx_router_del_node_ref_LH(dx_router
  * Depending on its policy, the address may be eligible for being closed out
  * (i.e. Logging its terminal statistics and freeing its resources).
  */
-static void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
 {
     if (addr == 0)
         return;
@@ -314,7 +314,7 @@ static int router_writable_link_handler(
 }
 
 
-static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop)
 {
     dx_parsed_field_t   *in_da        = dx_message_delivery_annotations(msg);
     dx_composed_field_t *out_da       = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
@@ -333,25 +333,27 @@ static dx_field_iterator_t *router_annot
     //
     // If there is a trace field, append this router's ID to the trace.
     //
+    dx_compose_insert_string(out_da, DX_DA_TRACE);
+    dx_compose_start_list(out_da);
     if (trace) {
-        dx_compose_insert_string(out_da, DX_DA_TRACE);
-        dx_compose_start_list(out_da);
-
         if (dx_parse_is_list(trace)) {
             uint32_t idx = 0;
             dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
             while (trace_item) {
                 dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+                if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix))
+                    *drop = 1;
+                dx_field_iterator_reset(iter);
                 dx_compose_insert_string_iterator(out_da, iter);
                 idx++;
                 trace_item = dx_parse_sub_value(trace, idx);
             }
         }
-
-        dx_compose_insert_string(out_da, direct_prefix);
-        dx_compose_end_list(out_da);
     }
 
+    dx_compose_insert_string(out_da, direct_prefix);
+    dx_compose_end_list(out_da);
+
     //
     // If there is no ingress field, annotate the ingress as this router else
     // keep the original field.
@@ -475,25 +477,26 @@ static void router_rx_handler(void* cont
                 // Interpret and update the delivery annotations of the message.  As a convenience,
                 // this function returns the iterator to the ingress field (if it exists).
                 //
-                dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
+                int drop = 0;
+                dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
 
                 //
                 // Forward to the in-process handler for this message if there is one.  The
                 // actual invocation of the handler will occur later after we've released
                 // the lock.
                 //
-                if (addr->handler) {
+                if (!drop && addr->handler) {
                     in_process_copy = dx_message_copy(msg);
                     handler         = addr->handler;
                     handler_context = addr->handler_context;
-                    addr->deliveries_egress++;
+                    addr->deliveries_to_container++;
                 }
 
                 //
                 // If the address form is local (i.e. is prefixed by _local), don't forward
                 // outside of the router process.
                 //
-                if (!is_local) {
+                if (!drop && !is_local) {
                     //
                     // Forward to all of the local links receiving this address.
                     //
@@ -725,6 +728,7 @@ static int router_outgoing_link_handler(
     const char  *r_src   = pn_terminus_get_address(dx_link_remote_source(link));
     int is_dynamic       = pn_terminus_is_dynamic(dx_link_remote_source(link));
     int is_router        = dx_router_terminus_is_router(dx_link_remote_target(link));
+    int propagate        = 0;
     dx_field_iterator_t *iter = 0;
 
     if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) {
@@ -745,15 +749,15 @@ static int router_outgoing_link_handler(
 
     //
     // If this is an endpoint link with a source address, make sure the address is
-    // appropriate for endpoint links.  If it is not a local or mobile address, (i.e.
-    // a router or area address), it cannot be bound to an endpoint link.
+    // appropriate for endpoint links.  If it is not mobile address, it cannot be
+    // bound to an endpoint link.
     //
     if(r_src && !is_router && !is_dynamic) {
         iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
         unsigned char prefix = dx_field_iterator_octet(iter);
         dx_field_iterator_reset(iter);
 
-        if (prefix != 'L' && prefix != 'M') {
+        if (prefix != 'M') {
             dx_field_iterator_free(iter);
             pn_link_close(pn_link);
             dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
@@ -819,15 +823,26 @@ static int router_outgoing_link_handler(
             dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
             DEQ_INSERT_TAIL(router->addrs, addr);
         }
-        dx_field_iterator_free(iter);
 
         rlink->owning_addr = addr;
         dx_router_add_link_ref_LH(&addr->rlinks, rlink);
+
+        //
+        // If this is not a dynamic address and it is the first local subscription
+        // to the address, supply the address to the router module for propagation
+        // to other nodes.
+        //
+        propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
     }
 
     DEQ_INSERT_TAIL(router->links, rlink);
     sys_mutex_unlock(router->lock);
 
+    if (propagate)
+        dx_router_global_added(router, iter);
+
+    if (iter)
+        dx_field_iterator_free(iter);
     pn_link_open(pn_link);
     return 0;
 }
@@ -1059,6 +1074,8 @@ dx_router_t *dx_router(dx_dispatch_t *dx
     router->dtag               = 1;
     router->pyRouter           = 0;
     router->pyTick             = 0;
+    router->pyAdded            = 0;
+    router->pyRemoved          = 0;
 
     //
     // Create addresses for all of the routers in the topology.  It will be registered
@@ -1172,7 +1189,7 @@ void dx_router_send(dx_dispatch_t       
         //
         // Forward to all of the local links receiving this address.
         //
-        addr->deliveries_ingress++;
+        addr->deliveries_from_container++;
         dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
         while (dest_link_ref) {
             dx_routed_event_t *re = new_dx_routed_event_t();

Modified: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Tue Oct 15 21:05:50 2013
@@ -120,6 +120,8 @@ struct dx_address_t {
     uint64_t deliveries_ingress;
     uint64_t deliveries_egress;
     uint64_t deliveries_transit;
+    uint64_t deliveries_to_container;
+    uint64_t deliveries_from_container;
 };
 
 ALLOC_DECLARE(dx_address_t);
@@ -150,6 +152,8 @@ struct dx_router_t {
 
     PyObject               *pyRouter;
     PyObject               *pyTick;
+    PyObject               *pyAdded;
+    PyObject               *pyRemoved;
 
     dx_agent_class_t       *class_router;
     dx_agent_class_t       *class_link;
@@ -158,11 +162,16 @@ struct dx_router_t {
 };
 
 
+
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr);
 void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
 void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
 
 void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
 void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
 
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter);
+void dx_router_global_removed(dx_router_t *router, const char *addr);
+
 
 #endif

Modified: qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_pynode.c?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_pynode.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_pynode.c Tue Oct 15 21:05:50 2013
@@ -347,15 +347,48 @@ static PyObject* dx_del_neighbor_router(
 
 static PyObject* dx_map_destination(PyObject *self, PyObject *args)
 {
-    //RouterAdapter *adapter = (RouterAdapter*) self;
-    //dx_router_t   *router  = adapter->router;
-    const char *addr;
-    int         router_maskbit;
+    RouterAdapter       *adapter = (RouterAdapter*) self;
+    dx_router_t         *router  = adapter->router;
+    const char          *addr_string;
+    int                  maskbit;
+    dx_address_t        *addr;
+    dx_field_iterator_t *iter;
 
-    if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+    if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
         return 0;
 
-    // TODO
+    if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+        PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+        return 0;
+    }
+        
+    if (router->routers_by_mask_bit[maskbit] == 0) {
+        PyErr_SetString(PyExc_Exception, "Router Not Found");
+        return 0;
+    }
+
+    iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+    sys_mutex_lock(router->lock);
+    dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+    if (!addr) {
+        addr = new_dx_address_t();
+        memset(addr, 0, sizeof(dx_address_t));
+        DEQ_ITEM_INIT(addr);
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
+        dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+        DEQ_ITEM_INIT(addr);
+        DEQ_INSERT_TAIL(router->addrs, addr);
+    }
+    dx_field_iterator_free(iter);
+
+    dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+    dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+    sys_mutex_unlock(router->lock);
+
+    dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -364,15 +397,43 @@ static PyObject* dx_map_destination(PyOb
 
 static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
 {
-    //RouterAdapter *adapter = (RouterAdapter*) self;
-    //dx_router_t   *router  = adapter->router;
-    const char *addr;
-    int         router_maskbit;
+    RouterAdapter *adapter = (RouterAdapter*) self;
+    dx_router_t   *router  = adapter->router;
+    const char    *addr_string;
+    int            maskbit;
+    dx_address_t  *addr;
+
+    if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+        return 0;
 
-    if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+    if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+        PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
         return 0;
+    }
+        
+    if (router->routers_by_mask_bit[maskbit] == 0) {
+        PyErr_SetString(PyExc_Exception, "Router Not Found");
+        return 0;
+    }
+
+    dx_router_node_t    *rnode = router->routers_by_mask_bit[maskbit];
+    dx_field_iterator_t *iter  = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+    sys_mutex_lock(router->lock);
+    dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+    dx_field_iterator_free(iter);
+
+    if (!addr) {
+        PyErr_SetString(PyExc_Exception, "Address Not Found");
+        sys_mutex_unlock(router->lock);
+        return 0;
+    }
+        
+    dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+    dx_router_check_addr_LH(router, addr);
+    sys_mutex_unlock(router->lock);
 
-    // TODO
+    dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -534,6 +595,18 @@ void dx_router_python_setup(dx_router_t 
         dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
         return;
     }
+
+    router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded");
+    if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) {
+        dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method");
+        return;
+    }
+
+    router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved");
+    if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) {
+        dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
+        return;
+    }
 }
 
 
@@ -557,3 +630,35 @@ void dx_pyrouter_tick(dx_router_t *route
     }
 }
 
+
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
+{
+    PyObject *pArgs;
+    PyObject *pValue;
+
+    if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
+        dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+        char *address = (char*) dx_field_iterator_copy(iter);
+
+        dx_python_lock();
+        pArgs  = PyTuple_New(1);
+        PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
+        pValue = PyObject_CallObject(router->pyAdded, pArgs);
+        if (PyErr_Occurred()) {
+            PyErr_Print();
+        }
+        Py_DECREF(pArgs);
+        if (pValue) {
+            Py_DECREF(pValue);
+        }
+        dx_python_unlock();
+
+        free(address);
+    }
+}
+
+
+void dx_router_global_removed(dx_router_t *router, const char *addr)
+{
+}
+

Modified: qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py (original)
+++ qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py Tue Oct 15 21:05:50 2013
@@ -370,7 +370,7 @@ class RouterTest(unittest.TestCase):
       da = rm.instructions
       self.assertEqual(da.__class__, dict)
       self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
-      self.assertFalse('qdx.trace' in da)
+      self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
 
     ##
     ## Pre-existing ingress
@@ -388,7 +388,7 @@ class RouterTest(unittest.TestCase):
       da = rm.instructions
       self.assertEqual(da.__class__, dict)
       self.assertEqual(da['qdx.ingress'], 'ingress-router')
-      self.assertFalse('qdx.trace' in da)
+      self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
 
     ##
     ## Invalid trace type

Modified: qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat?rev=1532528&r1=1532527&r2=1532528&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat (original)
+++ qpid/trunk/qpid/extras/dispatch/tools/src/py/qdstat Tue Oct 15 21:05:50 2013
@@ -102,7 +102,7 @@ class BusManager:
         self.M.start()
         self.M.route("amqp:/*", "amqp://%s/$1" % host)
         self.address = "amqp:/_local/agent"
-        self.reply   = "amqp:/reply-address/0001"    # FIX THIS!
+        self.reply   = "amqp:/temp.reply-address/0001"    # FIX THIS!
         self.M.subscribe(self.reply)
 
     def Disconnect(self):
@@ -217,6 +217,8 @@ class BusManager:
         heads.append(Header("in", Header.COMMAS))
         heads.append(Header("out", Header.COMMAS))
         heads.append(Header("thru", Header.COMMAS))
+        heads.append(Header("to-proc", Header.COMMAS))
+        heads.append(Header("from-proc", Header.COMMAS))
         rows = []
 
         request = Message()
@@ -243,6 +245,8 @@ class BusManager:
             row.append(addr['deliveries-ingress'])
             row.append(addr['deliveries-egress'])
             row.append(addr['deliveries-transit'])
+            row.append(addr['deliveries-to-container'])
+            row.append(addr['deliveries-from-container'])
             rows.append(row)
         title = "Router Addresses"
         sorter = Sorter(heads, rows, 'address', 0, True)
@@ -316,7 +320,6 @@ def main(argv=None):
         print
     except Exception,e:
         print "Failed: %s - %s" % (e.__class__.__name__, e)
-        raise
 
     bm.Disconnect()   # try to deallocate brokers
     return 1



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org