You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/02/03 20:18:14 UTC
qpid-dispatch git commit: DISPATCH-629 - Added protocol-version to
inter-router messages for future backward compatibility.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 7d6c05376 -> 3bb2c53d8
DISPATCH-629 - Added protocol-version to inter-router messages for future backward compatibility.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3bb2c53d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3bb2c53d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3bb2c53d
Branch: refs/heads/master
Commit: 3bb2c53d8dcd708e50340e458b84a6507dbb8b13
Parents: 7d6c053
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Feb 3 15:17:09 2017 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Feb 3 15:17:09 2017 -0500
----------------------------------------------------------------------
python/qpid_dispatch/management/qdrouter.json | 4 ++
python/qpid_dispatch_internal/router/data.py | 52 ++++++++++++++++------
python/qpid_dispatch_internal/router/hello.py | 2 +-
python/qpid_dispatch_internal/router/link.py | 6 +--
python/qpid_dispatch_internal/router/node.py | 50 +++++++++++++++------
tests/router_engine_test.py | 4 +-
tools/qdstat | 22 ++++-----
7 files changed, 96 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index f4ab551..3fcadb4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1219,6 +1219,10 @@
"description": "Remote node identifier.",
"type": "string"
},
+ "protocolVersion": {
+ "description": "Router-protocol version supported by the node.",
+ "type": "integer"
+ },
"instance": {
"description": "Remote node boot number.",
"type": "integer"
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py
index 5b739b8..a2b669b 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -17,6 +17,12 @@
# under the License.
#
+##
+## Define the current protocol version. Any messages that do not contain version
+## information shall be considered to be coming from routers using version 0.
+##
+ProtocolVersion = 1L
+
def getMandatory(data, key, cls=None):
"""
Get the value mapped to the requested key. If it's not present, raise an exception.
@@ -107,20 +113,23 @@ class MessageHELLO(object):
self.area = '0'
self.seen_peers = getMandatory(body, 'seen', list)
self.instance = getOptional(body, 'instance', 0, long)
+ self.version = getOptional(body, 'pv', 0, long)
else:
self.id = _id
self.area = '0'
self.seen_peers = _seen_peers
self.instance = _instance
+ self.version = ProtocolVersion
def __repr__(self):
- return "HELLO(id=%s area=%s inst=%d seen=%r)" % (self.id, self.area, self.instance, self.seen_peers)
+ return "HELLO(id=%s pv=%d area=%s inst=%d seen=%r)" % (self.id, self.version, self.area, self.instance, self.seen_peers)
def get_opcode(self):
return 'HELLO'
def to_dict(self):
return {'id' : self.id,
+ 'pv' : self.version,
'area' : self.area,
'instance' : self.instance,
'seen' : self.seen_peers}
@@ -143,22 +152,25 @@ class MessageRA(object):
self.ls_seq = getMandatory(body, 'ls_seq', long)
self.mobile_seq = getMandatory(body, 'mobile_seq', long)
self.instance = getOptional(body, 'instance', 0, long)
+ self.version = getOptional(body, 'pv', 0, long)
else:
self.id = _id
self.area = '0'
self.ls_seq = long(_ls_seq)
self.mobile_seq = long(_mobile_seq)
self.instance = _instance
+ self.version = ProtocolVersion
def get_opcode(self):
return 'RA'
def __repr__(self):
- return "RA(id=%s area=%s inst=%d ls_seq=%d mobile_seq=%d)" % \
- (self.id, self.area, self.instance, self.ls_seq, self.mobile_seq)
+ return "RA(id=%s pv=%d area=%s inst=%d ls_seq=%d mobile_seq=%d)" % \
+ (self.id, self.version, self.area, self.instance, self.ls_seq, self.mobile_seq)
def to_dict(self):
return {'id' : self.id,
+ 'pv' : self.version,
'area' : self.area,
'instance' : self.instance,
'ls_seq' : self.ls_seq,
@@ -175,22 +187,25 @@ class MessageLSU(object):
self.ls_seq = getMandatory(body, 'ls_seq', long)
self.ls = LinkState(getMandatory(body, 'ls', dict))
self.instance = getOptional(body, 'instance', 0, long)
+ self.version = getOptional(body, 'pv', 0, long)
else:
self.id = _id
self.area = '0'
self.ls_seq = long(_ls_seq)
self.ls = _ls
self.instance = _instance
+ self.version = ProtocolVersion
def get_opcode(self):
return 'LSU'
def __repr__(self):
- return "LSU(id=%s area=%s inst=%d ls_seq=%d ls=%r)" % \
- (self.id, self.area, self.instance, self.ls_seq, self.ls)
+ return "LSU(id=%s pv=%d area=%s inst=%d ls_seq=%d ls=%r)" % \
+ (self.id, self.version, self.area, self.instance, self.ls_seq, self.ls)
def to_dict(self):
return {'id' : self.id,
+ 'pv' : self.version,
'area' : self.area,
'instance' : self.instance,
'ls_seq' : self.ls_seq,
@@ -203,20 +218,23 @@ class MessageLSR(object):
def __init__(self, body, _id=None):
if body:
self.id = getMandatory(body, 'id', str)
+ self.version = getOptional(body, 'pv', 0, long)
self.area = '0'
else:
self.id = _id
+ self.version = ProtocolVersion
self.area = '0'
def get_opcode(self):
return 'LSR'
def __repr__(self):
- return "LSR(id=%s area=%s)" % (self.id, self.area)
+ return "LSR(id=%s pv=%d area=%s)" % (self.id, self.version, self.area)
def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area}
+ return {'id' : self.id,
+ 'pv' : self.version,
+ 'area' : self.area}
class MessageMAU(object):
@@ -225,6 +243,7 @@ class MessageMAU(object):
def __init__(self, body, _id=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None):
if body:
self.id = getMandatory(body, 'id', str)
+ self.version = getOptional(body, 'pv', 0, long)
self.area = '0'
self.mobile_seq = getMandatory(body, 'mobile_seq', long)
self.add_list = getOptional(body, 'add', None, list)
@@ -232,6 +251,7 @@ class MessageMAU(object):
self.exist_list = getOptional(body, 'exist', None, list)
else:
self.id = _id
+ self.version = ProtocolVersion
self.area = '0'
self.mobile_seq = long(_seq)
self.add_list = _add_list
@@ -248,13 +268,14 @@ class MessageMAU(object):
if self.add_list != None: _add = ' add=%r' % self.add_list
if self.del_list != None: _del = ' del=%r' % self.del_list
if self.exist_list != None: _exist = ' exist=%r' % self.exist_list
- return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \
- (self.id, self.area, self.mobile_seq, _add, _del, _exist)
+ return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \
+ (self.id, self.version, self.area, self.mobile_seq, _add, _del, _exist)
def to_dict(self):
- body = { 'id' : self.id,
- 'area' : self.area,
- 'mobile_seq' : self.mobile_seq }
+ body = {'id' : self.id,
+ 'pv' : self.version,
+ 'area' : self.area,
+ 'mobile_seq' : self.mobile_seq }
if self.add_list != None: body['add'] = self.add_list
if self.del_list != None: body['del'] = self.del_list
if self.exist_list != None: body['exist'] = self.exist_list
@@ -267,10 +288,12 @@ class MessageMAR(object):
def __init__(self, body, _id=None, _have_seq=None):
if body:
self.id = getMandatory(body, 'id', str)
+ self.version = getOptional(body, 'pv', 0, long)
self.area = '0'
self.have_seq = getMandatory(body, 'have_seq', long)
else:
self.id = _id
+ self.version = ProtocolVersion
self.area = '0'
self.have_seq = long(_have_seq)
@@ -278,9 +301,10 @@ class MessageMAR(object):
return 'MAR'
def __repr__(self):
- return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq)
+ return "MAR(id=%s pv=%d area=%s have_seq=%d)" % (self.id, self.version, self.area, self.have_seq)
def to_dict(self):
return {'id' : self.id,
+ 'pv' : self.version,
'area' : self.area,
'have_seq' : self.have_seq}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/hello.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/hello.py b/python/qpid_dispatch_internal/router/hello.py
index 8c3224a..cbd55ed 100644
--- a/python/qpid_dispatch_internal/router/hello.py
+++ b/python/qpid_dispatch_internal/router/hello.py
@@ -54,7 +54,7 @@ class HelloProtocol(object):
return
self.hellos[msg.id] = now
if msg.is_seen(self.id):
- self.node_tracker.neighbor_refresh(msg.id, msg.instance, link_id, cost, now)
+ self.node_tracker.neighbor_refresh(msg.id, msg.version, msg.instance, link_id, cost, now)
def _expire_hellos(self, now):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/link.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/link.py b/python/qpid_dispatch_internal/router/link.py
index 37671e4..13bd1b5 100644
--- a/python/qpid_dispatch_internal/router/link.py
+++ b/python/qpid_dispatch_internal/router/link.py
@@ -49,19 +49,19 @@ class LinkStateEngine(object):
def handle_ra(self, msg, now):
if msg.id == self.id:
return
- self.node_tracker.ra_received(msg.id, msg.ls_seq, msg.mobile_seq, msg.instance, now)
+ self.node_tracker.ra_received(msg.id, msg.version, msg.ls_seq, msg.mobile_seq, msg.instance, now)
def handle_lsu(self, msg, now):
if msg.id == self.id:
return
- self.node_tracker.link_state_received(msg.id, msg.ls, msg.instance, now)
+ self.node_tracker.link_state_received(msg.id, msg.version, msg.ls, msg.instance, now)
def handle_lsr(self, msg, now):
if msg.id == self.id:
return
- self.node_tracker.router_learned(msg.id)
+ self.node_tracker.router_learned(msg.id, msg.version)
my_ls = self.node_tracker.link_state
smsg = MessageLSU(None, self.id, my_ls.ls_seq, my_ls, self.container.instance)
self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id), smsg)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index 9822169..a9af3c6 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -18,7 +18,7 @@
#
from ..dispatch import LOG_INFO, LOG_TRACE, LOG_DEBUG
-from data import LinkState
+from data import LinkState, ProtocolVersion
from .address import Address
class NodeTracker(object):
@@ -55,6 +55,7 @@ class NodeTracker(object):
"""Refresh management attributes"""
attributes.update({
"id": self.my_id,
+ "protocolVersion": ProtocolVersion,
"instance": self.container.instance, # Boot number, integer
"linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
"nextHop": "(self)",
@@ -188,7 +189,7 @@ class NodeTracker(object):
self.container.link_state_engine.send_ra(now)
- def neighbor_refresh(self, node_id, instance, link_id, cost, now):
+ def neighbor_refresh(self, node_id, version, instance, link_id, cost, now):
"""
Invoked when the hello protocol has received positive confirmation
of continued bi-directional connectivity with a neighbor router.
@@ -198,10 +199,16 @@ class NodeTracker(object):
## If the node id is not known, create a new RouterNode to track it.
##
if node_id not in self.nodes:
- self.nodes[node_id] = RouterNode(self, node_id, instance)
+ self.nodes[node_id] = RouterNode(self, node_id, version, instance)
node = self.nodes[node_id]
##
+ ## Add the version if we haven't already done so.
+ ##
+ if node.version == None:
+ node.version = version
+
+ ##
## Set the link_id to indicate this is a neighbor router. If the link_id
## changed, update the index and add the neighbor to the local link state.
##
@@ -220,7 +227,7 @@ class NodeTracker(object):
## If the instance was updated (i.e. the neighbor restarted suddenly),
## schedule a topology recompute and a link-state-request to that router.
##
- if node.update_instance(instance):
+ if node.update_instance(instance, version):
self.recompute_topology = True
node.request_link_state()
@@ -247,7 +254,7 @@ class NodeTracker(object):
return result
- def ra_received(self, node_id, ls_seq, mobile_seq, instance, now):
+ def ra_received(self, node_id, version, ls_seq, mobile_seq, instance, now):
"""
Invoked when a router advertisement is received from another router.
"""
@@ -255,14 +262,20 @@ class NodeTracker(object):
## If the node id is not known, create a new RouterNode to track it.
##
if node_id not in self.nodes:
- self.nodes[node_id] = RouterNode(self, node_id, instance)
+ self.nodes[node_id] = RouterNode(self, node_id, version, instance)
node = self.nodes[node_id]
##
+ ## Add the version if we haven't already done so.
+ ##
+ if node.version == None:
+ node.version = version
+
+ ##
## If the instance was updated (i.e. the router restarted suddenly),
## schedule a topology recompute and a link-state-request to that router.
##
- if node.update_instance(instance):
+ if node.update_instance(instance, version):
self.recompute_topology = True
node.request_link_state()
@@ -286,15 +299,15 @@ class NodeTracker(object):
node.mobile_address_request()
- def router_learned(self, node_id):
+ def router_learned(self, node_id, version):
"""
Invoked when we learn about another router by any means
"""
if node_id not in self.nodes and node_id != self.my_id:
- self.nodes[node_id] = RouterNode(self, node_id, None)
+ self.nodes[node_id] = RouterNode(self, node_id, version, None)
- def link_state_received(self, node_id, link_state, instance, now):
+ def link_state_received(self, node_id, version, link_state, instance, now):
"""
Invoked when a link state update is received from another router.
"""
@@ -302,10 +315,16 @@ class NodeTracker(object):
## If the node id is not known, create a new RouterNode to track it.
##
if node_id not in self.nodes:
- self.nodes[node_id] = RouterNode(self, node_id, instance)
+ self.nodes[node_id] = RouterNode(self, node_id, version, instance)
node = self.nodes[node_id]
##
+ ## Add the version if we haven't already done so.
+ ##
+ if node.version == None:
+ node.version = version
+
+ ##
## If the new link state is more up-to-date than the stored link state,
## update it and schedule a topology recompute.
##
@@ -321,7 +340,7 @@ class NodeTracker(object):
##
for peer in node.link_state.peers:
if peer not in self.nodes:
- self.router_learned(peer)
+ self.router_learned(peer, None)
def router_node(self, node_id):
@@ -359,11 +378,12 @@ class RouterNode(object):
RouterNode is used to track remote routers in the router network.
"""
- def __init__(self, parent, node_id, instance):
+ def __init__(self, parent, node_id, version, instance):
self.parent = parent
self.adapter = parent.container.router_adapter
self.log = parent.container.log
self.id = node_id
+ self.version = version
self.instance = instance
self.maskbit = self.parent._allocate_maskbit()
self.neighbor_refresh_time = 0.0
@@ -385,6 +405,7 @@ class RouterNode(object):
"""Refresh management attributes"""
attributes.update({
"id": self.id,
+ "protocolVersion": self.version,
"instance": self.instance, # Boot number, integer
"linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
"nextHop": self.next_hop_router and self.next_hop_router.id,
@@ -531,7 +552,7 @@ class RouterNode(object):
self.unmap_address(a)
- def update_instance(self, instance):
+ def update_instance(self, instance, version):
if instance == None:
return False
if self.instance == None:
@@ -541,6 +562,7 @@ class RouterNode(object):
return False
self.instance = instance
+ self.version = version
self.link_state.del_all_peers()
self.unmap_all_addresses()
self.log(LOG_INFO, "Detected Restart of Router Node %s" % self.id)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/tests/router_engine_test.py
----------------------------------------------------------------------
diff --git a/tests/router_engine_test.py b/tests/router_engine_test.py
index 8a18baa..8132270 100644
--- a/tests/router_engine_test.py
+++ b/tests/router_engine_test.py
@@ -25,7 +25,7 @@ import mock # Mock definitions for tests.
sys.path.append(os.path.join(os.environ["SOURCE_DIR"], "python"))
from qpid_dispatch_internal.router.engine import HelloProtocol, PathEngine, NodeTracker
-from qpid_dispatch_internal.router.data import LinkState, MessageHELLO
+from qpid_dispatch_internal.router.data import LinkState, MessageHELLO, ProtocolVersion
from qpid_dispatch.management.entity import EntityBase
from system_test import main_module
@@ -141,7 +141,7 @@ class NeighborTest(unittest.TestCase):
def send(self, dest, msg):
self.sent.append((dest, msg))
- def neighbor_refresh(self, node_id, instance, link_id, cost, now):
+ def neighbor_refresh(self, node_id, ProtocolVersion, instance, link_id, cost, now):
self.neighbors[node_id] = (instance, link_id, cost, now)
def setUp(self):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index b85e204..6794495 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -66,6 +66,12 @@ def parse_args(argv):
return opts, args
+def get(obj, attr):
+ if attr in obj.__dict__:
+ return obj.__dict__[attr]
+ return None
+
+
class BusManager(Node):
schema = QdSchema()
@@ -138,10 +144,7 @@ class BusManager(Node):
heads.append(Header("tenant"))
rows = []
- cols = ('identity', 'host', 'container', 'role', 'dir', 'isAuthenticated', 'sasl',
- 'user', 'isEncrypted', 'sslProto', 'sslCipher', 'tenant')
-
- objects = self.query('org.apache.qpid.dispatch.connection', cols, limit=self.opts.limit)
+ objects = self.query('org.apache.qpid.dispatch.connection', limit=self.opts.limit)
for conn in objects:
row = []
@@ -152,7 +155,7 @@ class BusManager(Node):
row.append(conn.dir)
row.append(self.connSecurity(conn))
row.append(self.connAuth(conn))
- row.append(self.noTrailingSlash(conn.tenant))
+ row.append(self.noTrailingSlash(get(conn, 'tenant')))
rows.append(row)
title = "Connections"
dispRows = rows
@@ -307,14 +310,12 @@ class BusManager(Node):
heads.append(Header("next-hop"))
heads.append(Header("link"))
if self.opts.verbose:
+ heads.append(Header("ver"))
heads.append(Header("cost"))
heads.append(Header("neighbors"))
heads.append(Header("valid-origins"))
rows = []
- cols = ('id', 'nextHop', 'routerLink', 'lastTopoChange')
- if self.opts.verbose:
- cols += ('cost', 'linkState', 'validOrigins')
- objects = self.query('org.apache.qpid.dispatch.router.node', cols, limit=self.opts.limit)
+ objects = self.query('org.apache.qpid.dispatch.router.node', limit=self.opts.limit)
# Find the most recent topo change in this neighborhood.
lastTopoChange = 0.0
@@ -333,7 +334,8 @@ class BusManager(Node):
row.append(node.routerLink)
if self.opts.verbose:
- row.append(node.cost)
+ row.append(get(node, 'protocolVersion'))
+ row.append(get(node, 'cost'))
row.append('%r' % self._list_clean(node.linkState))
row.append('%r' % self._list_clean(node.validOrigins))
rows.append(row)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org