You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/27 23:21:13 UTC
svn commit: r903868 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py
cpp/src/tests/test_env.sh.in python/qpid/brokertest.py
Author: aconway
Date: Wed Jan 27 22:21:13 2010
New Revision: 903868
URL: http://svn.apache.org/viewvc?rev=903868&view=rev
Log:
Test for management + cluster: run management tools in parallel with regular clients.
cluster_tests.py: added LongTests.test_management
brokertest.py: optionally drain test process output to *.out/*.err files. On by default.
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=903868&r1=903867&r2=903868&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jan 27 22:21:13 2010
@@ -31,6 +31,12 @@
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
+def readfile(filename):
+ """Returns te content of file named filename as a string"""
+ f = file(filename)
+ try: return f.read()
+ finally: f.close()
+
class ShortTests(BrokerTest):
"""Short cluster functionality tests."""
@@ -81,7 +87,7 @@
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
- assert file("direct.dump").read() == file("updatee.dump").read()
+ assert readfile("direct.dump") == readfile("updatee.dump")
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -94,14 +100,14 @@
cluster.start()
update_re = re.compile(r"member update: (.*) frameSeq=[0-9]+ configSeq=([0-9]+)")
- matches = [ update_re.search(file(b.log).read()) for b in cluster ]
+ matches = [ update_re.search(readfile(b.log)) for b in cluster ]
sequences = [ m.group(2) for m in matches]
self.assertEqual(sequences, ["0", "1", "3"])
# Check that configurations with same seq. number match
configs={}
for b in cluster:
- matches = update_re.findall(file(b.log).read())
+ matches = update_re.findall(readfile(b.log))
for m in matches:
seq=m[1]
config=re.sub("\((member|unknown)\)", "", m[0])
@@ -142,6 +148,62 @@
receiver.stop(sender.sent)
for i in range(i, len(cluster)): cluster[i].kill()
+ def test_management(self):
+ """Run management in conjunction with other traffic."""
+ # Publish often to provoke errors
+ args=["--mgmt-pub-interval", 1]
+ # Use store if present
+ if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
+
+ class ClientLoop(StoppableThread):
+ """Run an infinite client loop."""
+ def __init__(self, broker, cmd):
+ StoppableThread.__init__(self)
+ self.broker=broker
+ self.cmd = cmd
+ self.lock = Lock()
+ self.process = None
+ self.stopped = False
+ self.start()
+
+ def run(self):
+ try:
+ while True:
+ self.lock.acquire()
+ try:
+ if self.stopped: break
+ self.process = self.broker.test.popen(self.cmd,
+ expect=EXPECT_UNKNOWN)
+ finally: self.lock.release()
+ try: exit = self.process.wait()
+ except: exit = 1
+ self.lock.acquire()
+ try:
+ if exit != 0 and not self.stopped:
+ self.process.unexpected("bad exit status in client loop")
+ finally: self.lock.release()
+ except Exception, e:
+ error=e
+
+ def stop(self):
+ self.lock.acquire()
+ try:
+ self.stopped = True
+ try: self.process.terminate()
+ except: pass
+ finally: self.lock.release()
+ StoppableThread.stop(self)
+ cluster = self.cluster(3, args)
+ clients = []
+ for b in cluster:
+ clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()]))
+ clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())]))
+ endtime = time.time() + self.duration()
+ while time.time() < endtime:
+ for b in cluster: b.ready() # Will raise if broker crashed.
+ time.sleep(1)
+ for c in clients:
+ c.stop()
class StoreTests(BrokerTest):
"""
@@ -255,8 +317,8 @@
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(file(a.log).read())
- assert msg.search(file(b.log).read())
+ assert msg.search(readfile(a.log))
+ assert msg.search(readfile(b.log))
# FIXME aconway 2009-12-03: verify manual restore procedure
Modified: qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_env.sh.in?rev=903868&r1=903867&r2=903868&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_env.sh.in Wed Jan 27 22:21:13 2010
@@ -49,6 +49,9 @@
export RECEIVER_EXEC=$QPID_TEST_EXEC_DIR/receiver
export SENDER_EXEC=$QPID_TEST_EXEC_DIR/sender
+# Path
+export PATH=$top_builddir/src:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PATH
+
# Modules
export TEST_STORE_LIB=$testmoduledir/test_store.so
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=903868&r1=903867&r2=903868&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Jan 27 22:21:13 2010
@@ -36,6 +36,7 @@
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test.
EXPECT_RUNNING=3 # Expect to still be running at end of test
+EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
def is_running(pid):
try:
@@ -62,14 +63,16 @@
return func(*args, **kwargs)
except Exception, e:
raise Exception("%s: %s" %(self.msg, str(e)))
-
+
def error_line(f):
try:
- lines = file(f).readlines()
- if len(lines) > 0: return ": %s" % (lines[-1])
- except: pass
- return ""
-
+ ff = file(f)
+ try:
+ lines = ff.readlines()
+ if len(lines) > 0: return ": %s" % (lines[-1])
+ else: return ""
+ finally: ff.close()
+ except: return ""
class Popen(popen2.Popen3):
"""
@@ -78,7 +81,41 @@
Dumps command line, stdout, stderr to data dir for debugging.
"""
- def __init__(self, cmd, expect=EXPECT_EXIT_OK):
+ class DrainThread(Thread):
+ """Thread to drain a file object and write the data to a file."""
+ def __init__(self, infile, outname):
+ Thread.__init__(self)
+ self.infile, self.outname = infile, outname
+ self.outfile = None
+
+ def run(self):
+ try:
+ for line in self.infile:
+ if self.outfile is None:
+ self.outfile = file(self.outname, "w")
+ self.outfile.write(line)
+ finally:
+ if self.outfile is not None: self.outfile.close
+
+ class OutStream(ExceptionWrapper):
+ """Wrapper for output streams, handles excpetions & draining output"""
+ def __init__(self, infile, outfile, msg):
+ ExceptionWrapper.__init__(self, infile, msg)
+ self.infile, self.outfile = infile, outfile
+ self.thread = None
+
+ def drain(self):
+ if self.thread is None:
+ self.thread = Popen.DrainThread(self.infile, self.outfile)
+ self.thread.start()
+
+ def outfile(self, ext): return "%s.%s" % (self.pname, ext)
+
+ def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+ """Run cmd (should be a list of arguments)
+ expect - if set verify expectation at end of test.
+ drain - if true (default) drain stdout/stderr to files.
+ """
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
popen2.Popen3.__init__(self, self.cmd, True)
@@ -86,25 +123,34 @@
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.tochild, msg)
- self.stdout = ExceptionWrapper(self.fromchild, msg)
- self.stderr = ExceptionWrapper(self.childerr, msg)
- self.dump(self.cmd_str(), "cmd")
+ self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
+ self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg)
+ f = file(self.outfile("cmd"), "w")
+ try: f.write(self.cmd_str())
+ finally: f.close()
log.debug("Started process %s" % self.pname)
+ if drain: self.drain()
- def dump(self, str, ext):
- name = "%s.%s" % (self.pname, ext)
- f = file(name, "w")
- f.write(str)
- f.close()
- return name
+ def drain(self):
+ self.stdout.drain()
+ self.stderr.drain()
+
+ def drain_join(self):
+ self.stdout.thread.join()
+ self.stderr.thread.join()
def unexpected(self,msg):
- self.dump(self.stdout.read(), "out")
- err = self.dump(self.stderr.read(), "err")
- raise BadProcessStatus("%s %s%s" % (self.pname, msg, error_line(err)))
+ self.drain()
+ self.drain_join()
+ raise BadProcessStatus("%s %s%s" % (self.pname, msg,
+ error_line(self.outfile("err"))))
def stop(self): # Clean up at end of test.
- if self.expect == EXPECT_RUNNING:
+ self.drain()
+ if self.expect == EXPECT_UNKNOWN:
+ try: self.kill() # Just make sure its dead
+ except: pass
+ elif self.expect == EXPECT_RUNNING:
try:
self.kill()
except:
@@ -131,7 +177,8 @@
self.wait()
return outerr
- def is_running(self): return self.poll() is None
+ def is_running(self):
+ return self.poll() is None
def assert_running(self):
if not self.is_running(): unexpected("Exit code %d" % self.returncode)
@@ -142,7 +189,9 @@
return self.returncode
def wait(self):
+ self.drain()
self.returncode = popen2.Popen3.wait(self)
+ self.drain_join()
return self.returncode
def send_signal(self, sig):
@@ -186,7 +235,7 @@
cmd += ["--log-to-stderr=no"]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
- Popen.__init__(self, cmd, expect)
+ Popen.__init__(self, cmd, expect, drain=False)
test.cleanup_stop(self)
self.host = "localhost"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
@@ -336,10 +385,10 @@
"""Call thing.stop at end of test"""
self.stopem.append(stopable)
- def popen(self, cmd, expect=EXPECT_EXIT_OK):
+ def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
- p = Popen(cmd, expect)
+ p = Popen(cmd, expect, drain)
self.cleanup_stop(p)
return p
@@ -359,8 +408,8 @@
for b in _brokers: b.connect().close()
class RethrownException(Exception):
- """Captures the original stack trace to be thrown later"""
- def __init__(self, e, msg=""):
+ """Captures the stack trace of the current exception to be thrown later"""
+ def __init__(self, msg=""):
Exception.__init__(self, msg+"\n"+format_exc())
class StoppableThread(Thread):
@@ -409,7 +458,7 @@
self.sender.stdin.write(str(self.sent)+"\n")
self.sender.stdin.flush()
self.sent += 1
- except Exception, e: self.error = RethrownException(e, self.sender.pname)
+ except Exception: self.error = RethrownException(self.sender.pname)
def notify_received(self, count):
"""Called by receiver to enable flow control. count = messages received so far."""
@@ -438,7 +487,8 @@
Thread.__init__(self)
self.test = broker.test
self.receiver = self.test.popen(
- [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
+ [self.test.receiver_exec, "--port", broker.port()],
+ expect=EXPECT_RUNNING, drain=False)
self.stopat = None
self.lock = Lock()
self.error = None
@@ -460,8 +510,8 @@
self.received += 1
if self.sender:
self.sender.notify_received(self.received)
- except Exception, e:
- self.error = RethrownException(e, self.receiver.pname)
+ except Exception:
+ self.error = RethrownException(self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
@@ -500,5 +550,8 @@
Import executable script at path as a module.
Requires some trickery as scripts are not in standard module format
"""
- name=os.path.split(path)[1].replace("-","_")
- return imp.load_module(name, file(path), path, ("", "r", imp.PY_SOURCE))
+ f = file(path)
+ try:
+ name=os.path.split(path)[1].replace("-","_")
+ return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
+ finally: f.close()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org