You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/24 19:56:33 UTC
svn commit: r927154 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py
cpp/src/tests/run_long_cluster_tests python/qpid/brokertest.py
Author: aconway
Date: Wed Mar 24 18:56:33 2010
New Revision: 927154
URL: http://svn.apache.org/viewvc?rev=927154&view=rev
Log:
Fix resource leaks in cluster_tests, add 5 minute test run to make check-long.
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=927154&r1=927153&r2=927154&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Mar 24 18:56:33 2010
@@ -160,7 +160,7 @@ class LongTests(BrokerTest):
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self):
- """Run management clients and other clients concurrently."""
+ """Stress test: Run management clients and other clients concurrently."""
# TODO aconway 2010-03-03: move to brokertest framework
class ClientLoop(StoppableThread):
@@ -171,6 +171,7 @@ class LongTests(BrokerTest):
self.cmd = cmd # Client command.
self.lock = Lock()
self.process = None # Client process.
+ self._expect_fail = False
self.start()
def run(self):
@@ -195,7 +196,7 @@ class LongTests(BrokerTest):
try:
# Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
- if exit != 0:
+ if not self._expect_fail and exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
finally: self.lock.release()
@@ -205,12 +206,13 @@ class LongTests(BrokerTest):
def expect_fail(self):
"""Ignore exit status of the currently running client."""
self.lock.acquire()
- stopped = True
+ self._expect_fail = True
self.lock.release()
def stop(self):
"""Stop the running client and wait for it to exit"""
self.lock.acquire()
+ if self.stopped: return
try:
self.stopped = True
if self.process:
@@ -232,7 +234,7 @@ class LongTests(BrokerTest):
"""Start ordinary clients for a broker"""
batch = []
for cmd in [
- ["perftest", "--count", 1000,
+ ["perftest", "--count", 50000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
["testagent", "localhost", str(broker.port())] ]:
@@ -257,11 +259,12 @@ class LongTests(BrokerTest):
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker. Ignore errors on its clients and all the mclients
for c in clients[alive] + mclients: c.expect_fail()
- clients[alive] = []
- mclients = []
b = cluster[alive]
b.expect = EXPECT_EXIT_FAIL
b.kill()
+ for c in clients[alive] + mclients: c.stop()
+ clients[alive] = []
+ mclients = []
# Start another broker and clients
alive += 1
b = cluster.start()
Modified: qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests?rev=927154&r1=927153&r2=927154&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests Wed Mar 24 18:56:33 2010
@@ -20,4 +20,4 @@
#
srcdir=`dirname $0`
-$srcdir/run_cluster_tests long_cluster_tests
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=5
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=927154&r1=927153&r2=927154&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Mar 24 18:56:33 2010
@@ -81,7 +81,7 @@ def error_line(filename):
"""Get the last line of filename for error messages"""
result = ""
try:
- f = file(filename)
+ f = open(filename)
try:
for l in f: result = ": " + l
finally: f.close()
@@ -118,7 +118,7 @@ class Popen(popen2.Popen3):
try:
for line in self.infile:
if self.outfile is None:
- self.outfile = file(self.outname, "w")
+ self.outfile = open(self.outname, "w")
self.outfile.write(line)
finally:
self.infile.close()
@@ -146,57 +146,61 @@ class Popen(popen2.Popen3):
assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
+ self.returncode = None
popen2.Popen3.__init__(self, self.cmd, True)
self.expect = expect
- self.was_shutdown = False # Set if we deliberately kill/terminate the process
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.tochild, msg)
self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg)
- f = file(self.outfile("cmd"), "w")
+ f = open(self.outfile("cmd"), "w")
try: f.write(self.cmd_str())
finally: f.close()
log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
if drain: self.drain()
+ self._clean = False
def drain(self):
"""Start threads to drain stdout/err"""
self.stdout.drain()
self.stderr.drain()
- def drain_join(self):
- """Join the drain threads"""
- self.stdout.thread.join()
+ def _cleanup(self):
+ """Close pipes to sub-process"""
+ if self._clean: return
+ self._clean = True
+ self.stdin.close()
+ self.drain() # Drain output pipes.
+ self.stdout.thread.join() # Drain thread closes pipe.
self.stderr.thread.join()
def unexpected(self,msg):
- self.drain()
- self.drain_join()
+ self._cleanup()
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
def stop(self): # Clean up at end of test.
- self.drain()
- self.stdin.close()
- if self.expect == EXPECT_UNKNOWN:
- try: self.kill() # Just make sure its dead
- except: pass
- elif self.expect == EXPECT_RUNNING:
- try:
- self.kill()
- except:
- self.unexpected("expected running, exit code %d" % self.wait())
- else:
- retry(lambda: self.poll() is not None)
- if self.returncode is None: # Still haven't stopped
- self.kill()
- self.unexpected("still running")
- elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
- self.unexpected("exit code %d" % self.returncode)
- elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
- self.unexpected("expected error")
- self.stdin.close()
+ try:
+ if self.expect == EXPECT_UNKNOWN:
+ try: self.kill() # Just make sure its dead
+ except: pass
+ elif self.expect == EXPECT_RUNNING:
+ try:
+ self.kill()
+ except:
+ self.unexpected("expected running, exit code %d" % self.wait())
+ else:
+ retry(lambda: self.poll() is not None)
+ if self.returncode is None: # Still haven't stopped
+ self.kill()
+ self.unexpected("still running")
+ elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
+ self.unexpected("exit code %d" % self.returncode)
+ elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
+ self.unexpected("expected error")
+ finally:
+ self._cleanup()
def communicate(self, input=None):
if input:
@@ -213,20 +217,24 @@ class Popen(popen2.Popen3):
if not self.is_running(): unexpected("Exit code %d" % self.returncode)
def poll(self):
+ if self.returncode is not None: return self.returncode
self.returncode = popen2.Popen3.poll(self)
if (self.returncode == -1): self.returncode = None
+ if self.returncode is not None: self._cleanup()
return self.returncode
def wait(self):
+ if self.returncode is not None: return self.returncode
self.drain()
- self.returncode = popen2.Popen3.wait(self)
- self.drain_join()
+ try: self.returncode = popen2.Popen3.wait(self)
+ except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
+ self._cleanup()
return self.returncode
def send_signal(self, sig):
- self.was_shutdown = True
- os.kill(self.pid,sig)
- self.wait()
+ try: os.kill(self.pid,sig)
+ except OSError,e: raise OSError("Kill failed %s: %s"%(self.pname, e))
+ self._cleanup()
def terminate(self): self.send_signal(signal.SIGTERM)
def kill(self): self.send_signal(signal.SIGKILL)
@@ -347,7 +355,7 @@ class Broker(Popen):
"""Return true if the log file exists and contains a broker ready message"""
if self._log_ready: return True
if not os.path.exists(self.log): return False
- f = file(self.log)
+ f = open(self.log)
try:
for l in f:
if "notice Broker running" in l:
@@ -604,7 +612,7 @@ def import_script(path):
Import executable script at path as a module.
Requires some trickery as scripts are not in standard module format
"""
- f = file(path)
+ f = open(path)
try:
name=os.path.split(path)[1].replace("-","_")
return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org