You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/03 21:23:43 UTC

svn commit: r918674 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.fail cpp/src/tests/cluster_tests.py python/qpid/brokertest.py tools/src/py/qpid-stat

Author: aconway
Date: Wed Mar  3 20:23:43 2010
New Revision: 918674

URL: http://svn.apache.org/viewvc?rev=918674&view=rev
Log:
Extended cluster_tests.test_management exposes a bug.

- kill and start brokers with clients running.
- added qpid-stat -b and testagent clients
- disabled in src/tests/cluster_tests.fail till bug is fixed.

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py
    qpid/trunk/qpid/tools/src/py/qpid-stat

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail Wed Mar  3 20:23:43 2010
@@ -1,2 +1,3 @@
+cluster_tests.LongTests.test_management
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Mar  3 20:23:43 2010
@@ -25,6 +25,7 @@
 from qpid.messaging import Message
 from threading import Thread
 from logging import getLogger
+from itertools import chain
 
 log = getLogger("qpid.cluster_tests")
 
@@ -149,21 +150,17 @@
         for i in range(i, len(cluster)): cluster[i].kill()
 
     def test_management(self):
-        """Run management in conjunction with other traffic."""
-        # Publish often to provoke errors
-        args=["--mgmt-pub-interval", 1]
-        # Use store if present
-        if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
+        """Run management clients and other clients concurrently."""
 
+        # FIXME aconway 2010-03-03: move to framework
         class ClientLoop(StoppableThread):
-            """Run an infinite client loop."""
+            """Run a client executable in a loop."""
             def __init__(self, broker, cmd):
                 StoppableThread.__init__(self)
                 self.broker=broker
-                self.cmd = cmd
+                self.cmd = cmd          # Client command.
                 self.lock = Lock()
-                self.process = None
-                self.stopped = False
+                self.process = None     # Client process.
                 self.start()
 
             def run(self):
@@ -172,37 +169,91 @@
                         self.lock.acquire()
                         try:
                             if self.stopped: break
-                            self.process = self.broker.test.popen(self.cmd,
-                                                                  expect=EXPECT_UNKNOWN)
+                            self.process = self.broker.test.popen(
+                                self.cmd, expect=EXPECT_UNKNOWN)
                         finally: self.lock.release()
                         try: exit = self.process.wait()
                         except: exit = 1
                         self.lock.acquire()
                         try:
-                            if exit != 0 and not self.stopped:
-                                self.process.unexpected("bad exit status in client loop")
+                            # Quit and ignore errors if stopped or expecting failure.
+                            if self.stopped: break
+                            if exit != 0:
+                                self.process.unexpected("client of %s exit status %s" %
+                                                        (self.broker.name, exit))
                         finally: self.lock.release()
                 except Exception, e:
-                    error=e
+                    self.error = RethrownException("Error in ClientLoop.run")
 
+            def expect_fail(self):
+                """Ignore exit status of the currently running client."""
+                self.lock.acquire()
+                stopped = True
+                self.lock.release()
+                
             def stop(self):
+                """Stop the running client and wait for it to exit"""
                 self.lock.acquire()
                 try:
                     self.stopped = True
-                    try: self.process.terminate()
-                    except: pass
+                    if self.process:
+                        try: self.process.kill() # Kill the client.
+                        except OSError: pass # The client might not be running.
                 finally: self.lock.release()
                 StoppableThread.stop(self)
+
+        # def test_management
+        args=["--mgmt-pub-interval", 1] # Publish management information every second.
+        # Use store if present.
+        if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
         cluster = self.cluster(3, args)
-        clients = []
-        for b in cluster:
-            clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()]))
-            clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())]))
+        
+        clients = [] # Ordinary clients that only connect to one broker.
+        mclients = [] # Management clients that connect to every broker in the cluster.
+
+        def start_clients(broker):
+            """Start ordinary clients for a broker"""
+            batch = []
+            for cmd in [
+                ["perftest", "--count", 1000,
+                 "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
+                ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
+                [os.path.join(self.rootdir, "testagent/testagent"), "localhost",
+                 str(broker.port())]
+                ]:
+                batch.append(ClientLoop(broker, cmd))
+            clients.append(batch)
+
+        def start_mclients(broker):
+            """Start management clients that make multiple connections"""
+            for cmd in [
+                ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
+                ]:
+                mclients.append(ClientLoop(broker, cmd))
+
         endtime = time.time() + self.duration()
+        alive = 0                       # First live cluster member
+        for b in cluster:
+            start_clients(b)
+            start_mclients(b)
+
         while time.time() < endtime:
-            for b in cluster: b.ready() # Will raise if broker crashed.
-            time.sleep(1)
-        for c in clients:
+            time.sleep(min(5,self.duration()))
+            for b in cluster[alive:]: b.ready() # Check if a broker crashed.
+            # Kill the first broker. Ignore errors on its clients and all the mclients
+            for c in clients[alive] + mclients: c.expect_fail()
+            clients[alive] = []
+            mclients = []
+            b = cluster[alive]
+            b.expect = EXPECT_EXIT_FAIL
+            b.kill()
+            # Start another broker and clients
+            alive += 1
+            b = cluster.start()
+            start_clients(b)
+            for b in cluster[alive:]: start_mclients(b)
+
+        for c in chain(mclients, *clients):
             c.stop()
 
 class StoreTests(BrokerTest):

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Mar  3 20:23:43 2010
@@ -188,12 +188,7 @@
             except:
                 self.unexpected("expected running, exit code %d" % self.wait())
         else:
-            # Give the process some time to exit.
-            # FIXME aconway 2010-03-02: use retry
-            delay = 0.1
-            while (self.poll() is None and delay < 1):
-                time.sleep(delay)
-                delay *= 2
+            retry(self.poll)
             if self.returncode is None: # Still haven't stopped
                 self.kill()
                 self.unexpected("still running")
@@ -229,7 +224,6 @@
         return self.returncode
 
     def send_signal(self, sig):
-        log.debug("kill -%s %s"%(sig, self.pname))
         self.was_shutdown = True
         os.kill(self.pid,sig)
         self.wait()

Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Wed Mar  3 20:23:43 2010
@@ -505,6 +505,7 @@
     print
 except Exception,e:
     print "Failed: %s - %s" % (e.__class__.__name__, e)
+    raise                               # FIXME aconway 2010-03-03: 
     sys.exit(1)
 
 bm.Disconnect()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org