You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/02/04 23:26:07 UTC
svn commit: r1564538 - in /qpid/dispatch/trunk:
python/qpid_dispatch_internal/router/ src/
Author: tross
Date: Tue Feb 4 22:26:07 2014
New Revision: 1564538
URL: http://svn.apache.org/r1564538
Log:
DISPATCH-22 - Use inter-router link loss as an immediate indicator of a lost
neighbor. This greatly reduces the time to recompute after the
loss of a router in a topology.
Modified:
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
qpid/dispatch/trunk/src/python_embedded.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/router_pynode.c
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Tue Feb 4 22:26:07 2014
@@ -119,6 +119,13 @@ class RouterEngine:
traceback.print_tb(exc_traceback)
+ def linkLost(self, link_id):
+ """
+ """
+ self.log(LOG_INFO, "Router Link Lost - link_id=%d" % link_id)
+ self.neighbor_engine.linkLost(link_id)
+
+
def handleTimerTick(self):
"""
"""
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py Tue Feb 4 22:26:07 2014
@@ -66,9 +66,18 @@ class NeighborEngine(object):
self.link_state_changed = True
self.container.new_neighbor(msg.id, link_id)
self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id))
- ##
- ## TODO - Use this function to detect area boundaries
- ##
+
+ def linkLost(self, link_id):
+ node_id = self.container.node_tracker.link_id_to_node_id(link_id)
+ if node_id:
+ self._delete_neighbor(node_id)
+
+ def _delete_neighbor(self, key):
+ self.hellos.pop(key)
+ if self.link_state.del_peer(key):
+ self.link_state_changed = True
+ self.container.lost_neighbor(key)
+ self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
def _expire_hellos(self, now):
to_delete = []
@@ -76,8 +85,5 @@ class NeighborEngine(object):
if now - last_seen > self.hello_max_age:
to_delete.append(key)
for key in to_delete:
- self.hellos.pop(key)
- if self.link_state.del_peer(key):
- self.link_state_changed = True
- self.container.lost_neighbor(key)
- self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+ self._delete_neighbor(key)
+
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py Tue Feb 4 22:26:07 2014
@@ -32,11 +32,12 @@ class NodeTracker(object):
The mask bit is used in the main router to represent sets of valid destinations for addresses.
"""
def __init__(self, container, max_routers):
- self.container = container
- self.max_routers = max_routers
- self.nodes = {} # id => RemoteNode
- self.maskbits = []
- self.next_maskbit = 1 # Reserve bit '0' to represent this router
+ self.container = container
+ self.max_routers = max_routers
+ self.nodes = {} # id => RemoteNode
+ self.nodes_by_link_id = {} # link-id => node-id
+ self.maskbits = []
+ self.next_maskbit = 1 # Reserve bit '0' to represent this router
for i in range(max_routers):
self.maskbits.append(None)
self.maskbits[0] = True
@@ -51,14 +52,16 @@ class NodeTracker(object):
A node, designated by node_id, has been discovered as a neighbor over a link with
a maskbit of link_maskbit.
"""
+ self.nodes_by_link_id[link_maskbit] = node_id
if node_id in self.nodes:
node = self.nodes[node_id]
if node.neighbor:
return
self.container.del_remote_router(node.maskbit)
node.neighbor = True
+ node.link_id = link_maskbit
else:
- node = RemoteNode(node_id, self._allocate_maskbit(), True)
+ node = RemoteNode(node_id, self._allocate_maskbit(), True, link_maskbit)
self.nodes[node_id] = node
self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit)
@@ -69,6 +72,8 @@ class NodeTracker(object):
"""
node = self.nodes[node_id]
node.neighbor = False
+ self.nodes_by_link_id.pop(node.link_id)
+ node.link_id = None
self.container.del_neighbor_router(node.maskbit)
if node.remote:
self.container.add_remote_router(self._address(node.id), node.maskbit)
@@ -83,7 +88,7 @@ class NodeTracker(object):
remote peer.
"""
if node_id not in self.nodes:
- node = RemoteNode(node_id, self._allocate_maskbit(), False)
+ node = RemoteNode(node_id, self._allocate_maskbit(), False, None)
self.nodes[node_id] = node
self.container.add_remote_router(self._address(node.id), node.maskbit)
else:
@@ -140,6 +145,12 @@ class NodeTracker(object):
return (added, deleted)
+ def link_id_to_node_id(self, link_id):
+ if link_id in self.nodes_by_link_id:
+ return self.nodes_by_link_id[link_id]
+ return None
+
+
def _allocate_maskbit(self):
if self.next_maskbit == None:
raise Exception("Exceeded Maximum Router Count")
@@ -165,10 +176,11 @@ class NodeTracker(object):
class RemoteNode(object):
- def __init__(self, node_id, maskbit, neighbor):
+ def __init__(self, node_id, maskbit, neighbor, link_id):
self.id = node_id
self.maskbit = maskbit
self.neighbor = neighbor
self.remote = not neighbor
+ self.link_id = link_id
self.addrs = {} # Address => Count at Node (1 only for the present)
Modified: qpid/dispatch/trunk/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Tue Feb 4 22:26:07 2014
@@ -31,7 +31,7 @@
static qd_dispatch_t *dispatch = 0;
static uint32_t ref_count = 0;
-static sys_mutex_t *lock = 0;
+static sys_mutex_t *ilock = 0;
static qd_log_source_t *log_source = 0;
static PyObject *dispatch_module = 0;
static PyObject *dispatch_python_pkgdir = 0;
@@ -46,7 +46,7 @@ void qd_python_initialize(qd_dispatch_t
{
log_source = qd_log_source("PYTHON");
dispatch = qd;
- lock = sys_mutex();
+ ilock = sys_mutex();
if (python_pkgdir)
dispatch_python_pkgdir = PyString_FromString(python_pkgdir);
}
@@ -55,26 +55,26 @@ void qd_python_initialize(qd_dispatch_t
void qd_python_finalize(void)
{
assert(ref_count == 0);
- sys_mutex_free(lock);
+ sys_mutex_free(ilock);
}
void qd_python_start(void)
{
- sys_mutex_lock(lock);
+ sys_mutex_lock(ilock);
if (ref_count == 0) {
Py_Initialize();
qd_python_setup();
qd_log(log_source, QD_LOG_TRACE, "Embedded Python Interpreter Initialized");
}
ref_count++;
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(ilock);
}
void qd_python_stop(void)
{
- sys_mutex_lock(lock);
+ sys_mutex_lock(ilock);
ref_count--;
if (ref_count == 0) {
Py_DECREF(dispatch_module);
@@ -82,7 +82,7 @@ void qd_python_stop(void)
Py_Finalize();
qd_log(log_source, QD_LOG_TRACE, "Embedded Python Interpreter Shut Down");
}
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(ilock);
}
@@ -461,7 +461,7 @@ static void qd_io_rx_handler(void *conte
return;
}
- sys_mutex_lock(lock);
+ sys_mutex_lock(ilock);
PyObject *pAP = qd_field_to_py(ap_map);
PyObject *pBody = qd_field_to_py(body_map);
@@ -475,7 +475,7 @@ static void qd_io_rx_handler(void *conte
if (pValue) {
Py_DECREF(pValue);
}
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(ilock);
qd_field_iterator_free(ap);
qd_field_iterator_free(body);
@@ -685,11 +685,11 @@ static void qd_python_setup(void)
void qd_python_lock(void)
{
- sys_mutex_lock(lock);
+ sys_mutex_lock(ilock);
}
void qd_python_unlock(void)
{
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(ilock);
}
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Tue Feb 4 22:26:07 2014
@@ -1105,6 +1105,7 @@ static int router_link_detach_handler(vo
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
qd_router_conn_t *shared = (qd_router_conn_t*) qd_link_get_conn_context(link);
qd_address_t *oaddr = 0;
+ int lost_link_mask_bit = -1;
if (shared) {
qd_link_set_conn_context(link, 0);
@@ -1138,8 +1139,10 @@ static int router_link_detach_handler(vo
//
// If this is an incoming inter-router link, we must free the mask_bit.
//
- if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_INCOMING)
+ if (rlink->link_type == QD_LINK_ROUTER && rlink->link_direction == QD_INCOMING) {
+ lost_link_mask_bit = rlink->mask_bit;
qd_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
+ }
//
// Remove the link from the master list-of-links.
@@ -1157,6 +1160,13 @@ static int router_link_detach_handler(vo
free(rlink->target);
free_qd_router_link_t(rlink);
+ //
+ // If we lost the link to a neighbor router, notify the route engine so it doesn't
+ // have to wait for the HELLO timeout to expire.
+ //
+ if (lost_link_mask_bit >= 0)
+ qd_router_link_lost(router, lost_link_mask_bit);
+
return 0;
}
Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Tue Feb 4 22:26:07 2014
@@ -201,6 +201,6 @@ void qd_router_del_node_ref_LH(qd_router
void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter);
void qd_router_mobile_removed(qd_router_t *router, const char *addr);
-
+void qd_router_link_lost(qd_router_t *router, int link_mask_bit);
#endif
Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1564538&r1=1564537&r2=1564538&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Tue Feb 4 22:26:07 2014
@@ -34,6 +34,7 @@ static PyObject *pyRouter = 0;
static PyObject *pyTick = 0;
static PyObject *pyAdded = 0;
static PyObject *pyRemoved = 0;
+static PyObject *pyLinkLost = 0;
@@ -622,6 +623,12 @@ void qd_router_python_setup(qd_router_t
qd_log(log_source, QD_LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
return;
}
+
+ pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost");
+ if (!pyLinkLost || !PyCallable_Check(pyLinkLost)) {
+ qd_log(log_source, QD_LOG_CRITICAL, "'RouterEngine' class has no linkLost method");
+ return;
+ }
}
@@ -694,3 +701,25 @@ void qd_router_mobile_removed(qd_router_
}
}
+
+void qd_router_link_lost(qd_router_t *router, int link_mask_bit)
+{
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
+ pValue = PyObject_CallObject(pyLinkLost, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
+ qd_python_unlock();
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org