You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/02/04 23:26:07 UTC

svn commit: r1564538 - in /qpid/dispatch/trunk: python/qpid_dispatch_internal/router/ src/

Author: tross
Date: Tue Feb  4 22:26:07 2014
New Revision: 1564538

URL: http://svn.apache.org/r1564538
Log:
DISPATCH-22 - Use inter-router link loss as an immediate indicator of a lost
              neighbor.  This greatly reduces the time to recompute after the
              loss of a router in a topology.

Modified:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
    qpid/dispatch/trunk/src/python_embedded.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h
    qpid/dispatch/trunk/src/router_pynode.c

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Tue Feb  4 22:26:07 2014
@@ -119,6 +119,13 @@ class RouterEngine:
             traceback.print_tb(exc_traceback)
 
 
+    def linkLost(self, link_id):
+        """
+        """
+        self.log(LOG_INFO, "Router Link Lost - link_id=%d" % link_id)
+        self.neighbor_engine.linkLost(link_id)
+
+
     def handleTimerTick(self):
         """
         """

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py Tue Feb  4 22:26:07 2014
@@ -66,9 +66,18 @@ class NeighborEngine(object):
                 self.link_state_changed = True
                 self.container.new_neighbor(msg.id, link_id)
                 self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id))
-        ##
-        ## TODO - Use this function to detect area boundaries
-        ##
+
+    def linkLost(self, link_id):
+        node_id = self.container.node_tracker.link_id_to_node_id(link_id)
+        if node_id:
+            self._delete_neighbor(node_id)
+
+    def _delete_neighbor(self, key):
+        self.hellos.pop(key)
+        if self.link_state.del_peer(key):
+            self.link_state_changed = True
+            self.container.lost_neighbor(key)
+            self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
 
     def _expire_hellos(self, now):
         to_delete = []
@@ -76,8 +85,5 @@ class NeighborEngine(object):
             if now - last_seen > self.hello_max_age:
                 to_delete.append(key)
         for key in to_delete:
-            self.hellos.pop(key)
-            if self.link_state.del_peer(key):
-                self.link_state_changed = True
-                self.container.lost_neighbor(key)
-                self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+            self._delete_neighbor(key)
+

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py Tue Feb  4 22:26:07 2014
@@ -32,11 +32,12 @@ class NodeTracker(object):
     The mask bit is used in the main router to represent sets of valid destinations for addresses.
     """
     def __init__(self, container, max_routers):
-        self.container    = container
-        self.max_routers  = max_routers
-        self.nodes        = {}  # id => RemoteNode
-        self.maskbits     = []
-        self.next_maskbit = 1   # Reserve bit '0' to represent this router
+        self.container        = container
+        self.max_routers      = max_routers
+        self.nodes            = {}  # id => RemoteNode
+        self.nodes_by_link_id = {}  # link-id => node-id
+        self.maskbits         = []
+        self.next_maskbit     = 1   # Reserve bit '0' to represent this router
         for i in range(max_routers):
             self.maskbits.append(None)
         self.maskbits[0] = True
@@ -51,14 +52,16 @@ class NodeTracker(object):
         A node, designated by node_id, has been discovered as a neighbor over a link with
         a maskbit of link_maskbit.
         """
+        self.nodes_by_link_id[link_maskbit] = node_id
         if node_id in self.nodes:
             node = self.nodes[node_id]
             if node.neighbor:
                 return
             self.container.del_remote_router(node.maskbit)
             node.neighbor = True
+            node.link_id  = link_maskbit
         else:
-            node = RemoteNode(node_id, self._allocate_maskbit(), True)
+            node = RemoteNode(node_id, self._allocate_maskbit(), True, link_maskbit)
             self.nodes[node_id] = node
         self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit)
 
@@ -69,6 +72,8 @@ class NodeTracker(object):
         """
         node = self.nodes[node_id]
         node.neighbor = False
+        self.nodes_by_link_id.pop(node.link_id)
+        node.link_id = None
         self.container.del_neighbor_router(node.maskbit)
         if node.remote:
             self.container.add_remote_router(self._address(node.id), node.maskbit)
@@ -83,7 +88,7 @@ class NodeTracker(object):
         remote peer.
         """
         if node_id not in self.nodes:
-            node = RemoteNode(node_id, self._allocate_maskbit(), False)
+            node = RemoteNode(node_id, self._allocate_maskbit(), False, None)
             self.nodes[node_id] = node
             self.container.add_remote_router(self._address(node.id), node.maskbit)
         else:
@@ -140,6 +145,12 @@ class NodeTracker(object):
         return (added, deleted)
 
 
+    def link_id_to_node_id(self, link_id):
+        if link_id in self.nodes_by_link_id:
+            return self.nodes_by_link_id[link_id]
+        return None
+
+
     def _allocate_maskbit(self):
         if self.next_maskbit == None:
             raise Exception("Exceeded Maximum Router Count")
@@ -165,10 +176,11 @@ class NodeTracker(object):
 
 class RemoteNode(object):
 
-    def __init__(self, node_id, maskbit, neighbor):
+    def __init__(self, node_id, maskbit, neighbor, link_id):
         self.id       = node_id
         self.maskbit  = maskbit
         self.neighbor = neighbor
         self.remote   = not neighbor
+        self.link_id  = link_id
         self.addrs    = {}  # Address => Count at Node (1 only for the present)
 

Modified: qpid/dispatch/trunk/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Tue Feb  4 22:26:07 2014
@@ -31,7 +31,7 @@
 
 static qd_dispatch_t   *dispatch   = 0;
 static uint32_t         ref_count  = 0;
-static sys_mutex_t     *lock       = 0;
+static sys_mutex_t     *ilock      = 0;
 static qd_log_source_t *log_source = 0;
 static PyObject        *dispatch_module = 0;
 static PyObject        *dispatch_python_pkgdir = 0;
@@ -46,7 +46,7 @@ void qd_python_initialize(qd_dispatch_t 
 {
     log_source = qd_log_source("PYTHON");
     dispatch = qd;
-    lock = sys_mutex();
+    ilock = sys_mutex();
     if (python_pkgdir)
         dispatch_python_pkgdir = PyString_FromString(python_pkgdir);
 }
@@ -55,26 +55,26 @@ void qd_python_initialize(qd_dispatch_t 
 void qd_python_finalize(void)
 {
     assert(ref_count == 0);
-    sys_mutex_free(lock);
+    sys_mutex_free(ilock);
 }
 
 
 void qd_python_start(void)
 {
-    sys_mutex_lock(lock);
+    sys_mutex_lock(ilock);
     if (ref_count == 0) {
         Py_Initialize();
         qd_python_setup();
         qd_log(log_source, QD_LOG_TRACE, "Embedded Python Interpreter Initialized");
     }
     ref_count++;
-    sys_mutex_unlock(lock);
+    sys_mutex_unlock(ilock);
 }
 
 
 void qd_python_stop(void)
 {
-    sys_mutex_lock(lock);
+    sys_mutex_lock(ilock);
     ref_count--;
     if (ref_count == 0) {
         Py_DECREF(dispatch_module);
@@ -82,7 +82,7 @@ void qd_python_stop(void)
         Py_Finalize();
         qd_log(log_source, QD_LOG_TRACE, "Embedded Python Interpreter Shut Down");
     }
-    sys_mutex_unlock(lock);
+    sys_mutex_unlock(ilock);
 }
 
 
@@ -461,7 +461,7 @@ static void qd_io_rx_handler(void *conte
         return;
     }
 
-    sys_mutex_lock(lock);
+    sys_mutex_lock(ilock);
     PyObject *pAP   = qd_field_to_py(ap_map);
     PyObject *pBody = qd_field_to_py(body_map);
 
@@ -475,7 +475,7 @@ static void qd_io_rx_handler(void *conte
     if (pValue) {
         Py_DECREF(pValue);
     }
-    sys_mutex_unlock(lock);
+    sys_mutex_unlock(ilock);
 
     qd_field_iterator_free(ap);
     qd_field_iterator_free(body);
@@ -685,11 +685,11 @@ static void qd_python_setup(void)
 
 void qd_python_lock(void)
 {
-    sys_mutex_lock(lock);
+    sys_mutex_lock(ilock);
 }
 
 void qd_python_unlock(void)
 {
-    sys_mutex_unlock(lock);
+    sys_mutex_unlock(ilock);
 }
 

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Tue Feb  4 22:26:07 2014
@@ -1105,6 +1105,7 @@ static int router_link_detach_handler(vo
     qd_router_link_t *rlink  = (qd_router_link_t*) qd_link_get_context(link);
     qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
     qd_address_t     *oaddr  = 0;
+    int               lost_link_mask_bit = -1;
 
     if (shared) {
         qd_link_set_conn_context(link, 0);
@@ -1138,8 +1139,10 @@ static int router_link_detach_handler(vo
     //
     // If this is an incoming inter-router link, we must free the mask_bit.
     //
-    if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_INCOMING)
+    if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_INCOMING) {
+        lost_link_mask_bit = rlink->mask_bit;
         qd_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
+    }
 
     //
     // Remove the link from the master list-of-links.
@@ -1157,6 +1160,13 @@ static int router_link_detach_handler(vo
         free(rlink->target);
     free_qd_router_link_t(rlink);
 
+    //
+    // If we lost the link to a neighbor router, notify the route engine so it doesn't
+    // have to wait for the HELLO timeout to expire.
+    //
+    if (lost_link_mask_bit >= 0)
+        qd_router_link_lost(router, lost_link_mask_bit);
+
     return 0;
 }
 

Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Tue Feb  4 22:26:07 2014
@@ -201,6 +201,6 @@ void qd_router_del_node_ref_LH(qd_router
 
 void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter);
 void qd_router_mobile_removed(qd_router_t *router, const char *addr);
-
+void qd_router_link_lost(qd_router_t *router, int link_mask_bit);
 
 #endif

Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Tue Feb  4 22:26:07 2014
@@ -34,6 +34,7 @@ static PyObject        *pyRouter   = 0;
 static PyObject        *pyTick     = 0;
 static PyObject        *pyAdded    = 0;
 static PyObject        *pyRemoved  = 0;
+static PyObject        *pyLinkLost = 0;
 
 
 
@@ -622,6 +623,12 @@ void qd_router_python_setup(qd_router_t 
         qd_log(log_source, QD_LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
         return;
     }
+
+    pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost");
+    if (!pyLinkLost || !PyCallable_Check(pyLinkLost)) {
+        qd_log(log_source, QD_LOG_CRITICAL, "'RouterEngine' class has no linkLost method");
+        return;
+    }
 }
 
 
@@ -694,3 +701,25 @@ void qd_router_mobile_removed(qd_router_
     }
 }
 
+
+void qd_router_link_lost(qd_router_t *router, int link_mask_bit)
+{
+    PyObject *pArgs;
+    PyObject *pValue;
+
+    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+        qd_python_lock();
+        pArgs = PyTuple_New(1);
+        PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
+        pValue = PyObject_CallObject(pyLinkLost, pArgs);
+        if (PyErr_Occurred()) {
+            PyErr_Print();
+        }
+        Py_DECREF(pArgs);
+        if (pValue) {
+            Py_DECREF(pValue);
+        }
+        qd_python_unlock();
+    }
+}
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org