You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/05 23:47:56 UTC

svn commit: r907119 - /qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects

Author: tross
Date: Fri Feb  5 22:47:56 2010
New Revision: 907119

URL: http://svn.apache.org/viewvc?rev=907119&view=rev
Log:
QPID-2029 - Added verification script contributed by John Dunning

Added:
    qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects

Added: qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects?rev=907119&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects (added)
+++ qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects Fri Feb  5 22:47:56 2010
@@ -0,0 +1,400 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, SchemaClass
+
+_host = "localhost"
+_connTimeout = 10
+_verbose = 0
+_del_test = False;
+pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+_debug_recursion = 0
+
+def Usage ():
+    print "Usage:  verify_cluster_objects [OPTIONS] [broker-addr]"
+    print
+    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
+    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+    print
+    print "    This program contacts every node of a cluster, loads all manageable objects from"
+    print "    those nodes and verifies that the management data is identical across the clusters."
+    print
+    print "Options:"
+    print "    --timeout seconds (10)  Maximum time to wait for broker connection"
+    print "    --verbose level (0)     Show details of objects and their IDs"
+    print "    --delete                Delete some objects after creation, to test synchup"
+    print
+    sys.exit (1)
+
+class IpAddr:
+    def __init__(self, text):
+        if text.find("@") != -1:
+            tokens = text.split("@")
+            text = tokens[1]
+        if text.find(":") != -1:
+            tokens = text.split(":")
+            text = tokens[0]
+            self.port = int(tokens[1])
+        else:
+            self.port = 5672
+        self.dottedQuad = socket.gethostbyname(text)
+        nums = self.dottedQuad.split(".")
+        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+    def bestAddr(self, addrPortList):
+        bestDiff = 0xFFFFFFFFL
+        bestAddr = None
+        for addrPort in addrPortList:
+            diff = IpAddr(addrPort[0]).addr ^ self.addr
+            if diff < bestDiff:
+                bestDiff = diff
+                bestAddr = addrPort
+        return bestAddr
+
+class Broker(object):
+    def __init__(self, qmf, broker):
+        self.broker = broker
+        self.qmf = qmf
+
+        agents = qmf.getAgents()
+        for a in agents:
+            if a.getAgentBank() == 0:
+                self.brokerAgent = a
+
+        bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
+                              _agent=self.brokerAgent)[0]
+        self.currentTime = bobj.getTimestamps()[0]
+        try:
+            self.uptime = bobj.uptime
+        except:
+            self.uptime = 0
+        self.tablesByName = {}
+        self.package = "org.apache.qpid.broker"
+
+    def getUrl(self):
+        return self.broker.getUrl()
+
+    def getData(self):
+        if _verbose > 1:
+            print "Broker:", self.broker
+
+        classList = self.qmf.getClasses(self.package)
+        for cls in classList:
+            if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
+                self.loadTable(cls)
+
+
+    #
+    # this should be a method on an object, but is kept here for now, until
+    # we finish sorting out the treatment of names in qmfv2
+    #
+    def getAbstractId(self, object):
+      """ return a string the of the hierarchical name """
+      global _debug_recursion
+      result = u""
+      valstr = u""
+      _debug_recursion += 1
+      debug_prefix = _debug_recursion
+      if (_verbose > 9):
+          print debug_prefix, "  enter gai: props ", self._properties
+      for property, value in object._properties:
+
+          # we want to recurse on things which are refs.  we tell by
+          # asking each property if it's an index.  I think...
+          if (_verbose > 9):
+              print debug_prefix, "  prop ", property, " val " , value, " idx ", 
+              property.index, " type ", property.type
+
+          # property is an instance, you can ask its type, name, etc.
+
+          # special case system refs, as they will never be the same on
+          # distinct cluster nodes.  later we probably want a different
+          # way of representing these objects, like for instance don't
+          # include the system ref in the hierarchy.
+
+          if property.name == "systemRef":
+              _debug_recursion -= 1
+              return ""
+
+          if property.index:
+              if result != u"":
+                  result += u":"
+              if property.type == 10:
+                  try:
+                      recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
+                      if (_verbose > 9):
+                          print debug_prefix, "   r ", recursive_objects[0]
+                          for rp, rv in recursive_objects[0]._properties:
+                              print debug_prefix, "   rrr ", rp, " idx-p ", rp.index, " v ", rv
+                          print debug_prefix, "    recursing on ", recursive_objects[0]
+                      valstr = self.getAbstractId(recursive_objects[0])
+                      if (_verbose > 9):
+                          print debug_prefix,  "    recursing on ", recursive_objects[0],
+                          " -> ", valstr
+                  except Exception, e:
+                      if (_verbose > 9):
+                          print debug_prefix, "          except ", e
+                      valstr = u"<undecodable>"
+              else:
+                  # this yields UUID-blah.  not good.  try something else
+                  # valstr = value.__repr__()
+                  # print debug_prefix, " val ", value
+          
+                  # yetch.  this needs to be abstracted someplace?  I don't
+                  # think we have the infrastructure we need to make these id
+                  # strings be sensible in the general case
+                  if property.name == "systemId":
+                      # special case.  try to do something sensible about systemref objects
+                      valstr = object.nodeName
+                  else:
+                      valstr = value.__repr__() # I think...
+          result += valstr
+          if (_verbose > 9):
+              print debug_prefix, "    id ", self, " -> ", result
+      _debug_recursion -= 1
+      return result
+
+    def loadTable(self, cls):
+        if _verbose > 1:
+            print "  Class:", cls.getClassName()
+        list = self.qmf.getObjects(_class=cls.getClassName(),
+                                   _package=cls.getPackageName(),
+                                   _agent=self.brokerAgent)
+
+        # tables-by-name maps class name to a table by object-name of
+        # objects.  ie use the class name ("broker", "queue", etc) to
+        # index tables-by-name, returning a second table, use the
+        # object name to index that to get an object.
+
+        self.tablesByName[cls.getClassName()] = {}
+        for obj in list:
+            # make sure we aren't colliding on name.  it's an internal
+            # error (ie, the name-generation code is busted) if we do
+            key = self.getAbstractId(obj)
+            if key in self.tablesByName[cls.getClassName()]:
+                print "internal error: collision for %s on key %s\n" % (obj, key)
+                sys.exit(1)
+                
+            self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj
+#            sys.exit(1)
+            if _verbose > 1:
+                print "   ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj)
+
+
+class BrokerManager:
+    def __init__(self):
+        self.brokerName = None
+        self.qmf        = None
+        self.broker     = None
+        self.brokers    = []
+        self.cluster    = None
+
+    def SetBroker(self, brokerUrl):
+        self.url = brokerUrl
+        self.qmf = Session()
+        self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
+        agents = self.qmf.getAgents()
+        for a in agents:
+            if a.getAgentBank() == 0:
+                self.brokerAgent = a
+
+    def Disconnect(self):
+        if self.broker:
+            self.qmf.delBroker(self.broker)
+
+    def _getCluster(self):
+        packages = self.qmf.getPackages()
+        if "org.apache.qpid.cluster" not in packages:
+            return None
+
+        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+        if len(clusters) == 0:
+            print "Clustering is installed but not enabled on the broker."
+            return None
+
+        self.cluster = clusters[0]
+
+    def _getHostList(self, urlList):
+        hosts = []
+        hostAddr = IpAddr(_host)
+        for url in urlList:
+            if url.find("amqp:") != 0:
+                raise Exception("Invalid URL 1")
+            url = url[5:]
+            addrs = str(url).split(",")
+            addrList = []
+            for addr in addrs:
+                tokens = addr.split(":")
+                if len(tokens) != 3:
+                    raise Exception("Invalid URL 2")
+                addrList.append((tokens[1], tokens[2]))
+
+            # Find the address in the list that is most likely to be in the same subnet as the address
+            # with which we made the original QMF connection.  This increases the probability that we will
+            # be able to reach the cluster member.
+
+            best = hostAddr.bestAddr(addrList)
+            bestUrl = best[0] + ":" + best[1]
+            hosts.append(bestUrl)
+        return hosts
+
+
+    # the main fun which tests for broker state "identity".  now that
+    # we're using qmf2 style object names across the board, that test
+    # means that we are ensuring that for all objects of a given
+    # class, an object of that class with the same object name exists
+    # on the peer broker.
+
+    def verify(self):
+        if _verbose > 0:
+            print "Connecting to the cluster..."
+        self._getCluster()
+        if self.cluster:
+            memberList = self.cluster.members.split(";")
+            hostList = self._getHostList(memberList)
+            self.qmf.delBroker(self.broker)
+            self.broker = None
+            for host in hostList:
+                b = self.qmf.addBroker(host, _connTimeout)
+                self.brokers.append(Broker(self.qmf, b))
+                if _verbose > 0:
+                    print "   ", b
+        else:
+            print "Failed - Not a cluster"
+            sys.exit(1)
+
+        failures = []
+
+        # Wait until connections to all nodes are established before
+        # loading the management data.  This will ensure that the
+        # objects are all stable and the same.
+        if _verbose > 0:
+            print "Loading management data from nodes..."
+        for broker in self.brokers:
+            broker.getData()
+
+        # If we're testing delete-some-objects functionality, create a
+        # few widgets here and then delete them.
+        if _del_test:
+            if _verbose > 0:
+                print "Running delete test"
+            # just stick 'em in the first broker
+            b = self.brokers[0]
+            session = b.qmf.brokers[0].getAmqpSession()
+            session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
+            session.exchange_bind(exchange="amq.direct",
+                                                 queue="foo", binding_key="foo")
+            session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
+            session.exchange_bind(exchange="amq.direct",
+                                                 queue="bar", binding_key="bar")
+            # now delete 'em
+            session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
+            session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
+            session.queue_delete("bar")
+            session.queue_delete("foo")
+
+        # Verify that each node has the same set of objects (based on
+        # object name).
+        if _verbose > 0:
+            print "Verifying objects based on object name..."
+        base = self.brokers[0]
+        for broker in self.brokers[1:]:
+
+            # walk over the class names, for each class (with some
+            # exceptions) walk over the objects of that class, making
+            # sure they match between broker A and broker B
+
+            for className in base.tablesByName:
+                if className in ["broker", "system", "connection"]:
+                    continue
+
+                tab1 = base.tablesByName[className]
+                tab2 = broker.tablesByName[className]
+
+                for key in tab1:
+                    if key not in tab2:
+                        failures.append("%s key %s not found on node %s" %
+                                        (className, key, broker.getUrl()))
+                for key in tab2:
+                    if key not in tab1:
+                        failures.append("%s key %s not found on node %s" %
+                                        (className, key, base.getUrl()))
+
+        if len(failures) > 0:
+            print "Failures:"
+            for failure in failures:
+                print "  %s" % failure
+            sys.exit(1)
+
+        if _verbose > 0:
+            print "Success"
+        sys.exit(0)
+
+##
+## Main Program
+##
+
+try:
+    longOpts = ("verbose=", "timeout=", "delete")
+    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
+except:
+    Usage()
+
+try:
+    encoding = locale.getpreferredencoding()
+    cargs = [a.decode(encoding) for a in encArgs]
+except:
+    cargs = encArgs
+
+for opt in optlist:
+    if opt[0] == "--timeout":
+        _connTimeout = int(opt[1])
+        if _connTimeout == 0:
+            _connTimeout = None
+    elif opt[0] == "--verbose":
+        _verbose = int(opt[1])
+    elif opt[0] == "--delete":
+        _del_test = True;
+    else:
+        Usage()
+
+nargs = len(cargs)
+bm    = BrokerManager()
+
+if nargs == 1:
+    _host = cargs[0]
+
+try:
+    bm.SetBroker(_host)
+    bm.verify()
+except KeyboardInterrupt:
+    print
+except Exception,e:
+    print "Failed: %s - %s" % (e.__class__.__name__, e)
+    sys.exit(1)
+
+bm.Disconnect()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org