You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/05/06 18:57:39 UTC
svn commit: r772359 - /qpid/trunk/qpid/python/qpid/testlib.py
Author: kpvdr
Date: Wed May 6 16:57:38 2009
New Revision: 772359
URL: http://svn.apache.org/viewvc?rev=772359&view=rev
Log:
Added the ability to start and stop a test broker from within the python test framework. Also added some cluster test functionality
Modified:
qpid/trunk/qpid/python/qpid/testlib.py
Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=772359&r1=772358&r2=772359&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Wed May 6 16:57:38 2009
@@ -53,6 +53,7 @@
class TestRunner:
SPEC_FOLDER = "../specs"
+ qpidd = os.getenv("QPIDD")
"""Runs unit tests.
@@ -73,6 +74,9 @@
0-10-errata - use the 0-10 specification with qpid errata.
-e/--errata <errata.xml> : file containing amqp XML errata
-b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to
+ -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD
+ env to point to broker executable. broker-args will be
+ prepended with "--daemon --port=0"
-v/--verbose : verbose - lists tests as they are run.
-d/--debug : enable debug logging.
-i/--ignore <test> : ignore the named test.
@@ -82,6 +86,34 @@
"""
sys.exit(1)
+ def startBroker(self, brokerArgs):
+ """Start a single broker daemon"""
+ if TestRunner.qpidd == None:
+ self._die("QPIDD environment var must point to qpidd when using -B/--start-broker")
+ cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs)
+ portStr = os.popen(cmd).read()
+ if len(portStr) == 0:
+ self._die("%s failed to start" % TestRunner.qpidd)
+ port = int(portStr)
+ pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read())
+ print "Started broker: pid=%d, port=%d" % (pid, port)
+ self.brokerTuple = (pid, port)
+ self.setBroker("localhost:%d" % port)
+
+ def stopBroker(self):
+ """Stop the broker using qpidd -q"""
+ if self.brokerTuple:
+ ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q")
+ if ret != 0:
+ self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret))
+ print "Stopped broker: pid=%d, port=%d" % self.brokerTuple
+
+ def killBroker(self):
+ """Kill the broker using kill -9 (SIGTERM)"""
+ if self.brokerTuple:
+ os.kill(self.brokerTuple[0], signal.SIGTERM)
+ print "Killed broker: pid=%d, port=%d" % self.brokerTuple
+
def setBroker(self, broker):
try:
self.url = URL(broker)
@@ -122,17 +154,22 @@
self.skip_self_test = False
try:
- opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:",
+ opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:",
["help", "spec", "errata=", "broker=",
- "verbose", "skip-self-test", "ignore",
+ "start-broker=", "verbose", "skip-self-test", "ignore",
"ignore-file", "spec-folder"])
except GetoptError, e:
self._die(str(e))
+ # check for mutually exclusive options
+ if "-B" in opts or "--start-broker" in opts:
+ if "-b" in opts or "--broker" in opts:
+ self._die("Cannot use -B/--start-broker and -b/broker options together")
for opt, value in opts:
if opt in ("-?", "-h", "--help"): self._die()
if opt in ("-s", "--spec"): self.specfile = value
if opt in ("-e", "--errata"): self.errata.append(value)
if opt in ("-b", "--broker"): self.setBroker(value)
+ if opt in ("-B", "--start-broker"): self.startBroker(value)
if opt in ("-v", "--verbose"): self.verbose = 2
if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
if opt in ("-i", "--ignore"): self.ignore.append(value)
@@ -182,6 +219,7 @@
return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
def run(self, args=sys.argv[1:]):
+ self.brokerTuple = None
self._parseargs(args)
runner = unittest.TextTestRunner(descriptions=False,
verbosity=self.verbose)
@@ -193,6 +231,7 @@
for t in self.ignore: print t
print "======================================="
+ self.stopBroker()
return result.wasSuccessful()
def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
@@ -390,3 +429,196 @@
session.message_subscribe(**keys)
session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
+
+
+class TestBaseCluster(unittest.TestCase):
+ """
+ Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes.
+ """
+ _tempDir = os.getenv("TMPDIR")
+ _qpidd = os.getenv("QPIDD")
+ _storeLib = os.getenv("LIBSTORE")
+ _clusterLib = os.getenv("LIBCLUSTER")
+
+ # --- Cluster helper functions ---
+
+ """
+ _clusterDict is a dictionary of clusters:
+ key = cluster name (string)
+ val = dictionary of node numbers:
+ key = node number (int)
+ val = tuple containing (pid, port)
+ For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows:
+ {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}}
+ where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively.
+ """
+ _clusterDict = {}
+
+ """Index for (pid, port) tuple"""
+ PID = 0
+ PORT = 1
+
+ def startBroker(self, qpiddArgs, logFile = None):
+ """Start a single broker daemon, returns tuple (pid, port)"""
+ if self._qpidd == None:
+ raise Exception("Environment variable QPIDD is not set")
+ cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs)
+ portStr = os.popen(cmd).read()
+ if len(portStr) == 0:
+ err = "Broker daemon startup failed."
+ if logFile != None:
+ err += " See log file %s" % logFile
+ raise Exception(err)
+ port = int(portStr)
+ pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read())
+ #print "started broker: pid=%d, port=%d" % (pid, port)
+ return (pid, port)
+
+ def createClusterNode(self, nodeNumber, clusterName):
+ """Create a node and add it to the named cluster"""
+ if self._tempDir == None:
+ raise Exception("Environment variable TMPDIR is not set")
+ if self._storeLib == None:
+ raise Exception("Environment variable LIBSTORE is not set")
+ if self._clusterLib == None:
+ raise Exception("Environment variable LIBCLUSTER is not set")
+ name = "%s-%d" % (clusterName, nodeNumber)
+ dataDir = os.path.join(self._tempDir, "cluster", name)
+ logFile = "%s.log" % dataDir
+ args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \
+ (self._storeLib, self._clusterLib, dataDir, clusterName, logFile)
+ self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
+
+ def createCluster(self, clusterName, numberNodes):
+ """Create a cluster containing an initial number of nodes"""
+ self._clusterDict[clusterName] = {}
+ for n in range(0, numberNodes):
+ self.createClusterNode(n, clusterName)
+
+ def getTupleList(self):
+ """Get list of (pid, port) tuples of all known cluster brokers"""
+ tList = []
+ for l in self._clusterDict.itervalues():
+ for t in l.itervalues():
+ tList.append(t)
+ return tList
+
+ def getNumBrokers(self):
+ """Get total number of brokers in all known clusters"""
+ return len(self.getTupleList())
+
+ def checkNumBrokers(self, expected):
+ """Check that the total number of brokers in all known clusters is the expected value"""
+ if self.getNumBrokers() != expected:
+ raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
+
+ def getClusterTupleList(self, clusterName):
+ """Get list of (pid, port) tuples of all nodes in named cluster"""
+ return self._clusterDict[clusterName].values()
+
+ def getNumClusterBrokers(self, clusterName):
+ """Get total number of brokers in named cluster"""
+ return len(self.getClusterTupleList(clusterName))
+
+ def getNodeTuple(self, nodeNumber, clusterName):
+ """Get the (pid, port) tuple for the given cluster node"""
+ return self._clusterDict[clusterName][nodeNumber]
+
+ def checkNumClusterBrokers(self, clusterName, expected):
+ """Check that the total number of brokers in the named cluster is the expected value"""
+ if self.getNumClusterBrokers(clusterName) != expected:
+ raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
+ (clusterName, expected, self.getNumClusterBrokers(clusterName)))
+
+ def clusterExists(self, clusterName):
+ """ Return True if clusterName exists, False otherwise"""
+ return clusterName in self._clusterDict.keys()
+
+ def clusterNodeExists(self, clusterName, nodeNumber):
+ """ Return True if nodeNumber in clusterName exists, False otherwise"""
+ if clusterName in self._clusterDict.keys():
+ return nodeNumber in self._clusterDict[nodeName]
+ return False
+
+ def createCheckCluster(self, clusterName, size):
+ """Create a cluster using the given name and size, then check the number of brokers"""
+ self.createCluster(clusterName, size)
+ self.checkNumClusterBrokers(clusterName, size)
+
+ # Kill cluster nodes using signal 9
+
+ def killNode(self, nodeNumber, clusterName, updateDict = True):
+ """Kill the given node in the named cluster using kill -9"""
+ pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID]
+ os.kill(pid, signal.SIGTERM)
+ #print "killed broker: pid=%d" % pid
+ if updateDict:
+ del(self._clusterDict[clusterName][nodeNumber])
+
+ def killCluster(self, clusterName, updateDict = True):
+ """Kill all nodes in the named cluster"""
+ for n in self._clusterDict[clusterName].iterkeys():
+ self.killNode(n, clusterName, False)
+ if updateDict:
+ del(self._clusterDict[clusterName])
+
+ def killClusterCheck(self, clusterName):
+ """Kill the named cluster and check that the name is removed from the cluster dictionary"""
+ self.killCluster(clusterName)
+ if self.clusterExists(clusterName):
+ raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
+ (clusterName, self.getNumClusterBrokers(clusterName)))
+
+ def killAllClusters(self):
+ """Kill all known clusters"""
+ for n in self._clusterDict.iterkeys():
+ self.killCluster(n, False)
+ self._clusterDict.clear()
+
+ def killAllClustersCheck(self):
+ """Kill all known clusters and check that the cluster dictionary is empty"""
+ self.killAllClusters()
+ self.checkNumBrokers(0)
+
+ # Stop cluster nodes using qpidd -q
+
+ def stopNode(self, nodeNumber, clusterName, updateDict = True):
+ """Stop the given node in the named cluster using qpidd -q"""
+ port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+ ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q")
+ if ret != 0:
+ raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \
+ (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret))
+ #print "stopped broker: port=%d" % port
+ if updateDict:
+ del(self._clusterDict[clusterName][nodeNumber])
+
+ def stopAllClusters(self):
+ """Stop all known clusters"""
+ for n in self._clusterDict.iterkeys():
+ self.stopCluster(n, False)
+ self._clusterDict.clear()
+
+
+ def stopCluster(self, clusterName, updateDict = True):
+ """Stop all nodes in the named cluster"""
+ for n in self._clusterDict[clusterName].iterkeys():
+ self.stopNode(n, clusterName, False)
+ if updateDict:
+ del(self._clusterDict[clusterName])
+
+ def stopCheckCluster(self, clusterName):
+ """Stop the named cluster and check that the name is removed from the cluster dictionary"""
+ self.stopCluster(clusterName)
+ if self.clusterExists(clusterName):
+ raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
+ def stopCheckAll(self):
+ """Kill all known clusters and check that the cluster dictionary is empty"""
+ self.stopAllClusters()
+ self.checkNumBrokers(0)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org