You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/03/11 23:29:14 UTC

svn commit: r1080786 - in /qpid/trunk/qpid/cpp/src/tests: brokertest.py cluster_tests.py

Author: aconway
Date: Fri Mar 11 22:29:14 2011
New Revision: 1080786

URL: http://svn.apache.org/viewvc?rev=1080786&view=rev
Log:
QPID-3129: cluster_tests.LongTests.test_failover hangs

- simplified brokertest.py using subprocess.Popen file redirection instead of threads.
- fixed the hang in test_failover

Modified:
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1080786&r1=1080785&r2=1080786&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Mar 11 22:29:14 2011
@@ -62,24 +62,6 @@ def is_running(pid):
 class BadProcessStatus(Exception):
     pass
 
-class ExceptionWrapper:
-    """Proxy object that adds a message to exceptions raised"""
-    def __init__(self, obj, msg):
-        self.obj = obj
-        self.msg = msg
-
-    def __getattr__(self, name):
-        func = getattr(self.obj, name)
-        if type(func) != callable:
-            return func
-        return lambda *args, **kwargs: self._wrap(func, args, kwargs)
-
-    def _wrap(self, func, args, kwargs):
-        try:
-            return func(*args, **kwargs)
-        except Exception, e:
-            raise Exception("%s: %s" %(self.msg, str(e)))
-
 def error_line(filename, n=1):
     """Get the last n line(s) of filename for error messages"""
     result = []
@@ -89,7 +71,8 @@ def error_line(filename, n=1):
             for l in f:
                 if len(result) == n:  result.pop(0)
                 result.append("    "+l)
-        finally: f.close()
+        finally:
+            f.close()
     except: return ""
     return ":\n" + "".join(result)
 
@@ -106,88 +89,63 @@ def retry(function, timeout=10, delay=.0
         delay *= 2
     return True
 
+class AtomicCounter:
+    def __init__(self):
+        self.count = 0
+        self.lock = Lock()
+
+    def next(self):
+        self.lock.acquire();
+        ret = self.count
+        self.count += 1
+        self.lock.release();
+        return ret
+
+_popen_id = AtomicCounter() # Popen identifier for use in output file names.
+
+# Constants for file descriptor arguments to Popen
+FILE = "FILE"                       # Write to file named after process
+PIPE = subprocess.PIPE
+
 class Popen(subprocess.Popen):
     """
     Can set and verify expectation of process status at end of test.
     Dumps command line, stdout, stderr to data dir for debugging.
     """
 
-    class DrainThread(Thread):
-        """Thread to drain a file object and write the data to a file."""
-        def __init__(self, infile, outname):
-            Thread.__init__(self)
-            self.infile, self.outname = infile, outname
-            self.outfile = None
-
-        def run(self):
-            try:
-                for line in self.infile:
-                    if self.outfile is None:
-                        self.outfile = open(self.outname, "w")
-                    self.outfile.write(line)
-            finally:
-                self.infile.close()
-                if self.outfile is not None: self.outfile.close()
-
-    class OutStream(ExceptionWrapper):
-        """Wrapper for output streams, handles exceptions & draining output"""
-        def __init__(self, infile, outfile, msg):
-            ExceptionWrapper.__init__(self, infile, msg)
-            self.infile, self.outfile = infile, outfile
-            self.thread = None
-
-        def drain(self):
-            if self.thread is None:
-                self.thread = Popen.DrainThread(self.infile, self.outfile)
-                self.thread.start()
-
-    def outfile(self, ext): return "%s.%s" % (self.pname, ext)
-
-    def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
-        """Run cmd (should be a list of arguments)
+    def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
+        """Run cmd (should be a list of program and arguments)
         expect - if set verify expectation at end of test.
-        drain  - if true (default) drain stdout/stderr to files.
+        stdout, stderr - can have the same values as for subprocess.Popen as well as
+          FILE (the default) which means write to a file named after the process.
+        stdin - like subprocess.Popen but defauts to PIPE
         """
         self._clean = False
         self._clean_lock = Lock()
         assert find_exe(cmd[0]), "executable not found: "+cmd[0]
         if type(cmd) is type(""): cmd = [cmd] # Make it a list.
         self.cmd  = [ str(x) for x in cmd ]
-        self.returncode = None
         self.expect = expect
-        try:
-            subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True)
-        except ValueError:     # Windows can't do close_fds
-            subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE)
-        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
-        msg = "Process %s" % self.pname
-        self.stdin = ExceptionWrapper(self.stdin, msg)
-        self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg)
-        self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg)
+        self.id = _popen_id.next()
+        self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
+        if stdout == FILE: stdout = open(self.outfile("out"), "w")
+        if stderr == FILE: stderr = open(self.outfile("err"), "w")
+        try:
+            subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+                                      stdin=stdin, stdout=stdout, stderr=stderr,
+                                      close_fds=True)
+        except ValueError: # Windows can't do close_fds
+            subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+                                      stdin=stdin, stdout=stdout, stderr=stderr)
+
         f = open(self.outfile("cmd"), "w")
-        try: f.write(self.cmd_str())
+        try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
         finally: f.close()
         log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
-        if drain: self.drain()
 
-        def __str__(self): return "Popen<%s>"%(self.pname)
+    def __str__(self): return "Popen<%s>"%(self.pname)
 
-    def drain(self):
-        """Start threads to drain stdout/err"""
-        self.stdout.drain()
-        self.stderr.drain()
-
-    def _cleanup(self):
-        """Close pipes to sub-process"""
-        self._clean_lock.acquire()
-        try:
-            if self._clean: return
-            self._clean = True
-            self.stdin.close()
-            self.drain()                    # Drain output pipes.
-            self.stdout.thread.join()       # Drain thread closes pipe.
-            self.stderr.thread.join()
-        finally: self._clean_lock.release()
+    def outfile(self, ext): return "%s.%s" % (self.pname, ext)
 
     def unexpected(self,msg):
         err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
@@ -199,10 +157,8 @@ class Popen(subprocess.Popen):
                 try: self.kill()            # Just make sure its dead
                 except: pass
             elif self.expect == EXPECT_RUNNING:
-                try:
-                    self.kill()
-                except:
-                    self.unexpected("expected running, exit code %d" % self.wait())
+                try: self.kill()
+                except: self.unexpected("expected running, exit code %d" % self.wait())
             else:
                 retry(lambda: self.poll() is not None)
                 if self.returncode is None: # Still haven't stopped
@@ -216,38 +172,19 @@ class Popen(subprocess.Popen):
             self.wait()                 # Clean up the process.
 
     def communicate(self, input=None):
-        if input:
-            self.stdin.write(input)
-            self.stdin.close()
-        outerr = (self.stdout.read(), self.stderr.read())
-        self.wait()
-        return outerr
+        ret = subprocess.Popen.communicate(self, input)
+        self.cleanup()
+        return ret
 
-    def is_running(self):
-        return self.poll() is None
+    def is_running(self): return self.poll() is None
 
     def assert_running(self):
         if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
 
-    def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
-        if self.returncode is None:
-            # Pass _deadstate only if it has been set, there is no _deadstate
-            # parameter in Python 2.6
-            if _deadstate is None: ret = subprocess.Popen.poll(self)
-            else: ret = subprocess.Popen.poll(self, _deadstate)
-
-            if (ret != -1):
-                self.returncode = ret
-                self._cleanup()
-        return self.returncode
-
     def wait(self):
-        if self.returncode is None:
-            self.drain()
-            try: self.returncode = subprocess.Popen.wait(self)
-            except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
-            self._cleanup()
-        return self.returncode
+        ret = subprocess.Popen.wait(self)
+        self._cleanup()
+        return ret
 
     def terminate(self):
         try: subprocess.Popen.terminate(self)
@@ -256,6 +193,7 @@ class Popen(subprocess.Popen):
                 os.kill( self.pid , signal.SIGTERM)
             except AttributeError: # no os.kill, using taskkill.. (Windows only)
                 os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
+        self._cleanup()
 
     def kill(self):
         try: subprocess.Popen.kill(self)
@@ -264,6 +202,20 @@ class Popen(subprocess.Popen):
                 os.kill( self.pid , signal.SIGKILL)
             except AttributeError: # no os.kill, using taskkill.. (Windows only)
                 os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
+        self._cleanup()
+
+    def _cleanup(self):
+        """Clean up after a dead process"""
+        self._clean_lock.acquire()
+        if not self._clean:
+            self._clean = True
+            try: self.stdin.close()
+            except: pass
+            try: self.stdout.close()
+            except: pass
+            try: self.stderr.close()
+            except: pass
+        self._clean_lock.release()
 
     def cmd_str(self): return " ".join([str(s) for s in self.cmd])
 
@@ -323,7 +275,7 @@ class Broker(Popen):
             cmd += ["--log-enable=%s" % log_level]
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
-        Popen.__init__(self, cmd, expect, drain=False)
+        Popen.__init__(self, cmd, expect, stdout=PIPE)
         test.cleanup_stop(self)
         self._host = "127.0.0.1"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
@@ -427,7 +379,9 @@ class Broker(Popen):
             "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
 
     def store_state(self):
-        uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
+        f = open(os.path.join(self.datadir, "cluster", "store.status"))
+        try: uuids = f.readlines()
+        finally: f.close()
         null_uuid="00000000-0000-0000-0000-000000000000\n"
         if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
         if uuids[0] == null_uuid: return "empty"
@@ -509,10 +463,10 @@ class BrokerTest(TestCase):
         """Call thing.stop at end of test"""
         self.stopem.append(stopable)
 
-    def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+    def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
         """Start a process that will be killed at end of test, in the test dir."""
         os.chdir(self.dir)
-        p = Popen(cmd, expect, drain)
+        p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
         self.cleanup_stop(p)
         return p
 
@@ -582,7 +536,8 @@ class NumberedSender(Thread):
              "--failover-updates",
              "--content-stdin"
              ],
-            expect=EXPECT_RUNNING)
+            expect=EXPECT_RUNNING,
+            stdin=PIPE)
         self.condition = Condition()
         self.max = max_depth
         self.received = 0
@@ -642,7 +597,7 @@ class NumberedReceiver(Thread):
              "--forever"
              ],
             expect=EXPECT_RUNNING,
-            drain=False)
+            stdout=PIPE)
         self.lock = Lock()
         self.error = None
         self.sender = sender

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1080786&r1=1080785&r2=1080786&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Mar 11 22:29:14 2011
@@ -526,24 +526,24 @@ class LongTests(BrokerTest):
                             if self.stopped: break
                             self.process = self.broker.test.popen(
                                 self.cmd, expect=EXPECT_UNKNOWN)
-                        finally: self.lock.release()
-                        try: exit = self.process.wait()
+                        finally:
+                            self.lock.release()
+                        try:
+                            exit = self.process.wait()
                         except OSError, e:
-                            # Seems to be a race in wait(), it throws
-                            # "no such process" during test shutdown.
-                            # Doesn't indicate a test error, ignore.
-                            return
+                            # Process may already have been killed by self.stop()
+                            break
                         except Exception, e:
                             self.process.unexpected(
                                 "client of %s: %s"%(self.broker.name, e))
                         self.lock.acquire()
                         try:
-                            # Quit and ignore errors if stopped or expecting failure.
                             if self.stopped: break
                             if exit != 0:
                                 self.process.unexpected(
                                     "client of %s exit code %s"%(self.broker.name, exit))
-                        finally: self.lock.release()
+                        finally:
+                            self.lock.release()
                 except Exception, e:
                     self.error = RethrownException("Error in ClientLoop.run")
 
@@ -588,7 +588,8 @@ class LongTests(BrokerTest):
             mclients.append(ClientLoop(broker, cmd))
 
         endtime = time.time() + self.duration()
-        runtime = self.duration() / 4   # First run is longer, use quarter of duration.
+        # For long duration, first run is a quarter of the duration.
+        runtime = max(5, self.duration() / 4.0)
         alive = 0                       # First live cluster member
         for i in range(len(cluster)): start_clients(cluster[i])
         start_mclients(cluster[alive])
@@ -614,14 +615,13 @@ class LongTests(BrokerTest):
             start_mclients(cluster[alive])
         for c in chain(mclients, *clients):
             c.stop()
-
         # Verify that logs are consistent
         cluster_test_logs.verify_logs()
 
     def test_management_qmf2(self):
         self.test_management(args=["--mgmt-qmf2=yes"])
 
-    def test_connect_consistent(self):   # FIXME aconway 2011-01-18:
+    def test_connect_consistent(self):
         args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
         cluster = self.cluster(2, args=args)
         end = time.time() + self.duration()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org