You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/10/05 03:24:46 UTC

svn commit: r1004496 - /qpid/trunk/qpid/java/testkit/testkit.py

Author: rajith
Date: Tue Oct  5 01:24:46 2010
New Revision: 1004496

URL: http://svn.apache.org/viewvc?rev=1004496&view=rev
Log:
Added more test cases to cover failover.

Modified:
    qpid/trunk/qpid/java/testkit/testkit.py

Modified: qpid/trunk/qpid/java/testkit/testkit.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/testkit.py?rev=1004496&r1=1004495&r2=1004496&view=diff
==============================================================================
--- qpid/trunk/qpid/java/testkit/testkit.py (original)
+++ qpid/trunk/qpid/java/testkit/testkit.py Tue Oct  5 01:24:46 2010
@@ -100,6 +100,7 @@ class JavaClientTest(BrokerTest):
         ssn = broker.connect().session()
         err_watcher = ssn.receiver("control; {create:always}", capacity=1)
         i = run_time/error_ck_freq
+        is_error = False   
         for j in range(i):    
             not_empty = True
             while not_empty:            
@@ -107,13 +108,15 @@ class JavaClientTest(BrokerTest):
                     m = err_watcher.fetch(timeout=error_ck_freq)
                     ssn.acknowledge()
                     print "Java process notified of an error"
-                    self.check_for_error(m) 
+                    self.print_error(m) 
+                    is_error = True
                 except messaging.Empty, e: 
                     not_empty = False              
                     
         ssn.close()
+        return is_error
 
-    def check_for_error(self,msg):
+    def print_error(self,msg):
         print msg.properties.get("exception-trace")
 
     def verify(self, receiver,sender):
@@ -125,8 +128,6 @@ class JavaClientTest(BrokerTest):
 
     def start_sender_and_receiver(self,**options):
 
-        options["use_unique_dests"]=True
-        options["address"]="amq.topic"
         receiver_opts = options
         receiver_opts["receiver"]=True
         receiver = self.popen(self.client(**receiver_opts),
@@ -139,21 +140,28 @@ class JavaClientTest(BrokerTest):
 
         return receiver, sender
 
+    def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
+        if options.get("durable",False)==True:
+            cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
+        else:
+            cluster = Cluster(self, count=count)    
+        return cluster  
+
 class ConcurrencyTest(JavaClientTest):
     """A concurrency test suite for the JMS client"""
-    skip = False
+    skip = True
 
     def base_case(self,**options):
         if self.skip :
             print "Skipping test"
             return
 
-        if options["durable"]==True:
-            cluster = Cluster(self, count=2,args=self.store_module_args())
-        else:
-            cluster = Cluster(self, count=2)            
+        cluster = self.start_cluster(count=2,**options)      
         self.start_error_watcher(broker=cluster[0])
-        options["port"] = port=cluster[0].port()  
+        options["port"] = port=cluster[0].port() 
+
+        options["use_unique_dests"]=True
+        options["address"]="amq.topic" 
         receiver, sender = self.start_sender_and_receiver(**options)
         self.monitor_clients(broker=cluster[0],run_time=180)
         self.verify(receiver,sender)
@@ -176,7 +184,7 @@ class ConcurrencyTest(JavaClientTest):
     def test_multiplexing_con_with_durable_sub(self):
         """Tests multiple sessions with durable subs""" 
 
-        self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id())
+        self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
 
     def test_multiplexing_con_with_sync_ack(self):
         """Tests multiple sessions with sync ack""" 
@@ -191,4 +199,80 @@ class ConcurrencyTest(JavaClientTest):
     def test_multiple_cons_and_ssns(self):
         """Tests multiple connections and sessions""" 
 
-        self.base_case(con_count=25,ssn_per_con=25,test_name=self.id())
+        self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
+
+
+class SoakTest(JavaClientTest):
+    """A soak test suite for the JMS client"""
+
+    def base_case(self,**options):
+        cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
+        options["port"] = port=cluster[0].port()  
+        self.start_error_watcher(broker=cluster[0])
+        options["use_unique_dests"]=True
+        options["address"]="amq.topic" 
+        receiver,sender = self.start_sender_and_receiver(**options)
+        is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
+
+        if (is_error):
+            print "The sender or receiver didn't start properly. Exiting test."
+            return       
+        else:
+            "Print no error !" 
+
+        # grace period for java clients to get the failover properly setup.
+        time.sleep(30) 
+        error_msg= None
+        # Kill original brokers, start new ones.
+        try:
+            for i in range(8):
+                cluster[i].kill()
+                b=cluster.start()
+                self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+                print "iteration : " + str(i)
+        except ConnectError, e1:
+            error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
+
+        except SessionError, e2:
+            error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+
+        self.verify(receiver,sender)
+        if error_msg:      
+            raise Exception(error_msg)      
+
+     
+    def test_failover(self) :
+        """Test basic failover""" 
+
+        self.base_case(test_name=self.id())
+
+
+    def test_failover_with_durablesub(self):
+        """Test failover with durable subscriber""" 
+
+        self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
+
+
+    def test_failover_with_sync_rcv(self):
+        """Test failover with sync receive""" 
+
+        self.base_case(sync_rcv=True,test_name=self.id())
+
+
+    def test_failover_with_sync_ack(self):
+        """Test failover with sync ack""" 
+
+        self.base_case(sync_ack=True,test_name=self.id())
+
+
+    def test_failover_with_noprefetch(self):
+        """Test failover with no prefetch""" 
+
+        self.base_case(max_prefetch=1,test_name=self.id())
+
+
+    def test_failover_with_multiple_cons_and_ssns(self):
+        """Test failover with multiple connections and sessions""" 
+
+        self.base_case(use_unique_dests=True,address="amq.topic",
+                       con_count=10,ssn_per_con=25,test_name=self.id())



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org