You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/05/26 17:30:48 UTC

svn commit: r778751 - in /qpid/trunk/qpid: cpp/src/ cpp/src/tests/ python/qpid/

Author: kpvdr
Date: Tue May 26 15:30:47 2009
New Revision: 778751

URL: http://svn.apache.org/viewvc?rev=778751&view=rev
Log:
Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.

Added:
    qpid/trunk/qpid/cpp/src/tests/cluster.py
    qpid/trunk/qpid/cpp/src/tests/run_cluster_tests   (with props)
    qpid/trunk/qpid/cpp/src/tests/testlib.py
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/ais_check
    qpid/trunk/qpid/cpp/src/tests/cluster.mk
    qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
    qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
    qpid/trunk/qpid/python/qpid/testlib.py

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue May 26 15:30:47 2009
@@ -107,6 +107,16 @@
 AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
 INCLUDES = -Igen -I$(srcdir)/gen
 
+#
+# Destination for intalled programs and tests defined here
+#
+qpidexecdir = $(libexecdir)/qpid
+qpidexec_PROGRAMS =
+qpidexec_SCRIPTS =
+qpidtestdir = $(qpidexecdir)/test
+qpidtest_PROGRAMS =
+qpidtest_SCRIPTS =
+
 ## Automake macros to build libraries and executables.
 qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\"
 libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue May 26 15:30:47 2009
@@ -39,6 +39,16 @@
 LONG_TESTS=
 
 #
+# Destination for intalled programs and tests defined here
+#
+qpidexecdir = $(libexecdir)/qpid
+qpidexec_PROGRAMS =
+qpidexec_SCRIPTS =
+qpidtestdir = $(qpidexecdir)/test
+qpidtest_PROGRAMS =
+qpidtest_SCRIPTS =
+
+#
 # Unit test program
 #
 # Unit tests are built as a single program to reduce valgrind overhead
@@ -128,6 +138,21 @@
 include ssl.mk
 endif
 
+# receiver, sender are installed and therefore built as part of make, not make check
+qpidtest_PROGRAMS += receiver
+receiver_SOURCES = \
+  receiver.cpp \
+  TestOptions.h \
+  ConnectionOptions.h
+receiver_LDADD = $(lib_client)
+
+qpidtest_PROGRAMS += sender
+sender_SOURCES = \
+  sender.cpp \
+  TestOptions.h \
+  ConnectionOptions.h
+sender_LDADD = $(lib_client)
+
 #
 # Other test programs
 #
@@ -195,14 +220,6 @@
 txjob_SOURCES=txjob.cpp  TestOptions.h ConnectionOptions.h
 txjob_LDADD=$(lib_client) 
 
-check_PROGRAMS+=receiver
-receiver_SOURCES=receiver.cpp  TestOptions.h ConnectionOptions.h
-receiver_LDADD=$(lib_client) 
-
-check_PROGRAMS+=sender
-sender_SOURCES=sender.cpp  TestOptions.h ConnectionOptions.h
-sender_LDADD=$(lib_client) 
-
 check_PROGRAMS+=PollerTest
 PollerTest_SOURCES=PollerTest.cpp
 PollerTest_LDADD=$(lib_common) $(SOCKLIBS)

Modified: qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ qpid/trunk/qpid/cpp/src/tests/ais_check Tue May 26 15:30:47 2009
@@ -33,7 +33,6 @@
 
     Tests that depend on the openais library (used for clustering)
     will not be run because:
-
     $NOGROUP
     $NOAISEXEC
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Tue May 26 15:30:47 2009
@@ -29,17 +29,45 @@
 
 
 # ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS+=ais_check federated_cluster_test clustered_replication_test
-EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \
-  federated_cluster_test clustered_replication_test
+TESTS += \
+    ais_check \
+	run_cluster_tests \
+	federated_cluster_test \
+	clustered_replication_test
+	
+EXTRA_DIST += \
+	ais_check \
+	start_cluster \
+	stop_cluster \
+	restart_cluster \
+	cluster_python_tests \
+	cluster_python_tests_failing.txt \
+  	federated_cluster_test \
+  	clustered_replication_test \
+  	run_cluster_tests \
+  	testlib.py \
+  	cluster.py
+  	
+
+unit_test_LDADD += ../cluster.la
+
+LONG_TESTS += \
+	start_cluster \
+	cluster_python_tests \
+	stop_cluster
+
+qpidtest_PROGRAMS += cluster_test
+cluster_test_SOURCES = \
+  	cluster_test.cpp \
+  	unit_test.cpp \
+  	ClusterFixture.cpp \
+  	ClusterFixture.h \
+  	ForkedBroker.h \
+  	ForkedBroker.cpp \
+  	PartialFailure.cpp \
+  	ClusterFailover.cpp
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework
 
-check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
-	cluster_test.cpp PartialFailure.cpp ClusterFailover.cpp
+qpidtest_SCRIPTS += run_cluster_tests cluster.py testlib.py
 
-cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework 
-
-unit_test_LDADD+=../cluster.la
-
-LONG_TESTS+=start_cluster cluster_python_tests stop_cluster
 endif

Added: qpid/trunk/qpid/cpp/src/tests/cluster.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.py?rev=778751&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.py Tue May 26 15:30:47 2009
@@ -0,0 +1,419 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, unittest
+from testlib import TestBaseCluster
+
+class ClusterTests(TestBaseCluster):
+    """Basic cluster with async store tests"""
+    
+    def test_Cluster_01_Initialization(self):
+        """Start a single cluster containing several nodes, and stop it again"""
+        try:
+            clusterName = "cluster-01"
+            self.createCheckCluster(clusterName, 5)
+            self.checkNumBrokers(5)
+            self.stopCheckCluster(clusterName)
+        except:
+            self.killAllClusters()
+            raise
+
+    def test_Cluster_02_MultipleClusterInitialization(self):
+        """Start several clusters each with several nodes and stop them again"""
+        try:
+            for i in range(0, 5):
+                clusterName = "cluster-02.%d" % i
+                self.createCluster(clusterName, 5)
+            self.checkNumBrokers(25)
+            self.killCluster("cluster-02.2")
+            self.checkNumBrokers(20)
+            self.stopCheckAll()
+        except:
+            self.killAllClusters()
+            raise
+        
+    def test_Cluster_03_AddRemoveNodes(self):
+        """Create a multi-node cluster, then kill some nodes and add some new ones (not those killed)"""
+        try:
+            clusterName = "cluster-03"
+            self.createCheckCluster(clusterName, 3)
+            for i in range(4,9):
+                self.createClusterNode(i, clusterName)
+            self.checkNumClusterBrokers(clusterName, 8)
+            self.killNode(2, clusterName)
+            self.killNode(5, clusterName)
+            self.killNode(6, clusterName)
+            self.checkNumClusterBrokers(clusterName, 5)
+            self.createClusterNode(9, clusterName)
+            self.createClusterNode(10, clusterName)
+            self.checkNumClusterBrokers(clusterName, 7)
+            self.stopCheckAll()
+        except:
+            self.killAllClusters()
+            raise
+
+    def test_Cluster_04_RemoveRestoreNodes(self):
+        """Create a multi-node cluster, then kill some of the nodes and restart them"""
+        try:
+            clusterName = "cluster-04"
+            self.createCheckCluster(clusterName, 6)
+            self.checkNumBrokers(6)
+            self.killNode(1, clusterName)
+            self.killNode(3, clusterName)
+            self.killNode(4, clusterName)
+            self.checkNumBrokers(3)
+            self.createClusterNode(1, clusterName)
+            self.createClusterNode(3, clusterName)
+            self.createClusterNode(4, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            self.killNode(2, clusterName)
+            self.killNode(3, clusterName)
+            self.killNode(4, clusterName)
+            self.checkNumBrokers(3)
+            self.createClusterNode(2, clusterName)
+            self.createClusterNode(3, clusterName)
+            self.createClusterNode(4, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            self.stopCheckAll()
+        except:
+            self.killAllClusters()
+            raise
+        
+    def test_Cluster_05_KillAllNodesThenRecover(self):
+        """Create a multi-node cluster, then kill *all* nodes, then restart the cluster"""
+        try:
+            clusterName = "cluster-05"
+            self.createCheckCluster(clusterName, 6)
+            self.killClusterCheck(clusterName)
+            self.createCheckCluster(clusterName, 6)
+            self.stopCheckAll()
+        except:
+            self.killAllClusters()
+            raise
+    
+    def test_Cluster_06_PublishConsume(self):
+        """Publish then consume 100 messages from a single cluster"""
+        try:
+            clusterName = "cluster-06"
+            self.createCheckCluster(clusterName, 3)
+            self.sendReceiveMsgs(0, clusterName, "test-exchange-06", "test-queue-06", 100)
+            self.stopCheckAll()
+        except:
+            self.killAllClusters()
+            raise
+    
+    def test_Cluster_07_MultiplePublishConsume(self):
+        """Staggered publish and consume on a single cluster"""
+        try:
+            clusterName = "cluster-07"
+            exchangeName = "test-exchange-07"
+            queueName = "test-queue-07"
+            self.createCheckCluster(clusterName, 3)
+            self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
+            txMsgs  = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
+            rxMsgs  = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10
+            txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) # 30, 10
+            rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30
+            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 
+            rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
+            txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50
+            rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
+            self.stopCheckAll()
+            if txMsgs != rxMsgs:
+                print "txMsgs=%s" % txMsgs
+                print "rxMsgs=%s" % rxMsgs
+                self.fail("Send - receive message mismatch")
+        except:
+            self.killAllClusters()
+            raise
+    
+    def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self):
+        """Staggered publish and consume interleaved with adding and removing nodes on a single cluster"""
+        try:
+            clusterName = "cluster-08"
+            exchangeName = "test-exchange-08"
+            queueName = "test-queue-08"
+            self.createCheckCluster(clusterName, 3)
+            self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
+            txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
+            for i in range(3,6):
+                self.createClusterNode(i, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0
+            self.killNode(0, clusterName)
+            self.checkNumClusterBrokers(clusterName, 5)
+            rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10
+            self.killNode(2, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30
+            self.createClusterNode(6, clusterName)
+            self.checkNumClusterBrokers(clusterName, 5)
+            txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) # 30, 30 
+            rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50
+            self.createClusterNode(7, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 20) # 30, 50
+            rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
+            self.stopCheckAll()
+            if txMsgs != rxMsgs:
+                print "txMsgs=%s" % txMsgs
+                print "rxMsgs=%s" % rxMsgs
+                self.fail("Send - receive message mismatch")
+        except:
+            self.killAllClusters()
+            raise
+     
+    def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self):
+        """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster"""
+        try:
+            clusterName = "cluster-09"
+            exchangeName = "test-exchange-09"
+            queueName = "test-queue-09"
+            self.createCheckCluster(clusterName, 6)
+            self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
+            txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
+            self.killNode(2, clusterName)
+            self.checkNumClusterBrokers(clusterName, 5)
+            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0
+            self.killNode(0, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10
+            self.killNode(4, clusterName)
+            self.checkNumClusterBrokers(clusterName, 3)
+            rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30
+            self.createClusterNode(2, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30
+            self.createClusterNode(0, clusterName)
+            self.checkNumClusterBrokers(clusterName, 5)
+            rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
+            self.createClusterNode(4, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50
+            rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80
+            self.stopCheckAll()
+            if txMsgs != rxMsgs:
+                print "txMsgs=%s" % txMsgs
+                print "rxMsgs=%s" % rxMsgs
+                self.fail("Send - receive message mismatch")
+        except:
+            self.killAllClusters()
+            raise
+   
+    def test_Cluster_10_LinearNodeKillCreateProgression(self):
+        """Publish and consume messages while linearly killing all original nodes and replacing them with new ones"""
+        try:
+            clusterName = "cluster-10"
+            exchangeName = "test-exchange-10"
+            queueName = "test-queue-10"
+            self.createCheckCluster(clusterName, 4)
+            self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
+            txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20)
+            rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10)
+            for i in range(0, 16):
+                self.killNode(i, clusterName)
+                self.createClusterNode(i+4, clusterName)
+                self.checkNumClusterBrokers(clusterName, 4)
+                txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, queueName, 20)
+                rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20)
+            rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10)
+            self.stopCheckAll()
+            if txMsgs != rxMsgs:
+                print "txMsgs=%s" % txMsgs
+                print "rxMsgs=%s" % rxMsgs
+                self.fail("Send - receive message mismatch")
+        except:
+            self.killAllClusters()
+            raise
+    
+    def test_Cluster_11_CircularNodeKillRestoreProgression(self):
+        """Publish and consume messages while circularly killing all original nodes and restoring them again"""
+        try:
+            clusterName = "cluster-11"
+            exchangeName = "test-exchange-11"
+            queueName = "test-queue-11"
+            self.createCheckCluster(clusterName, 4)
+            self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
+            txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20)
+            rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10)
+            self.killNode(0, clusterName)
+            self.killNode(1, clusterName)
+            for i in range(0, 16):
+                self.killNode((i + 2) % 4, clusterName)
+                self.createClusterNode(i % 4, clusterName)
+                self.checkNumClusterBrokers(clusterName, 2)
+                txMsgs += self.sendMsgs((i + 3) % 4, clusterName, exchangeName, queueName, 20)
+                rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, queueName, 20)
+            rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10)
+            self.stopCheckAll()
+            if txMsgs != rxMsgs:
+                print "txMsgs=%s" % txMsgs
+                print "rxMsgs=%s" % rxMsgs
+                self.fail("Send - receive message mismatch")
+        except:
+            self.killAllClusters()
+            raise
+    
+    def test_Cluster_12_TopicExchange(self):
+        """Create topic exchange in a cluster and make sure it replicates correctly"""
+        try:
+            clusterName = "cluster-12"
+            self.createCheckCluster(clusterName, 4)
+            topicExchangeName = "test-exchange-12"
+            topicQueueNameKeyList = {"test-queue-12-A" : "#.A", "test-queue-12-B" : "#.B", "test-queue-12-C" : "C.#", "test-queue-12-D" : "D.#"}
+            self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList)
+            
+            # Place initial messages
+            txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0)
+            self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # Should not go to any queue
+            txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, "D.hello.A", 10) # (20, 0, 10, 10)
+            txMsgsA += txMsgsD
+            txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, "hello.B", 20) # (20, 20, 10, 10)
+            # Kill and add some nodes
+            self.killNode(0, clusterName)
+            self.killNode(2, clusterName)
+            self.createClusterNode(4, clusterName)
+            self.createClusterNode(5, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            # Pull 10 messages from each queue
+            rxMsgsA =  self.receiveMsgs(1, clusterName, "test-queue-12-A", 10) # (10, 20, 10, 10)           
+            rxMsgsB =  self.receiveMsgs(3, clusterName, "test-queue-12-B", 10) # (10, 10, 10, 10)                
+            rxMsgsC =  self.receiveMsgs(4, clusterName, "test-queue-12-C", 10) # (10, 10, 0, 10)            
+            rxMsgsD =  self.receiveMsgs(5, clusterName, "test-queue-12-D", 10) # (10, 10, 0, 0)
+            # Kill and add another node
+            self.killNode(4, clusterName)
+            self.createClusterNode(6, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            # Add two more queues
+            self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-12-E" : "#.bye.A", "test-queue-12-F" : "#.bye.B"})
+            # Place more messages
+            txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0)
+            txMsgsA += txMsgs
+            txMsgsC += txMsgs
+            txMsgsE  = txMsgs
+            self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # Should not go to any queue
+            txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, "D.bye.B", 20) # (20, 30, 10, 20, 10, 20)
+            txMsgsB += txMsgs
+            txMsgsD += txMsgs
+            txMsgsF  = txMsgs
+            # Kill all nodes but one
+            self.killNode(1, clusterName)
+            self.killNode(3, clusterName)
+            self.killNode(6, clusterName)
+            self.checkNumClusterBrokers(clusterName, 1)
+            # Pull all remaining messages from each queue
+            rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-12-A", 20)         
+            rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-12-B", 30)               
+            rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-12-C", 10)          
+            rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-12-D", 20)
+            rxMsgsE  = self.receiveMsgs(5, clusterName, "test-queue-12-E", 10)
+            rxMsgsF  = self.receiveMsgs(5, clusterName, "test-queue-12-F", 20)
+            # Check messages
+            self.stopCheckAll()
+            if txMsgsA != rxMsgsA:
+                self.fail("Send - receive message mismatch for queue A")
+            if txMsgsB != rxMsgsB:
+                self.fail("Send - receive message mismatch for queue B")
+            if txMsgsC != rxMsgsC:
+                self.fail("Send - receive message mismatch for queue C")
+            if txMsgsD != rxMsgsD:
+                self.fail("Send - receive message mismatch for queue D")
+            if txMsgsE != rxMsgsE:
+                self.fail("Send - receive message mismatch for queue E")
+            if txMsgsF != rxMsgsF:
+                self.fail("Send - receive message mismatch for queue F")
+        except:
+            self.killAllClusters()
+            raise  
+     
+    def test_Cluster_13_FanoutExchange(self):
+        """Create fanout exchange in a cluster and make sure it replicates correctly"""
+        try:
+            clusterName = "cluster-13"
+            self.createCheckCluster(clusterName, 4)
+            fanoutExchangeName = "test-exchange-13"
+            fanoutQueueNameList = ["test-queue-13-A", "test-queue-13-B", "test-queue-13-C"]
+            self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList)
+            
+            # Place initial 20 messages, retrieve 10
+            txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
+            rxMsgA =  self.receiveMsgs(1, clusterName, "test-queue-13-A", 10)     
+            rxMsgB =  self.receiveMsgs(3, clusterName, "test-queue-13-B", 10)           
+            rxMsgC =  self.receiveMsgs(0, clusterName, "test-queue-13-C", 10)       
+            # Kill and add some nodes
+            self.killNode(0, clusterName)
+            self.killNode(2, clusterName)
+            self.createClusterNode(4, clusterName)
+            self.createClusterNode(5, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            # Place another 20 messages, retrieve 20
+            txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
+            rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20)     
+            rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20)           
+            rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-13-C", 20)       
+            # Kill and add another node
+            self.killNode(4, clusterName)
+            self.createClusterNode(6, clusterName)
+            self.checkNumClusterBrokers(clusterName, 4)
+            # Add another 2 queues
+            self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-13-D", "test-queue-13-E"])
+            # Place another 20 messages, retrieve 20
+            tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
+            txMsg += tmp
+            rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20)     
+            rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20)           
+            rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-13-C", 20)       
+            rxMsgD  = self.receiveMsgs(6, clusterName, "test-queue-13-D", 10)       
+            rxMsgE  = self.receiveMsgs(6, clusterName, "test-queue-13-E", 10)       
+            # Kill all nodes but one
+            self.killNode(1, clusterName)
+            self.killNode(3, clusterName)
+            self.killNode(6, clusterName)
+            self.checkNumClusterBrokers(clusterName, 1)
+            # Pull all remaining messages from each queue
+            rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 10)           
+            rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 10)             
+            rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10)            
+            rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 10)            
+            rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-13-E", 10)            
+            # Check messages
+            self.stopCheckAll()
+            if txMsg != rxMsgA:
+                self.fail("Send - receive message mismatch for queue A")
+            if txMsg != rxMsgB:
+                self.fail("Send - receive message mismatch for queue B")
+            if txMsg != rxMsgC:
+                self.fail("Send - receive message mismatch for queue C")
+            if tmp != rxMsgD:
+                self.fail("Send - receive message mismatch for queue D")
+            if tmp != rxMsgE:
+                self.fail("Send - receive message mismatch for queue E")
+        except:
+            self.killAllClusters()
+            raise
+        
+
+# Start the test here
+  
+if __name__ == '__main__':
+    if os.getenv("STORE_ENABLE") != None:
+        print "NOTE: Store enabled for the following tests:"
+    if not unittest.main(): sys.exit(1)
+  

Modified: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Tue May 26 15:30:47 2009
@@ -63,9 +63,15 @@
 
     if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
         cat <<EOF
-Not running federation to cluster test because:
+
+    =========== WARNING: NOT RUNNING AIS TESTS ==============
+
+    Not running cluster replication test because:
     $NOGROUP
     $NOAISEXEC
+
+    ==========================================================
+
 EOF
         exit 0;
     fi

Modified: qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federated_cluster_test?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federated_cluster_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/federated_cluster_test Tue May 26 15:30:47 2009
@@ -132,9 +132,15 @@
 
     if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
         cat <<EOF
-Not running federation to cluster test because:
+
+    =========== WARNING: NOT RUNNING AIS TESTS ==============
+
+    Not running federation to cluster test because:
     $NOGROUP
     $NOAISEXEC
+
+    ==========================================================
+
 EOF
         exit 0;
     fi

Added: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=778751&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (added)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Tue May 26 15:30:47 2009
@@ -0,0 +1,70 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the cluster tests.
+TEST_DIR=$srcdir
+
+# Check AIS requirements
+id -nG | grep '\<ais\>' >/dev/null || NOGROUP="You are not a member of the ais group."
+ps -u root | grep 'aisexec\|corosync' >/dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root"
+
+if test -n "${NOGROUP}" -o -n "${NOAISEXEC}"; then
+    cat <<EOF
+
+    ========= WARNING: CLUSTERING TESTS DISABLED ==============
+
+    Tests that depend on the openais library (used for clustering)
+    will not be run because:
+
+    ${NOGROUP}
+    ${NOAISEXEC}
+
+    ===========================================================
+
+EOF
+	exit 0
+fi
+
+export PYTHONPATH=$srcdir
+export RUN_CLUSTER_TESTS=1
+export QPIDD=$srcdir/../qpidd
+export LIBCLUSTER=$srcdir/../.libs/cluster.so
+export RECEIVER=$srcdir/receiver
+export SENDER=$srcdir/sender
+
+#Make sure temp dir exists if this is the first to use it
+TMP_STORE_DIR=${TEST_DIR}/test_tmp
+if ! test -d ${TMP_STORE_DIR} ; then
+    mkdir -p ${TMP_STORE_DIR}
+   	mkdir -p ${TMP_STORE_DIR}/cluster
+else
+	rm -rf "${TMP_STORE_DIR}/cluster"
+    mkdir -p "${TMP_STORE_DIR}/cluster"
+fi
+export TMP_STORE_DIR
+
+
+AMQP_SPEC=${TEST_DIR}/../../../specs/amqp.0-10-qpid-errata.xml
+sg ais -c "${TEST_DIR}/cluster.py -v"
+RETCODE=$?
+if test x$RETCODE != x0; then 
+    echo "FAIL cluster tests"; exit 1;
+fi

Propchange: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/cpp/src/tests/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testlib.py?rev=778751&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testlib.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/testlib.py Tue May 26 15:30:47 2009
@@ -0,0 +1,486 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Support library for qpid python tests.
+#
+
+import os, signal, subprocess, unittest
+
+class TestBase(unittest.TestCase):
+    """
+    Base class for qpid tests. Provides broker start/stop/kill methods
+    """
+    
+    """
+    The following environment vars control if and how the test is run, and determine where many of the helper
+    executables/libs are to be found.
+    """
+    _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for durability to be enabled during the test
+    _storeLib = os.getenv("LIBSTORE")
+    _qpiddExec = os.getenv("QPIDD", "/usr/sbin/qpidd")
+    _tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid"))
+    
+    """Global message counter ensures unique messages"""
+    _msgCnt = 0
+    
+    # --- Helper functions for parameter handling ---
+    
+    def _paramBool(self, key, val, keyOnly = False):
+        if val == None:
+            return ""
+        if keyOnly:
+            if val:
+                return " --%s" % key
+            else:
+                return ""
+        else:
+            if val:
+                return " --%s yes" % key
+            else:
+                return " --%s no" % key
+    
+    def _paramNum(self, key, val):
+        if val != None:
+            return " --%s %d" % (key, val)
+        return ""
+    
+    def _paramString(self, key, val):
+        if val != None:
+            return " --%s %s" % (key, val)
+        return ""
+    
+    def _paramStringList(self, key, valList, val):
+        if val in valList:
+            return " --%s %s" % (key, val)
+        return ""
+    
+    # --- Helper functions for message creation ---
+    
+    def _makeMessage(self, msgSize):
+        msg = "Message-%04d" % self._msgCnt
+        self._msgCnt = self._msgCnt + 1
+        msgLen = len(msg)
+        if msgSize > msgLen:
+            for i in range(msgLen, msgSize):
+                if i == msgLen:
+                    msg += "-"
+                else:
+                    msg += chr(ord('a') + (i % 26))
+        return msg
+    
+    def _makeMessageList(self, numMsgs, msgSize):
+        if msgSize == None:
+            msgSize = 12
+        msgs = ""
+        for m in range(0, numMsgs):
+            msgs += "%s\n" % self._makeMessage(msgSize)
+        return msgs
+    
+    # --- Starting and stopping a broker ---
+    
+    def startBroker(self, qpiddArgs, logFile = None):
+        """Start a single broker daemon, returns tuple (pid, port)"""
+        if self._qpiddExec == None:
+            raise Exception("Environment variable QPIDD is not set")
+        cmd = "%s --daemon --port=0 %s" % (self._qpiddExec, qpiddArgs)
+        portStr = os.popen(cmd).read()
+        if len(portStr) == 0:
+            err = "Broker daemon startup failed."
+            if logFile != None:
+                err += " See log file %s" % logFile
+            raise Exception(err)
+        port = int(portStr)
+        pidStr = os.popen("%s -p %d -c" % (self._qpiddExec, port)).read()
+        try:
+            pid = int(pidStr)
+        except:
+            raise Exception("Unable to get pid: \"%s -p %d -c\" returned %s" % (self._qpiddExec, port, pidStr))
+        #print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs)
+        return (pid, port)
+    
+    def killBroker(self, pid):
+        """Kill a broker using kill -9"""
+        os.kill(pid, signal.SIGTERM)
+        #print "killed broker: pid=%d" % pid
+    
+    def stopBroker(self, port):
+        """Stop a broker using qpidd -q"""
+        ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % port, "-q")
+        if ret != 0:
+            raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret))
+        #print "stopped broker: port=%d" % port 
+
+
+
+class TestBaseCluster(TestBase):
+    """
+    Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes.
+    """
+    
+    """
+    The following environment vars control if and how the test is run, and determine where many of the helper
+    executables/libs are to be found.
+    """
+    _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True for these cluster tests to run
+    _clusterLib = os.getenv("LIBCLUSTER")
+    _qpidConfigExec = os.getenv("QPID_CONFIG", "/usr/bin/qpid-config")
+    _qpidRouteExec = os.getenv("QPID_ROUTE", "/usr/bin/qpid-route")
+    _receiverExec = os.getenv("RECEIVER", "/usr/libexec/qpid/test/receiver")
+    _senderExec = os.getenv("SENDER", "/usr/libexec/qpid/test/sender")
+    
+    
+    """
+    _clusterDict is a dictionary of clusters:
+        key = cluster name (string)
+        val = dictionary of node numbers:
+            key = integer node number
+            val = tuple containing (pid, port)
+    For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows:
+    {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}}
+    where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively.
+    """
+    _clusterDict = {}
+    
+    """Index for (pid, port) tuple"""
+    PID = 0
+    PORT = 1
+    
+    def run(self, res):
+        """ Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined."""
+        if not self._runClusterTests:
+            return
+        unittest.TestCase.run(self, res)
+    
+    
+    # --- Starting cluster node(s) ---
+    
+    def createClusterNode(self, nodeNumber, clusterName):
+        """Create a node and add it to the named cluster"""
+        if self._tempStoreDir == None:
+            raise Exception("Environment variable TMP_STORE_DIR is not set")
+        if self._clusterLib == None:
+            raise Exception("Environment variable LIBCLUSTER is not set")
+        name = "%s-%d" % (clusterName, nodeNumber)
+        dataDir = os.path.join(self._tempStoreDir, "cluster", name)
+        logFile = "%s.log" % dataDir
+        args = "--no-module-dir --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=notice+ --log-to-file=%s" % \
+            (self._clusterLib, dataDir, clusterName, logFile)
+        if self._storeEnable:
+            if self._storeLib == None:
+                raise Exception("Environment variable LIBSTORE is not set")
+            args += " --load-module %s" % self._storeLib
+        self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
+    
+    def createCluster(self, clusterName, numberNodes):
+        """Create a cluster containing an initial number of nodes"""
+        self._clusterDict[clusterName] = {}
+        for n in range(0, numberNodes):
+            self.createClusterNode(n, clusterName)
+    
+    # --- Cluster and node status ---
+    
+    def getTupleList(self):
+        """Get list of (pid, port) tuples of all known cluster brokers"""
+        tList = []
+        for l in self._clusterDict.itervalues():
+            for t in l.itervalues():
+                tList.append(t)
+        return tList
+    
+    def getNumBrokers(self):
+        """Get total number of brokers in all known clusters"""
+        return len(self.getTupleList())
+    
+    def checkNumBrokers(self, expected):
+        """Check that the total number of brokers in all known clusters is the expected value"""
+        if self.getNumBrokers() != expected:
+            raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
+
+    def getClusterTupleList(self, clusterName):
+        """Get list of (pid, port) tuples of all nodes in named cluster"""
+        return self._clusterDict[clusterName].values()
+    
+    def getNumClusterBrokers(self, clusterName):
+        """Get total number of brokers in named cluster"""
+        return len(self.getClusterTupleList(clusterName))
+    
+    def getNodeTuple(self, nodeNumber, clusterName):
+        """Get the (pid, port) tuple for the given cluster node"""
+        return self._clusterDict[clusterName][nodeNumber]
+    
+    def checkNumClusterBrokers(self, clusterName, expected):
+        """Check that the total number of brokers in the named cluster is the expected value"""
+        if self.getNumClusterBrokers(clusterName) != expected:
+            raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
+                            (clusterName, expected, self.getNumClusterBrokers(clusterName)))
+
+    def clusterExists(self, clusterName):
+        """ Return True if clusterName exists, False otherwise"""
+        return clusterName in self._clusterDict.keys()
+    
+    def clusterNodeExists(self, clusterName, nodeNumber):
+        """ Return True if nodeNumber in clusterName exists, False otherwise"""
+        if clusterName in self._clusterDict.keys():
+            return nodeNumber in self._clusterDict[nodeName]
+        return False
+    
+    def createCheckCluster(self, clusterName, size):
+        """Create a cluster using the given name and size, then check the number of brokers"""
+        self.createCluster(clusterName, size)
+        self.checkNumClusterBrokers(clusterName, size)
+    
+    # --- Kill cluster nodes using signal 9 ---
+    
+    def killNode(self, nodeNumber, clusterName, updateDict = True):
+        """Kill the given node in the named cluster using kill -9"""
+        self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID])
+        if updateDict:
+            del(self._clusterDict[clusterName][nodeNumber])
+    
+    def killCluster(self, clusterName, updateDict = True):
+        """Kill all nodes in the named cluster"""
+        for n in self._clusterDict[clusterName].iterkeys():
+            self.killNode(n, clusterName, False)
+        if updateDict:
+            del(self._clusterDict[clusterName])
+    
+    def killClusterCheck(self, clusterName):
+        """Kill the named cluster and check that the name is removed from the cluster dictionary"""
+        self.killCluster(clusterName)
+        if self.clusterExists(clusterName):
+            raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
+                            (clusterName, self.getNumClusterBrokers(clusterName)))
+    
+    def killAllClusters(self):
+        """Kill all known clusters"""
+        for n in self._clusterDict.iterkeys():
+            self.killCluster(n, False)
+        self._clusterDict.clear() 
+    
+    def killAllClustersCheck(self):
+        """Kill all known clusters and check that the cluster dictionary is empty"""
+        self.killAllClusters()
+        self.checkNumBrokers(0)
+    
+    # --- Stop cluster nodes using qpidd -q ---
+    
+    def stopNode(self, nodeNumber, clusterName, updateDict = True):
+        """Stop the given node in the named cluster using qpidd -q"""
+        self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT])
+        if updateDict:
+            del(self._clusterDict[clusterName][nodeNumber])
+    
+    def stopAllClusters(self):
+        """Stop all known clusters"""
+        for n in self._clusterDict.iterkeys():
+            self.stopCluster(n, False)
+        self._clusterDict.clear() 
+
+    
+    def stopCluster(self, clusterName, updateDict = True):
+        """Stop all nodes in the named cluster"""
+        for n in self._clusterDict[clusterName].iterkeys():
+            self.stopNode(n, clusterName, False)
+        if updateDict:
+            del(self._clusterDict[clusterName])
+    
+    def stopCheckCluster(self, clusterName):
+        """Stop the named cluster and check that the name is removed from the cluster dictionary"""
+        self.stopCluster(clusterName)
+        if self.clusterExists(clusterName):
+            raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
+    
+    def stopCheckAll(self):
+        """Kill all known clusters and check that the cluster dictionary is empty"""
+        self.stopAllClusters()
+        self.checkNumBrokers(0)
+    
+    # --- qpid-config functions ---
+    
+    def _qpidConfig(self, nodeNumber, clusterName, action):
+        """Configure some aspect of a qpid broker using the qpid_config executable"""
+        port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+        #print "%s -a localhost:%d %s" % (self._qpidConfigExec, port, action)
+        ret = os.spawnl(os.P_WAIT, self._qpidConfigExec, self._qpidConfigExec, "-a", "localhost:%d" % port,  *action.split())
+        if ret != 0:
+            raise Exception("_qpidConfig(): cluster=\"%s\" nodeNumber=%d port=%d action=\"%s\" returned %d" % \
+                            (clusterName, nodeNumber, port, action, ret))
+    
+    def addExchange(self, nodeNumber, clusterName, exchangeType, exchangeName, durable = False, sequence = False, \
+                    ive = False):
+        """Add a named exchange."""
+        action = "add exchange %s %s" % (exchangeType, exchangeName)
+        action += self._paramBool("durable", durable, True)
+        action += self._paramBool("sequence", sequence, True)
+        action += self._paramBool("ive", ive, True)
+        self._qpidConfig(nodeNumber, clusterName, action)
+    
+    def deleteExchange(self, nodeNumber, clusterName, exchangeName):
+        """Delete a named exchange"""
+        self._qpidConfig(nodeNumber, clusterName, "del exchange %s" % exchangeName)
+
+    def addQueue(self, nodeNumber, clusterName, queueName, configArgs = None):
+        """Add a queue using qpid-config."""
+        action = "add queue %s" % queueName
+        if self._storeEnable:
+            action += " --durable"
+        if configArgs != None:
+            action += " %s" % configArgs
+        self._qpidConfig(nodeNumber, clusterName, action)
+    
+    def delQueue(self, nodeNumber, clusterName, queueName):
+        """Delete a named queue using qpid-config."""
+        self._qpidConfig(nodeNumber, clusterName, "del queue %s" % queueName)
+    
+    def bind(self, nodeNumber, clusterName, exchangeName, queueName, key):
+        """Create an exchange-queue binding using qpid-config."""
+        self._qpidConfig(nodeNumber, clusterName, "bind %s %s %s" % (exchangeName, queueName, key))
+    
+    def unbind(self, nodeNumber, clusterName, exchangeName, queueName, key):
+        """Remove an exchange-queue binding using qpid-config."""
+        self._qpidConfig(nodeNumber, clusterName, "unbind %s %s %s" % (exchangeName, queueName, key))
+    
+    # --- qpid-route functions (federation) ---
+    
+    def brokerDict(self, nodeNumber, clusterName, host = "localhost", user = None, password = None):
+        """Returns a dictionary containing the broker info to be passed to route functions"""
+        port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+        return {"cluster": clusterName, "node":nodeNumber, "port":port, "host":host, "user":user, "password":password}
+    
+    def _brokerStr(self, brokerDict):
+        """Set up a broker string in the format [user/password@]host:port"""
+        str = ""
+        if brokerDict["user"] !=None and brokerDict["password"] != None:
+            str = "%s@%s" % (brokerDict["user"], brokerDict["password"])
+        str += "%s:%d" % (brokerDict["host"], brokerDict["port"])
+        return str
+    
+    def _qpidRoute(self, action):
+        """Set up a route using qpid-route"""
+        #print "%s %s" % (self._qpidRouteExec, action)
+        ret = os.spawnl(os.P_WAIT, self._qpidRouteExec, self._qpidRouteExec, *action.split())
+        if ret != 0:
+            raise Exception("_qpidRoute(): action=\"%s\" returned %d" % (action, ret))
+        
+    def routeDynamicAdd(self, destBrokerDict, srcBrokerDict, exchangeName):
+        self._qpidRoute("dynamic add %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName))
+         
+    def routeDynamicDelete(self, destBrokerDict, srcBrokerDict, exchangeName):
+        self._qpidRoute("dynamic del %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName))
+         
+    def routeAdd(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey):
+        self._qpidRoute("route add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey))
+         
+    def routeDelete(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey):
+        self._qpidRoute("route del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey))
+    
+    def routeQueueAdd(self, destBrokerDict, srcBrokerDict, exchangeName, queueName):
+        self._qpidRoute("queue add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName))
+    
+    def routeQueueDelete(self, destBrokerDict, srcBrokerDict, exchangeName, queueName):
+        self._qpidRoute("queue del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName))
+    
+    def routeLinkAdd(self, destBrokerDict, srcBrokerDict):
+        self._qpidRoute("link add %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict)))
+     
+    def routeLinkDelete(self, destBrokerDict, srcBrokerDict):
+        self._qpidRoute("link del %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict)))
+    
+    # --- Message send and receive functions ---
+    
+    def _receiver(self, action):
+        if self._receiverExec == None:
+            raise Exception("Environment variable RECEIVER is not set")
+        cmd = "%s %s" % (self._receiverExec, action)
+        #print cmd
+        return subprocess.Popen(cmd.split(), stdout = subprocess.PIPE)
+    
+    def _sender(self, action):
+        if self._senderExec == None:
+            raise Exception("Environment variable SENDER is not set")
+        cmd = "%s %s" % (self._senderExec, action)
+        #print cmd
+        return subprocess.Popen(cmd.split(), stdin = subprocess.PIPE)
+    
+    def createReciever(self, nodeNumber, clusterName, queueName, numMsgs = None, receiverArgs = None):
+        port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+        action = "--port %d --queue %s" % (port, queueName)
+        if numMsgs != None:
+            action += " --messages %d" % numMsgs
+        if receiverArgs != None:
+            action += " %s" % receiverArgs
+        return self._receiver(action)
+    
+    def createSender(self, nodeNumber, clusterName, exchangeName, routingKey, senderArgs = None):
+        port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+        action = "--port %d --exchange %s" % (port, exchangeName)
+        if routingKey != None and len(routingKey) > 0:
+            action += " --routing-key %s" % routingKey
+        if self._storeEnable:
+            action += " --durable yes"
+        if senderArgs != None:
+            action += " %s" % senderArgs
+        return self._sender(action)
+    
+    def createBindDirectExchangeQueue(self, nodeNumber, clusterName, exchangeName, queueName):
+        self.addExchange(nodeNumber, clusterName, "direct", exchangeName)
+        self.addQueue(nodeNumber, clusterName, queueName)
+        self.bind(nodeNumber, clusterName, exchangeName, queueName, queueName)
+    
+    def createBindTopicExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameKeyList):
+        self.addExchange(nodeNumber, clusterName, "topic", exchangeName)
+        for queueName, key in queueNameKeyList.iteritems():
+            self.addQueue(nodeNumber, clusterName, queueName)
+            self.bind(nodeNumber, clusterName, exchangeName, queueName, key)
+    
+    def createBindFanoutExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameList):
+        self.addExchange(nodeNumber, clusterName, "fanout", exchangeName)
+        for queueName in queueNameList:
+            self.addQueue(nodeNumber, clusterName, queueName)
+            self.bind(nodeNumber, clusterName, exchangeName, queueName, "")
+    
+    def sendMsgs(self, nodeNumber, clusterName, exchangeName, routingKey, numMsgs, msgSize = None, wait = True):
+        msgs = self._makeMessageList(numMsgs, msgSize)
+        sender = self.createSender(nodeNumber, clusterName, exchangeName, routingKey)
+        sender.stdin.write(msgs)
+        sender.stdin.close()
+        if wait:
+            sender.wait()
+        return msgs
+    
+    def receiveMsgs(self, nodeNumber, clusterName, queueName, numMsgs, wait = True):
+        receiver = self.createReciever(nodeNumber, clusterName, queueName, numMsgs)
+        cnt = 0
+        msgs = ""
+        while cnt < numMsgs:
+            rx = receiver.stdout.readline()
+            if rx == "" and receiver.poll() != None: break
+            msgs += rx
+            cnt = cnt + 1
+        if wait:
+            receiver.wait()
+        return msgs
+   
+    def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, queueName, numMsgs, wait = True, msgSize = None):
+        self.createBindDirectExchangeQueue(nodeNumber, clusterName, exchangeName, queueName)
+        txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, queueName, numMsgs, msgSize, wait)
+        rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, wait)
+        if txMsgs != rxMsgs:
+            self.fail("Send - receive message mismatch")

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Tue May 26 15:30:47 2009
@@ -21,7 +21,7 @@
 # Support library for qpid python tests.
 #
 
-import sys, re, unittest, os, signal, random, logging, traceback
+import sys, re, unittest, os, random, logging, traceback
 import qpid.client, qpid.spec, qmf.console
 import Queue
 from fnmatch import fnmatch
@@ -429,196 +429,3 @@
         session.message_subscribe(**keys)
         session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
         session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
-
-
-class TestBaseCluster(unittest.TestCase):
-    """
-    Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes.
-    """
-    _tempStoreDir = os.getenv("TMP_STORE_DIR")
-    _qpidd = os.getenv("QPIDD")
-    _storeLib = os.getenv("LIBSTORE")
-    _clusterLib = os.getenv("LIBCLUSTER")
-    
-    # --- Cluster helper functions ---
-        
-    """
-    _clusterDict is a dictionary of clusters:
-        key = cluster name (string)
-        val = dictionary of node numbers:
-            key = node number (int)
-            val = tuple containing (pid, port)
-    For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows:
-    {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}}
-    where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively.
-    """
-    _clusterDict = {}
-    
-    """Index for (pid, port) tuple"""
-    PID = 0
-    PORT = 1
-    
-    def startBroker(self, qpiddArgs, logFile = None):
-        """Start a single broker daemon, returns tuple (pid, port)"""
-        if self._qpidd == None:
-            raise Exception("Environment variable QPIDD is not set")
-        cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs)
-        portStr = os.popen(cmd).read()
-        if len(portStr) == 0:
-            err = "Broker daemon startup failed."
-            if logFile != None:
-                err += " See log file %s" % logFile
-            raise Exception(err)
-        port = int(portStr)
-        pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read())
-        #print "started broker: pid=%d, port=%d" % (pid, port)
-        return (pid, port)
-    
-    def createClusterNode(self, nodeNumber, clusterName):
-        """Create a node and add it to the named cluster"""
-        if self._tempStoreDir == None:
-            raise Exception("Environment variable TMP_STORE_DIR is not set")
-        if self._storeLib == None:
-            raise Exception("Environment variable LIBSTORE is not set")
-        if self._clusterLib == None:
-            raise Exception("Environment variable LIBCLUSTER is not set")
-        name = "%s-%d" % (clusterName, nodeNumber)
-        dataDir = os.path.join(self._tempStoreDir, "cluster", name)
-        logFile = "%s.log" % dataDir
-        args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \
-            (self._storeLib, self._clusterLib, dataDir, clusterName, logFile)
-        self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
-    
-    def createCluster(self, clusterName, numberNodes):
-        """Create a cluster containing an initial number of nodes"""
-        self._clusterDict[clusterName] = {}
-        for n in range(0, numberNodes):
-            self.createClusterNode(n, clusterName)
-    
-    def getTupleList(self):
-        """Get list of (pid, port) tuples of all known cluster brokers"""
-        tList = []
-        for l in self._clusterDict.itervalues():
-            for t in l.itervalues():
-                tList.append(t)
-        return tList
-    
-    def getNumBrokers(self):
-        """Get total number of brokers in all known clusters"""
-        return len(self.getTupleList())
-    
-    def checkNumBrokers(self, expected):
-        """Check that the total number of brokers in all known clusters is the expected value"""
-        if self.getNumBrokers() != expected:
-            raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
-
-    def getClusterTupleList(self, clusterName):
-        """Get list of (pid, port) tuples of all nodes in named cluster"""
-        return self._clusterDict[clusterName].values()
-    
-    def getNumClusterBrokers(self, clusterName):
-        """Get total number of brokers in named cluster"""
-        return len(self.getClusterTupleList(clusterName))
-    
-    def getNodeTuple(self, nodeNumber, clusterName):
-        """Get the (pid, port) tuple for the given cluster node"""
-        return self._clusterDict[clusterName][nodeNumber]
-    
-    def checkNumClusterBrokers(self, clusterName, expected):
-        """Check that the total number of brokers in the named cluster is the expected value"""
-        if self.getNumClusterBrokers(clusterName) != expected:
-            raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
-                            (clusterName, expected, self.getNumClusterBrokers(clusterName)))
-
-    def clusterExists(self, clusterName):
-        """ Return True if clusterName exists, False otherwise"""
-        return clusterName in self._clusterDict.keys()
-    
-    def clusterNodeExists(self, clusterName, nodeNumber):
-        """ Return True if nodeNumber in clusterName exists, False otherwise"""
-        if clusterName in self._clusterDict.keys():
-            return nodeNumber in self._clusterDict[nodeName]
-        return False
-    
-    def createCheckCluster(self, clusterName, size):
-        """Create a cluster using the given name and size, then check the number of brokers"""
-        self.createCluster(clusterName, size)
-        self.checkNumClusterBrokers(clusterName, size)
-    
-    # Kill cluster nodes using signal 9
-    
-    def killNode(self, nodeNumber, clusterName, updateDict = True):
-        """Kill the given node in the named cluster using kill -9"""
-        pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID]
-        os.kill(pid, signal.SIGTERM)
-        #print "killed broker: pid=%d" % pid
-        if updateDict:
-            del(self._clusterDict[clusterName][nodeNumber])
-    
-    def killCluster(self, clusterName, updateDict = True):
-        """Kill all nodes in the named cluster"""
-        for n in self._clusterDict[clusterName].iterkeys():
-            self.killNode(n, clusterName, False)
-        if updateDict:
-            del(self._clusterDict[clusterName])
-    
-    def killClusterCheck(self, clusterName):
-        """Kill the named cluster and check that the name is removed from the cluster dictionary"""
-        self.killCluster(clusterName)
-        if self.clusterExists(clusterName):
-            raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
-                            (clusterName, self.getNumClusterBrokers(clusterName)))
-    
-    def killAllClusters(self):
-        """Kill all known clusters"""
-        for n in self._clusterDict.iterkeys():
-            self.killCluster(n, False)
-        self._clusterDict.clear() 
-    
-    def killAllClustersCheck(self):
-        """Kill all known clusters and check that the cluster dictionary is empty"""
-        self.killAllClusters()
-        self.checkNumBrokers(0)
-    
-    # Stop cluster nodes using qpidd -q
-    
-    def stopNode(self, nodeNumber, clusterName, updateDict = True):
-        """Stop the given node in the named cluster using qpidd -q"""
-        port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
-        ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q")
-        if ret != 0:
-            raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \
-                            (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret))
-        #print "stopped broker: port=%d" % port 
-        if updateDict:
-            del(self._clusterDict[clusterName][nodeNumber])
-    
-    def stopAllClusters(self):
-        """Stop all known clusters"""
-        for n in self._clusterDict.iterkeys():
-            self.stopCluster(n, False)
-        self._clusterDict.clear() 
-
-    
-    def stopCluster(self, clusterName, updateDict = True):
-        """Stop all nodes in the named cluster"""
-        for n in self._clusterDict[clusterName].iterkeys():
-            self.stopNode(n, clusterName, False)
-        if updateDict:
-            del(self._clusterDict[clusterName])
-    
-    def stopCheckCluster(self, clusterName):
-        """Stop the named cluster and check that the name is removed from the cluster dictionary"""
-        self.stopCluster(clusterName)
-        if self.clusterExists(clusterName):
-            raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
-    def stopCheckAll(self):
-        """Kill all known clusters and check that the cluster dictionary is empty"""
-        self.stopAllClusters()
-        self.checkNumBrokers(0)
-    
-    def setUp(self):
-        pass
-    
-    def tearDown(self):
-        pass



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Re: [c++]: new cluster tests not working for me

Posted by Kim van der Riet <ki...@redhat.com>.
On Wed, 2009-05-27 at 14:27 +0100, Gordon Sim wrote:
> kpvdr@apache.org wrote:
> > Author: kpvdr
> > Date: Tue May 26 15:30:47 2009
> > New Revision: 778751
> > 
> > URL: http://svn.apache.org/viewvc?rev=778751&view=rev
> > Log:
> > Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.
> 
> Kim,
> 
> I had to make the attached changes to get these new tests to run from 
> svn (basically need to setup locations of qpid-config and qpid-route 
> tools for an svn checkout and QPID_CONFIG is used to set the config file 
> for qpidd so I changed the name of that variable).
> 
> Are you happy with these changes or am I ignoring something important in 
> how this has been designed to work?
> 
This is indeed a problem - thanks. I had the python client package
installed which put some of these utils onto my path, so I did not pick
up the path deficiencies. Fixed in r.779262.

Kim


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


[c++]: new cluster tests not working for me

Posted by Gordon Sim <gs...@redhat.com>.
kpvdr@apache.org wrote:
> Author: kpvdr
> Date: Tue May 26 15:30:47 2009
> New Revision: 778751
> 
> URL: http://svn.apache.org/viewvc?rev=778751&view=rev
> Log:
> Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.

Kim,

I had to make the attached changes to get these new tests to run from 
svn (basically need to setup locations of qpid-config and qpid-route 
tools for an svn checkout and QPID_CONFIG is used to set the config file 
for qpidd so I changed the name of that variable).

Are you happy with these changes or am I ignoring something important in 
how this has been designed to work?

--Gordon.