You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/03 18:11:10 UTC

svn commit: r918578 - in /qpid/trunk: .gitignore qpid/python/qpid/brokertest.py

Author: aconway
Date: Wed Mar  3 17:11:09 2010
New Revision: 918578

URL: http://svn.apache.org/viewvc?rev=918578&view=rev
Log:
Minor improvements to brokertest framework.

- fixed bug in use of host()
- check for existence of executables
- more efficient error_line impl
- check both *.err and *.out for error line

Modified:
    qpid/trunk/.gitignore
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/.gitignore
URL: http://svn.apache.org/viewvc/qpid/trunk/.gitignore?rev=918578&r1=918577&r2=918578&view=diff
==============================================================================
--- qpid/trunk/.gitignore (original)
+++ qpid/trunk/.gitignore Wed Mar  3 17:11:09 2010
@@ -7,7 +7,7 @@
 *.lo
 *.vglog
 .dirstamp
-Makefile*
+Makefile.in
 config\.*
 configure
 .deps
@@ -22,7 +22,6 @@
 qpid/cpp/src/gen/
 *gen.mk
 *.timestamp
-rgen.timestamp
 *.pcl
 qpid/cpp/managementgen/management-types.xml
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=918578&r1=918577&r2=918578&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Mar  3 17:11:09 2010
@@ -37,7 +37,21 @@
 EXPECT_EXIT_FAIL=2         # Expect to exit with non-0 status before end of test.
 EXPECT_RUNNING=3           # Expect to still be running at end of test
 EXPECT_UNKNOWN=4            # No expectation, don't check exit status.
-    
+
+def is_exe(fpath):
+    return os.path.exists(fpath) and os.access(fpath, os.X_OK)
+
+def find_exe(program):
+    """Find an executable in the system PATH"""
+    dir, name = os.path.split(program)
+    if dir:
+        if is_exe(program): return program
+    else:
+        for path in os.environ["PATH"].split(os.pathsep):
+            exe_file = os.path.join(path, program)
+            if is_exe(exe_file): return exe_file
+    return None
+
 def is_running(pid):
     try:
         os.kill(pid, 0)
@@ -64,15 +78,28 @@
         except Exception, e:
             raise Exception("%s: %s" %(self.msg, str(e)))
 
-def error_line(f):
+def error_line(filename):
+    """Get the last line of filename for error messages"""
+    result = ""
     try:
-        ff = file(f)
+        f = file(filename)
         try:
-            lines = ff.readlines()
-            if len(lines) > 0: return ": %s" % (lines[-1])
-            else: return ""
-        finally: ff.close()
+            for l in f: result = ": " + l
+        finally: f.close()
     except: return ""
+    return result
+
+def retry(function, timeout=1, delay=.001):
+    """Call function until it returns True or timeout expires.
+    Double the delay for each retry. Return True if function
+    returns true, False if timeout expires."""
+    elapsed = 0
+    while not function():
+        elapsed += delay
+        if elapsed > timeout: return False
+        delay *= 2
+        time.sleep(delay)
+    return True
 
 class Popen(popen2.Popen3):
     """
@@ -95,7 +122,8 @@
                         self.outfile = file(self.outname, "w")
                     self.outfile.write(line)
             finally:
-                if self.outfile is not None: self.outfile.close
+                self.infile.close()
+                if self.outfile is not None: self.outfile.close()
 
     class OutStream(ExceptionWrapper):
         """Wrapper for output streams, handles excpetions & draining output"""
@@ -116,10 +144,12 @@
         expect - if set verify expectation at end of test.
         drain  - if true (default) drain stdout/stderr to files.
         """
+        assert find_exe(cmd[0])
         if type(cmd) is type(""): cmd = [cmd] # Make it a list.
         self.cmd  = [ str(x) for x in cmd ]
         popen2.Popen3.__init__(self, self.cmd, True)
         self.expect = expect
+        self.was_shutdown = False # Set if we deliberately kill/terminate the process
         self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
         msg = "Process %s" % self.pname
         self.stdin = ExceptionWrapper(self.tochild, msg)
@@ -128,22 +158,24 @@
         f = file(self.outfile("cmd"), "w")
         try: f.write(self.cmd_str())
         finally: f.close()
-        log.debug("Started process %s" % self.pname)
+        log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
         if drain: self.drain()
 
     def drain(self):
+        """Start threads to drain stdout/err"""
         self.stdout.drain()
         self.stderr.drain()
 
     def drain_join(self):
+        """Join the drain threads"""
         self.stdout.thread.join()
         self.stderr.thread.join()
 
     def unexpected(self,msg):
         self.drain()
         self.drain_join()
-        raise BadProcessStatus("%s %s%s" % (self.pname, msg,
-                                            error_line(self.outfile("err"))))
+        err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
+        raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
     
     def stop(self):                  # Clean up at end of test.
         self.drain()
@@ -157,6 +189,7 @@
                 self.unexpected("expected running, exit code %d" % self.wait())
         else:
             # Give the process some time to exit.
+            # FIXME aconway 2010-03-02: use retry
             delay = 0.1
             while (self.poll() is None and delay < 1):
                 time.sleep(delay)
@@ -168,6 +201,7 @@
                 self.unexpected("exit code %d" % self.returncode)
             elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
                 self.unexpected("expected error")
+        self.stdin.close()
                
     def communicate(self, input=None):
         if input:
@@ -195,6 +229,8 @@
         return self.returncode
 
     def send_signal(self, sig):
+        log.debug("kill -%s %s"%(sig, self.pname))
+        self.was_shutdown = True
         os.kill(self.pid,sig)
         self.wait()
 
@@ -219,13 +255,13 @@
             self.log = "%s-%d.log" % (self.name, i)
             i += 1
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
+    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
         self.test = test
-        self._port = None
-        cmd = [BrokerTest.qpidd_exec, "--port=0", "--no-module-dir", "--auth=no"] + args
+        self._port=port
+        cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir", "--auth=no"] + args
         if name: self.name = name
         else:
             self.name = "broker%d" % Broker._broker_count
@@ -237,12 +273,14 @@
         cmd += ["--data-dir", self.datadir]
         Popen.__init__(self, cmd, expect, drain=False)
         test.cleanup_stop(self)
-        self.host = "localhost"
+        self._host = "localhost"
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
 
+    def host(self): return self._host
+
     def port(self):
         # Read port from broker process stdout if not already read.
-        if (self._port is None):
+        if (self._port == 0):
             try: self._port = int(self.stdout.readline())
             except ValueError, e:
                 raise Exception("Can't get port for broker %s (%s)%s" %
@@ -254,11 +292,11 @@
 
     def connect(self):
         """New API connection to the broker."""
-        return messaging.Connection.open(self.host, self.port())
+        return messaging.Connection.open(self.host(), self.port())
 
     def connect_old(self):
         """Old API connection to the broker."""
-        socket = qpid.util.connect(self.host,self.port())
+        socket = qpid.util.connect(self.host(),self.port())
         connection = qpid.connection.Connection (sock=socket)
         connection.start()
         return connection;
@@ -308,11 +346,28 @@
         s.connection.close()
         return m
 
-    def host_port(self): return "%s:%s" % (self.host, self.port())
+    def host_port(self): return "%s:%s" % (self.host(), self.port())
+
+    def log_ready(self):
+        """Return true if the log file exists and contains a broker ready message"""
+        if not os.path.exists(self.log): return False
+        ready_msg = re.compile("notice Broker running")
+        f = file(self.log)
+        try:
+            for l in f:
+                if ready_msg.search(l): return True
+            return False
+        finally: f.close()
 
+    # FIXME aconway 2010-03-02: rename to wait_ready
     def ready(self):
         """Wait till broker is ready to serve clients"""
-        self.connect().close()
+        # First make sure the broker is listening by checking the log.
+        if not retry(lambda: self.log_ready()):
+            raise Exception("Timed out waiting for broker %s" % self.name)
+        # Make a connection, this will wait for extended cluster init to finish.
+        try: self.connect().close()
+        except: raise RethrownException("Broker %s failed ready test %s"%self.name)
 
 class Cluster:
     """A cluster of brokers in a test."""
@@ -332,11 +387,10 @@
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
         self.start_n(count, expect=expect, wait=wait)
 
-    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
-        log.debug("Cluster %s starting member %s" % (self.name, name))
-        self._brokers.append(self.test.broker(self.args+args, name, expect, wait))
+        self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
         return self._brokers[-1]
 
     def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
@@ -392,10 +446,13 @@
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True,port=0):
         """Create and return a broker ready for use"""
-        b = Broker(self, args=args, name=name, expect=expect)
-        if (wait): b.connect().close()
+        b = Broker(self, args=args, name=name, expect=expect, port=port)
+        if (wait):
+            try: b.ready()
+            except Exception, e:
+                raise Exception("Failed to start broker %s: %s" % ( b.name, e))
         return b
 
     def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org