You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/05/26 17:30:48 UTC
svn commit: r778751 - in /qpid/trunk/qpid: cpp/src/ cpp/src/tests/
python/qpid/
Author: kpvdr
Date: Tue May 26 15:30:47 2009
New Revision: 778751
URL: http://svn.apache.org/viewvc?rev=778751&view=rev
Log:
Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.
Added:
qpid/trunk/qpid/cpp/src/tests/cluster.py
qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (with props)
qpid/trunk/qpid/cpp/src/tests/testlib.py
Modified:
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/ais_check
qpid/trunk/qpid/cpp/src/tests/cluster.mk
qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
qpid/trunk/qpid/python/qpid/testlib.py
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue May 26 15:30:47 2009
@@ -107,6 +107,16 @@
AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
INCLUDES = -Igen -I$(srcdir)/gen
+#
+# Destination for intalled programs and tests defined here
+#
+qpidexecdir = $(libexecdir)/qpid
+qpidexec_PROGRAMS =
+qpidexec_SCRIPTS =
+qpidtestdir = $(qpidexecdir)/test
+qpidtest_PROGRAMS =
+qpidtest_SCRIPTS =
+
## Automake macros to build libraries and executables.
qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\"
libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue May 26 15:30:47 2009
@@ -39,6 +39,16 @@
LONG_TESTS=
#
+# Destination for intalled programs and tests defined here
+#
+qpidexecdir = $(libexecdir)/qpid
+qpidexec_PROGRAMS =
+qpidexec_SCRIPTS =
+qpidtestdir = $(qpidexecdir)/test
+qpidtest_PROGRAMS =
+qpidtest_SCRIPTS =
+
+#
# Unit test program
#
# Unit tests are built as a single program to reduce valgrind overhead
@@ -128,6 +138,21 @@
include ssl.mk
endif
+# receiver, sender are installed and therefore built as part of make, not make check
+qpidtest_PROGRAMS += receiver
+receiver_SOURCES = \
+ receiver.cpp \
+ TestOptions.h \
+ ConnectionOptions.h
+receiver_LDADD = $(lib_client)
+
+qpidtest_PROGRAMS += sender
+sender_SOURCES = \
+ sender.cpp \
+ TestOptions.h \
+ ConnectionOptions.h
+sender_LDADD = $(lib_client)
+
#
# Other test programs
#
@@ -195,14 +220,6 @@
txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h
txjob_LDADD=$(lib_client)
-check_PROGRAMS+=receiver
-receiver_SOURCES=receiver.cpp TestOptions.h ConnectionOptions.h
-receiver_LDADD=$(lib_client)
-
-check_PROGRAMS+=sender
-sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h
-sender_LDADD=$(lib_client)
-
check_PROGRAMS+=PollerTest
PollerTest_SOURCES=PollerTest.cpp
PollerTest_LDADD=$(lib_common) $(SOCKLIBS)
Modified: qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ qpid/trunk/qpid/cpp/src/tests/ais_check Tue May 26 15:30:47 2009
@@ -33,7 +33,6 @@
Tests that depend on the openais library (used for clustering)
will not be run because:
-
$NOGROUP
$NOAISEXEC
Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Tue May 26 15:30:47 2009
@@ -29,17 +29,45 @@
# ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS+=ais_check federated_cluster_test clustered_replication_test
-EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \
- federated_cluster_test clustered_replication_test
+TESTS += \
+ ais_check \
+ run_cluster_tests \
+ federated_cluster_test \
+ clustered_replication_test
+
+EXTRA_DIST += \
+ ais_check \
+ start_cluster \
+ stop_cluster \
+ restart_cluster \
+ cluster_python_tests \
+ cluster_python_tests_failing.txt \
+ federated_cluster_test \
+ clustered_replication_test \
+ run_cluster_tests \
+ testlib.py \
+ cluster.py
+
+
+unit_test_LDADD += ../cluster.la
+
+LONG_TESTS += \
+ start_cluster \
+ cluster_python_tests \
+ stop_cluster
+
+qpidtest_PROGRAMS += cluster_test
+cluster_test_SOURCES = \
+ cluster_test.cpp \
+ unit_test.cpp \
+ ClusterFixture.cpp \
+ ClusterFixture.h \
+ ForkedBroker.h \
+ ForkedBroker.cpp \
+ PartialFailure.cpp \
+ ClusterFailover.cpp
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework
-check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
- cluster_test.cpp PartialFailure.cpp ClusterFailover.cpp
+qpidtest_SCRIPTS += run_cluster_tests cluster.py testlib.py
-cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework
-
-unit_test_LDADD+=../cluster.la
-
-LONG_TESTS+=start_cluster cluster_python_tests stop_cluster
endif
Added: qpid/trunk/qpid/cpp/src/tests/cluster.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.py?rev=778751&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.py Tue May 26 15:30:47 2009
@@ -0,0 +1,419 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, unittest
+from testlib import TestBaseCluster
+
+class ClusterTests(TestBaseCluster):
+ """Basic cluster with async store tests"""
+
+ def test_Cluster_01_Initialization(self):
+ """Start a single cluster containing several nodes, and stop it again"""
+ try:
+ clusterName = "cluster-01"
+ self.createCheckCluster(clusterName, 5)
+ self.checkNumBrokers(5)
+ self.stopCheckCluster(clusterName)
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_02_MultipleClusterInitialization(self):
+ """Start several clusters each with several nodes and stop them again"""
+ try:
+ for i in range(0, 5):
+ clusterName = "cluster-02.%d" % i
+ self.createCluster(clusterName, 5)
+ self.checkNumBrokers(25)
+ self.killCluster("cluster-02.2")
+ self.checkNumBrokers(20)
+ self.stopCheckAll()
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_03_AddRemoveNodes(self):
+ """Create a multi-node cluster, then kill some nodes and add some new ones (not those killed)"""
+ try:
+ clusterName = "cluster-03"
+ self.createCheckCluster(clusterName, 3)
+ for i in range(4,9):
+ self.createClusterNode(i, clusterName)
+ self.checkNumClusterBrokers(clusterName, 8)
+ self.killNode(2, clusterName)
+ self.killNode(5, clusterName)
+ self.killNode(6, clusterName)
+ self.checkNumClusterBrokers(clusterName, 5)
+ self.createClusterNode(9, clusterName)
+ self.createClusterNode(10, clusterName)
+ self.checkNumClusterBrokers(clusterName, 7)
+ self.stopCheckAll()
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_04_RemoveRestoreNodes(self):
+ """Create a multi-node cluster, then kill some of the nodes and restart them"""
+ try:
+ clusterName = "cluster-04"
+ self.createCheckCluster(clusterName, 6)
+ self.checkNumBrokers(6)
+ self.killNode(1, clusterName)
+ self.killNode(3, clusterName)
+ self.killNode(4, clusterName)
+ self.checkNumBrokers(3)
+ self.createClusterNode(1, clusterName)
+ self.createClusterNode(3, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ self.killNode(2, clusterName)
+ self.killNode(3, clusterName)
+ self.killNode(4, clusterName)
+ self.checkNumBrokers(3)
+ self.createClusterNode(2, clusterName)
+ self.createClusterNode(3, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ self.stopCheckAll()
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_05_KillAllNodesThenRecover(self):
+ """Create a multi-node cluster, then kill *all* nodes, then restart the cluster"""
+ try:
+ clusterName = "cluster-05"
+ self.createCheckCluster(clusterName, 6)
+ self.killClusterCheck(clusterName)
+ self.createCheckCluster(clusterName, 6)
+ self.stopCheckAll()
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_06_PublishConsume(self):
+ """Publish then consume 100 messages from a single cluster"""
+ try:
+ clusterName = "cluster-06"
+ self.createCheckCluster(clusterName, 3)
+ self.sendReceiveMsgs(0, clusterName, "test-exchange-06", "test-queue-06", 100)
+ self.stopCheckAll()
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_07_MultiplePublishConsume(self):
+ """Staggered publish and consume on a single cluster"""
+ try:
+ clusterName = "cluster-07"
+ exchangeName = "test-exchange-07"
+ queueName = "test-queue-07"
+ self.createCheckCluster(clusterName, 3)
+ self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
+ rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10
+ txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) # 30, 10
+ rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30
+ txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30
+ rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
+ txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50
+ rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
+ self.stopCheckAll()
+ if txMsgs != rxMsgs:
+ print "txMsgs=%s" % txMsgs
+ print "rxMsgs=%s" % rxMsgs
+ self.fail("Send - receive message mismatch")
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self):
+ """Staggered publish and consume interleaved with adding and removing nodes on a single cluster"""
+ try:
+ clusterName = "cluster-08"
+ exchangeName = "test-exchange-08"
+ queueName = "test-queue-08"
+ self.createCheckCluster(clusterName, 3)
+ self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
+ for i in range(3,6):
+ self.createClusterNode(i, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0
+ self.killNode(0, clusterName)
+ self.checkNumClusterBrokers(clusterName, 5)
+ rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10
+ self.killNode(2, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30
+ self.createClusterNode(6, clusterName)
+ self.checkNumClusterBrokers(clusterName, 5)
+ txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) # 30, 30
+ rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50
+ self.createClusterNode(7, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 20) # 30, 50
+ rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
+ self.stopCheckAll()
+ if txMsgs != rxMsgs:
+ print "txMsgs=%s" % txMsgs
+ print "rxMsgs=%s" % rxMsgs
+ self.fail("Send - receive message mismatch")
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self):
+ """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster"""
+ try:
+ clusterName = "cluster-09"
+ exchangeName = "test-exchange-09"
+ queueName = "test-queue-09"
+ self.createCheckCluster(clusterName, 6)
+ self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
+ self.killNode(2, clusterName)
+ self.checkNumClusterBrokers(clusterName, 5)
+ txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0
+ self.killNode(0, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10
+ self.killNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 3)
+ rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30
+ self.createClusterNode(2, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30
+ self.createClusterNode(0, clusterName)
+ self.checkNumClusterBrokers(clusterName, 5)
+ rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
+ self.createClusterNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50
+ rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80
+ self.stopCheckAll()
+ if txMsgs != rxMsgs:
+ print "txMsgs=%s" % txMsgs
+ print "rxMsgs=%s" % rxMsgs
+ self.fail("Send - receive message mismatch")
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_10_LinearNodeKillCreateProgression(self):
+ """Publish and consume messages while linearly killing all original nodes and replacing them with new ones"""
+ try:
+ clusterName = "cluster-10"
+ exchangeName = "test-exchange-10"
+ queueName = "test-queue-10"
+ self.createCheckCluster(clusterName, 4)
+ self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20)
+ rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10)
+ for i in range(0, 16):
+ self.killNode(i, clusterName)
+ self.createClusterNode(i+4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, queueName, 20)
+ rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20)
+ rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10)
+ self.stopCheckAll()
+ if txMsgs != rxMsgs:
+ print "txMsgs=%s" % txMsgs
+ print "rxMsgs=%s" % rxMsgs
+ self.fail("Send - receive message mismatch")
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_11_CircularNodeKillRestoreProgression(self):
+ """Publish and consume messages while circularly killing all original nodes and restoring them again"""
+ try:
+ clusterName = "cluster-11"
+ exchangeName = "test-exchange-11"
+ queueName = "test-queue-11"
+ self.createCheckCluster(clusterName, 4)
+ self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20)
+ rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10)
+ self.killNode(0, clusterName)
+ self.killNode(1, clusterName)
+ for i in range(0, 16):
+ self.killNode((i + 2) % 4, clusterName)
+ self.createClusterNode(i % 4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 2)
+ txMsgs += self.sendMsgs((i + 3) % 4, clusterName, exchangeName, queueName, 20)
+ rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, queueName, 20)
+ rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10)
+ self.stopCheckAll()
+ if txMsgs != rxMsgs:
+ print "txMsgs=%s" % txMsgs
+ print "rxMsgs=%s" % rxMsgs
+ self.fail("Send - receive message mismatch")
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_12_TopicExchange(self):
+ """Create topic exchange in a cluster and make sure it replicates correctly"""
+ try:
+ clusterName = "cluster-12"
+ self.createCheckCluster(clusterName, 4)
+ topicExchangeName = "test-exchange-12"
+ topicQueueNameKeyList = {"test-queue-12-A" : "#.A", "test-queue-12-B" : "#.B", "test-queue-12-C" : "C.#", "test-queue-12-D" : "D.#"}
+ self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList)
+
+ # Place initial messages
+ txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0)
+ self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # Should not go to any queue
+ txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, "D.hello.A", 10) # (20, 0, 10, 10)
+ txMsgsA += txMsgsD
+ txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, "hello.B", 20) # (20, 20, 10, 10)
+ # Kill and add some nodes
+ self.killNode(0, clusterName)
+ self.killNode(2, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.createClusterNode(5, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ # Pull 10 messages from each queue
+ rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-12-A", 10) # (10, 20, 10, 10)
+ rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-12-B", 10) # (10, 10, 10, 10)
+ rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-12-C", 10) # (10, 10, 0, 10)
+ rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-12-D", 10) # (10, 10, 0, 0)
+ # Kill and add another node
+ self.killNode(4, clusterName)
+ self.createClusterNode(6, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ # Add two more queues
+ self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-12-E" : "#.bye.A", "test-queue-12-F" : "#.bye.B"})
+ # Place more messages
+ txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0)
+ txMsgsA += txMsgs
+ txMsgsC += txMsgs
+ txMsgsE = txMsgs
+ self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # Should not go to any queue
+ txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, "D.bye.B", 20) # (20, 30, 10, 20, 10, 20)
+ txMsgsB += txMsgs
+ txMsgsD += txMsgs
+ txMsgsF = txMsgs
+ # Kill all nodes but one
+ self.killNode(1, clusterName)
+ self.killNode(3, clusterName)
+ self.killNode(6, clusterName)
+ self.checkNumClusterBrokers(clusterName, 1)
+ # Pull all remaining messages from each queue
+ rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-12-A", 20)
+ rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-12-B", 30)
+ rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-12-C", 10)
+ rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-12-D", 20)
+ rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-12-E", 10)
+ rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-12-F", 20)
+ # Check messages
+ self.stopCheckAll()
+ if txMsgsA != rxMsgsA:
+ self.fail("Send - receive message mismatch for queue A")
+ if txMsgsB != rxMsgsB:
+ self.fail("Send - receive message mismatch for queue B")
+ if txMsgsC != rxMsgsC:
+ self.fail("Send - receive message mismatch for queue C")
+ if txMsgsD != rxMsgsD:
+ self.fail("Send - receive message mismatch for queue D")
+ if txMsgsE != rxMsgsE:
+ self.fail("Send - receive message mismatch for queue E")
+ if txMsgsF != rxMsgsF:
+ self.fail("Send - receive message mismatch for queue F")
+ except:
+ self.killAllClusters()
+ raise
+
+ def test_Cluster_13_FanoutExchange(self):
+ """Create fanout exchange in a cluster and make sure it replicates correctly"""
+ try:
+ clusterName = "cluster-13"
+ self.createCheckCluster(clusterName, 4)
+ fanoutExchangeName = "test-exchange-13"
+ fanoutQueueNameList = ["test-queue-13-A", "test-queue-13-B", "test-queue-13-C"]
+ self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList)
+
+ # Place initial 20 messages, retrieve 10
+ txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
+ rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10)
+ rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10)
+ rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-13-C", 10)
+ # Kill and add some nodes
+ self.killNode(0, clusterName)
+ self.killNode(2, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.createClusterNode(5, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ # Place another 20 messages, retrieve 20
+ txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
+ rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20)
+ rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20)
+ rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-13-C", 20)
+ # Kill and add another node
+ self.killNode(4, clusterName)
+ self.createClusterNode(6, clusterName)
+ self.checkNumClusterBrokers(clusterName, 4)
+ # Add another 2 queues
+ self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-13-D", "test-queue-13-E"])
+ # Place another 20 messages, retrieve 20
+ tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
+ txMsg += tmp
+ rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20)
+ rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20)
+ rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-13-C", 20)
+ rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-13-D", 10)
+ rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-13-E", 10)
+ # Kill all nodes but one
+ self.killNode(1, clusterName)
+ self.killNode(3, clusterName)
+ self.killNode(6, clusterName)
+ self.checkNumClusterBrokers(clusterName, 1)
+ # Pull all remaining messages from each queue
+ rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 10)
+ rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 10)
+ rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10)
+ rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 10)
+ rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-13-E", 10)
+ # Check messages
+ self.stopCheckAll()
+ if txMsg != rxMsgA:
+ self.fail("Send - receive message mismatch for queue A")
+ if txMsg != rxMsgB:
+ self.fail("Send - receive message mismatch for queue B")
+ if txMsg != rxMsgC:
+ self.fail("Send - receive message mismatch for queue C")
+ if tmp != rxMsgD:
+ self.fail("Send - receive message mismatch for queue D")
+ if tmp != rxMsgE:
+ self.fail("Send - receive message mismatch for queue E")
+ except:
+ self.killAllClusters()
+ raise
+
+
+# Start the test here
+
+if __name__ == '__main__':
+ if os.getenv("STORE_ENABLE") != None:
+ print "NOTE: Store enabled for the following tests:"
+ if not unittest.main(): sys.exit(1)
+
Modified: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Tue May 26 15:30:47 2009
@@ -63,9 +63,15 @@
if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
cat <<EOF
-Not running federation to cluster test because:
+
+ =========== WARNING: NOT RUNNING AIS TESTS ==============
+
+ Not running cluster replication test because:
$NOGROUP
$NOAISEXEC
+
+ ==========================================================
+
EOF
exit 0;
fi
Modified: qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federated_cluster_test?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federated_cluster_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/federated_cluster_test Tue May 26 15:30:47 2009
@@ -132,9 +132,15 @@
if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
cat <<EOF
-Not running federation to cluster test because:
+
+ =========== WARNING: NOT RUNNING AIS TESTS ==============
+
+ Not running federation to cluster test because:
$NOGROUP
$NOAISEXEC
+
+ ==========================================================
+
EOF
exit 0;
fi
Added: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=778751&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (added)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Tue May 26 15:30:47 2009
@@ -0,0 +1,70 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the cluster tests.
+TEST_DIR=$srcdir
+
+# Check AIS requirements
+id -nG | grep '\<ais\>' >/dev/null || NOGROUP="You are not a member of the ais group."
+ps -u root | grep 'aisexec\|corosync' >/dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root"
+
+if test -n "${NOGROUP}" -o -n "${NOAISEXEC}"; then
+ cat <<EOF
+
+ ========= WARNING: CLUSTERING TESTS DISABLED ==============
+
+ Tests that depend on the openais library (used for clustering)
+ will not be run because:
+
+ ${NOGROUP}
+ ${NOAISEXEC}
+
+ ===========================================================
+
+EOF
+ exit 0
+fi
+
+export PYTHONPATH=$srcdir
+export RUN_CLUSTER_TESTS=1
+export QPIDD=$srcdir/../qpidd
+export LIBCLUSTER=$srcdir/../.libs/cluster.so
+export RECEIVER=$srcdir/receiver
+export SENDER=$srcdir/sender
+
+#Make sure temp dir exists if this is the first to use it
+TMP_STORE_DIR=${TEST_DIR}/test_tmp
+if ! test -d ${TMP_STORE_DIR} ; then
+ mkdir -p ${TMP_STORE_DIR}
+ mkdir -p ${TMP_STORE_DIR}/cluster
+else
+ rm -rf "${TMP_STORE_DIR}/cluster"
+ mkdir -p "${TMP_STORE_DIR}/cluster"
+fi
+export TMP_STORE_DIR
+
+
+AMQP_SPEC=${TEST_DIR}/../../../specs/amqp.0-10-qpid-errata.xml
+sg ais -c "${TEST_DIR}/cluster.py -v"
+RETCODE=$?
+if test x$RETCODE != x0; then
+ echo "FAIL cluster tests"; exit 1;
+fi
Propchange: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/trunk/qpid/cpp/src/tests/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testlib.py?rev=778751&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testlib.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/testlib.py Tue May 26 15:30:47 2009
@@ -0,0 +1,486 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Support library for qpid python tests.
+#
+
+import os, signal, subprocess, unittest
+
+class TestBase(unittest.TestCase):
+ """
+ Base class for qpid tests. Provides broker start/stop/kill methods
+ """
+
+ """
+ The following environment vars control if and how the test is run, and determine where many of the helper
+ executables/libs are to be found.
+ """
+ _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for durability to be enabled during the test
+ _storeLib = os.getenv("LIBSTORE")
+ _qpiddExec = os.getenv("QPIDD", "/usr/sbin/qpidd")
+ _tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid"))
+
+ """Global message counter ensures unique messages"""
+ _msgCnt = 0
+
+ # --- Helper functions for parameter handling ---
+
+ def _paramBool(self, key, val, keyOnly = False):
+ if val == None:
+ return ""
+ if keyOnly:
+ if val:
+ return " --%s" % key
+ else:
+ return ""
+ else:
+ if val:
+ return " --%s yes" % key
+ else:
+ return " --%s no" % key
+
+ def _paramNum(self, key, val):
+ if val != None:
+ return " --%s %d" % (key, val)
+ return ""
+
+ def _paramString(self, key, val):
+ if val != None:
+ return " --%s %s" % (key, val)
+ return ""
+
+ def _paramStringList(self, key, valList, val):
+ if val in valList:
+ return " --%s %s" % (key, val)
+ return ""
+
+ # --- Helper functions for message creation ---
+
+ def _makeMessage(self, msgSize):
+ msg = "Message-%04d" % self._msgCnt
+ self._msgCnt = self._msgCnt + 1
+ msgLen = len(msg)
+ if msgSize > msgLen:
+ for i in range(msgLen, msgSize):
+ if i == msgLen:
+ msg += "-"
+ else:
+ msg += chr(ord('a') + (i % 26))
+ return msg
+
+ def _makeMessageList(self, numMsgs, msgSize):
+ if msgSize == None:
+ msgSize = 12
+ msgs = ""
+ for m in range(0, numMsgs):
+ msgs += "%s\n" % self._makeMessage(msgSize)
+ return msgs
+
+ # --- Starting and stopping a broker ---
+
+ def startBroker(self, qpiddArgs, logFile = None):
+ """Start a single broker daemon, returns tuple (pid, port)"""
+ if self._qpiddExec == None:
+ raise Exception("Environment variable QPIDD is not set")
+ cmd = "%s --daemon --port=0 %s" % (self._qpiddExec, qpiddArgs)
+ portStr = os.popen(cmd).read()
+ if len(portStr) == 0:
+ err = "Broker daemon startup failed."
+ if logFile != None:
+ err += " See log file %s" % logFile
+ raise Exception(err)
+ port = int(portStr)
+ pidStr = os.popen("%s -p %d -c" % (self._qpiddExec, port)).read()
+ try:
+ pid = int(pidStr)
+ except:
+ raise Exception("Unable to get pid: \"%s -p %d -c\" returned %s" % (self._qpiddExec, port, pidStr))
+ #print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs)
+ return (pid, port)
+
+ def killBroker(self, pid):
+ """Kill a broker using kill -9"""
+ os.kill(pid, signal.SIGTERM)
+ #print "killed broker: pid=%d" % pid
+
+ def stopBroker(self, port):
+ """Stop a broker using qpidd -q"""
+ ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % port, "-q")
+ if ret != 0:
+ raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret))
+ #print "stopped broker: port=%d" % port
+
+
+
+class TestBaseCluster(TestBase):
+ """
+ Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes.
+ """
+
+ """
+ The following environment vars control if and how the test is run, and determine where many of the helper
+ executables/libs are to be found.
+ """
+ _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True for these cluster tests to run
+ _clusterLib = os.getenv("LIBCLUSTER")
+ _qpidConfigExec = os.getenv("QPID_CONFIG", "/usr/bin/qpid-config")
+ _qpidRouteExec = os.getenv("QPID_ROUTE", "/usr/bin/qpid-route")
+ _receiverExec = os.getenv("RECEIVER", "/usr/libexec/qpid/test/receiver")
+ _senderExec = os.getenv("SENDER", "/usr/libexec/qpid/test/sender")
+
+
+ """
+ _clusterDict is a dictionary of clusters:
+ key = cluster name (string)
+ val = dictionary of node numbers:
+ key = integer node number
+ val = tuple containing (pid, port)
+ For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows:
+ {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}}
+ where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively.
+ """
+ _clusterDict = {}
+
+ """Index for (pid, port) tuple"""
+ PID = 0
+ PORT = 1
+
+ def run(self, res):
+ """ Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined."""
+ if not self._runClusterTests:
+ return
+ unittest.TestCase.run(self, res)
+
+
+ # --- Starting cluster node(s) ---
+
+ def createClusterNode(self, nodeNumber, clusterName):
+ """Create a node and add it to the named cluster"""
+ if self._tempStoreDir == None:
+ raise Exception("Environment variable TMP_STORE_DIR is not set")
+ if self._clusterLib == None:
+ raise Exception("Environment variable LIBCLUSTER is not set")
+ name = "%s-%d" % (clusterName, nodeNumber)
+ dataDir = os.path.join(self._tempStoreDir, "cluster", name)
+ logFile = "%s.log" % dataDir
+ args = "--no-module-dir --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=notice+ --log-to-file=%s" % \
+ (self._clusterLib, dataDir, clusterName, logFile)
+ if self._storeEnable:
+ if self._storeLib == None:
+ raise Exception("Environment variable LIBSTORE is not set")
+ args += " --load-module %s" % self._storeLib
+ self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
+
+ def createCluster(self, clusterName, numberNodes):
+ """Create a cluster containing an initial number of nodes"""
+ self._clusterDict[clusterName] = {}
+ for n in range(0, numberNodes):
+ self.createClusterNode(n, clusterName)
+
+ # --- Cluster and node status ---
+
+ def getTupleList(self):
+ """Get list of (pid, port) tuples of all known cluster brokers"""
+ tList = []
+ for l in self._clusterDict.itervalues():
+ for t in l.itervalues():
+ tList.append(t)
+ return tList
+
+ def getNumBrokers(self):
+ """Get total number of brokers in all known clusters"""
+ return len(self.getTupleList())
+
+ def checkNumBrokers(self, expected):
+ """Check that the total number of brokers in all known clusters is the expected value"""
+ if self.getNumBrokers() != expected:
+ raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
+
+ def getClusterTupleList(self, clusterName):
+ """Get list of (pid, port) tuples of all nodes in named cluster"""
+ return self._clusterDict[clusterName].values()
+
+ def getNumClusterBrokers(self, clusterName):
+ """Get total number of brokers in named cluster"""
+ return len(self.getClusterTupleList(clusterName))
+
+ def getNodeTuple(self, nodeNumber, clusterName):
+ """Get the (pid, port) tuple for the given cluster node"""
+ return self._clusterDict[clusterName][nodeNumber]
+
+ def checkNumClusterBrokers(self, clusterName, expected):
+ """Check that the total number of brokers in the named cluster is the expected value"""
+ if self.getNumClusterBrokers(clusterName) != expected:
+ raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
+ (clusterName, expected, self.getNumClusterBrokers(clusterName)))
+
+ def clusterExists(self, clusterName):
+ """ Return True if clusterName exists, False otherwise"""
+ return clusterName in self._clusterDict.keys()
+
+ def clusterNodeExists(self, clusterName, nodeNumber):
+ """ Return True if nodeNumber in clusterName exists, False otherwise"""
+ if clusterName in self._clusterDict.keys():
+ return nodeNumber in self._clusterDict[nodeName]
+ return False
+
+ def createCheckCluster(self, clusterName, size):
+ """Create a cluster using the given name and size, then check the number of brokers"""
+ self.createCluster(clusterName, size)
+ self.checkNumClusterBrokers(clusterName, size)
+
+ # --- Kill cluster nodes using signal 9 ---
+
+ def killNode(self, nodeNumber, clusterName, updateDict = True):
+ """Kill the given node in the named cluster using kill -9"""
+ self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID])
+ if updateDict:
+ del(self._clusterDict[clusterName][nodeNumber])
+
+ def killCluster(self, clusterName, updateDict = True):
+ """Kill all nodes in the named cluster"""
+ for n in self._clusterDict[clusterName].iterkeys():
+ self.killNode(n, clusterName, False)
+ if updateDict:
+ del(self._clusterDict[clusterName])
+
+ def killClusterCheck(self, clusterName):
+ """Kill the named cluster and check that the name is removed from the cluster dictionary"""
+ self.killCluster(clusterName)
+ if self.clusterExists(clusterName):
+ raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
+ (clusterName, self.getNumClusterBrokers(clusterName)))
+
+ def killAllClusters(self):
+ """Kill all known clusters"""
+ for n in self._clusterDict.iterkeys():
+ self.killCluster(n, False)
+ self._clusterDict.clear()
+
+ def killAllClustersCheck(self):
+ """Kill all known clusters and check that the cluster dictionary is empty"""
+ self.killAllClusters()
+ self.checkNumBrokers(0)
+
+ # --- Stop cluster nodes using qpidd -q ---
+
+ def stopNode(self, nodeNumber, clusterName, updateDict = True):
+ """Stop the given node in the named cluster using qpidd -q"""
+ self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT])
+ if updateDict:
+ del(self._clusterDict[clusterName][nodeNumber])
+
+ def stopAllClusters(self):
+ """Stop all known clusters"""
+ for n in self._clusterDict.iterkeys():
+ self.stopCluster(n, False)
+ self._clusterDict.clear()
+
+
+ def stopCluster(self, clusterName, updateDict = True):
+ """Stop all nodes in the named cluster"""
+ for n in self._clusterDict[clusterName].iterkeys():
+ self.stopNode(n, clusterName, False)
+ if updateDict:
+ del(self._clusterDict[clusterName])
+
+ def stopCheckCluster(self, clusterName):
+ """Stop the named cluster and check that the name is removed from the cluster dictionary"""
+ self.stopCluster(clusterName)
+ if self.clusterExists(clusterName):
+ raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
+
+ def stopCheckAll(self):
+ """Kill all known clusters and check that the cluster dictionary is empty"""
+ self.stopAllClusters()
+ self.checkNumBrokers(0)
+
+ # --- qpid-config functions ---
+
+ def _qpidConfig(self, nodeNumber, clusterName, action):
+ """Configure some aspect of a qpid broker using the qpid_config executable"""
+ port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+ #print "%s -a localhost:%d %s" % (self._qpidConfigExec, port, action)
+ ret = os.spawnl(os.P_WAIT, self._qpidConfigExec, self._qpidConfigExec, "-a", "localhost:%d" % port, *action.split())
+ if ret != 0:
+ raise Exception("_qpidConfig(): cluster=\"%s\" nodeNumber=%d port=%d action=\"%s\" returned %d" % \
+ (clusterName, nodeNumber, port, action, ret))
+
+ def addExchange(self, nodeNumber, clusterName, exchangeType, exchangeName, durable = False, sequence = False, \
+ ive = False):
+ """Add a named exchange."""
+ action = "add exchange %s %s" % (exchangeType, exchangeName)
+ action += self._paramBool("durable", durable, True)
+ action += self._paramBool("sequence", sequence, True)
+ action += self._paramBool("ive", ive, True)
+ self._qpidConfig(nodeNumber, clusterName, action)
+
+ def deleteExchange(self, nodeNumber, clusterName, exchangeName):
+ """Delete a named exchange"""
+ self._qpidConfig(nodeNumber, clusterName, "del exchange %s" % exchangeName)
+
+ def addQueue(self, nodeNumber, clusterName, queueName, configArgs = None):
+ """Add a queue using qpid-config."""
+ action = "add queue %s" % queueName
+ if self._storeEnable:
+ action += " --durable"
+ if configArgs != None:
+ action += " %s" % configArgs
+ self._qpidConfig(nodeNumber, clusterName, action)
+
+ def delQueue(self, nodeNumber, clusterName, queueName):
+ """Delete a named queue using qpid-config."""
+ self._qpidConfig(nodeNumber, clusterName, "del queue %s" % queueName)
+
+ def bind(self, nodeNumber, clusterName, exchangeName, queueName, key):
+ """Create an exchange-queue binding using qpid-config."""
+ self._qpidConfig(nodeNumber, clusterName, "bind %s %s %s" % (exchangeName, queueName, key))
+
+ def unbind(self, nodeNumber, clusterName, exchangeName, queueName, key):
+ """Remove an exchange-queue binding using qpid-config."""
+ self._qpidConfig(nodeNumber, clusterName, "unbind %s %s %s" % (exchangeName, queueName, key))
+
+ # --- qpid-route functions (federation) ---
+
+ def brokerDict(self, nodeNumber, clusterName, host = "localhost", user = None, password = None):
+ """Returns a dictionary containing the broker info to be passed to route functions"""
+ port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+ return {"cluster": clusterName, "node":nodeNumber, "port":port, "host":host, "user":user, "password":password}
+
+ def _brokerStr(self, brokerDict):
+ """Set up a broker string in the format [user/password@]host:port"""
+ str = ""
+ if brokerDict["user"] !=None and brokerDict["password"] != None:
+ str = "%s@%s" % (brokerDict["user"], brokerDict["password"])
+ str += "%s:%d" % (brokerDict["host"], brokerDict["port"])
+ return str
+
+ def _qpidRoute(self, action):
+ """Set up a route using qpid-route"""
+ #print "%s %s" % (self._qpidRouteExec, action)
+ ret = os.spawnl(os.P_WAIT, self._qpidRouteExec, self._qpidRouteExec, *action.split())
+ if ret != 0:
+ raise Exception("_qpidRoute(): action=\"%s\" returned %d" % (action, ret))
+
+ def routeDynamicAdd(self, destBrokerDict, srcBrokerDict, exchangeName):
+ self._qpidRoute("dynamic add %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName))
+
+ def routeDynamicDelete(self, destBrokerDict, srcBrokerDict, exchangeName):
+ self._qpidRoute("dynamic del %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName))
+
+ def routeAdd(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey):
+ self._qpidRoute("route add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey))
+
+ def routeDelete(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey):
+ self._qpidRoute("route del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey))
+
+ def routeQueueAdd(self, destBrokerDict, srcBrokerDict, exchangeName, queueName):
+ self._qpidRoute("queue add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName))
+
+ def routeQueueDelete(self, destBrokerDict, srcBrokerDict, exchangeName, queueName):
+ self._qpidRoute("queue del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName))
+
+ def routeLinkAdd(self, destBrokerDict, srcBrokerDict):
+ self._qpidRoute("link add %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict)))
+
+ def routeLinkDelete(self, destBrokerDict, srcBrokerDict):
+ self._qpidRoute("link del %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict)))
+
+ # --- Message send and receive functions ---
+
+ def _receiver(self, action):
+ if self._receiverExec == None:
+ raise Exception("Environment variable RECEIVER is not set")
+ cmd = "%s %s" % (self._receiverExec, action)
+ #print cmd
+ return subprocess.Popen(cmd.split(), stdout = subprocess.PIPE)
+
+ def _sender(self, action):
+ if self._senderExec == None:
+ raise Exception("Environment variable SENDER is not set")
+ cmd = "%s %s" % (self._senderExec, action)
+ #print cmd
+ return subprocess.Popen(cmd.split(), stdin = subprocess.PIPE)
+
+ def createReciever(self, nodeNumber, clusterName, queueName, numMsgs = None, receiverArgs = None):
+ port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+ action = "--port %d --queue %s" % (port, queueName)
+ if numMsgs != None:
+ action += " --messages %d" % numMsgs
+ if receiverArgs != None:
+ action += " %s" % receiverArgs
+ return self._receiver(action)
+
+ def createSender(self, nodeNumber, clusterName, exchangeName, routingKey, senderArgs = None):
+ port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
+ action = "--port %d --exchange %s" % (port, exchangeName)
+ if routingKey != None and len(routingKey) > 0:
+ action += " --routing-key %s" % routingKey
+ if self._storeEnable:
+ action += " --durable yes"
+ if senderArgs != None:
+ action += " %s" % senderArgs
+ return self._sender(action)
+
+ def createBindDirectExchangeQueue(self, nodeNumber, clusterName, exchangeName, queueName):
+ self.addExchange(nodeNumber, clusterName, "direct", exchangeName)
+ self.addQueue(nodeNumber, clusterName, queueName)
+ self.bind(nodeNumber, clusterName, exchangeName, queueName, queueName)
+
+ def createBindTopicExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameKeyList):
+ self.addExchange(nodeNumber, clusterName, "topic", exchangeName)
+ for queueName, key in queueNameKeyList.iteritems():
+ self.addQueue(nodeNumber, clusterName, queueName)
+ self.bind(nodeNumber, clusterName, exchangeName, queueName, key)
+
+ def createBindFanoutExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameList):
+ self.addExchange(nodeNumber, clusterName, "fanout", exchangeName)
+ for queueName in queueNameList:
+ self.addQueue(nodeNumber, clusterName, queueName)
+ self.bind(nodeNumber, clusterName, exchangeName, queueName, "")
+
+ def sendMsgs(self, nodeNumber, clusterName, exchangeName, routingKey, numMsgs, msgSize = None, wait = True):
+ msgs = self._makeMessageList(numMsgs, msgSize)
+ sender = self.createSender(nodeNumber, clusterName, exchangeName, routingKey)
+ sender.stdin.write(msgs)
+ sender.stdin.close()
+ if wait:
+ sender.wait()
+ return msgs
+
+ def receiveMsgs(self, nodeNumber, clusterName, queueName, numMsgs, wait = True):
+ receiver = self.createReciever(nodeNumber, clusterName, queueName, numMsgs)
+ cnt = 0
+ msgs = ""
+ while cnt < numMsgs:
+ rx = receiver.stdout.readline()
+ if rx == "" and receiver.poll() != None: break
+ msgs += rx
+ cnt = cnt + 1
+ if wait:
+ receiver.wait()
+ return msgs
+
+ def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, queueName, numMsgs, wait = True, msgSize = None):
+ self.createBindDirectExchangeQueue(nodeNumber, clusterName, exchangeName, queueName)
+ txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, queueName, numMsgs, msgSize, wait)
+ rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, wait)
+ if txMsgs != rxMsgs:
+ self.fail("Send - receive message mismatch")
Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=778751&r1=778750&r2=778751&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Tue May 26 15:30:47 2009
@@ -21,7 +21,7 @@
# Support library for qpid python tests.
#
-import sys, re, unittest, os, signal, random, logging, traceback
+import sys, re, unittest, os, random, logging, traceback
import qpid.client, qpid.spec, qmf.console
import Queue
from fnmatch import fnmatch
@@ -429,196 +429,3 @@
session.message_subscribe(**keys)
session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
-
-
-class TestBaseCluster(unittest.TestCase):
- """
- Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes.
- """
- _tempStoreDir = os.getenv("TMP_STORE_DIR")
- _qpidd = os.getenv("QPIDD")
- _storeLib = os.getenv("LIBSTORE")
- _clusterLib = os.getenv("LIBCLUSTER")
-
- # --- Cluster helper functions ---
-
- """
- _clusterDict is a dictionary of clusters:
- key = cluster name (string)
- val = dictionary of node numbers:
- key = node number (int)
- val = tuple containing (pid, port)
- For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows:
- {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}}
- where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively.
- """
- _clusterDict = {}
-
- """Index for (pid, port) tuple"""
- PID = 0
- PORT = 1
-
- def startBroker(self, qpiddArgs, logFile = None):
- """Start a single broker daemon, returns tuple (pid, port)"""
- if self._qpidd == None:
- raise Exception("Environment variable QPIDD is not set")
- cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs)
- portStr = os.popen(cmd).read()
- if len(portStr) == 0:
- err = "Broker daemon startup failed."
- if logFile != None:
- err += " See log file %s" % logFile
- raise Exception(err)
- port = int(portStr)
- pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read())
- #print "started broker: pid=%d, port=%d" % (pid, port)
- return (pid, port)
-
- def createClusterNode(self, nodeNumber, clusterName):
- """Create a node and add it to the named cluster"""
- if self._tempStoreDir == None:
- raise Exception("Environment variable TMP_STORE_DIR is not set")
- if self._storeLib == None:
- raise Exception("Environment variable LIBSTORE is not set")
- if self._clusterLib == None:
- raise Exception("Environment variable LIBCLUSTER is not set")
- name = "%s-%d" % (clusterName, nodeNumber)
- dataDir = os.path.join(self._tempStoreDir, "cluster", name)
- logFile = "%s.log" % dataDir
- args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \
- (self._storeLib, self._clusterLib, dataDir, clusterName, logFile)
- self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile)
-
- def createCluster(self, clusterName, numberNodes):
- """Create a cluster containing an initial number of nodes"""
- self._clusterDict[clusterName] = {}
- for n in range(0, numberNodes):
- self.createClusterNode(n, clusterName)
-
- def getTupleList(self):
- """Get list of (pid, port) tuples of all known cluster brokers"""
- tList = []
- for l in self._clusterDict.itervalues():
- for t in l.itervalues():
- tList.append(t)
- return tList
-
- def getNumBrokers(self):
- """Get total number of brokers in all known clusters"""
- return len(self.getTupleList())
-
- def checkNumBrokers(self, expected):
- """Check that the total number of brokers in all known clusters is the expected value"""
- if self.getNumBrokers() != expected:
- raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
-
- def getClusterTupleList(self, clusterName):
- """Get list of (pid, port) tuples of all nodes in named cluster"""
- return self._clusterDict[clusterName].values()
-
- def getNumClusterBrokers(self, clusterName):
- """Get total number of brokers in named cluster"""
- return len(self.getClusterTupleList(clusterName))
-
- def getNodeTuple(self, nodeNumber, clusterName):
- """Get the (pid, port) tuple for the given cluster node"""
- return self._clusterDict[clusterName][nodeNumber]
-
- def checkNumClusterBrokers(self, clusterName, expected):
- """Check that the total number of brokers in the named cluster is the expected value"""
- if self.getNumClusterBrokers(clusterName) != expected:
- raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
- (clusterName, expected, self.getNumClusterBrokers(clusterName)))
-
- def clusterExists(self, clusterName):
- """ Return True if clusterName exists, False otherwise"""
- return clusterName in self._clusterDict.keys()
-
- def clusterNodeExists(self, clusterName, nodeNumber):
- """ Return True if nodeNumber in clusterName exists, False otherwise"""
- if clusterName in self._clusterDict.keys():
- return nodeNumber in self._clusterDict[nodeName]
- return False
-
- def createCheckCluster(self, clusterName, size):
- """Create a cluster using the given name and size, then check the number of brokers"""
- self.createCluster(clusterName, size)
- self.checkNumClusterBrokers(clusterName, size)
-
- # Kill cluster nodes using signal 9
-
- def killNode(self, nodeNumber, clusterName, updateDict = True):
- """Kill the given node in the named cluster using kill -9"""
- pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID]
- os.kill(pid, signal.SIGTERM)
- #print "killed broker: pid=%d" % pid
- if updateDict:
- del(self._clusterDict[clusterName][nodeNumber])
-
- def killCluster(self, clusterName, updateDict = True):
- """Kill all nodes in the named cluster"""
- for n in self._clusterDict[clusterName].iterkeys():
- self.killNode(n, clusterName, False)
- if updateDict:
- del(self._clusterDict[clusterName])
-
- def killClusterCheck(self, clusterName):
- """Kill the named cluster and check that the name is removed from the cluster dictionary"""
- self.killCluster(clusterName)
- if self.clusterExists(clusterName):
- raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
- (clusterName, self.getNumClusterBrokers(clusterName)))
-
- def killAllClusters(self):
- """Kill all known clusters"""
- for n in self._clusterDict.iterkeys():
- self.killCluster(n, False)
- self._clusterDict.clear()
-
- def killAllClustersCheck(self):
- """Kill all known clusters and check that the cluster dictionary is empty"""
- self.killAllClusters()
- self.checkNumBrokers(0)
-
- # Stop cluster nodes using qpidd -q
-
- def stopNode(self, nodeNumber, clusterName, updateDict = True):
- """Stop the given node in the named cluster using qpidd -q"""
- port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT]
- ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q")
- if ret != 0:
- raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \
- (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret))
- #print "stopped broker: port=%d" % port
- if updateDict:
- del(self._clusterDict[clusterName][nodeNumber])
-
- def stopAllClusters(self):
- """Stop all known clusters"""
- for n in self._clusterDict.iterkeys():
- self.stopCluster(n, False)
- self._clusterDict.clear()
-
-
- def stopCluster(self, clusterName, updateDict = True):
- """Stop all nodes in the named cluster"""
- for n in self._clusterDict[clusterName].iterkeys():
- self.stopNode(n, clusterName, False)
- if updateDict:
- del(self._clusterDict[clusterName])
-
- def stopCheckCluster(self, clusterName):
- """Stop the named cluster and check that the name is removed from the cluster dictionary"""
- self.stopCluster(clusterName)
- if self.clusterExists(clusterName):
- raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
- def stopCheckAll(self):
- """Kill all known clusters and check that the cluster dictionary is empty"""
- self.stopAllClusters()
- self.checkNumBrokers(0)
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org
Re: [c++]: new cluster tests not working for me
Posted by Kim van der Riet <ki...@redhat.com>.
On Wed, 2009-05-27 at 14:27 +0100, Gordon Sim wrote:
> kpvdr@apache.org wrote:
> > Author: kpvdr
> > Date: Tue May 26 15:30:47 2009
> > New Revision: 778751
> >
> > URL: http://svn.apache.org/viewvc?rev=778751&view=rev
> > Log:
> > Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.
>
> Kim,
>
> I had to make the attached changes to get these new tests to run from
> svn (basically need to setup locations of qpid-config and qpid-route
> tools for an svn checkout and QPID_CONFIG is used to set the config file
> for qpidd so I changed the name of that variable).
>
> Are you happy with these changes or am I ignoring something important in
> how this has been designed to work?
>
This is indeed a problem - thanks. I had the python client package
installed which put some of these utils onto my path, so I did not pick
up the path deficiencies. Fixed in r.779262.
Kim
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org
[c++]: new cluster tests not working for me
Posted by Gordon Sim <gs...@redhat.com>.
kpvdr@apache.org wrote:
> Author: kpvdr
> Date: Tue May 26 15:30:47 2009
> New Revision: 778751
>
> URL: http://svn.apache.org/viewvc?rev=778751&view=rev
> Log:
> Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.
Kim,
I had to make the attached changes to get these new tests to run from
svn (basically need to setup locations of qpid-config and qpid-route
tools for an svn checkout and QPID_CONFIG is used to set the config file
for qpidd so I changed the name of that variable).
Are you happy with these changes or am I ignoring something important in
how this has been designed to work?
--Gordon.