You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 00:05:07 UTC
svn commit: r1534394 [5/22] - in /qpid/branches/linearstore/qpid: ./ cpp/
cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/
cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/
cpp/examples/qmf-agent/ cpp/include/qpid/ cp...
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py Mon Oct 21 22:04:51 2013
@@ -20,169 +20,170 @@
from data import MessageRA, MessageMAR, MessageMAU
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class MobileAddressEngine(object):
- """
- This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
- It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
- Note that this routing table maps from the mobile address to the remote router where that address
- is directly bound.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
- self.mobile_seq = 0
- self.local_keys = []
- self.added_keys = []
- self.deleted_keys = []
- self.remote_lists = {} # map router_id => (sequence, list of keys)
- self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
- self.remote_changed = False
- self.needed_mars = {}
-
-
- def tick(self, now):
- self._expire_remotes(now)
- self._send_mars()
-
- ##
- ## If local keys have changed, collect the changes and send a MAU with the diffs
- ## Note: it is important that the differential-MAU be sent before a RA is sent
- ##
- if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
- self.mobile_seq += 1
- self.container.send('_topo.%s.all' % self.area,
- MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
- self.local_keys.extend(self.added_keys)
- for key in self.deleted_keys:
- self.local_keys.remove(key)
- self.added_keys = []
- self.deleted_keys = []
- self.container.mobile_sequence_changed(self.mobile_seq)
-
- ##
- ## If remotes have changed, start the process of updating local bindings
- ##
- if self.remote_changed:
- self.remote_changed = False
- self._update_remote_keys()
-
-
- def add_local_address(self, key):
"""
+ This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
+ It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
+ Note that this routing table maps from the mobile address to the remote router where that address
+ is directly bound.
"""
- if self.local_keys.count(key) == 0:
- if self.added_keys.count(key) == 0:
- self.added_keys.append(key)
- else:
- if self.deleted_keys.count(key) > 0:
- self.deleted_keys.remove(key)
+ def __init__(self, container, node_tracker):
+ self.container = container
+ self.node_tracker = node_tracker
+ self.id = self.container.id
+ self.area = self.container.area
+ self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
+ self.mobile_seq = 0
+ self.local_addrs = []
+ self.added_addrs = []
+ self.deleted_addrs = []
+ self.remote_lists = {} # map router_id => (sequence, list of addrs)
+ self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
+ self.needed_mars = {}
+
+
+ def tick(self, now):
+ self._expire_remotes(now)
+ self._send_mars()
+
+ ##
+ ## If local addrs have changed, collect the changes and send a MAU with the diffs
+ ## Note: it is important that the differential-MAU be sent before a RA is sent
+ ##
+ if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
+ self.mobile_seq += 1
+ self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
+ MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs))
+ self.local_addrs.extend(self.added_addrs)
+ for addr in self.deleted_addrs:
+ self.local_addrs.remove(addr)
+ self.added_addrs = []
+ self.deleted_addrs = []
+ self.container.mobile_sequence_changed(self.mobile_seq)
+
+
+ def add_local_address(self, addr):
+ """
+ """
+ if self.local_addrs.count(addr) == 0:
+ if self.added_addrs.count(addr) == 0:
+ self.added_addrs.append(addr)
+ else:
+ if self.deleted_addrs.count(addr) > 0:
+ self.deleted_addrs.remove(addr)
- def del_local_address(self, key):
- """
- """
- if self.local_keys.count(key) > 0:
- if self.deleted_keys.count(key) == 0:
- self.deleted_keys.append(key)
- else:
- if self.added_keys.count(key) > 0:
- self.added_keys.remove(key)
-
-
- def handle_ra(self, msg, now):
- if msg.id == self.id:
- return
-
- if msg.mobile_seq == 0:
- return
-
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- self.remote_last_seen[msg.id] = now
- if _seq < msg.mobile_seq:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
- def handle_mau(self, msg, now):
- ##
- ## If the MAU is differential, we can only use it if its sequence is exactly one greater
- ## than our stored sequence. If not, we will ignore the content and schedule a MAR.
- ##
- ## If the MAU is absolute, we can use it in all cases.
- ##
- if msg.id == self.id:
- return
-
- if msg.exist_list:
- ##
- ## Absolute MAU
- ##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq >= msg.mobile_seq: # ignore duplicates
- return
- self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
- self.remote_last_seen[msg.id] = now
- self.remote_changed = True
- else:
- ##
- ## Differential MAU
- ##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq == msg.mobile_seq: # ignore duplicates
- return
- self.remote_last_seen[msg.id] = now
- if _seq + 1 == msg.mobile_seq:
- ##
- ## This is one greater than our stored value, incorporate the deltas
- ##
- if msg.add_list and msg.add_list.__class__ == list:
- _list.extend(msg.add_list)
- if msg.del_list and msg.del_list.__class__ == list:
- for key in msg.del_list:
- _list.remove(key)
- self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- self.remote_changed = True
+ def del_local_address(self, addr):
+ """
+ """
+ if self.local_addrs.count(addr) > 0:
+ if self.deleted_addrs.count(addr) == 0:
+ self.deleted_addrs.append(addr)
+ else:
+ if self.added_addrs.count(addr) > 0:
+ self.added_addrs.remove(addr)
+
+
+ def handle_ra(self, msg, now):
+ if msg.id == self.id:
+ return
+
+ if msg.mobile_seq == 0:
+ return
+
+ if msg.id in self.remote_lists:
+ _seq, _list = self.remote_lists[msg.id]
+ self.remote_last_seen[msg.id] = now
+ if _seq < msg.mobile_seq:
+ self.needed_mars[(msg.id, msg.area, _seq)] = None
+ else:
+ self.needed_mars[(msg.id, msg.area, 0)] = None
+
+
+ def handle_mau(self, msg, now):
+ ##
+ ## If the MAU is differential, we can only use it if its sequence is exactly one greater
+ ## than our stored sequence. If not, we will ignore the content and schedule a MAR.
+ ##
+ ## If the MAU is absolute, we can use it in all cases.
+ ##
+ if msg.id == self.id:
+ return
+
+ if msg.exist_list:
+ ##
+ ## Absolute MAU
+ ##
+ if msg.id in self.remote_lists:
+ _seq, _list = self.remote_lists[msg.id]
+ if _seq >= msg.mobile_seq: # ignore duplicates
+ return
+ self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
+ self.remote_last_seen[msg.id] = now
+ (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
+ self._activate_remotes(msg.id, add_list, del_list)
else:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
- def handle_mar(self, msg, now):
- if msg.id == self.id:
- return
- if msg.have_seq < self.mobile_seq:
- self.container.send('_topo.%s.%s' % (msg.area, msg.id),
- MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
-
- def _update_remote_keys(self):
- keys = {}
- for _id,(seq,key_list) in self.remote_lists.items():
- keys[_id] = key_list
- self.container.mobile_keys_changed(keys)
-
-
- def _expire_remotes(self, now):
- for _id, t in self.remote_last_seen.items():
- if now - t > self.mobile_addr_max_age:
- self.remote_lists.pop(_id)
- self.remote_last_seen.pop(_id)
- self.remote_changed = True
-
-
- def _send_mars(self):
- for _id, _area, _seq in self.needed_mars.keys():
- self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
- self.needed_mars = {}
+ ##
+ ## Differential MAU
+ ##
+ if msg.id in self.remote_lists:
+ _seq, _list = self.remote_lists[msg.id]
+ if _seq == msg.mobile_seq: # ignore duplicates
+ return
+ self.remote_last_seen[msg.id] = now
+ if _seq + 1 == msg.mobile_seq:
+ ##
+ ## This is one greater than our stored value, incorporate the deltas
+ ##
+ if msg.add_list and msg.add_list.__class__ == list:
+ _list.extend(msg.add_list)
+ if msg.del_list and msg.del_list.__class__ == list:
+ for addr in msg.del_list:
+ _list.remove(addr)
+ self.remote_lists[msg.id] = (msg.mobile_seq, _list)
+ if msg.add_list:
+ self.node_tracker.add_addresses(msg.id, msg.add_list)
+ if msg.del_list:
+ self.node_tracker.del_addresses(msg.id, msg.del_list)
+ self._activate_remotes(msg.id, msg.add_list, msg.del_list)
+ else:
+ self.needed_mars[(msg.id, msg.area, _seq)] = None
+ else:
+ self.needed_mars[(msg.id, msg.area, 0)] = None
+
+
+ def handle_mar(self, msg, now):
+ if msg.id == self.id:
+ return
+ if msg.have_seq < self.mobile_seq:
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
+ MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs))
+
+
+ def _expire_remotes(self, now):
+ for _id, t in self.remote_last_seen.items():
+ if now - t > self.mobile_addr_max_age:
+ self.remote_lists.pop(_id)
+ self.remote_last_seen.pop(_id)
+ self.remote_changed = True
+
+
+ def _send_mars(self):
+ for _id, _area, _seq in self.needed_mars.keys():
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
+ self.needed_mars = {}
+
+
+ def _activate_remotes(self, _id, added, deleted):
+ bit = self.node_tracker.maskbit_for_node(_id)
+ if added:
+ for a in added:
+ self.container.router_adapter.map_destination(a, bit)
+ if deleted:
+ for d in deleted:
+ self.container.router_adapter.unmap_destination(d, bit)
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py Mon Oct 21 22:04:51 2013
@@ -21,63 +21,63 @@ from data import LinkState, MessageHELLO
from time import time
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class NeighborEngine(object):
- """
- This module is responsible for maintaining this router's link-state. It runs the HELLO protocol
- with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
- link-state) changes.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.last_hello_time = 0.0
- self.hello_interval = container.config.hello_interval
- self.hello_max_age = container.config.hello_max_age
- self.hellos = {}
- self.link_state_changed = False
- self.link_state = LinkState(None, self.id, self.area, 0, [])
-
-
- def tick(self, now):
- self._expire_hellos(now)
-
- if now - self.last_hello_time >= self.hello_interval:
- self.last_hello_time = now
- self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
-
- if self.link_state_changed:
- self.link_state_changed = False
- self.link_state.bump_sequence()
- self.container.local_link_state_changed(self.link_state)
-
-
- def handle_hello(self, msg, now):
- if msg.id == self.id:
- return
- self.hellos[msg.id] = now
- if msg.is_seen(self.id):
- if self.link_state.add_peer(msg.id):
- self.link_state_changed = True
- self.container.new_neighbor(msg.id)
- self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id)
- ##
- ## TODO - Use this function to detect area boundaries
- ##
-
- def _expire_hellos(self, now):
- to_delete = []
- for key, last_seen in self.hellos.items():
- if now - last_seen > self.hello_max_age:
- to_delete.append(key)
- for key in to_delete:
- self.hellos.pop(key)
- if self.link_state.del_peer(key):
- self.link_state_changed = True
- self.container.lost_neighbor(key)
- self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+ """
+ This module is responsible for maintaining this router's link-state. It runs the HELLO protocol
+ with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
+ link-state) changes.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.last_hello_time = 0.0
+ self.hello_interval = container.config.hello_interval
+ self.hello_max_age = container.config.hello_max_age
+ self.hellos = {}
+ self.link_state_changed = False
+ self.link_state = LinkState(None, self.id, self.area, 0, [])
+
+
+ def tick(self, now):
+ self._expire_hellos(now)
+
+ if now - self.last_hello_time >= self.hello_interval:
+ self.last_hello_time = now
+ self.container.send('amqp:/_local/qdxhello', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
+
+ if self.link_state_changed:
+ self.link_state_changed = False
+ self.link_state.bump_sequence()
+ self.container.local_link_state_changed(self.link_state)
+
+
+ def handle_hello(self, msg, now, link_id):
+ if msg.id == self.id:
+ return
+ self.hellos[msg.id] = now
+ if msg.is_seen(self.id):
+ if self.link_state.add_peer(msg.id):
+ self.link_state_changed = True
+ self.container.new_neighbor(msg.id, link_id)
+ self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id))
+ ##
+ ## TODO - Use this function to detect area boundaries
+ ##
+
+ def _expire_hellos(self, now):
+ to_delete = []
+ for key, last_seen in self.hellos.items():
+ if now - last_seen > self.hello_max_age:
+ to_delete.append(key)
+ for key in to_delete:
+ self.hellos.pop(key)
+ if self.link_state.del_peer(key):
+ self.link_state_changed = True
+ self.container.lost_neighbor(key)
+ self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py Mon Oct 21 22:04:51 2013
@@ -18,86 +18,157 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class NodeTracker(object):
- """
- This module is responsible for tracking the set of router nodes that are known to this
- router. It tracks whether they are neighbor or remote and whether they are reachable.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.nodes = {} # id => RemoteNode
-
-
- def tick(self, now):
- pass
-
-
- def new_neighbor(self, node_id):
- if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id)
- self.nodes[node_id].set_neighbor()
- self._notify(self.nodes[node_id])
-
-
- def lost_neighbor(self, node_id):
- node = self.nodes[node_id]
- node.clear_neighbor()
- self._notify(node)
- if node.to_delete():
- self.nodes.pop(node_id)
-
-
- def new_node(self, node_id):
- if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id)
- self.nodes[node_id].set_remote()
- self._notify(self.nodes[node_id])
-
-
- def lost_node(self, node_id):
- node = self.nodes[node_id]
- node.clear_remote()
- self._notify(node)
- if node.to_delete():
- self.nodes.pop(node_id)
-
-
- def _notify(self, node):
- if node.to_delete():
- self.container.adapter.node_updated("R%s" % node.id, 0, 0)
- else:
- is_neighbor = 0
- if node.neighbor:
- is_neighbor = 1
- self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor)
+ """
+ This module is responsible for tracking the set of router nodes that are known to this
+ router. It tracks whether they are neighbor or remote and whether they are reachable.
+
+ This module is also responsible for assigning a unique mask bit value to each router.
+ The mask bit is used in the main router to represent sets of valid destinations for addresses.
+ """
+ def __init__(self, container, max_routers):
+ self.container = container
+ self.max_routers = max_routers
+ self.nodes = {} # id => RemoteNode
+ self.maskbits = []
+ self.next_maskbit = 1 # Reserve bit '0' to represent this router
+ for i in range(max_routers):
+ self.maskbits.append(None)
+ self.maskbits[0] = True
+
+
+ def tick(self, now):
+ pass
+
+
+ def new_neighbor(self, node_id, link_maskbit):
+ """
+ A node, designated by node_id, has been discovered as a neighbor over a link with
+ a maskbit of link_maskbit.
+ """
+ if node_id in self.nodes:
+ node = self.nodes[node_id]
+ if node.neighbor:
+ return
+ self.container.del_remote_router(node.maskbit)
+ node.neighbor = True
+ else:
+ node = RemoteNode(node_id, self._allocate_maskbit(), True)
+ self.nodes[node_id] = node
+ self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit)
+
+
+ def lost_neighbor(self, node_id):
+ """
+ We have lost contact with a neighboring node node_id.
+ """
+ node = self.nodes[node_id]
+ node.neighbor = False
+ self.container.del_neighbor_router(node.maskbit)
+ if node.remote:
+ self.container.add_remote_router(self._address(node.id), node.maskbit)
+ else:
+ self._free_maskbit(node.maskbit)
+ self.nodes.pop(node_id)
+
+
+ def new_node(self, node_id):
+ """
+ A node, designated by node_id, has been discovered through the an advertisement from a
+ remote peer.
+ """
+ if node_id not in self.nodes:
+ node = RemoteNode(node_id, self._allocate_maskbit(), False)
+ self.nodes[node_id] = node
+ self.container.add_remote_router(self._address(node.id), node.maskbit)
+ else:
+ node = self.nodes[node_id]
+ node.remote = True
+
+
+ def lost_node(self, node_id):
+ """
+ A remote node, node_id, has not been heard from for too long and is being deemed lost.
+ """
+ node = self.nodes[node_id]
+ if node.remote:
+ node.remote = False
+ if not node.neighbor:
+ self.container.del_remote_router(node.maskbit)
+ self._free_maskbit(node.maskbit)
+ self.nodes.pop(node_id)
+
+
+ def maskbit_for_node(self, node_id):
+ """
+ """
+ node = self.nodes[node_id]
+ if node:
+ return node.maskbit
+ return None
+
+
+ def add_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ for a in addrs:
+ node.addrs[a] = 1
+
+
+ def del_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ for a in addrs:
+ node.addrs.pop(a)
+
+
+ def overwrite_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ added = []
+ deleted = []
+ for a in addrs:
+ if a not in node.addrs.keys():
+ added.append(a)
+ for a in node.addrs.keys():
+ if a not in addrs:
+ deleted.append(a)
+ for a in addrs:
+ node.addrs[a] = 1
+ return (added, deleted)
+
+
+ def _allocate_maskbit(self):
+ if self.next_maskbit == None:
+ raise Exception("Exceeded Maximum Router Count")
+ result = self.next_maskbit
+ self.next_maskbit = None
+ self.maskbits[result] = True
+ for n in range(result + 1, self.max_routers):
+ if self.maskbits[n] == None:
+ self.next_maskbit = n
+ break
+ return result
+
+
+ def _free_maskbit(self, i):
+ self.maskbits[i] = None
+ if self.next_maskbit == None or i < self.next_maskbit:
+ self.next_maskbit = i
-class RemoteNode(object):
-
- def __init__(self, node_id):
- self.id = node_id
- self.neighbor = None
- self.remote = None
-
- def set_neighbor(self):
- self.neighbor = True
+ def _address(self, node_id):
+ return "amqp:/_topo/%s/%s" % (self.container.area, node_id)
- def set_remote(self):
- self.remote = True
- def clear_neighbor(self):
- self.neighbor = None
-
- def clear_remote(self):
- self.remote = None
+class RemoteNode(object):
- def to_delete(self):
- return self.neighbor or self.remote
+ def __init__(self, node_id, maskbit, neighbor):
+ self.id = node_id
+ self.maskbit = maskbit
+ self.neighbor = neighbor
+ self.remote = not neighbor
+ self.addrs = {} # Address => Count at Node (1 only for the present)
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py Mon Oct 21 22:04:51 2013
@@ -18,185 +18,218 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class PathEngine(object):
- """
- This module is responsible for computing the next-hop for every router/area in the domain
- based on the collection of link states that have been gathered.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.recalculate = False
- self.collection = None
-
-
- def tick(self, now_unused):
- if self.recalculate:
- self.recalculate = False
- self._calculate_routes()
-
-
- def ls_collection_changed(self, collection):
- self.recalculate = True
- self.collection = collection
-
-
- def _calculate_tree_from_root(self, root):
- ##
- ## Make a copy of the current collection of link-states that contains
- ## an empty link-state for nodes that are known-peers but are not in the
- ## collection currently. This is needed to establish routes to those nodes
- ## so we can trade link-state information with them.
- ##
- link_states = {}
- for _id, ls in self.collection.items():
- link_states[_id] = ls.peers
- for p in ls.peers:
- if p not in link_states:
- link_states[p] = []
-
- ##
- ## Setup Dijkstra's Algorithm
- ##
- cost = {}
- prev = {}
- for _id in link_states:
- cost[_id] = None # infinite
- prev[_id] = None # undefined
- cost[root] = 0 # no cost to the root node
- unresolved = NodeSet(cost)
-
- ##
- ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
- ##
- while not unresolved.empty():
- u = unresolved.lowest_cost()
- if cost[u] == None:
- # There are no more reachable nodes in unresolved
- break
- for v in link_states[u]:
- if unresolved.contains(v):
- alt = cost[u] + 1 # TODO - Use link cost instead of 1
- if cost[v] == None or alt < cost[v]:
- cost[v] = alt
- prev[v] = u
- unresolved.set_cost(v, alt)
-
- ##
- ## Remove unreachable nodes from the map. Note that this will also remove the
- ## root node (has no previous node) from the map.
- ##
- for u, val in prev.items():
- if not val:
- prev.pop(u)
-
- ##
- ## Return previous-node map. This is a map of all reachable, remote nodes to
- ## their predecessor node.
- ##
- return prev
-
-
- def _calculate_routes(self):
- ##
- ## Generate the shortest-path tree with the local node as root
- ##
- prev = self._calculate_tree_from_root(self.id)
- nodes = prev.keys()
-
- ##
- ## Distill the path tree into a map of next hops for each node
- ##
- next_hops = {}
- while len(nodes) > 0:
- u = nodes[0] # pick any destination
- path = [u]
- nodes.remove(u)
- v = prev[u]
- while v != self.id: # build a list of nodes in the path back to the root
- if v in nodes:
- path.append(v)
- nodes.remove(v)
- u = v
- v = prev[u]
- for w in path: # mark each node in the path as reachable via the next hop
- next_hops[w] = u
-
- ##
- ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
- ## for which the path from origin to dest passes through us. This is the set
- ## of valid origins for forwarding to the destination.
- ##
+ """
+ This module is responsible for computing the next-hop for every router/area in the domain
+ based on the collection of link states that have been gathered.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.recalculate = False
+ self.collection = None
- self.container.next_hops_changed(next_hops)
+ def tick(self, now_unused):
+ if self.recalculate:
+ self.recalculate = False
+ self._calculate_routes()
+
+
+ def ls_collection_changed(self, collection):
+ self.recalculate = True
+ self.collection = collection
+
+
+ def _calculate_tree_from_root(self, root):
+ ##
+ ## Make a copy of the current collection of link-states that contains
+ ## a fake link-state for nodes that are known-peers but are not in the
+ ## collection currently. This is needed to establish routes to those nodes
+ ## so we can trade link-state information with them.
+ ##
+ link_states = {}
+ for _id, ls in self.collection.items():
+ link_states[_id] = ls.peers
+ for p in ls.peers:
+ if p not in link_states:
+ link_states[p] = [_id]
+
+ ##
+ ## Setup Dijkstra's Algorithm
+ ##
+ cost = {}
+ prev = {}
+ for _id in link_states:
+ cost[_id] = None # infinite
+ prev[_id] = None # undefined
+ cost[root] = 0 # no cost to the root node
+ unresolved = NodeSet(cost)
+
+ ##
+ ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
+ ##
+ while not unresolved.empty():
+ u = unresolved.lowest_cost()
+ if cost[u] == None:
+ # There are no more reachable nodes in unresolved
+ break
+ for v in link_states[u]:
+ if unresolved.contains(v):
+ alt = cost[u] + 1 # TODO - Use link cost instead of 1
+ if cost[v] == None or alt < cost[v]:
+ cost[v] = alt
+ prev[v] = u
+ unresolved.set_cost(v, alt)
-class NodeSet(object):
- """
- This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
- Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
- repeatable ordering.
- """
- def __init__(self, cost_map):
- self.nodes = []
- for _id, cost in cost_map.items():
- ##
- ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
- ## during this initialization.
- ##
- if cost == 0:
- self.nodes.insert(0, (_id, cost))
- else:
##
- ## There is no need to sort unreachable nodes by ID
+ ## Remove unreachable nodes from the map. Note that this will also remove the
+ ## root node (has no previous node) from the map.
##
- self.nodes.append((_id, cost))
+ for u, val in prev.items():
+ if not val:
+ prev.pop(u)
+ ##
+ ## Return previous-node map. This is a map of all reachable, remote nodes to
+ ## their predecessor node.
+ ##
+ return prev
- def __repr__(self):
- return self.nodes.__repr__()
+ def _calculate_valid_origins(self, nodeset):
+ ##
+ ## Calculate the tree from each origin, determine the set of origins-per-dest
+ ## for which the path from origin to dest passes through us. This is the set
+ ## of valid origins for forwarding to the destination.
+ ##
+ valid_origin = {} # Map of destination => List of Valid Origins
+ for node in nodeset:
+ if node != self.id:
+ valid_origin[node] = []
+
+ for root in valid_origin.keys():
+ prev = self._calculate_tree_from_root(root)
+ nodes = prev.keys()
+ while len(nodes) > 0:
+ u = nodes[0]
+ path = [u]
+ nodes.remove(u)
+ v = prev[u]
+ while v != root:
+ if v in nodes:
+ if v != self.id:
+ path.append(v)
+ nodes.remove(v)
+ if v == self.id:
+ valid_origin[root].extend(path)
+ u = v
+ v = prev[u]
+ return valid_origin
- def empty(self):
- return len(self.nodes) == 0
+ def _calculate_routes(self):
+ ##
+ ## Generate the shortest-path tree with the local node as root
+ ##
+ prev = self._calculate_tree_from_root(self.id)
+ nodes = prev.keys()
- def contains(self, _id):
- for a, b in self.nodes:
- if a == _id:
- return True
- return False
+ ##
+ ## Distill the path tree into a map of next hops for each node
+ ##
+ next_hops = {}
+ while len(nodes) > 0:
+ u = nodes[0] # pick any destination
+ path = [u]
+ nodes.remove(u)
+ v = prev[u]
+ while v != self.id: # build a list of nodes in the path back to the root
+ if v in nodes:
+ path.append(v)
+ nodes.remove(v)
+ u = v
+ v = prev[u]
+ for w in path: # mark each node in the path as reachable via the next hop
+ next_hops[w] = u
+ self.container.next_hops_changed(next_hops)
- def lowest_cost(self):
- """
- Remove and return the lowest cost node ID.
- """
- _id, cost = self.nodes.pop(0)
- return _id
+ ##
+ ## Calculate the valid origins for remote routers
+ ##
+ valid_origin = self._calculate_valid_origins(prev.keys())
+ self.container.valid_origins_changed(valid_origin)
- def set_cost(self, _id, new_cost):
+
+class NodeSet(object):
"""
- Set the cost for an ID in the NodeSet and re-insert the ID so that the list
- remains sorted in increasing cost order.
+ This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
+ Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
+ repeatable ordering.
"""
- index = 0
- for i, c in self.nodes:
- if i == _id:
- break
- index += 1
- self.nodes.pop(index)
-
- index = 0
- for i, c in self.nodes:
- if c == None or new_cost < c or (new_cost == c and _id < i):
- break
- index += 1
+ def __init__(self, cost_map):
+ self.nodes = []
+ for _id, cost in cost_map.items():
+ ##
+ ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
+ ## during this initialization.
+ ##
+ if cost == 0:
+ self.nodes.insert(0, (_id, cost))
+ else:
+ ##
+ ## There is no need to sort unreachable nodes by ID
+ ##
+ self.nodes.append((_id, cost))
+
+
+ def __repr__(self):
+ return self.nodes.__repr__()
+
+
+ def empty(self):
+ return len(self.nodes) == 0
+
+
+ def contains(self, _id):
+ for a, b in self.nodes:
+ if a == _id:
+ return True
+ return False
+
+
+ def lowest_cost(self):
+ """
+ Remove and return the lowest cost node ID.
+ """
+ _id, cost = self.nodes.pop(0)
+ return _id
+
+
+ def set_cost(self, _id, new_cost):
+ """
+ Set the cost for an ID in the NodeSet and re-insert the ID so that the list
+ remains sorted in increasing cost order.
+ """
+ index = 0
+ for i, c in self.nodes:
+ if i == _id:
+ break
+ index += 1
+ self.nodes.pop(index)
+
+ index = 0
+ for i, c in self.nodes:
+ if c == None or new_cost < c or (new_cost == c and _id < i):
+ break
+ index += 1
+
+ self.nodes.insert(index, (_id, new_cost))
- self.nodes.insert(index, (_id, new_cost))
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Mon Oct 21 22:04:51 2013
@@ -27,254 +27,267 @@ from link import LinkStateEngine
from path import PathEngine
from mobile import MobileAddressEngine
from routing import RoutingTableEngine
-from binding import BindingEngine
-from adapter import AdapterEngine
from node import NodeTracker
+import sys
+import traceback
+
##
## Import the Dispatch adapters from the environment. If they are not found
## (i.e. we are in a test bench, etc.), load the stub versions.
##
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class RouterEngine:
- """
- """
-
- def __init__(self, router_adapter, router_id=None, area='area', config_override={}):
- """
- Initialize an instance of a router for a domain.
- """
- ##
- ## Record important information about this router instance
- ##
- self.domain = "domain"
- self.router_adapter = router_adapter
- self.log_adapter = LogAdapter("dispatch.router")
- self.io_adapter = IoAdapter(self, "qdxrouter")
-
- if router_id:
- self.id = router_id
- else:
- self.id = str(uuid4())
- self.area = area
- self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id))
-
- ##
- ## Setup configuration
- ##
- self.config = Configuration(config_override)
- self.log(LOG_INFO, "Config: %r" % self.config)
-
- ##
- ## Launch the sub-module engines
- ##
- self.neighbor_engine = NeighborEngine(self)
- self.link_state_engine = LinkStateEngine(self)
- self.path_engine = PathEngine(self)
- self.mobile_address_engine = MobileAddressEngine(self)
- self.routing_table_engine = RoutingTableEngine(self)
- self.binding_engine = BindingEngine(self)
- self.adapter_engine = AdapterEngine(self)
- self.node_tracker = NodeTracker(self)
-
-
-
- ##========================================================================================
- ## Adapter Entry Points - invoked from the adapter
- ##========================================================================================
- def getId(self):
- """
- Return the router's ID
- """
- return self.id
-
-
- def addLocalAddress(self, key):
"""
"""
- try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
- return
- self.mobile_address_engine.add_local_address(key)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
-
- def delLocalAddress(self, key):
- """
- """
- try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
- return
- self.mobile_address_engine.del_local_address(key)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
-
-
- def handleTimerTick(self):
- """
- """
- try:
- now = time()
- self.neighbor_engine.tick(now)
- self.link_state_engine.tick(now)
- self.path_engine.tick(now)
- self.mobile_address_engine.tick(now)
- self.routing_table_engine.tick(now)
- self.binding_engine.tick(now)
- self.adapter_engine.tick(now)
- self.node_tracker.tick(now)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
-
-
- def handleControlMessage(self, opcode, body):
- """
- """
- try:
- now = time()
- if opcode == 'HELLO':
- msg = MessageHELLO(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.neighbor_engine.handle_hello(msg, now)
-
- elif opcode == 'RA':
- msg = MessageRA(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_ra(msg, now)
- self.mobile_address_engine.handle_ra(msg, now)
-
- elif opcode == 'LSU':
- msg = MessageLSU(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_lsu(msg, now)
-
- elif opcode == 'LSR':
- msg = MessageLSR(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_lsr(msg, now)
-
- elif opcode == 'MAU':
- msg = MessageMAU(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mau(msg, now)
-
- elif opcode == 'MAR':
- msg = MessageMAR(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mar(msg, now)
-
- except Exception, e:
- self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
-
-
- def receive(self, message_properties, body):
- """
- This is the IoAdapter message-receive handler
- """
- try:
- self.handleControlMessage(message_properties['opcode'], body)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
- (message_properties, body, e))
-
- def getRouterData(self, kind):
- """
- """
- if kind == 'help':
- return { 'help' : "Get list of supported values for kind",
- 'link-state' : "This router's link state",
- 'link-state-set' : "The set of link states from known routers",
- 'next-hops' : "Next hops to each known router",
- 'topo-table' : "Topological routing table",
- 'mobile-table' : "Mobile key routing table"
- }
- if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
- if kind == 'next-hops' : return self.routing_table_engine.next_hops
- if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']}
- if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']}
- if kind == 'link-state-set' :
- copy = {}
- for _id,_ls in self.link_state_engine.collection.items():
- copy[_id] = _ls.to_dict()
- return copy
-
- return {'notice':'Use kind="help" to get a list of possibilities'}
-
-
- ##========================================================================================
- ## Adapter Calls - outbound calls to Dispatch
- ##========================================================================================
- def log(self, level, text):
- """
- Emit a log message to the host's event log
- """
- self.log_adapter.log(level, text)
-
-
- def send(self, dest, msg):
- """
- Send a control message to another router.
- """
- app_props = {'opcode' : msg.get_opcode() }
- self.io_adapter.send(dest, app_props, msg.to_dict())
- self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
-
-
- def node_updated(self, addr, reachable, neighbor):
- """
- """
- self.router_adapter(addr, reachable, neighbor)
-
- ##========================================================================================
- ## Interconnect between the Sub-Modules
- ##========================================================================================
- def local_link_state_changed(self, link_state):
- self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
- self.link_state_engine.new_local_link_state(link_state)
-
- def ls_collection_changed(self, collection):
- self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
- self.path_engine.ls_collection_changed(collection)
-
- def next_hops_changed(self, next_hop_table):
- self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
- self.routing_table_engine.next_hops_changed(next_hop_table)
- self.binding_engine.next_hops_changed()
-
- def mobile_sequence_changed(self, mobile_seq):
- self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
- self.link_state_engine.set_mobile_sequence(mobile_seq)
-
- def mobile_keys_changed(self, keys):
- self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
- self.binding_engine.mobile_keys_changed(keys)
-
- def get_next_hops(self):
- return self.routing_table_engine.get_next_hops()
-
- def remote_routes_changed(self, key_class, routes):
- self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
- self.adapter_engine.remote_routes_changed(key_class, routes)
-
- def new_neighbor(self, rid):
- self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid)
- self.node_tracker.new_neighbor(rid)
-
- def lost_neighbor(self, rid):
- self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
- self.node_tracker.lost_neighbor(rid)
-
- def new_node(self, rid):
- self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
- self.node_tracker.new_node(rid)
-
- def lost_node(self, rid):
- self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
- self.node_tracker.lost_node(rid)
+ def __init__(self, router_adapter, router_id, area, max_routers, config_override={}):
+ """
+ Initialize an instance of a router for a domain.
+ """
+ ##
+ ## Record important information about this router instance
+ ##
+ self.domain = "domain"
+ self.router_adapter = router_adapter
+ self.log_adapter = LogAdapter("dispatch.router")
+ self.io_adapter = IoAdapter(self, ("qdxrouter", "qdxhello"))
+ self.max_routers = max_routers
+ self.id = router_id
+ self.area = area
+ self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" %
+ (self.area, self.id, self.max_routers))
+
+ ##
+ ## Setup configuration
+ ##
+ self.config = Configuration(config_override)
+ self.log(LOG_INFO, "Config: %r" % self.config)
+
+ ##
+ ## Launch the sub-module engines
+ ##
+ self.node_tracker = NodeTracker(self, self.max_routers)
+ self.neighbor_engine = NeighborEngine(self)
+ self.link_state_engine = LinkStateEngine(self)
+ self.path_engine = PathEngine(self)
+ self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker)
+ self.routing_table_engine = RoutingTableEngine(self, self.node_tracker)
+
+
+
+ ##========================================================================================
+ ## Adapter Entry Points - invoked from the adapter
+ ##========================================================================================
+ def getId(self):
+ """
+ Return the router's ID
+ """
+ return self.id
+
+
+ def addressAdded(self, addr):
+ """
+ """
+ try:
+ if addr.find('Mtemp.') == 0:
+ return
+ if addr.find('M') == 0:
+ self.mobile_address_engine.add_local_address(addr[1:])
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
+
+
+ def addressRemoved(self, addr):
+ """
+ """
+ try:
+ if addr.find('Mtemp.') == 0:
+ return
+ if addr.find('M') == 0:
+ self.mobile_address_engine.del_local_address(addr[1:])
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
+
+
+ def handleTimerTick(self):
+ """
+ """
+ try:
+ now = time()
+ self.neighbor_engine.tick(now)
+ self.link_state_engine.tick(now)
+ self.path_engine.tick(now)
+ self.mobile_address_engine.tick(now)
+ self.routing_table_engine.tick(now)
+ self.node_tracker.tick(now)
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
+
+
+ def handleControlMessage(self, opcode, body, link_id):
+ """
+ """
+ try:
+ now = time()
+ if opcode == 'HELLO':
+ msg = MessageHELLO(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.neighbor_engine.handle_hello(msg, now, link_id)
+
+ elif opcode == 'RA':
+ msg = MessageRA(body)
+ self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.link_state_engine.handle_ra(msg, now)
+ self.mobile_address_engine.handle_ra(msg, now)
+
+ elif opcode == 'LSU':
+ msg = MessageLSU(body)
+ self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.link_state_engine.handle_lsu(msg, now)
+
+ elif opcode == 'LSR':
+ msg = MessageLSR(body)
+ self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.link_state_engine.handle_lsr(msg, now)
+
+ elif opcode == 'MAU':
+ msg = MessageMAU(body)
+ self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.mobile_address_engine.handle_mau(msg, now)
+
+ elif opcode == 'MAR':
+ msg = MessageMAR(body)
+ self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.mobile_address_engine.handle_mar(msg, now)
+
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ traceback.print_tb(exc_traceback)
+
+
+ def receive(self, message_properties, body, link_id):
+ """
+ This is the IoAdapter message-receive handler
+ """
+ try:
+ #self.log(LOG_DEBUG, "Raw Receive: mp=%r body=%r link_id=%r" % (message_properties, body, link_id))
+ self.handleControlMessage(message_properties['opcode'], body, link_id)
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
+ (message_properties, body, e))
+
+
+ def getRouterData(self, kind):
+ """
+ """
+ if kind == 'help':
+ return { 'help' : "Get list of supported values for kind",
+ 'link-state' : "This router's link state",
+ 'link-state-set' : "The set of link states from known routers",
+ 'next-hops' : "Next hops to each known router"
+ }
+ if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
+ if kind == 'next-hops' : return self.routing_table_engine.next_hops
+ if kind == 'link-state-set' :
+ copy = {}
+ for _id,_ls in self.link_state_engine.collection.items():
+ copy[_id] = _ls.to_dict()
+ return copy
+
+ return {'notice':'Use kind="help" to get a list of possibilities'}
+
+
+ ##========================================================================================
+ ## Adapter Calls - outbound calls to Dispatch
+ ##========================================================================================
+ def log(self, level, text):
+ """
+ Emit a log message to the host's event log
+ """
+ self.log_adapter.log(level, text)
+
+
+ def send(self, dest, msg):
+ """
+ Send a control message to another router.
+ """
+ app_props = {'opcode' : msg.get_opcode() }
+ self.io_adapter.send(dest, app_props, msg.to_dict())
+ if "qdxhello" in dest:
+ self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
+ else:
+ self.log(LOG_DEBUG, "SENT: %r dest=%s" % (msg, dest))
+
+
+ def node_updated(self, addr, reachable, neighbor):
+ """
+ """
+ self.router_adapter(addr, reachable, neighbor)
+
+
+ ##========================================================================================
+ ## Interconnect between the Sub-Modules
+ ##========================================================================================
+ def local_link_state_changed(self, link_state):
+ self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
+ self.link_state_engine.new_local_link_state(link_state)
+
+ def ls_collection_changed(self, collection):
+ self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
+ self.path_engine.ls_collection_changed(collection)
+
+ def next_hops_changed(self, next_hop_table):
+ self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
+ self.routing_table_engine.next_hops_changed(next_hop_table)
+
+ def valid_origins_changed(self, valid_origins):
+ self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
+ self.routing_table_engine.valid_origins_changed(valid_origins)
+
+ def mobile_sequence_changed(self, mobile_seq):
+ self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
+ self.link_state_engine.set_mobile_sequence(mobile_seq)
+
+ def get_next_hops(self):
+ return self.routing_table_engine.get_next_hops()
+
+ def new_neighbor(self, rid, link_id):
+ self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
+ self.node_tracker.new_neighbor(rid, link_id)
+
+ def lost_neighbor(self, rid):
+ self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
+ self.node_tracker.lost_neighbor(rid)
+
+ def new_node(self, rid):
+ self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
+ self.node_tracker.new_node(rid)
+
+ def lost_node(self, rid):
+ self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
+ self.node_tracker.lost_node(rid)
+
+ def add_neighbor_router(self, address, router_bit, link_bit):
+ self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, router_bit=%d, link_bit=%d" % \
+ (address, router_bit, link_bit))
+ self.router_adapter.add_neighbor_router(address, router_bit, link_bit)
+
+ def del_neighbor_router(self, router_bit):
+ self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % router_bit)
+ self.router_adapter.del_neighbor_router(router_bit)
+
+ def add_remote_router(self, address, router_bit):
+ self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, router_bit=%d" % (address, router_bit))
+ self.router_adapter.add_remote_router(address, router_bit)
+
+ def del_remote_router(self, router_bit):
+ self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % router_bit)
+ self.router_adapter.del_remote_router(router_bit)
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py Mon Oct 21 22:04:51 2013
@@ -18,39 +18,45 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class RoutingTableEngine(object):
- """
- This module is responsible for converting the set of next hops to remote routers to a routing
- table in the "topological" address class.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.next_hops = {}
-
-
- def tick(self, now):
- pass
-
-
- def next_hops_changed(self, next_hops):
- # Convert next_hops into routing table
- self.next_hops = next_hops
- new_table = []
- for _id, next_hop in next_hops.items():
- new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop))
- pair = ('_topo.%s.all' % (self.area), next_hop)
- if new_table.count(pair) == 0:
- new_table.append(pair)
+ """
+ This module is responsible for converting the set of next hops to remote routers to a routing
+ table in the "topological" address class.
+ """
+ def __init__(self, container, node_tracker):
+ self.container = container
+ self.node_tracker = node_tracker
+ self.id = self.container.id
+ self.area = self.container.area
+ self.next_hops = {}
+
+
+ def tick(self, now):
+ pass
+
+
+ def next_hops_changed(self, next_hops):
+ # Convert next_hops into routing table
+ self.next_hops = next_hops
+ for _id, next_hop in next_hops.items():
+ mb_id = self.node_tracker.maskbit_for_node(_id)
+ mb_nh = self.node_tracker.maskbit_for_node(next_hop)
+ self.container.router_adapter.set_next_hop(mb_id, mb_nh)
+
+
+ def valid_origins_changed(self, valid_origins):
+ for _id, vo in valid_origins.items():
+ mb_id = self.node_tracker.maskbit_for_node(_id)
+ mb_vo = []
+ for o in vo:
+ mb_vo.append(self.node_tracker.maskbit_for_node(o))
+ self.container.router_adapter.set_valid_origins(mb_id, mb_vo)
- self.container.remote_routes_changed('topological', new_table)
-
- def get_next_hops(self):
- return self.next_hops
+ def get_next_hops(self):
+ return self.next_hops
Modified: qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt Mon Oct 21 22:04:51 2013
@@ -18,7 +18,7 @@
##
-set(DEFAULT_CONFIG_PATH "/etc/qpid-dispatch.conf" CACHE string "Default Config File Path")
+set(DEFAULT_CONFIG_PATH "/etc/qpid/qpid-dispatch.conf" CACHE string "Default Config File Path")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
Modified: qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c Mon Oct 21 22:04:51 2013
@@ -117,7 +117,7 @@ int main(int argc, char **argv)
}
}
- dx_log_set_mask(0xFFFFFFFF);
+ dx_log_set_mask(0xFFFFFFFE);
dispatch = dx_dispatch(config_path);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c Mon Oct 21 22:04:51 2013
@@ -37,7 +37,7 @@
struct dx_agent_t {
dx_dispatch_t *dx;
- hash_t *class_hash;
+ dx_hash_t *class_hash;
dx_message_list_t in_fifo;
dx_message_list_t out_fifo;
sys_mutex_t *lock;
@@ -71,7 +71,7 @@ static void dx_agent_process_get(dx_agen
dx_field_iterator_t *cls_string = dx_parse_raw(cls);
const dx_agent_class_t *cls_record;
- hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
+ dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
if (cls_record == 0)
return;
@@ -138,11 +138,11 @@ static void dx_agent_process_get(dx_agen
//
// Create a message and send it.
//
- dx_message_t *msg = dx_allocate_message();
+ dx_message_t *msg = dx_message();
dx_message_compose_2(msg, field);
dx_router_send(agent->dx, reply_to, msg);
- dx_free_message(msg);
+ dx_message_free(msg);
dx_compose_free(field);
}
@@ -234,13 +234,13 @@ static void dx_agent_deferred_handler(vo
if (msg) {
dx_agent_process_request(agent, msg);
- dx_free_message(msg);
+ dx_message_free(msg);
}
} while (msg);
}
-static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id)
{
dx_agent_t *agent = (dx_agent_t*) context;
dx_message_t *copy = dx_message_copy(msg);
@@ -257,12 +257,12 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
dx_agent_t *agent = NEW(dx_agent_t);
agent->dx = dx;
- agent->class_hash = hash(6, 10, 1);
+ agent->class_hash = dx_hash(6, 10, 1);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent);
- agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
+ agent->address = dx_router_register_address(dx, "$management", dx_agent_rx_handler, agent);
return agent;
}
@@ -273,7 +273,7 @@ void dx_agent_free(dx_agent_t *agent)
dx_router_unregister_address(agent->address);
sys_mutex_free(agent->lock);
dx_timer_free(agent->timer);
- hash_free(agent->class_hash);
+ dx_hash_free(agent->class_hash);
free(agent);
}
@@ -295,7 +295,7 @@ dx_agent_class_t *dx_agent_register_clas
cls->query_handler = query_handler;
dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
- int result = hash_insert_const(agent->class_hash, iter, cls);
+ int result = dx_hash_insert_const(agent->class_hash, iter, cls, 0);
dx_field_iterator_free(iter);
if (result < 0)
assert(false);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c Mon Oct 21 22:04:51 2013
@@ -34,7 +34,7 @@ void dx_buffer_set_size(size_t size)
}
-dx_buffer_t *dx_allocate_buffer(void)
+dx_buffer_t *dx_buffer(void)
{
size_locked = 1;
dx_buffer_t *buf = new_dx_buffer_t();
@@ -45,7 +45,7 @@ dx_buffer_t *dx_allocate_buffer(void)
}
-void dx_free_buffer(dx_buffer_t *buf)
+void dx_buffer_free(dx_buffer_t *buf)
{
free_dx_buffer_t(buf);
}
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c Mon Oct 21 22:04:51 2013
@@ -43,7 +43,7 @@ static void dx_insert(dx_composed_field_
while (len > 0) {
if (buf == 0 || dx_buffer_capacity(buf) == 0) {
- buf = dx_allocate_buffer();
+ buf = dx_buffer();
if (buf == 0)
return;
DEQ_INSERT_TAIL(field->buffers, buf);
@@ -115,8 +115,8 @@ static void dx_overwrite_32(dx_field_loc
size_t cursor = field->offset;
dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
- dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
- dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 16));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 8));
dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF));
}
@@ -212,7 +212,7 @@ void dx_compose_free(dx_composed_field_t
dx_buffer_t *buf = DEQ_HEAD(field->buffers);
while (buf) {
DEQ_REMOVE_HEAD(field->buffers);
- dx_free_buffer(buf);
+ dx_buffer_free(buf);
buf = DEQ_HEAD(field->buffers);
}
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/container.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/container.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/container.c Mon Oct 21 22:04:51 2013
@@ -88,8 +88,8 @@ typedef struct container_class_t {
struct dx_container_t {
dx_dispatch_t *dx;
dx_server_t *server;
- hash_t *node_type_map;
- hash_t *node_map;
+ dx_hash_t *node_type_map;
+ dx_hash_t *node_map;
sys_mutex_t *lock;
dx_node_t *default_node;
dxc_node_type_list_t node_type_list;
@@ -108,7 +108,7 @@ static void setup_outgoing_link(dx_conta
if (source) {
iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
- hash_retrieve(container->node_map, iter, (void*) &node);
+ dx_hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
}
sys_mutex_unlock(container->lock);
@@ -149,7 +149,7 @@ static void setup_incoming_link(dx_conta
if (target) {
iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
- hash_retrieve(container->node_map, iter, (void*) &node);
+ dx_hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
}
sys_mutex_unlock(container->lock);
@@ -429,8 +429,8 @@ static void container_query_handler(void
container_class_t *cls = (container_class_t*) context;
if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) {
- dx_agent_value_uint(correlator, "node_type_count", hash_size(cls->container->node_type_map));
- dx_agent_value_uint(correlator, "node_count", hash_size(cls->container->node_map));
+ dx_agent_value_uint(correlator, "node_type_count", dx_hash_size(cls->container->node_type_map));
+ dx_agent_value_uint(correlator, "node_count", dx_hash_size(cls->container->node_map));
if (cls->container->default_node)
dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name);
else
@@ -463,8 +463,8 @@ dx_container_t *dx_container(dx_dispatch
container->dx = dx;
container->server = dx->server;
- container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
- container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
+ container->node_type_map = dx_hash(6, 4, 1); // 64 buckets, item batches of 4
+ container->node_map = dx_hash(10, 32, 0); // 1K buckets, item batches of 32
container->lock = sys_mutex();
container->default_node = 0;
DEQ_INIT(container->node_type_list);
@@ -507,7 +507,7 @@ int dx_container_register_node_type(dx_d
nt_item->ntype = nt;
sys_mutex_lock(container->lock);
- result = hash_insert_const(container->node_type_map, iter, nt);
+ result = dx_hash_insert_const(container->node_type_map, iter, nt, 0);
DEQ_INSERT_TAIL(container->node_type_list, nt_item);
sys_mutex_unlock(container->lock);
@@ -565,7 +565,7 @@ dx_node_t *dx_container_create_node(dx_d
if (name) {
dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
- result = hash_insert(container->node_map, iter, node);
+ result = dx_hash_insert(container->node_map, iter, node, 0);
sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
if (result < 0) {
@@ -591,7 +591,7 @@ void dx_container_destroy_node(dx_node_t
if (node->name) {
dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
- hash_remove(container->node_map, iter);
+ dx_hash_remove(container->node_map, iter);
sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
free(node->name);
@@ -651,12 +651,63 @@ void *dx_link_get_context(dx_link_t *lin
}
+void dx_link_set_conn_context(dx_link_t *link, void *context)
+{
+ pn_session_t *pn_sess = pn_link_session(link->pn_link);
+ if (!pn_sess)
+ return;
+ pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+ if (!pn_conn)
+ return;
+ dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+ if (!conn)
+ return;
+ dx_connection_set_link_context(conn, context);
+}
+
+
+void *dx_link_get_conn_context(dx_link_t *link)
+{
+ pn_session_t *pn_sess = pn_link_session(link->pn_link);
+ if (!pn_sess)
+ return 0;
+ pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+ if (!pn_conn)
+ return 0;
+ dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+ if (!conn)
+ return 0;
+ return dx_connection_get_link_context(conn);
+}
+
+
pn_link_t *dx_link_pn(dx_link_t *link)
{
return link->pn_link;
}
+dx_connection_t *dx_link_connection(dx_link_t *link)
+{
+ if (!link || !link->pn_link)
+ return 0;
+
+ pn_session_t *sess = pn_link_session(link->pn_link);
+ if (!sess)
+ return 0;
+
+ pn_connection_t *conn = pn_session_connection(sess);
+ if (!conn)
+ return 0;
+
+ dx_connection_t *ctx = pn_connection_get_context(conn);
+ if (!ctx)
+ return 0;
+
+ return ctx;
+}
+
+
pn_terminus_t *dx_link_source(dx_link_t *link)
{
return pn_link_source(link->pn_link);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c Mon Oct 21 22:04:51 2013
@@ -24,6 +24,7 @@
#include "dispatch_private.h"
#include "alloc_private.h"
#include "log_private.h"
+#include "router_private.h"
/**
* Private Function Prototypes
@@ -34,8 +35,8 @@ void dx_server_free(dx_server
dx_container_t *dx_container(dx_dispatch_t *dx);
void dx_container_setup_agent(dx_dispatch_t *dx);
void dx_container_free(dx_container_t *container);
-dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id);
-void dx_router_setup_agent(dx_dispatch_t *dx);
+dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *area, const char *id);
+void dx_router_setup_late(dx_dispatch_t *dx);
void dx_router_free(dx_router_t *router);
dx_agent_t *dx_agent(dx_dispatch_t *dx);
void dx_agent_free(dx_agent_t *agent);
@@ -53,10 +54,13 @@ dx_dispatch_t *dx_dispatch(const char *c
{
dx_dispatch_t *dx = NEW(dx_dispatch_t);
- int thread_count = 0;
- const char *container_name = 0;
- const char *router_area = 0;
- const char *router_id = 0;
+ int thread_count = 0;
+ const char *container_name = 0;
+ const char *router_mode_str = 0;
+ const char *router_area = 0;
+ const char *router_id = 0;
+
+ dx_router_mode_t router_mode = DX_ROUTER_MODE_STANDALONE;
DEQ_INIT(dx->config_listeners);
DEQ_INIT(dx->config_connectors);
@@ -78,8 +82,9 @@ dx_dispatch_t *dx_dispatch(const char *c
count = dx_config_item_count(dx->config, CONF_ROUTER);
if (count == 1) {
- router_area = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "area");
- router_id = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "router-id");
+ router_mode_str = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "mode");
+ router_area = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "area");
+ router_id = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "router-id");
}
}
@@ -89,6 +94,12 @@ dx_dispatch_t *dx_dispatch(const char *c
if (!container_name)
container_name = "00000000-0000-0000-0000-000000000000"; // TODO - gen a real uuid
+ if (router_mode_str && strcmp(router_mode_str, "interior") == 0)
+ router_mode = DX_ROUTER_MODE_INTERIOR;
+
+ if (router_mode_str && strcmp(router_mode_str, "edge") == 0)
+ router_mode = DX_ROUTER_MODE_EDGE;
+
if (!router_area)
router_area = "area";
@@ -97,13 +108,13 @@ dx_dispatch_t *dx_dispatch(const char *c
dx->server = dx_server(thread_count, container_name);
dx->container = dx_container(dx);
- dx->router = dx_router(dx, router_area, router_id);
+ dx->router = dx_router(dx, router_mode, router_area, router_id);
dx->agent = dx_agent(dx);
dx_alloc_setup_agent(dx);
dx_server_setup_agent(dx);
dx_container_setup_agent(dx);
- dx_router_setup_agent(dx);
+ dx_router_setup_late(dx);
return dx;
}
@@ -126,6 +137,7 @@ static void load_server_config(dx_dispat
{
config->host = dx_config_item_value_string(dx->config, section, i, "addr");
config->port = dx_config_item_value_string(dx->config, section, i, "port");
+ config->role = dx_config_item_value_string(dx->config, section, i, "role");
config->sasl_mechanisms =
dx_config_item_value_string(dx->config, section, i, "sasl-mechanisms");
config->ssl_enabled =
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c Mon Oct 21 22:04:51 2013
@@ -23,18 +23,18 @@
#include <stdio.h>
#include <string.h>
-typedef struct hash_item_t {
- DEQ_LINKS(struct hash_item_t);
+typedef struct dx_hash_item_t {
+ DEQ_LINKS(struct dx_hash_item_t);
unsigned char *key;
union {
void *val;
const void *val_const;
} v;
-} hash_item_t;
+} dx_hash_item_t;
-ALLOC_DECLARE(hash_item_t);
-ALLOC_DEFINE(hash_item_t);
-DEQ_DECLARE(hash_item_t, items_t);
+ALLOC_DECLARE(dx_hash_item_t);
+ALLOC_DEFINE(dx_hash_item_t);
+DEQ_DECLARE(dx_hash_item_t, items_t);
typedef struct bucket_t {
@@ -42,7 +42,7 @@ typedef struct bucket_t {
} bucket_t;
-struct hash_t {
+struct dx_hash_t {
bucket_t *buckets;
unsigned int bucket_count;
unsigned int bucket_mask;
@@ -52,8 +52,17 @@ struct hash_t {
};
+struct dx_hash_handle_t {
+ bucket_t *bucket;
+ dx_hash_item_t *item;
+};
+
+ALLOC_DECLARE(dx_hash_handle_t);
+ALLOC_DEFINE(dx_hash_handle_t);
+
+
// djb2 hash algorithm
-static unsigned long hash_function(dx_field_iterator_t *iter)
+static unsigned long dx_hash_function(dx_field_iterator_t *iter)
{
unsigned long hash = 5381;
int c;
@@ -68,10 +77,10 @@ static unsigned long hash_function(dx_fi
}
-hash_t *hash(int bucket_exponent, int batch_size, int value_is_const)
+dx_hash_t *dx_hash(int bucket_exponent, int batch_size, int value_is_const)
{
int i;
- hash_t *h = NEW(hash_t);
+ dx_hash_t *h = NEW(dx_hash_t);
if (!h)
return 0;
@@ -90,22 +99,22 @@ hash_t *hash(int bucket_exponent, int ba
}
-void hash_free(hash_t *h)
+void dx_hash_free(dx_hash_t *h)
{
// TODO - Implement this
}
-size_t hash_size(hash_t *h)
+size_t dx_hash_size(dx_hash_t *h)
{
return h ? h->size : 0;
}
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
+static dx_hash_item_t *dx_hash_internal_insert(dx_hash_t *h, dx_field_iterator_t *key, int *exists, dx_hash_handle_t **handle)
{
- unsigned long idx = hash_function(key) & h->bucket_mask;
- hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+ unsigned long idx = dx_hash_function(key) & h->bucket_mask;
+ dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
while (item) {
if (dx_field_iterator_equal(key, item->key))
@@ -115,10 +124,12 @@ static hash_item_t *hash_internal_insert
if (item) {
*exists = 1;
+ if (handle)
+ *handle = 0;
return item;
}
- item = new_hash_item_t();
+ item = new_dx_hash_item_t();
if (!item)
return 0;
@@ -128,14 +139,24 @@ static hash_item_t *hash_internal_insert
DEQ_INSERT_TAIL(h->buckets[idx].items, item);
h->size++;
*exists = 0;
+
+ //
+ // If a pointer to a handle-pointer was supplied, create a handle for this item.
+ //
+ if (handle) {
+ *handle = new_dx_hash_handle_t();
+ (*handle)->bucket = &h->buckets[idx];
+ (*handle)->item = item;
+ }
+
return item;
}
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t dx_hash_insert(dx_hash_t *h, dx_field_iterator_t *key, void *val, dx_hash_handle_t **handle)
{
- int exists = 0;
- hash_item_t *item = hash_internal_insert(h, key, &exists);
+ int exists = 0;
+ dx_hash_item_t *item = dx_hash_internal_insert(h, key, &exists, handle);
if (!item)
return DX_ERROR_ALLOC;
@@ -149,12 +170,12 @@ dx_error_t hash_insert(hash_t *h, dx_fie
}
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t dx_hash_insert_const(dx_hash_t *h, dx_field_iterator_t *key, const void *val, dx_hash_handle_t **handle)
{
assert(h->is_const);
- int error = 0;
- hash_item_t *item = hash_internal_insert(h, key, &error);
+ int error = 0;
+ dx_hash_item_t *item = dx_hash_internal_insert(h, key, &error, handle);
if (item)
item->v.val_const = val;
@@ -162,10 +183,10 @@ dx_error_t hash_insert_const(hash_t *h,
}
-static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key)
+static dx_hash_item_t *dx_hash_internal_retrieve(dx_hash_t *h, dx_field_iterator_t *key)
{
- unsigned long idx = hash_function(key) & h->bucket_mask;
- hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+ unsigned long idx = dx_hash_function(key) & h->bucket_mask;
+ dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
while (item) {
if (dx_field_iterator_equal(key, item->key))
@@ -177,9 +198,9 @@ static hash_item_t *hash_internal_retrie
}
-dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
+dx_error_t dx_hash_retrieve(dx_hash_t *h, dx_field_iterator_t *key, void **val)
{
- hash_item_t *item = hash_internal_retrieve(h, key);
+ dx_hash_item_t *item = dx_hash_internal_retrieve(h, key);
if (item)
*val = item->v.val;
else
@@ -189,11 +210,11 @@ dx_error_t hash_retrieve(hash_t *h, dx_f
}
-dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
+dx_error_t dx_hash_retrieve_const(dx_hash_t *h, dx_field_iterator_t *key, const void **val)
{
assert(h->is_const);
- hash_item_t *item = hash_internal_retrieve(h, key);
+ dx_hash_item_t *item = dx_hash_internal_retrieve(h, key);
if (item)
*val = item->v.val_const;
else
@@ -203,10 +224,10 @@ dx_error_t hash_retrieve_const(hash_t *h
}
-dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
+dx_error_t dx_hash_remove(dx_hash_t *h, dx_field_iterator_t *key)
{
- unsigned long idx = hash_function(key) & h->bucket_mask;
- hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+ unsigned long idx = dx_hash_function(key) & h->bucket_mask;
+ dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
while (item) {
if (dx_field_iterator_equal(key, item->key))
@@ -217,7 +238,7 @@ dx_error_t hash_remove(hash_t *h, dx_fie
if (item) {
free(item->key);
DEQ_REMOVE(h->buckets[idx].items, item);
- free_hash_item_t(item);
+ free_dx_hash_item_t(item);
h->size--;
return DX_ERROR_NONE;
}
@@ -225,3 +246,40 @@ dx_error_t hash_remove(hash_t *h, dx_fie
return DX_ERROR_NOT_FOUND;
}
+
+void dx_hash_handle_free(dx_hash_handle_t *handle)
+{
+ if (handle)
+ free_dx_hash_handle_t(handle);
+}
+
+
+const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle)
+{
+ if (handle)
+ return handle->item->key;
+ return 0;
+}
+
+
+dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle)
+{
+ unsigned char *key = 0;
+ dx_error_t error = dx_hash_remove_by_handle2(h, handle, &key);
+ if (key)
+ free(key);
+ return error;
+}
+
+
+dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key)
+{
+ if (!handle)
+ return DX_ERROR_NOT_FOUND;
+ *key = handle->item->key;
+ DEQ_REMOVE(handle->bucket->items, handle->item);
+ free_dx_hash_item_t(handle->item);
+ h->size--;
+ return DX_ERROR_NONE;
+}
+
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message.c Mon Oct 21 22:04:51 2013
@@ -376,7 +376,7 @@ static dx_field_location_t *dx_message_f
}
-dx_message_t *dx_allocate_message()
+dx_message_t *dx_message()
{
dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t();
if (!msg)
@@ -400,7 +400,7 @@ dx_message_t *dx_allocate_message()
}
-void dx_free_message(dx_message_t *in_msg)
+void dx_message_free(dx_message_t *in_msg)
{
uint32_t rc;
dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
@@ -417,14 +417,14 @@ void dx_free_message(dx_message_t *in_ms
dx_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
DEQ_REMOVE_HEAD(content->buffers);
- dx_free_buffer(buf);
+ dx_buffer_free(buf);
buf = DEQ_HEAD(content->buffers);
}
buf = DEQ_HEAD(content->new_delivery_annotations);
while (buf) {
DEQ_REMOVE_HEAD(content->new_delivery_annotations);
- dx_free_buffer(buf);
+ dx_buffer_free(buf);
buf = DEQ_HEAD(content->new_delivery_annotations);
}
@@ -474,9 +474,11 @@ dx_parsed_field_t *dx_message_delivery_a
!dx_parse_is_map(content->parsed_delivery_annotations)) {
dx_field_iterator_free(da);
dx_parse_free(content->parsed_delivery_annotations);
+ content->parsed_delivery_annotations = 0;
return 0;
}
+ dx_field_iterator_free(da);
return content->parsed_delivery_annotations;
}
@@ -506,7 +508,7 @@ dx_message_t *dx_message_receive(dx_deli
// link it and the delivery together.
//
if (!msg) {
- msg = (dx_message_pvt_t*) dx_allocate_message();
+ msg = (dx_message_pvt_t*) dx_message();
dx_delivery_set_context(delivery, (void*) msg);
}
@@ -517,7 +519,7 @@ dx_message_t *dx_message_receive(dx_deli
//
buf = DEQ_TAIL(msg->content->buffers);
if (!buf) {
- buf = dx_allocate_buffer();
+ buf = dx_buffer();
DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
@@ -538,7 +540,7 @@ dx_message_t *dx_message_receive(dx_deli
//
if (dx_buffer_size(buf) == 0) {
DEQ_REMOVE_TAIL(msg->content->buffers);
- dx_free_buffer(buf);
+ dx_buffer_free(buf);
}
dx_delivery_set_context(delivery, 0);
return (dx_message_t*) msg;
@@ -556,7 +558,7 @@ dx_message_t *dx_message_receive(dx_deli
// tail of the message's list.
//
if (dx_buffer_capacity(buf) == 0) {
- buf = dx_allocate_buffer();
+ buf = dx_buffer();
DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
} else
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h Mon Oct 21 22:04:51 2013
@@ -67,7 +67,7 @@ typedef struct {
sys_mutex_t *lock;
uint32_t ref_count; // The number of messages referencing this
dx_buffer_list_t buffers; // The buffer chain containing the message
- dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations
+ dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT)
dx_field_location_t section_message_header; // The message header list
dx_field_location_t section_delivery_annotation; // The delivery annotation map
dx_field_location_t section_message_annotation; // The message annotation map
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org