You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/06/14 19:57:25 UTC

svn commit: r1135722 - in /qpid/trunk/qpid/cpp/src/tests: brokertest.py cluster_tests.py

Author: aconway
Date: Tue Jun 14 17:57:25 2011
New Revision: 1135722

URL: http://svn.apache.org/viewvc?rev=1135722&view=rev
Log:
NO-JIRA: Fix bugs in test_failover test and the brokertest.py framework.

- brokertest.py was not reliably detecting failed processes.
- test_failover was not setting the reconnect option on its connections.

Modified:
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1135722&r1=1135721&r2=1135722&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Jun 14 17:57:25 2011
@@ -157,8 +157,13 @@ class Popen(subprocess.Popen):
                 try: self.kill()            # Just make sure its dead
                 except: pass
             elif self.expect == EXPECT_RUNNING:
-                try: self.kill()
-                except: self.unexpected("expected running, exit code %d" % self.wait())
+                    if self.poll() != None:
+                        self.unexpected("expected running, exit code %d" % self.returncode)
+                    else:
+                        try:
+                            self.kill()
+                        except Exception,e:
+                            self.unexpected("exception from kill: %s" % str(e))
             else:
                 retry(lambda: self.poll() is not None)
                 if self.returncode is None: # Still haven't stopped
@@ -544,6 +549,7 @@ class NumberedSender(Thread):
              "--broker", "localhost:%s"%broker.port(),
              "--address", "%s;{create:always}"%queue,
              "--failover-updates",
+             "--connection-options", "{reconnect:true,reconnect-timeout:10}",
              "--content-stdin"
              ],
             expect=EXPECT_RUNNING,
@@ -562,6 +568,7 @@ class NumberedSender(Thread):
         try:
             self.sent = 0
             while not self.stopped:
+                self.sender.assert_running()
                 if self.max:
                     self.condition.acquire()
                     while not self.stopped and self.sent - self.received > self.max:
@@ -604,6 +611,7 @@ class NumberedReceiver(Thread):
              "--broker", "localhost:%s"%broker.port(),
              "--address", "%s;{create:always}"%queue,
              "--failover-updates",
+             "--connection-options", "{reconnect:true,reconnect_timeout:10}",
              "--forever"
              ],
             expect=EXPECT_RUNNING,
@@ -620,6 +628,7 @@ class NumberedReceiver(Thread):
             self.received = 0
             m = self.read_message()
             while m != -1:
+                self.receiver.assert_running()
                 assert(m <= self.received) # Check for missing messages
                 if (m == self.received): # Ignore duplicates
                     self.received += 1

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1135722&r1=1135721&r2=1135722&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Jun 14 17:57:25 2011
@@ -253,6 +253,7 @@ acl allow all all
         client was attached.
         """
         args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+        # First broker will be killed.
         cluster0 = self.cluster(1, args=args)
         cluster1 = self.cluster(1, args=args)
         assert 0 == subprocess.call(
@@ -287,6 +288,7 @@ acl allow all all
 
         # Force a change of elder
         cluster0.start()
+        cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
         cluster0[0].kill()
         time.sleep(2) # Allow a management interval to pass.
         # Verify logs are consistent
@@ -305,7 +307,7 @@ acl allow all all
              "--sequence=true",
              "--send-eos=1",
              "--messages=100000",
-             "--connection-options={reconnect:true}"
+             "--connection-options={reconnect:true,reconnect_timeout:10}"
              ])
         self.receiver = self.popen(
             ["qpid-receive",
@@ -313,7 +315,7 @@ acl allow all all
              "--address", queue,
              "--ignore-duplicates",
              "--check-redelivered",
-             "--connection-options={reconnect:true}",
+             "--connection-options={reconnect:true,reconnect_timeout:10}",
              "--forever"
              ])
         time.sleep(1)#give sender enough time to have some messages to replay
@@ -461,7 +463,7 @@ acl allow all all
                                                 "--content-size=%s" % self.size,
                                                 "--messages=%s" % self.count,
                                                 "--failover-updates",
-                                                "--connection-options={reconnect:true}",
+                                                "--connection-options={reconnect:true,reconnect_timeout:10}",
                                                 "--address=%s" % self.queue,
                                                 "--broker=%s" % self.broker.host_port()])
                 self.sender.wait()
@@ -493,7 +495,7 @@ acl allow all all
                                      "--timeout=1",
                                      "--print-content=no",
                                      "--failover-updates",
-                                     "--connection-options={reconnect:true}",
+                                     "--connection-options={reconnect:true,reconnect_timeout:10}",
                                      "--ack-frequency=1",
                                      "--address=flq",
                                      "--broker=%s" % cluster[1].host_port()])
@@ -518,14 +520,14 @@ acl allow all all
                                      "--timeout=1",
                                      "--print-content=no",
                                      "--failover-updates",
-                                     "--connection-options={reconnect:true}",
+                                     "--connection-options={reconnect:true,reconnect_timeout:10}",
                                      "--ack-frequency=1",
                                      "--address=flq",
                                      "--broker=%s" % cluster[2].host_port()])
         receiver.wait()
         q_obj.update()
         assert not q_obj.flowStopped
-        assert q_obj.msgDepth == 0
+        self.assertEqual(q_obj.msgDepth, 0)
 
         # verify that the sender has become unblocked
         sender.join(timeout=5)
@@ -701,18 +703,21 @@ class LongTests(BrokerTest):
 
         # Start sender and receiver threads
         cluster[0].declare_queue("test-queue")
-        sender = NumberedSender(cluster[1], 1000) # Max queue depth
-        receiver = NumberedReceiver(cluster[2], sender)
+        sender = NumberedSender(cluster[0], 1000) # Max queue depth
+        receiver = NumberedReceiver(cluster[0], sender)
         receiver.start()
         sender.start()
-
+        for b in cluster: b.ready()     # Make sure brokers are ready
         # Kill original brokers, start new ones for the duration.
         endtime = time.time() + self.duration()
         i = 0
         while time.time() < endtime:
+            sender.sender.assert_running()
+            receiver.receiver.assert_running()
             cluster[i].kill()
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)
+            for b in cluster[i:]: b.ready()
             ErrorGenerator(b)
             time.sleep(5)
         sender.stop()
@@ -853,29 +858,28 @@ class LongTests(BrokerTest):
 
         # Original cluster will all be killed so expect exit with failure
         cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
-        #for b in cluster: ErrorGenerator(b)
 
         # create a queue with rather draconian flow control settings
         ssn0 = cluster[0].connect().session()
         s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
 
-        receiver = NumberedReceiver(cluster[2])
+        receiver = NumberedReceiver(cluster[0])
         receiver.start()
-        senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+        senders = [NumberedSender(cluster[0]) for i in range(1,3)]
         for s in senders:
             s.start()
+        for b in cluster: b.ready()     # Make sure brokers are ready
 
         # Kill original brokers, start new ones for the duration.
         endtime = time.time() + self.duration();
         i = 0
         while time.time() < endtime:
+            for s in senders: s.sender.assert_running()
+            receiver.receiver.assert_running()
             cluster[i].kill()
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)
-            #ErrorGenerator(b)
             time.sleep(5)
-            #b = cluster[0]
-            #b.startQmf()
         for s in senders:
             s.stop()
         receiver.stop()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org