You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/24 19:56:33 UTC

svn commit: r927154 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py cpp/src/tests/run_long_cluster_tests python/qpid/brokertest.py

Author: aconway
Date: Wed Mar 24 18:56:33 2010
New Revision: 927154

URL: http://svn.apache.org/viewvc?rev=927154&view=rev
Log:
Fix resource leaks in cluster_tests, add 5 minute test run to make check-long.

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=927154&r1=927153&r2=927154&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Mar 24 18:56:33 2010
@@ -160,7 +160,7 @@ class LongTests(BrokerTest):
         for i in range(i, len(cluster)): cluster[i].kill()
 
     def test_management(self):
-        """Run management clients and other clients concurrently."""
+        """Stress test: Run management clients and other clients concurrently."""
 
         # TODO aconway 2010-03-03: move to brokertest framework
         class ClientLoop(StoppableThread):
@@ -171,6 +171,7 @@ class LongTests(BrokerTest):
                 self.cmd = cmd          # Client command.
                 self.lock = Lock()
                 self.process = None     # Client process.
+                self._expect_fail = False
                 self.start()
 
             def run(self):
@@ -195,7 +196,7 @@ class LongTests(BrokerTest):
                         try:
                             # Quit and ignore errors if stopped or expecting failure.
                             if self.stopped: break
-                            if exit != 0:
+                            if not self._expect_fail and exit != 0:
                                 self.process.unexpected(
                                     "client of %s exit code %s"%(self.broker.name, exit))
                         finally: self.lock.release()
@@ -205,12 +206,13 @@ class LongTests(BrokerTest):
             def expect_fail(self):
                 """Ignore exit status of the currently running client."""
                 self.lock.acquire()
-                stopped = True
+                self._expect_fail = True
                 self.lock.release()
                 
             def stop(self):
                 """Stop the running client and wait for it to exit"""
                 self.lock.acquire()
+                if self.stopped: return
                 try:
                     self.stopped = True
                     if self.process:
@@ -232,7 +234,7 @@ class LongTests(BrokerTest):
             """Start ordinary clients for a broker"""
             batch = []
             for cmd in [
-                ["perftest", "--count", 1000,
+                ["perftest", "--count", 50000,
                  "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
                 ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
                 ["testagent", "localhost", str(broker.port())] ]:
@@ -257,11 +259,12 @@ class LongTests(BrokerTest):
             for b in cluster[alive:]: b.ready() # Check if a broker crashed.
             # Kill the first broker. Ignore errors on its clients and all the mclients
             for c in clients[alive] + mclients: c.expect_fail()
-            clients[alive] = []
-            mclients = []
             b = cluster[alive]
             b.expect = EXPECT_EXIT_FAIL
             b.kill()
+            for c in clients[alive] + mclients: c.stop()
+            clients[alive] = []
+            mclients = []
             # Start another broker and clients
             alive += 1
             b = cluster.start()

Modified: qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests?rev=927154&r1=927153&r2=927154&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests Wed Mar 24 18:56:33 2010
@@ -20,4 +20,4 @@
 #
 
 srcdir=`dirname $0`
-$srcdir/run_cluster_tests long_cluster_tests
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=5

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=927154&r1=927153&r2=927154&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Mar 24 18:56:33 2010
@@ -81,7 +81,7 @@ def error_line(filename):
     """Get the last line of filename for error messages"""
     result = ""
     try:
-        f = file(filename)
+        f = open(filename)
         try:
             for l in f: result = ": " + l
         finally: f.close()
@@ -118,7 +118,7 @@ class Popen(popen2.Popen3):
             try:
                 for line in self.infile:
                     if self.outfile is None:
-                        self.outfile = file(self.outname, "w")
+                        self.outfile = open(self.outname, "w")
                     self.outfile.write(line)
             finally:
                 self.infile.close()
@@ -146,57 +146,61 @@ class Popen(popen2.Popen3):
         assert find_exe(cmd[0]), "executable not found: "+cmd[0]
         if type(cmd) is type(""): cmd = [cmd] # Make it a list.
         self.cmd  = [ str(x) for x in cmd ]
+        self.returncode = None
         popen2.Popen3.__init__(self, self.cmd, True)
         self.expect = expect
-        self.was_shutdown = False # Set if we deliberately kill/terminate the process
         self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
         msg = "Process %s" % self.pname
         self.stdin = ExceptionWrapper(self.tochild, msg)
         self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
         self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg)
-        f = file(self.outfile("cmd"), "w")
+        f = open(self.outfile("cmd"), "w")
         try: f.write(self.cmd_str())
         finally: f.close()
         log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
         if drain: self.drain()
+        self._clean = False
 
     def drain(self):
         """Start threads to drain stdout/err"""
         self.stdout.drain()
         self.stderr.drain()
 
-    def drain_join(self):
-        """Join the drain threads"""
-        self.stdout.thread.join()
+    def _cleanup(self):
+        """Close pipes to sub-process"""
+        if self._clean: return
+        self._clean = True
+        self.stdin.close()
+        self.drain()                    # Drain output pipes.
+        self.stdout.thread.join()       # Drain thread closes pipe.
         self.stderr.thread.join()
 
     def unexpected(self,msg):
-        self.drain()
-        self.drain_join()
+        self._cleanup()
         err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
         raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
     
     def stop(self):                  # Clean up at end of test.
-        self.drain()
-        self.stdin.close()
-        if self.expect == EXPECT_UNKNOWN:
-            try: self.kill()            # Just make sure its dead
-            except: pass
-        elif self.expect == EXPECT_RUNNING:
-            try:
-                self.kill()
-            except:
-                self.unexpected("expected running, exit code %d" % self.wait())
-        else:
-            retry(lambda: self.poll() is not None)
-            if self.returncode is None: # Still haven't stopped
-                self.kill()
-                self.unexpected("still running")
-            elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
-                self.unexpected("exit code %d" % self.returncode)
-            elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
-                self.unexpected("expected error")
-        self.stdin.close()
+        try:
+            if self.expect == EXPECT_UNKNOWN:
+                try: self.kill()            # Just make sure its dead
+                except: pass
+            elif self.expect == EXPECT_RUNNING:
+                try:
+                    self.kill()
+                except:
+                    self.unexpected("expected running, exit code %d" % self.wait())
+            else:
+                retry(lambda: self.poll() is not None)
+                if self.returncode is None: # Still haven't stopped
+                    self.kill()
+                    self.unexpected("still running")
+                elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
+                    self.unexpected("exit code %d" % self.returncode)
+                elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
+                    self.unexpected("expected error")
+        finally:
+            self._cleanup()
                
     def communicate(self, input=None):
         if input:
@@ -213,20 +217,24 @@ class Popen(popen2.Popen3):
         if not self.is_running(): unexpected("Exit code %d" % self.returncode)
 
     def poll(self):
+        if self.returncode is not None: return self.returncode
         self.returncode = popen2.Popen3.poll(self)
         if (self.returncode == -1): self.returncode = None
+        if self.returncode is not None: self._cleanup()
         return self.returncode
 
     def wait(self):
+        if self.returncode is not None: return self.returncode
         self.drain()
-        self.returncode = popen2.Popen3.wait(self)
-        self.drain_join()
+        try: self.returncode = popen2.Popen3.wait(self)
+        except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
+        self._cleanup()
         return self.returncode
 
     def send_signal(self, sig):
-        self.was_shutdown = True
-        os.kill(self.pid,sig)
-        self.wait()
+        try: os.kill(self.pid,sig)
+        except OSError,e: raise OSError("Kill failed %s: %s"%(self.pname, e))
+        self._cleanup()
 
     def terminate(self): self.send_signal(signal.SIGTERM)
     def kill(self): self.send_signal(signal.SIGKILL)
@@ -347,7 +355,7 @@ class Broker(Popen):
         """Return true if the log file exists and contains a broker ready message"""
         if self._log_ready: return True
         if not os.path.exists(self.log): return False
-        f = file(self.log)
+        f = open(self.log)
         try:
             for l in f:
                 if "notice Broker running" in l:
@@ -604,7 +612,7 @@ def import_script(path):
     Import executable script at path as a module.
     Requires some trickery as scripts are not in standard module format
     """
-    f = file(path)
+    f = open(path)
     try:
         name=os.path.split(path)[1].replace("-","_")
         return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org