You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/26 14:40:02 UTC
svn commit: r927847 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py
python/qpid/brokertest.py
Author: aconway
Date: Fri Mar 26 13:40:02 2010
New Revision: 927847
URL: http://svn.apache.org/viewvc?rev=927847&view=rev
Log:
Fix race condition causing deadlock in cluster_tests.py, failover_test.
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=927847&r1=927846&r2=927847&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Mar 26 13:40:02 2010
@@ -156,7 +156,7 @@ class LongTests(BrokerTest):
ErrorGenerator(b)
time.sleep(min(5,self.duration()/2))
sender.stop()
- receiver.stop(sender.sent)
+ receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self):
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=927847&r1=927846&r2=927847&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Mar 26 13:40:02 2010
@@ -497,7 +497,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
- Requires self.received(n) to be called each time messages are received.
+ Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
self.sender = broker.test.popen(
@@ -508,6 +508,10 @@ class NumberedSender(Thread):
self.stopped = False
self.error = None
+ def write_message(self, n):
+ self.sender.stdin.write(str(n)+"\n")
+ self.sender.stdin.flush()
+
def run(self):
try:
self.sent = 0
@@ -517,8 +521,7 @@ class NumberedSender(Thread):
while not self.stopped and self.sent - self.received > self.max:
self.condition.wait()
self.condition.release()
- self.sender.stdin.write(str(self.sent)+"\n")
- self.sender.stdin.flush()
+ self.write_message(self.sent)
self.sent += 1
except Exception: self.error = RethrownException(self.sender.pname)
@@ -531,10 +534,12 @@ class NumberedSender(Thread):
def stop(self):
self.condition.acquire()
- self.stopped = True
- self.condition.notify()
- self.condition.release()
+ try:
+ self.stopped = True
+ self.condition.notify()
+ finally: self.condition.release()
self.join()
+ self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
class NumberedReceiver(Thread):
@@ -551,35 +556,29 @@ class NumberedReceiver(Thread):
self.receiver = self.test.popen(
[self.test.receiver_exec, "--port", broker.port()],
expect=EXPECT_RUNNING, drain=False)
- self.stopat = None
self.lock = Lock()
self.error = None
self.sender = sender
- def continue_test(self):
- self.lock.acquire()
- ret = self.stopat is None or self.received < self.stopat
- self.lock.release()
- return ret
+ def read_message(self):
+ return int(self.receiver.stdout.readline())
def run(self):
try:
self.received = 0
- while self.continue_test():
- m = int(self.receiver.stdout.readline())
- assert(m <= self.received) # Allow for duplicates
- if (m == self.received):
+ m = self.read_message()
+ while m != -1:
+ assert(m <= self.received) # Check for missing messages
+ if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
self.sender.notify_received(self.received)
+ m = self.read_message()
except Exception:
self.error = RethrownException(self.receiver.pname)
- def stop(self, count):
- """Returns when received >= count"""
- self.lock.acquire()
- self.stopat = count
- self.lock.release()
+ def stop(self):
+ """Returns when termination message is received"""
self.join()
if self.error: raise self.error
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org