You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/05/26 20:22:48 UTC
svn commit: r778827 - in /qpid/trunk/qpid/cpp/src/tests: cluster.py
testlib.py
Author: kpvdr
Date: Tue May 26 18:22:48 2009
New Revision: 778827
URL: http://svn.apache.org/viewvc?rev=778827&view=rev
Log:
Persistent cluster test added which checks for recovery of queue and messages after all nodes in a cluster are killed. Test does not run if no store is loaded.
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster.py
qpid/trunk/qpid/cpp/src/tests/testlib.py
Modified: qpid/trunk/qpid/cpp/src/tests/cluster.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.py?rev=778827&r1=778826&r2=778827&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.py Tue May 26 18:22:48 2009
@@ -270,14 +270,58 @@
except:
self.killAllClusters()
raise
+
+ def test_Cluster_12_KillAllNodesRecoverMessages(self):
+ """Create a cluster, add and delete messages, kill all nodes then recover cluster and messages"""
+ if not self._storeEnable:
+ print " No store loaded, skipped"
+ return
+ try:
+ clusterName = "cluster-12"
+ exchangeName = "test-exchange-12"
+ queueName = "test-queue-12"
+ self.createCheckCluster(clusterName, 4)
+ self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20)
+ rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10)
+ txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20)
+ rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20)
+ self.killNode(0, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20)
+ rxMsgs += self.receiveMsgs(1, clusterName, queueName, 20)
+ self.killNode(2, clusterName)
+ self.createClusterNode(0, clusterName)
+ self.createClusterNode(5, clusterName)
+ self.checkNumClusterBrokers(clusterName, 5)
+ txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20)
+ rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20)
+ self.killAllClusters()
+ self.checkNumClusterBrokers(clusterName, 0)
+ self.createCluster(clusterName)
+ self.createClusterNode(3, clusterName) # last node to be used
+ self.createClusterNode(0, clusterName)
+ self.createClusterNode(1, clusterName)
+ self.createClusterNode(2, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.createClusterNode(5, clusterName)
+ rxMsgs += self.receiveMsgs(0, clusterName, queueName, 10)
+ if txMsgs != rxMsgs:
+ print "txMsgs=%s" % txMsgs
+ print "rxMsgs=%s" % rxMsgs
+ self.fail("Send - receive message mismatch")
+ except:
+ self.killAllClusters()
+ raise
- def test_Cluster_12_TopicExchange(self):
+ def test_Cluster_13_TopicExchange(self):
"""Create topic exchange in a cluster and make sure it replicates correctly"""
try:
- clusterName = "cluster-12"
+ clusterName = "cluster-13"
self.createCheckCluster(clusterName, 4)
- topicExchangeName = "test-exchange-12"
- topicQueueNameKeyList = {"test-queue-12-A" : "#.A", "test-queue-12-B" : "#.B", "test-queue-12-C" : "C.#", "test-queue-12-D" : "D.#"}
+ topicExchangeName = "test-exchange-13"
+ topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "#.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.#"}
self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList)
# Place initial messages
@@ -293,16 +337,16 @@
self.createClusterNode(5, clusterName)
self.checkNumClusterBrokers(clusterName, 4)
# Pull 10 messages from each queue
- rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-12-A", 10) # (10, 20, 10, 10)
- rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-12-B", 10) # (10, 10, 10, 10)
- rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-12-C", 10) # (10, 10, 0, 10)
- rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-12-D", 10) # (10, 10, 0, 0)
+ rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) # (10, 20, 10, 10)
+ rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) # (10, 10, 10, 10)
+ rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-13-C", 10) # (10, 10, 0, 10)
+ rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) # (10, 10, 0, 0)
# Kill and add another node
self.killNode(4, clusterName)
self.createClusterNode(6, clusterName)
self.checkNumClusterBrokers(clusterName, 4)
# Add two more queues
- self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-12-E" : "#.bye.A", "test-queue-12-F" : "#.bye.B"})
+ self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"})
# Place more messages
txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0)
txMsgsA += txMsgs
@@ -319,12 +363,12 @@
self.killNode(6, clusterName)
self.checkNumClusterBrokers(clusterName, 1)
# Pull all remaining messages from each queue
- rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-12-A", 20)
- rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-12-B", 30)
- rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-12-C", 10)
- rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-12-D", 20)
- rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-12-E", 10)
- rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-12-F", 20)
+ rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 20)
+ rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 30)
+ rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10)
+ rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 20)
+ rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-13-E", 10)
+ rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-13-F", 20)
# Check messages
self.stopCheckAll()
if txMsgsA != rxMsgsA:
@@ -343,20 +387,20 @@
self.killAllClusters()
raise
- def test_Cluster_13_FanoutExchange(self):
+ def test_Cluster_14_FanoutExchange(self):
"""Create fanout exchange in a cluster and make sure it replicates correctly"""
try:
- clusterName = "cluster-13"
+ clusterName = "cluster-14"
self.createCheckCluster(clusterName, 4)
- fanoutExchangeName = "test-exchange-13"
- fanoutQueueNameList = ["test-queue-13-A", "test-queue-13-B", "test-queue-13-C"]
+ fanoutExchangeName = "test-exchange-14"
+ fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"]
self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList)
# Place initial 20 messages, retrieve 10
txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
- rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10)
- rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10)
- rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-13-C", 10)
+ rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-14-A", 10)
+ rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-14-B", 10)
+ rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-14-C", 10)
# Kill and add some nodes
self.killNode(0, clusterName)
self.killNode(2, clusterName)
@@ -365,34 +409,34 @@
self.checkNumClusterBrokers(clusterName, 4)
# Place another 20 messages, retrieve 20
txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
- rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20)
- rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20)
- rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-13-C", 20)
+ rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20)
+ rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20)
+ rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-14-C", 20)
# Kill and add another node
self.killNode(4, clusterName)
self.createClusterNode(6, clusterName)
self.checkNumClusterBrokers(clusterName, 4)
# Add another 2 queues
- self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-13-D", "test-queue-13-E"])
+ self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-14-D", "test-queue-14-E"])
# Place another 20 messages, retrieve 20
tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
txMsg += tmp
- rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20)
- rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20)
- rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-13-C", 20)
- rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-13-D", 10)
- rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-13-E", 10)
+ rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20)
+ rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20)
+ rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-14-C", 20)
+ rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-14-D", 10)
+ rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-14-E", 10)
# Kill all nodes but one
self.killNode(1, clusterName)
self.killNode(3, clusterName)
self.killNode(6, clusterName)
self.checkNumClusterBrokers(clusterName, 1)
# Pull all remaining messages from each queue
- rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 10)
- rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 10)
- rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10)
- rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 10)
- rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-13-E", 10)
+ rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-14-A", 10)
+ rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-14-B", 10)
+ rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-14-C", 10)
+ rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-14-D", 10)
+ rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-14-E", 10)
# Check messages
self.stopCheckAll()
if txMsg != rxMsgA:
@@ -408,7 +452,6 @@
except:
self.killAllClusters()
raise
-
# Start the test here
Modified: qpid/trunk/qpid/cpp/src/tests/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testlib.py?rev=778827&r1=778826&r2=778827&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testlib.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/testlib.py Tue May 26 18:22:48 2009
@@ -188,7 +188,7 @@
args += " --load-module %s" % self._storeLib
self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
- def createCluster(self, clusterName, numberNodes):
+ def createCluster(self, clusterName, numberNodes = 0):
"""Create a cluster containing an initial number of nodes"""
self._clusterDict[clusterName] = {}
for n in range(0, numberNodes):
@@ -215,7 +215,9 @@
def getClusterTupleList(self, clusterName):
"""Get list of (pid, port) tuples of all nodes in named cluster"""
- return self._clusterDict[clusterName].values()
+ if clusterName in self._clusterDict:
+ return self._clusterDict[clusterName].values()
+ return []
def getNumClusterBrokers(self, clusterName):
"""Get total number of brokers in named cluster"""
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org