You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/03 21:23:43 UTC
svn commit: r918674 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.fail
cpp/src/tests/cluster_tests.py python/qpid/brokertest.py
tools/src/py/qpid-stat
Author: aconway
Date: Wed Mar 3 20:23:43 2010
New Revision: 918674
URL: http://svn.apache.org/viewvc?rev=918674&view=rev
Log:
Extended cluster_tests.test_management exposes a bug.
- kill and start brokers with clients running.
- added qpid-stat -b and testagent clients
- disabled in src/tests/cluster_tests.fail till bug is fixed.
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/python/qpid/brokertest.py
qpid/trunk/qpid/tools/src/py/qpid-stat
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail Wed Mar 3 20:23:43 2010
@@ -1,2 +1,3 @@
+cluster_tests.LongTests.test_management
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Mar 3 20:23:43 2010
@@ -25,6 +25,7 @@
from qpid.messaging import Message
from threading import Thread
from logging import getLogger
+from itertools import chain
log = getLogger("qpid.cluster_tests")
@@ -149,21 +150,17 @@
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self):
- """Run management in conjunction with other traffic."""
- # Publish often to provoke errors
- args=["--mgmt-pub-interval", 1]
- # Use store if present
- if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
+ """Run management clients and other clients concurrently."""
+ # FIXME aconway 2010-03-03: move to framework
class ClientLoop(StoppableThread):
- """Run an infinite client loop."""
+ """Run a client executable in a loop."""
def __init__(self, broker, cmd):
StoppableThread.__init__(self)
self.broker=broker
- self.cmd = cmd
+ self.cmd = cmd # Client command.
self.lock = Lock()
- self.process = None
- self.stopped = False
+ self.process = None # Client process.
self.start()
def run(self):
@@ -172,37 +169,91 @@
self.lock.acquire()
try:
if self.stopped: break
- self.process = self.broker.test.popen(self.cmd,
- expect=EXPECT_UNKNOWN)
+ self.process = self.broker.test.popen(
+ self.cmd, expect=EXPECT_UNKNOWN)
finally: self.lock.release()
try: exit = self.process.wait()
except: exit = 1
self.lock.acquire()
try:
- if exit != 0 and not self.stopped:
- self.process.unexpected("bad exit status in client loop")
+ # Quit and ignore errors if stopped or expecting failure.
+ if self.stopped: break
+ if exit != 0:
+ self.process.unexpected("client of %s exit status %s" %
+ (self.broker.name, exit))
finally: self.lock.release()
except Exception, e:
- error=e
+ self.error = RethrownException("Error in ClientLoop.run")
+ def expect_fail(self):
+ """Ignore exit status of the currently running client."""
+ self.lock.acquire()
+ stopped = True
+ self.lock.release()
+
def stop(self):
+ """Stop the running client and wait for it to exit"""
self.lock.acquire()
try:
self.stopped = True
- try: self.process.terminate()
- except: pass
+ if self.process:
+ try: self.process.kill() # Kill the client.
+ except OSError: pass # The client might not be running.
finally: self.lock.release()
StoppableThread.stop(self)
+
+ # def test_management
+ args=["--mgmt-pub-interval", 1] # Publish management information every second.
+ # Use store if present.
+ if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args)
- clients = []
- for b in cluster:
- clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()]))
- clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())]))
+
+ clients = [] # Ordinary clients that only connect to one broker.
+ mclients = [] # Management clients that connect to every broker in the cluster.
+
+ def start_clients(broker):
+ """Start ordinary clients for a broker"""
+ batch = []
+ for cmd in [
+ ["perftest", "--count", 1000,
+ "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
+ ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
+ [os.path.join(self.rootdir, "testagent/testagent"), "localhost",
+ str(broker.port())]
+ ]:
+ batch.append(ClientLoop(broker, cmd))
+ clients.append(batch)
+
+ def start_mclients(broker):
+ """Start management clients that make multiple connections"""
+ for cmd in [
+ ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
+ ]:
+ mclients.append(ClientLoop(broker, cmd))
+
endtime = time.time() + self.duration()
+ alive = 0 # First live cluster member
+ for b in cluster:
+ start_clients(b)
+ start_mclients(b)
+
while time.time() < endtime:
- for b in cluster: b.ready() # Will raise if broker crashed.
- time.sleep(1)
- for c in clients:
+ time.sleep(min(5,self.duration()))
+ for b in cluster[alive:]: b.ready() # Check if a broker crashed.
+ # Kill the first broker. Ignore errors on its clients and all the mclients
+ for c in clients[alive] + mclients: c.expect_fail()
+ clients[alive] = []
+ mclients = []
+ b = cluster[alive]
+ b.expect = EXPECT_EXIT_FAIL
+ b.kill()
+ # Start another broker and clients
+ alive += 1
+ b = cluster.start()
+ start_clients(b)
+ for b in cluster[alive:]: start_mclients(b)
+
+ for c in chain(mclients, *clients):
c.stop()
class StoreTests(BrokerTest):
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Mar 3 20:23:43 2010
@@ -188,12 +188,7 @@
except:
self.unexpected("expected running, exit code %d" % self.wait())
else:
- # Give the process some time to exit.
- # FIXME aconway 2010-03-02: use retry
- delay = 0.1
- while (self.poll() is None and delay < 1):
- time.sleep(delay)
- delay *= 2
+ retry(self.poll)
if self.returncode is None: # Still haven't stopped
self.kill()
self.unexpected("still running")
@@ -229,7 +224,6 @@
return self.returncode
def send_signal(self, sig):
- log.debug("kill -%s %s"%(sig, self.pname))
self.was_shutdown = True
os.kill(self.pid,sig)
self.wait()
Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=918674&r1=918673&r2=918674&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Wed Mar 3 20:23:43 2010
@@ -505,6 +505,7 @@
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
+ raise # FIXME aconway 2010-03-03:
sys.exit(1)
bm.Disconnect()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org