You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/05/06 18:57:39 UTC

svn commit: r772359 - /qpid/trunk/qpid/python/qpid/testlib.py

Author: kpvdr
Date: Wed May  6 16:57:38 2009
New Revision: 772359

URL: http://svn.apache.org/viewvc?rev=772359&view=rev
Log:
Added the ability to start and stop a test broker from within the python test framework. Also added some cluster test functionality

Modified:
    qpid/trunk/qpid/python/qpid/testlib.py

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=772359&r1=772358&r2=772359&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Wed May  6 16:57:38 2009
@@ -53,6 +53,7 @@
 class TestRunner:
 
     SPEC_FOLDER = "../specs"
+    qpidd = os.getenv("QPIDD")
 
     """Runs unit tests.
 
@@ -73,6 +74,9 @@
                            0-10-errata - use the 0-10 specification with qpid errata.
   -e/--errata <errata.xml> : file containing amqp XML errata
   -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to
+  -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD
+                             env to point to broker executable. broker-args will be
+                             prepended with "--daemon --port=0"
   -v/--verbose             : verbose - lists tests as they are run.
   -d/--debug               : enable debug logging.
   -i/--ignore <test>       : ignore the named test.
@@ -82,6 +86,34 @@
   """
         sys.exit(1)
 
+    def startBroker(self, brokerArgs):
+        """Start a single broker daemon"""
+        if TestRunner.qpidd == None:
+            self._die("QPIDD environment var must point to qpidd when using -B/--start-broker")
+        cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs)
+        portStr = os.popen(cmd).read()
+        if len(portStr) == 0:
+            self._die("%s failed to start" % TestRunner.qpidd)
+        port = int(portStr)
+        pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read())
+        print "Started broker: pid=%d, port=%d" % (pid, port)
+        self.brokerTuple = (pid, port)
+        self.setBroker("localhost:%d" % port)
+    
+    def stopBroker(self):
+        """Stop the broker using qpidd -q"""
+        if self.brokerTuple:
+            ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q")
+            if ret != 0:
+                self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret))
+            print "Stopped broker: pid=%d, port=%d" % self.brokerTuple
+    
+    def killBroker(self):
+        """Kill the broker using kill -9 (SIGTERM)"""
+        if self.brokerTuple:
+            os.kill(self.brokerTuple[0], signal.SIGTERM)
+            print "Killed broker: pid=%d, port=%d" % self.brokerTuple
+
     def setBroker(self, broker):
         try:
             self.url = URL(broker)
@@ -122,17 +154,22 @@
         self.skip_self_test = False
 
         try:
-            opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:",
+            opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:",
                                       ["help", "spec", "errata=", "broker=",
-                                       "verbose", "skip-self-test", "ignore",
+                                       "start-broker=", "verbose", "skip-self-test", "ignore",
                                        "ignore-file", "spec-folder"])
         except GetoptError, e:
             self._die(str(e))
+        # check for mutually exclusive options
+        if "-B" in opts or "--start-broker" in opts:
+            if "-b" in opts or "--broker" in opts:
+                self._die("Cannot use -B/--start-broker and -b/broker options together")
         for opt, value in opts:
             if opt in ("-?", "-h", "--help"): self._die()
             if opt in ("-s", "--spec"): self.specfile = value
             if opt in ("-e", "--errata"): self.errata.append(value)
             if opt in ("-b", "--broker"): self.setBroker(value)
+            if opt in ("-B", "--start-broker"): self.startBroker(value)
             if opt in ("-v", "--verbose"): self.verbose = 2
             if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
             if opt in ("-i", "--ignore"): self.ignore.append(value)
@@ -182,6 +219,7 @@
         return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
 
     def run(self, args=sys.argv[1:]):
+        self.brokerTuple = None
         self._parseargs(args)
         runner = unittest.TextTestRunner(descriptions=False,
                                          verbosity=self.verbose)
@@ -193,6 +231,7 @@
             for t in self.ignore: print t
             print "======================================="
 
+        self.stopBroker()
         return result.wasSuccessful()
 
     def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
@@ -390,3 +429,196 @@
         session.message_subscribe(**keys)
         session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
         session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
+
+
+class TestBaseCluster(unittest.TestCase):
+    """
+    Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes.
+    """
+    _tempDir = os.getenv("TMPDIR")
+    _qpidd = os.getenv("QPIDD")
+    _storeLib = os.getenv("LIBSTORE")
+    _clusterLib = os.getenv("LIBCLUSTER")
+    
+    # --- Cluster helper functions ---
+        
+    """
+    _clusterDict is a dictionary of clusters:
+        key = cluster name (string)
+        val = dictionary of node numbers:
+            key = node number (int)
+            val = tuple containing (pid, port)
+    For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows:
+    {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}}
+    where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively.
+    """
+    _clusterDict = {}
+    
+    """Index for (pid, port) tuple"""
+    PID = 0
+    PORT = 1
+    
+    def startBroker(self, qpiddArgs, logFile = None):
+        """Start a single broker daemon, returns tuple (pid, port)"""
+        if self._qpidd == None:
+            raise Exception("Environment variable QPIDD is not set")
+        cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs)
+        portStr = os.popen(cmd).read()
+        if len(portStr) == 0:
+            err = "Broker daemon startup failed."
+            if logFile != None:
+                err += " See log file %s" % logFile
+            raise Exception(err)
+        port = int(portStr)
+        pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read())
+        #print "started broker: pid=%d, port=%d" % (pid, port)
+        return (pid, port)
+    
+    def createClusterNode(self, nodeNumber, clusterName):
+        """Create a node and add it to the named cluster"""
+        if self._tempDir == None:
+            raise Exception("Environment variable TMPDIR is not set")
+        if self._storeLib == None:
+            raise Exception("Environment variable LIBSTORE is not set")
+        if self._clusterLib == None:
+            raise Exception("Environment variable LIBCLUSTER is not set")
+        name = "%s-%d" % (clusterName, nodeNumber)
+        dataDir = os.path.join(self._tempDir, "cluster", name)
+        logFile = "%s.log" % dataDir
+        args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \
+            (self._storeLib, self._clusterLib, dataDir, clusterName, logFile)
+        self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
+    
+    def createCluster(self, clusterName, numberNodes):
+        """Create a cluster containing an initial number of nodes"""
+        self._clusterDict[clusterName] = {}
+        for n in range(0, numberNodes):
+            self.createClusterNode(n, clusterName)
+    
+    def getTupleList(self):
+        """Get list of (pid, port) tuples of all known cluster brokers"""
+        tList = []
+        for l in self._clusterDict.itervalues():
+            for t in l.itervalues():
+                tList.append(t)
+        return tList
+    
+    def getNumBrokers(self):
+        """Get total number of brokers in all known clusters"""
+        return len(self.getTupleList())
+    
+    def checkNumBrokers(self, expected):
+        """Check that the total number of brokers in all known clusters is the expected value"""
+        if self.getNumBrokers() != expected:
+            raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
+
+    def getClusterTupleList(self, clusterName):
+        """Get list of (pid, port) tuples of all nodes in named cluster"""
+        return self._clusterDict[clusterName].values()
+    
+    def getNumClusterBrokers(self, clusterName):
+        """Get total number of brokers in named cluster"""
+        return len(self.getClusterTupleList(clusterName))
+    
+    def getNodeTuple(self, nodeNumber, clusterName):
+        """Get the (pid, port) tuple for the given cluster node"""
+        return self._clusterDict[clusterName][nodeNumber]
+    
+    def checkNumClusterBrokers(self, clusterName, expected):
+        """Check that the total number of brokers in the named cluster is the expected value"""
+        if self.getNumClusterBrokers(clusterName) != expected:
+            raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
+                            (clusterName, expected, self.getNumClusterBrokers(clusterName)))
+
+    def clusterExists(self, clusterName):
+        """ Return True if clusterName exists, False otherwise"""
+        return clusterName in self._clusterDict.keys()
+    
+    def clusterNodeExists(self, clusterName, nodeNumber):
+        """ Return True if nodeNumber in clusterName exists, False otherwise"""
+        if clusterName in self._clusterDict.keys():
+            return nodeNumber in self._clusterDict[nodeName]
+        return False
+    
+    def createCheckCluster(self, clusterName, size):
+        """Create a cluster using the given name and size, then check the number of brokers"""
+        self.createCluster(clusterName, size)
+        self.checkNumClusterBrokers(clusterName, size)
+    
+    # Kill cluster nodes using signal 9
+    
+    def killNode(self, nodeNumber, clusterName, updateDict = True):
+        """Kill the given node in the named cluster using kill -9"""
+        pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID]
+        os.kill(pid, signal.SIGTERM)
+        #print "killed broker: pid=%d" % pid
+        if updateDict:
+            del(self._clusterDict[clusterName][nodeNumber])
+    
+    def killCluster(self, clusterName, updateDict = True):
+        """Kill all nodes in the named cluster"""
+        for n in self._clusterDict[clusterName].iterkeys():
+            self.killNode(n, clusterName, False)
+        if updateDict:
+            del(self._clusterDict[clusterName])
+    
+    def killClusterCheck(self, clusterName):
+        """Kill the named cluster and check that the name is removed from the cluster dictionary"""
+        self.killCluster(clusterName)
+        if self.clusterExists(clusterName):
+            raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
+                            (clusterName, self.getNumClusterBrokers(clusterName)))
+    
+    def killAllClusters(self):
+        """Kill all known clusters"""
+        for n in self._clusterDict.iterkeys():
+            self.killCluster(n, False)
+        self._clusterDict.clear() 
+    
+    def killAllClustersCheck(self):
+        """Kill all known clusters and check that the cluster dictionary is empty"""
+        self.killAllClusters()
+        self.checkNumBrokers(0)
+    
+    # Stop cluster nodes using qpidd -q
+    
+    def stopNode(self, nodeNumber, clusterName, updateDict = True):
+        """Stop the given node in the named cluster using qpidd -q"""
+        port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+        ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q")
+        if ret != 0:
+            raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \
+                            (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret))
+        #print "stopped broker: port=%d" % port 
+        if updateDict:
+            del(self._clusterDict[clusterName][nodeNumber])
+    
+    def stopAllClusters(self):
+        """Stop all known clusters"""
+        for n in self._clusterDict.iterkeys():
+            self.stopCluster(n, False)
+        self._clusterDict.clear() 
+
+    
+    def stopCluster(self, clusterName, updateDict = True):
+        """Stop all nodes in the named cluster"""
+        for n in self._clusterDict[clusterName].iterkeys():
+            self.stopNode(n, clusterName, False)
+        if updateDict:
+            del(self._clusterDict[clusterName])
+    
+    def stopCheckCluster(self, clusterName):
+        """Stop the named cluster and check that the name is removed from the cluster dictionary"""
+        self.stopCluster(clusterName)
+        if self.clusterExists(clusterName):
+            raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
+    def stopCheckAll(self):
+        """Kill all known clusters and check that the cluster dictionary is empty"""
+        self.stopAllClusters()
+        self.checkNumBrokers(0)
+    
+    def setUp(self):
+        pass
+    
+    def tearDown(self):
+        pass



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org