You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/02/03 20:18:14 UTC

qpid-dispatch git commit: DISPATCH-629 - Added protocol-version to inter-router messages for future backward compatibility.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 7d6c05376 -> 3bb2c53d8


DISPATCH-629 - Added protocol-version to inter-router messages for future backward compatibility.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3bb2c53d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3bb2c53d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3bb2c53d

Branch: refs/heads/master
Commit: 3bb2c53d8dcd708e50340e458b84a6507dbb8b13
Parents: 7d6c053
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Feb 3 15:17:09 2017 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Feb 3 15:17:09 2017 -0500

----------------------------------------------------------------------
 python/qpid_dispatch/management/qdrouter.json |  4 ++
 python/qpid_dispatch_internal/router/data.py  | 52 ++++++++++++++++------
 python/qpid_dispatch_internal/router/hello.py |  2 +-
 python/qpid_dispatch_internal/router/link.py  |  6 +--
 python/qpid_dispatch_internal/router/node.py  | 50 +++++++++++++++------
 tests/router_engine_test.py                   |  4 +-
 tools/qdstat                                  | 22 ++++-----
 7 files changed, 96 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index f4ab551..3fcadb4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1219,6 +1219,10 @@
                     "description": "Remote node identifier.",
                     "type": "string"
                 },
+                "protocolVersion": {
+                    "description": "Router-protocol version supported by the node.",
+                    "type": "integer"
+                },
                 "instance": {
                     "description": "Remote node boot number.",
                     "type": "integer"

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py
index 5b739b8..a2b669b 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -17,6 +17,12 @@
 # under the License.
 #
 
+##
+## Define the current protocol version.  Any messages that do not contain version
+## information shall be considered to be coming from routers using version 0.
+##
+ProtocolVersion = 1L
+
 def getMandatory(data, key, cls=None):
     """
     Get the value mapped to the requested key.    If it's not present, raise an exception.
@@ -107,20 +113,23 @@ class MessageHELLO(object):
             self.area = '0'
             self.seen_peers = getMandatory(body, 'seen', list)
             self.instance = getOptional(body, 'instance', 0, long)
+            self.version  = getOptional(body, 'pv', 0, long)
         else:
             self.id   = _id
             self.area = '0'
             self.seen_peers = _seen_peers
             self.instance = _instance
+            self.version  = ProtocolVersion
 
     def __repr__(self):
-        return "HELLO(id=%s area=%s inst=%d seen=%r)" % (self.id, self.area, self.instance, self.seen_peers)
+        return "HELLO(id=%s pv=%d area=%s inst=%d seen=%r)" % (self.id, self.version, self.area, self.instance, self.seen_peers)
 
     def get_opcode(self):
         return 'HELLO'
 
     def to_dict(self):
         return {'id'       : self.id,
+                'pv'       : self.version,
                 'area'     : self.area,
                 'instance' : self.instance,
                 'seen'     : self.seen_peers}
@@ -143,22 +152,25 @@ class MessageRA(object):
             self.ls_seq = getMandatory(body, 'ls_seq', long)
             self.mobile_seq = getMandatory(body, 'mobile_seq', long)
             self.instance = getOptional(body, 'instance', 0, long)
+            self.version  = getOptional(body, 'pv', 0, long)
         else:
             self.id = _id
             self.area = '0'
             self.ls_seq = long(_ls_seq)
             self.mobile_seq = long(_mobile_seq)
             self.instance = _instance
+            self.version  = ProtocolVersion
 
     def get_opcode(self):
         return 'RA'
 
     def __repr__(self):
-        return "RA(id=%s area=%s inst=%d ls_seq=%d mobile_seq=%d)" % \
-                (self.id, self.area, self.instance, self.ls_seq, self.mobile_seq)
+        return "RA(id=%s pv=%d area=%s inst=%d ls_seq=%d mobile_seq=%d)" % \
+                (self.id, self.version, self.area, self.instance, self.ls_seq, self.mobile_seq)
 
     def to_dict(self):
         return {'id'         : self.id,
+                'pv'         : self.version,
                 'area'       : self.area,
                 'instance'   : self.instance,
                 'ls_seq'     : self.ls_seq,
@@ -175,22 +187,25 @@ class MessageLSU(object):
             self.ls_seq = getMandatory(body, 'ls_seq', long)
             self.ls = LinkState(getMandatory(body, 'ls', dict))
             self.instance = getOptional(body, 'instance', 0, long)
+            self.version  = getOptional(body, 'pv', 0, long)
         else:
             self.id = _id
             self.area = '0'
             self.ls_seq = long(_ls_seq)
             self.ls = _ls
             self.instance = _instance
+            self.version  = ProtocolVersion
 
     def get_opcode(self):
         return 'LSU'
 
     def __repr__(self):
-        return "LSU(id=%s area=%s inst=%d ls_seq=%d ls=%r)" % \
-                (self.id, self.area, self.instance, self.ls_seq, self.ls)
+        return "LSU(id=%s pv=%d area=%s inst=%d ls_seq=%d ls=%r)" % \
+                (self.id, self.version, self.area, self.instance, self.ls_seq, self.ls)
 
     def to_dict(self):
         return {'id'       : self.id,
+                'pv'       : self.version,
                 'area'     : self.area,
                 'instance' : self.instance,
                 'ls_seq'   : self.ls_seq,
@@ -203,20 +218,23 @@ class MessageLSR(object):
     def __init__(self, body, _id=None):
         if body:
             self.id = getMandatory(body, 'id', str)
+            self.version = getOptional(body, 'pv', 0, long)
             self.area = '0'
         else:
             self.id = _id
+            self.version = ProtocolVersion
             self.area = '0'
 
     def get_opcode(self):
         return 'LSR'
 
     def __repr__(self):
-        return "LSR(id=%s area=%s)" % (self.id, self.area)
+        return "LSR(id=%s pv=%d area=%s)" % (self.id, self.version, self.area)
 
     def to_dict(self):
-        return {'id'     : self.id,
-                'area'   : self.area}
+        return {'id'      : self.id,
+                'pv'      : self.version,
+                'area'    : self.area}
 
 
 class MessageMAU(object):
@@ -225,6 +243,7 @@ class MessageMAU(object):
     def __init__(self, body, _id=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None):
         if body:
             self.id = getMandatory(body, 'id', str)
+            self.version = getOptional(body, 'pv', 0, long)
             self.area = '0'
             self.mobile_seq = getMandatory(body, 'mobile_seq', long)
             self.add_list = getOptional(body, 'add', None, list)
@@ -232,6 +251,7 @@ class MessageMAU(object):
             self.exist_list = getOptional(body, 'exist', None, list)
         else:
             self.id = _id
+            self.version = ProtocolVersion
             self.area = '0'
             self.mobile_seq = long(_seq)
             self.add_list = _add_list
@@ -248,13 +268,14 @@ class MessageMAU(object):
         if self.add_list != None:   _add   = ' add=%r'   % self.add_list
         if self.del_list != None:   _del   = ' del=%r'   % self.del_list
         if self.exist_list != None: _exist = ' exist=%r' % self.exist_list
-        return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \
-                (self.id, self.area, self.mobile_seq, _add, _del, _exist)
+        return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \
+                (self.id, self.version, self.area, self.mobile_seq, _add, _del, _exist)
 
     def to_dict(self):
-        body = { 'id'         : self.id,
-                 'area'       : self.area,
-                 'mobile_seq' : self.mobile_seq }
+        body = {'id'         : self.id,
+                'pv'         : self.version,
+                'area'       : self.area,
+                'mobile_seq' : self.mobile_seq }
         if self.add_list != None:   body['add']   = self.add_list
         if self.del_list != None:   body['del']   = self.del_list
         if self.exist_list != None: body['exist'] = self.exist_list
@@ -267,10 +288,12 @@ class MessageMAR(object):
     def __init__(self, body, _id=None, _have_seq=None):
         if body:
             self.id = getMandatory(body, 'id', str)
+            self.version = getOptional(body, 'pv', 0, long)
             self.area = '0'
             self.have_seq = getMandatory(body, 'have_seq', long)
         else:
             self.id = _id
+            self.version = ProtocolVersion
             self.area = '0'
             self.have_seq = long(_have_seq)
 
@@ -278,9 +301,10 @@ class MessageMAR(object):
         return 'MAR'
 
     def __repr__(self):
-        return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq)
+        return "MAR(id=%s pv=%d area=%s have_seq=%d)" % (self.id, self.version, self.area, self.have_seq)
 
     def to_dict(self):
         return {'id'       : self.id,
+                'pv'       : self.version,
                 'area'     : self.area,
                 'have_seq' : self.have_seq}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/hello.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/hello.py b/python/qpid_dispatch_internal/router/hello.py
index 8c3224a..cbd55ed 100644
--- a/python/qpid_dispatch_internal/router/hello.py
+++ b/python/qpid_dispatch_internal/router/hello.py
@@ -54,7 +54,7 @@ class HelloProtocol(object):
             return
         self.hellos[msg.id] = now
         if msg.is_seen(self.id):
-            self.node_tracker.neighbor_refresh(msg.id, msg.instance, link_id, cost, now)
+            self.node_tracker.neighbor_refresh(msg.id, msg.version, msg.instance, link_id, cost, now)
 
 
     def _expire_hellos(self, now):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/link.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/link.py b/python/qpid_dispatch_internal/router/link.py
index 37671e4..13bd1b5 100644
--- a/python/qpid_dispatch_internal/router/link.py
+++ b/python/qpid_dispatch_internal/router/link.py
@@ -49,19 +49,19 @@ class LinkStateEngine(object):
     def handle_ra(self, msg, now):
         if msg.id == self.id:
             return
-        self.node_tracker.ra_received(msg.id, msg.ls_seq, msg.mobile_seq, msg.instance, now)
+        self.node_tracker.ra_received(msg.id, msg.version, msg.ls_seq, msg.mobile_seq, msg.instance, now)
 
 
     def handle_lsu(self, msg, now):
         if msg.id == self.id:
             return
-        self.node_tracker.link_state_received(msg.id, msg.ls, msg.instance, now)
+        self.node_tracker.link_state_received(msg.id, msg.version, msg.ls, msg.instance, now)
 
 
     def handle_lsr(self, msg, now):
         if msg.id == self.id:
             return
-        self.node_tracker.router_learned(msg.id)
+        self.node_tracker.router_learned(msg.id, msg.version)
         my_ls = self.node_tracker.link_state
         smsg = MessageLSU(None, self.id, my_ls.ls_seq, my_ls, self.container.instance)
         self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id), smsg)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index 9822169..a9af3c6 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -18,7 +18,7 @@
 #
 
 from ..dispatch import LOG_INFO, LOG_TRACE, LOG_DEBUG
-from data import LinkState
+from data import LinkState, ProtocolVersion
 from .address import Address
 
 class NodeTracker(object):
@@ -55,6 +55,7 @@ class NodeTracker(object):
         """Refresh management attributes"""
         attributes.update({
             "id": self.my_id,
+            "protocolVersion": ProtocolVersion,
             "instance": self.container.instance, # Boot number, integer
             "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
             "nextHop":  "(self)",
@@ -188,7 +189,7 @@ class NodeTracker(object):
             self.container.link_state_engine.send_ra(now)
 
 
-    def neighbor_refresh(self, node_id, instance, link_id, cost, now):
+    def neighbor_refresh(self, node_id, version, instance, link_id, cost, now):
         """
         Invoked when the hello protocol has received positive confirmation
         of continued bi-directional connectivity with a neighbor router.
@@ -198,10 +199,16 @@ class NodeTracker(object):
         ## If the node id is not known, create a new RouterNode to track it.
         ##
         if node_id not in self.nodes:
-            self.nodes[node_id] = RouterNode(self, node_id, instance)
+            self.nodes[node_id] = RouterNode(self, node_id, version, instance)
         node = self.nodes[node_id]
 
         ##
+        ## Add the version if we haven't already done so.
+        ##
+        if node.version == None:
+            node.version = version
+
+        ##
         ## Set the link_id to indicate this is a neighbor router.  If the link_id
         ## changed, update the index and add the neighbor to the local link state.
         ##
@@ -220,7 +227,7 @@ class NodeTracker(object):
         ## If the instance was updated (i.e. the neighbor restarted suddenly),
         ## schedule a topology recompute and a link-state-request to that router.
         ##
-        if node.update_instance(instance):
+        if node.update_instance(instance, version):
             self.recompute_topology = True
             node.request_link_state()
 
@@ -247,7 +254,7 @@ class NodeTracker(object):
         return result
 
 
-    def ra_received(self, node_id, ls_seq, mobile_seq, instance, now):
+    def ra_received(self, node_id, version, ls_seq, mobile_seq, instance, now):
         """
         Invoked when a router advertisement is received from another router.
         """
@@ -255,14 +262,20 @@ class NodeTracker(object):
         ## If the node id is not known, create a new RouterNode to track it.
         ##
         if node_id not in self.nodes:
-            self.nodes[node_id] = RouterNode(self, node_id, instance)
+            self.nodes[node_id] = RouterNode(self, node_id, version, instance)
         node = self.nodes[node_id]
 
         ##
+        ## Add the version if we haven't already done so.
+        ##
+        if node.version == None:
+            node.version = version
+
+        ##
         ## If the instance was updated (i.e. the router restarted suddenly),
         ## schedule a topology recompute and a link-state-request to that router.
         ##
-        if node.update_instance(instance):
+        if node.update_instance(instance, version):
             self.recompute_topology = True
             node.request_link_state()
 
@@ -286,15 +299,15 @@ class NodeTracker(object):
             node.mobile_address_request()
 
 
-    def router_learned(self, node_id):
+    def router_learned(self, node_id, version):
         """
         Invoked when we learn about another router by any means
         """
         if node_id not in self.nodes and node_id != self.my_id:
-            self.nodes[node_id] = RouterNode(self, node_id, None)
+            self.nodes[node_id] = RouterNode(self, node_id, version, None)
 
 
-    def link_state_received(self, node_id, link_state, instance, now):
+    def link_state_received(self, node_id, version, link_state, instance, now):
         """
         Invoked when a link state update is received from another router.
         """
@@ -302,10 +315,16 @@ class NodeTracker(object):
         ## If the node id is not known, create a new RouterNode to track it.
         ##
         if node_id not in self.nodes:
-            self.nodes[node_id] = RouterNode(self, node_id, instance)
+            self.nodes[node_id] = RouterNode(self, node_id, version, instance)
         node = self.nodes[node_id]
 
         ##
+        ## Add the version if we haven't already done so.
+        ##
+        if node.version == None:
+            node.version = version
+
+        ##
         ## If the new link state is more up-to-date than the stored link state,
         ## update it and schedule a topology recompute.
         ##
@@ -321,7 +340,7 @@ class NodeTracker(object):
             ##
             for peer in node.link_state.peers:
                 if peer not in self.nodes:
-                    self.router_learned(peer)
+                    self.router_learned(peer, None)
 
 
     def router_node(self, node_id):
@@ -359,11 +378,12 @@ class RouterNode(object):
     RouterNode is used to track remote routers in the router network.
     """
 
-    def __init__(self, parent, node_id, instance):
+    def __init__(self, parent, node_id, version, instance):
         self.parent                  = parent
         self.adapter                 = parent.container.router_adapter
         self.log                     = parent.container.log
         self.id                      = node_id
+        self.version                 = version
         self.instance                = instance
         self.maskbit                 = self.parent._allocate_maskbit()
         self.neighbor_refresh_time   = 0.0
@@ -385,6 +405,7 @@ class RouterNode(object):
         """Refresh management attributes"""
         attributes.update({
             "id": self.id,
+            "protocolVersion": self.version,
             "instance": self.instance, # Boot number, integer
             "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
             "nextHop":  self.next_hop_router and self.next_hop_router.id,
@@ -531,7 +552,7 @@ class RouterNode(object):
             self.unmap_address(a)
 
 
-    def update_instance(self, instance):
+    def update_instance(self, instance, version):
         if instance == None:
             return False
         if self.instance == None:
@@ -541,6 +562,7 @@ class RouterNode(object):
             return False
 
         self.instance = instance
+        self.version  = version
         self.link_state.del_all_peers()
         self.unmap_all_addresses()
         self.log(LOG_INFO, "Detected Restart of Router Node %s" % self.id)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/tests/router_engine_test.py
----------------------------------------------------------------------
diff --git a/tests/router_engine_test.py b/tests/router_engine_test.py
index 8a18baa..8132270 100644
--- a/tests/router_engine_test.py
+++ b/tests/router_engine_test.py
@@ -25,7 +25,7 @@ import mock                     # Mock definitions for tests.
 sys.path.append(os.path.join(os.environ["SOURCE_DIR"], "python"))
 
 from qpid_dispatch_internal.router.engine import HelloProtocol, PathEngine, NodeTracker
-from qpid_dispatch_internal.router.data import LinkState, MessageHELLO
+from qpid_dispatch_internal.router.data import LinkState, MessageHELLO, ProtocolVersion
 from qpid_dispatch.management.entity import EntityBase
 from system_test import main_module
 
@@ -141,7 +141,7 @@ class NeighborTest(unittest.TestCase):
     def send(self, dest, msg):
         self.sent.append((dest, msg))
 
-    def neighbor_refresh(self, node_id, instance, link_id, cost, now):
+    def neighbor_refresh(self, node_id, ProtocolVersion, instance, link_id, cost, now):
         self.neighbors[node_id] = (instance, link_id, cost, now)
 
     def setUp(self):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index b85e204..6794495 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -66,6 +66,12 @@ def parse_args(argv):
     return opts, args
 
 
+def get(obj, attr):
+    if attr in obj.__dict__:
+        return obj.__dict__[attr]
+    return None
+
+
 class BusManager(Node):
 
     schema = QdSchema()
@@ -138,10 +144,7 @@ class BusManager(Node):
         heads.append(Header("tenant"))
 
         rows = []
-        cols = ('identity', 'host', 'container', 'role', 'dir', 'isAuthenticated', 'sasl',
-                'user', 'isEncrypted', 'sslProto', 'sslCipher', 'tenant')
-
-        objects = self.query('org.apache.qpid.dispatch.connection', cols, limit=self.opts.limit)
+        objects = self.query('org.apache.qpid.dispatch.connection', limit=self.opts.limit)
 
         for conn in objects:
             row = []
@@ -152,7 +155,7 @@ class BusManager(Node):
             row.append(conn.dir)
             row.append(self.connSecurity(conn))
             row.append(self.connAuth(conn))
-            row.append(self.noTrailingSlash(conn.tenant))
+            row.append(self.noTrailingSlash(get(conn, 'tenant')))
             rows.append(row)
         title = "Connections"
         dispRows = rows
@@ -307,14 +310,12 @@ class BusManager(Node):
         heads.append(Header("next-hop"))
         heads.append(Header("link"))
         if self.opts.verbose:
+            heads.append(Header("ver"))
             heads.append(Header("cost"))
             heads.append(Header("neighbors"))
             heads.append(Header("valid-origins"))
         rows = []
-        cols = ('id', 'nextHop', 'routerLink', 'lastTopoChange')
-        if self.opts.verbose:
-            cols += ('cost', 'linkState', 'validOrigins')
-        objects  = self.query('org.apache.qpid.dispatch.router.node', cols, limit=self.opts.limit)
+        objects = self.query('org.apache.qpid.dispatch.router.node', limit=self.opts.limit)
 
         # Find the most recent topo change in this neighborhood.
         lastTopoChange = 0.0
@@ -333,7 +334,8 @@ class BusManager(Node):
                 row.append(node.routerLink)
 
             if self.opts.verbose:
-                row.append(node.cost)
+                row.append(get(node, 'protocolVersion'))
+                row.append(get(node, 'cost'))
                 row.append('%r' % self._list_clean(node.linkState))
                 row.append('%r' % self._list_clean(node.validOrigins))
             rows.append(row)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org