You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/27 23:21:13 UTC

svn commit: r903868 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py cpp/src/tests/test_env.sh.in python/qpid/brokertest.py

Author: aconway
Date: Wed Jan 27 22:21:13 2010
New Revision: 903868

URL: http://svn.apache.org/viewvc?rev=903868&view=rev
Log:
Test for management + cluster: run management tools in parallel with regular clients.

cluster_tests.py: added LongTests.test_management
brokertest.py: optionally drain test process output to *.out/*.err files. On by default.

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=903868&r1=903867&r2=903868&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jan 27 22:21:13 2010
@@ -31,6 +31,12 @@
 # Import scripts as modules
 qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
 
+def readfile(filename):
+    """Returns te content of file named filename as a string"""
+    f = file(filename)
+    try: return f.read()
+    finally: f.close()
+
 class ShortTests(BrokerTest):
     """Short cluster functionality tests."""
 
@@ -81,7 +87,7 @@
 
         # Now update a new member and compare their dumps.
         cluster.start(args=["--test-store-dump", "updatee.dump"])
-        assert file("direct.dump").read() == file("updatee.dump").read()
+        assert readfile("direct.dump") == readfile("updatee.dump")
         os.remove("direct.dump")
         os.remove("updatee.dump")
         
@@ -94,14 +100,14 @@
         cluster.start()
 
         update_re = re.compile(r"member update: (.*) frameSeq=[0-9]+ configSeq=([0-9]+)")
-        matches = [ update_re.search(file(b.log).read()) for b in cluster ]
+        matches = [ update_re.search(readfile(b.log)) for b in cluster ]
         sequences = [ m.group(2) for m in matches]
         self.assertEqual(sequences, ["0", "1", "3"])
 
         # Check that configurations with same seq. number match
         configs={}
         for b in cluster:
-            matches = update_re.findall(file(b.log).read())
+            matches = update_re.findall(readfile(b.log))
             for m in matches:
                 seq=m[1]
                 config=re.sub("\((member|unknown)\)", "", m[0])
@@ -142,6 +148,62 @@
         receiver.stop(sender.sent)
         for i in range(i, len(cluster)): cluster[i].kill()
 
+    def test_management(self):
+        """Run management in conjunction with other traffic."""
+        # Publish often to provoke errors
+        args=["--mgmt-pub-interval", 1]
+        # Use store if present
+        if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
+
+        class ClientLoop(StoppableThread):
+            """Run an infinite client loop."""
+            def __init__(self, broker, cmd):
+                StoppableThread.__init__(self)
+                self.broker=broker
+                self.cmd = cmd
+                self.lock = Lock()
+                self.process = None
+                self.stopped = False
+                self.start()
+
+            def run(self):
+                try:
+                    while True:
+                        self.lock.acquire()
+                        try:
+                            if self.stopped: break
+                            self.process = self.broker.test.popen(self.cmd,
+                                                                  expect=EXPECT_UNKNOWN)
+                        finally: self.lock.release()
+                        try: exit = self.process.wait()
+                        except: exit = 1
+                        self.lock.acquire()
+                        try:
+                            if exit != 0 and not self.stopped:
+                                self.process.unexpected("bad exit status in client loop")
+                        finally: self.lock.release()
+                except Exception, e:
+                    error=e
+
+            def stop(self):
+                self.lock.acquire()
+                try:
+                    self.stopped = True
+                    try: self.process.terminate()
+                    except: pass
+                finally: self.lock.release()
+                StoppableThread.stop(self)
+        cluster = self.cluster(3, args)
+        clients = []
+        for b in cluster:
+            clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()]))
+            clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())]))
+        endtime = time.time() + self.duration()
+        while time.time() < endtime:
+            for b in cluster: b.ready() # Will raise if broker crashed.
+            time.sleep(1)
+        for c in clients:
+            c.stop()
 
 class StoreTests(BrokerTest):
     """
@@ -255,8 +317,8 @@
         self.assertRaises(Exception, lambda: a.ready())
         self.assertRaises(Exception, lambda: b.ready())
         msg = re.compile("critical.*no clean store")
-        assert msg.search(file(a.log).read())
-        assert msg.search(file(b.log).read())
+        assert msg.search(readfile(a.log))
+        assert msg.search(readfile(b.log))
 
         # FIXME aconway 2009-12-03: verify manual restore procedure
 

Modified: qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_env.sh.in?rev=903868&r1=903867&r2=903868&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_env.sh.in Wed Jan 27 22:21:13 2010
@@ -49,6 +49,9 @@
 export RECEIVER_EXEC=$QPID_TEST_EXEC_DIR/receiver
 export SENDER_EXEC=$QPID_TEST_EXEC_DIR/sender
 
+# Path
+export PATH=$top_builddir/src:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PATH
+
 # Modules
 export TEST_STORE_LIB=$testmoduledir/test_store.so
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=903868&r1=903867&r2=903868&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Jan 27 22:21:13 2010
@@ -36,6 +36,7 @@
 EXPECT_EXIT_OK=1           # Expect to exit with 0 status before end of test.
 EXPECT_EXIT_FAIL=2         # Expect to exit with non-0 status before end of test.
 EXPECT_RUNNING=3           # Expect to still be running at end of test
+EXPECT_UNKNOWN=4            # No expectation, don't check exit status.
     
 def is_running(pid):
     try:
@@ -62,14 +63,16 @@
             return func(*args, **kwargs)
         except Exception, e:
             raise Exception("%s: %s" %(self.msg, str(e)))
-        
+
 def error_line(f):
     try:
-        lines = file(f).readlines()
-        if len(lines) > 0: return ": %s" % (lines[-1])
-    except: pass
-    return ""
-    
+        ff = file(f)
+        try:
+            lines = ff.readlines()
+            if len(lines) > 0: return ": %s" % (lines[-1])
+            else: return ""
+        finally: ff.close()
+    except: return ""
 
 class Popen(popen2.Popen3):
     """
@@ -78,7 +81,41 @@
     Dumps command line, stdout, stderr to data dir for debugging.
     """
 
-    def __init__(self, cmd, expect=EXPECT_EXIT_OK):
+    class DrainThread(Thread):
+        """Thread to drain a file object and write the data to a file."""
+        def __init__(self, infile, outname):
+            Thread.__init__(self)
+            self.infile, self.outname = infile, outname
+            self.outfile = None
+
+        def run(self):
+            try:
+                for line in self.infile:
+                    if self.outfile is None:
+                        self.outfile = file(self.outname, "w")
+                    self.outfile.write(line)
+            finally:
+                if self.outfile is not None: self.outfile.close
+
+    class OutStream(ExceptionWrapper):
+        """Wrapper for output streams, handles excpetions & draining output"""
+        def __init__(self, infile, outfile, msg):
+            ExceptionWrapper.__init__(self, infile, msg)
+            self.infile, self.outfile = infile, outfile
+            self.thread = None
+
+        def drain(self):
+            if self.thread is None:
+                self.thread = Popen.DrainThread(self.infile, self.outfile)
+                self.thread.start()
+
+    def outfile(self, ext): return "%s.%s" % (self.pname, ext)
+
+    def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+        """Run cmd (should be a list of arguments)
+        expect - if set verify expectation at end of test.
+        drain  - if true (default) drain stdout/stderr to files.
+        """
         if type(cmd) is type(""): cmd = [cmd] # Make it a list.
         self.cmd  = [ str(x) for x in cmd ]
         popen2.Popen3.__init__(self, self.cmd, True)
@@ -86,25 +123,34 @@
         self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
         msg = "Process %s" % self.pname
         self.stdin = ExceptionWrapper(self.tochild, msg)
-        self.stdout = ExceptionWrapper(self.fromchild, msg)
-        self.stderr = ExceptionWrapper(self.childerr, msg)
-        self.dump(self.cmd_str(), "cmd")
+        self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
+        self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg)
+        f = file(self.outfile("cmd"), "w")
+        try: f.write(self.cmd_str())
+        finally: f.close()
         log.debug("Started process %s" % self.pname)
+        if drain: self.drain()
 
-    def dump(self, str, ext):
-        name = "%s.%s" % (self.pname, ext)
-        f = file(name, "w")
-        f.write(str)
-        f.close()
-        return name
+    def drain(self):
+        self.stdout.drain()
+        self.stderr.drain()
+
+    def drain_join(self):
+        self.stdout.thread.join()
+        self.stderr.thread.join()
 
     def unexpected(self,msg):
-        self.dump(self.stdout.read(), "out")
-        err = self.dump(self.stderr.read(), "err")
-        raise BadProcessStatus("%s %s%s" % (self.pname, msg, error_line(err)))
+        self.drain()
+        self.drain_join()
+        raise BadProcessStatus("%s %s%s" % (self.pname, msg,
+                                            error_line(self.outfile("err"))))
     
     def stop(self):                  # Clean up at end of test.
-        if self.expect == EXPECT_RUNNING:
+        self.drain()
+        if self.expect == EXPECT_UNKNOWN:
+            try: self.kill()            # Just make sure its dead
+            except: pass
+        elif self.expect == EXPECT_RUNNING:
             try:
                 self.kill()
             except:
@@ -131,7 +177,8 @@
         self.wait()
         return outerr
 
-    def is_running(self): return self.poll() is None
+    def is_running(self):
+        return self.poll() is None
 
     def assert_running(self):
         if not self.is_running(): unexpected("Exit code %d" % self.returncode)
@@ -142,7 +189,9 @@
         return self.returncode
 
     def wait(self):
+        self.drain()
         self.returncode = popen2.Popen3.wait(self)
+        self.drain_join()
         return self.returncode
 
     def send_signal(self, sig):
@@ -186,7 +235,7 @@
         cmd += ["--log-to-stderr=no"] 
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
-        Popen.__init__(self, cmd, expect)
+        Popen.__init__(self, cmd, expect, drain=False)
         test.cleanup_stop(self)
         self.host = "localhost"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
@@ -336,10 +385,10 @@
         """Call thing.stop at end of test"""
         self.stopem.append(stopable)
 
-    def popen(self, cmd, expect=EXPECT_EXIT_OK):
+    def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
         """Start a process that will be killed at end of test, in the test dir."""
         os.chdir(self.dir)
-        p = Popen(cmd, expect)
+        p = Popen(cmd, expect, drain)
         self.cleanup_stop(p)
         return p
 
@@ -359,8 +408,8 @@
         for b in _brokers: b.connect().close()
 
 class RethrownException(Exception):
-    """Captures the original stack trace to be thrown later""" 
-    def __init__(self, e, msg=""):
+    """Captures the stack trace of the current exception to be thrown later"""
+    def __init__(self, msg=""):
         Exception.__init__(self, msg+"\n"+format_exc())
 
 class StoppableThread(Thread):
@@ -409,7 +458,7 @@
                 self.sender.stdin.write(str(self.sent)+"\n")
                 self.sender.stdin.flush()
                 self.sent += 1
-        except Exception, e: self.error = RethrownException(e, self.sender.pname)
+        except Exception: self.error = RethrownException(self.sender.pname)
 
     def notify_received(self, count):
         """Called by receiver to enable flow control. count = messages received so far."""
@@ -438,7 +487,8 @@
         Thread.__init__(self)
         self.test = broker.test
         self.receiver = self.test.popen(
-            [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
+            [self.test.receiver_exec, "--port", broker.port()],
+            expect=EXPECT_RUNNING, drain=False)
         self.stopat = None
         self.lock = Lock()
         self.error = None
@@ -460,8 +510,8 @@
                     self.received += 1
                     if self.sender:
                         self.sender.notify_received(self.received)
-        except Exception, e:
-            self.error = RethrownException(e, self.receiver.pname)
+        except Exception:
+            self.error = RethrownException(self.receiver.pname)
 
     def stop(self, count):
         """Returns when received >= count"""
@@ -500,5 +550,8 @@
     Import executable script at path as a module.
     Requires some trickery as scripts are not in standard module format
     """
-    name=os.path.split(path)[1].replace("-","_")
-    return imp.load_module(name, file(path), path, ("", "r", imp.PY_SOURCE))
+    f = file(path)
+    try:
+        name=os.path.split(path)[1].replace("-","_")
+        return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
+    finally: f.close()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org