You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/10/05 03:24:46 UTC
svn commit: r1004496 - /qpid/trunk/qpid/java/testkit/testkit.py
Author: rajith
Date: Tue Oct 5 01:24:46 2010
New Revision: 1004496
URL: http://svn.apache.org/viewvc?rev=1004496&view=rev
Log:
Added more test cases to cover failover.
Modified:
qpid/trunk/qpid/java/testkit/testkit.py
Modified: qpid/trunk/qpid/java/testkit/testkit.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/testkit.py?rev=1004496&r1=1004495&r2=1004496&view=diff
==============================================================================
--- qpid/trunk/qpid/java/testkit/testkit.py (original)
+++ qpid/trunk/qpid/java/testkit/testkit.py Tue Oct 5 01:24:46 2010
@@ -100,6 +100,7 @@ class JavaClientTest(BrokerTest):
ssn = broker.connect().session()
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
i = run_time/error_ck_freq
+ is_error = False
for j in range(i):
not_empty = True
while not_empty:
@@ -107,13 +108,15 @@ class JavaClientTest(BrokerTest):
m = err_watcher.fetch(timeout=error_ck_freq)
ssn.acknowledge()
print "Java process notified of an error"
- self.check_for_error(m)
+ self.print_error(m)
+ is_error = True
except messaging.Empty, e:
not_empty = False
ssn.close()
+ return is_error
- def check_for_error(self,msg):
+ def print_error(self,msg):
print msg.properties.get("exception-trace")
def verify(self, receiver,sender):
@@ -125,8 +128,6 @@ class JavaClientTest(BrokerTest):
def start_sender_and_receiver(self,**options):
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
receiver_opts = options
receiver_opts["receiver"]=True
receiver = self.popen(self.client(**receiver_opts),
@@ -139,21 +140,28 @@ class JavaClientTest(BrokerTest):
return receiver, sender
+ def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
+ if options.get("durable",False)==True:
+ cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=count)
+ return cluster
+
class ConcurrencyTest(JavaClientTest):
"""A concurrency test suite for the JMS client"""
- skip = False
+ skip = True
def base_case(self,**options):
if self.skip :
print "Skipping test"
return
- if options["durable"]==True:
- cluster = Cluster(self, count=2,args=self.store_module_args())
- else:
- cluster = Cluster(self, count=2)
+ cluster = self.start_cluster(count=2,**options)
self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
+ options["port"] = port=cluster[0].port()
+
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
receiver, sender = self.start_sender_and_receiver(**options)
self.monitor_clients(broker=cluster[0],run_time=180)
self.verify(receiver,sender)
@@ -176,7 +184,7 @@ class ConcurrencyTest(JavaClientTest):
def test_multiplexing_con_with_durable_sub(self):
"""Tests multiple sessions with durable subs"""
- self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id())
+ self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
def test_multiplexing_con_with_sync_ack(self):
"""Tests multiple sessions with sync ack"""
@@ -191,4 +199,80 @@ class ConcurrencyTest(JavaClientTest):
def test_multiple_cons_and_ssns(self):
"""Tests multiple connections and sessions"""
- self.base_case(con_count=25,ssn_per_con=25,test_name=self.id())
+ self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
+
+
+class SoakTest(JavaClientTest):
+ """A soak test suite for the JMS client"""
+
+ def base_case(self,**options):
+ cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
+ options["port"] = port=cluster[0].port()
+ self.start_error_watcher(broker=cluster[0])
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver,sender = self.start_sender_and_receiver(**options)
+ is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
+
+ if (is_error):
+ print "The sender or receiver didn't start properly. Exiting test."
+ return
+ else:
+ "Print no error !"
+
+ # grace period for java clients to get the failover properly setup.
+ time.sleep(30)
+ error_msg= None
+ # Kill original brokers, start new ones.
+ try:
+ for i in range(8):
+ cluster[i].kill()
+ b=cluster.start()
+ self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+ print "iteration : " + str(i)
+ except ConnectError, e1:
+ error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
+
+ except SessionError, e2:
+ error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+
+ self.verify(receiver,sender)
+ if error_msg:
+ raise Exception(error_msg)
+
+
+ def test_failover(self) :
+ """Test basic failover"""
+
+ self.base_case(test_name=self.id())
+
+
+ def test_failover_with_durablesub(self):
+ """Test failover with durable subscriber"""
+
+ self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
+
+
+ def test_failover_with_sync_rcv(self):
+ """Test failover with sync receive"""
+
+ self.base_case(sync_rcv=True,test_name=self.id())
+
+
+ def test_failover_with_sync_ack(self):
+ """Test failover with sync ack"""
+
+ self.base_case(sync_ack=True,test_name=self.id())
+
+
+ def test_failover_with_noprefetch(self):
+ """Test failover with no prefetch"""
+
+ self.base_case(max_prefetch=1,test_name=self.id())
+
+
+ def test_failover_with_multiple_cons_and_ssns(self):
+ """Test failover with multiple connections and sessions"""
+
+ self.base_case(use_unique_dests=True,address="amq.topic",
+ con_count=10,ssn_per_con=25,test_name=self.id())
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org