You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/26 14:40:02 UTC

svn commit: r927847 - in /qpid/trunk/qpid: cpp/src/tests/cluster_tests.py python/qpid/brokertest.py

Author: aconway
Date: Fri Mar 26 13:40:02 2010
New Revision: 927847

URL: http://svn.apache.org/viewvc?rev=927847&view=rev
Log:
Fix race condition causing deadlock in cluster_tests.py, failover_test.

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=927847&r1=927846&r2=927847&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Mar 26 13:40:02 2010
@@ -156,7 +156,7 @@ class LongTests(BrokerTest):
             ErrorGenerator(b)
             time.sleep(min(5,self.duration()/2))
         sender.stop()
-        receiver.stop(sender.sent)
+        receiver.stop()
         for i in range(i, len(cluster)): cluster[i].kill()
 
     def test_management(self):

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=927847&r1=927846&r2=927847&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Mar 26 13:40:02 2010
@@ -497,7 +497,7 @@ class NumberedSender(Thread):
     def __init__(self, broker, max_depth=None):
         """
         max_depth: enable flow control, ensure sent - received <= max_depth.
-        Requires self.received(n) to be called each time messages are received.
+        Requires self.notify_received(n) to be called each time messages are received.
         """
         Thread.__init__(self)
         self.sender = broker.test.popen(
@@ -508,6 +508,10 @@ class NumberedSender(Thread):
         self.stopped = False
         self.error = None
 
+    def write_message(self, n):
+        self.sender.stdin.write(str(n)+"\n")
+        self.sender.stdin.flush()
+
     def run(self):
         try:
             self.sent = 0
@@ -517,8 +521,7 @@ class NumberedSender(Thread):
                     while not self.stopped and self.sent - self.received > self.max:
                         self.condition.wait()
                     self.condition.release()
-                self.sender.stdin.write(str(self.sent)+"\n")
-                self.sender.stdin.flush()
+                self.write_message(self.sent)
                 self.sent += 1
         except Exception: self.error = RethrownException(self.sender.pname)
 
@@ -531,10 +534,12 @@ class NumberedSender(Thread):
 
     def stop(self):
         self.condition.acquire()
-        self.stopped = True
-        self.condition.notify()
-        self.condition.release()
+        try:
+            self.stopped = True
+            self.condition.notify()
+        finally: self.condition.release()
         self.join()
+        self.write_message(-1)          # end-of-messages marker.
         if self.error: raise self.error
         
 class NumberedReceiver(Thread):
@@ -551,35 +556,29 @@ class NumberedReceiver(Thread):
         self.receiver = self.test.popen(
             [self.test.receiver_exec, "--port", broker.port()],
             expect=EXPECT_RUNNING, drain=False)
-        self.stopat = None
         self.lock = Lock()
         self.error = None
         self.sender = sender
 
-    def continue_test(self):
-        self.lock.acquire()
-        ret = self.stopat is None or self.received < self.stopat
-        self.lock.release()
-        return ret
+    def read_message(self):
+        return int(self.receiver.stdout.readline())
     
     def run(self):
         try:
             self.received = 0
-            while self.continue_test():
-                m = int(self.receiver.stdout.readline())
-                assert(m <= self.received) # Allow for duplicates
-                if (m == self.received):
+            m = self.read_message()
+            while m != -1:
+                assert(m <= self.received) # Check for missing messages
+                if (m == self.received): # Ignore duplicates
                     self.received += 1
                     if self.sender:
                         self.sender.notify_received(self.received)
+                m = self.read_message()
         except Exception:
             self.error = RethrownException(self.receiver.pname)
 
-    def stop(self, count):
-        """Returns when received >= count"""
-        self.lock.acquire()
-        self.stopat = count
-        self.lock.release()
+    def stop(self):
+        """Returns when termination message is received"""
         self.join()
         if self.error: raise self.error
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org