You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/06/10 20:54:31 UTC
svn commit: r783451 - in /qpid/trunk/qpid/cpp/src/tests: cluster.py
cluster_test.cpp federated_cluster_test run_cluster_tests testlib.py
Author: kpvdr
Date: Wed Jun 10 18:54:30 2009
New Revision: 783451
URL: http://svn.apache.org/viewvc?rev=783451&view=rev
Log:
Updates to python cluster tests and associated scripts
Modified:
qpid/trunk/qpid/cpp/src/tests/cluster.py
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
qpid/trunk/qpid/cpp/src/tests/testlib.py
Modified: qpid/trunk/qpid/cpp/src/tests/cluster.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.py?rev=783451&r1=783450&r2=783451&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.py Wed Jun 10 18:54:30 2009
@@ -43,7 +43,7 @@
self.checkNumBrokers(25)
self.killCluster("cluster-02.2")
self.checkNumBrokers(20)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -53,17 +53,17 @@
try:
clusterName = "cluster-03"
self.createCheckCluster(clusterName, 3)
- for i in range(4,9):
+ for i in range(3,8):
self.createClusterNode(i, clusterName)
self.checkNumClusterBrokers(clusterName, 8)
self.killNode(2, clusterName)
self.killNode(5, clusterName)
self.killNode(6, clusterName)
self.checkNumClusterBrokers(clusterName, 5)
+ self.createClusterNode(8, clusterName)
self.createClusterNode(9, clusterName)
- self.createClusterNode(10, clusterName)
self.checkNumClusterBrokers(clusterName, 7)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -90,7 +90,7 @@
self.createClusterNode(3, clusterName)
self.createClusterNode(4, clusterName)
self.checkNumClusterBrokers(clusterName, 6)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -102,7 +102,7 @@
self.createCheckCluster(clusterName, 6)
self.killClusterCheck(clusterName)
self.createCheckCluster(clusterName, 6)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -245,6 +245,7 @@
dh.sendMsgs(20, 5) # 60 30 *
dh.receiveMsgs(20, 4) # 60 50 *
dh.killCluster() # cluster does not exist
+ self.checkNumClusterBrokers("cluster-12", 0)
dh.restoreCluster() # 60 50 . . . . . .
dh.restoreNodes() # 0 1 2 3 4 5
dh.finalizeTest()
@@ -325,7 +326,7 @@
# Start the test here
if __name__ == '__main__':
- if os.getenv("STORE_ENABLE") != None:
+ if os.getenv("STORE_LIB") != None:
print "NOTE: Store enabled for the following tests:"
if not unittest.main(): sys.exit(1)
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=783451&r1=783450&r2=783451&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jun 10 18:54:30 2009
@@ -70,14 +70,14 @@
using broker::Broker;
using boost::shared_ptr;
-bool durableFlag = std::getenv("STORE_ENABLE") != 0;
+bool durableFlag = std::getenv("STORE_LIB") != 0;
void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
ostringstream clusterLib;
clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
if (durableFlag)
- args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR";
+ args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
else
args += "--no-data-dir";
}
@@ -697,7 +697,7 @@
QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
{
/*
- Start with a single broker.
+ Start with a single broker.
Set up two queues: one durable, and one not.
Add a new broker to the cluster.
Make sure it has one durable and one non-durable queue.
@@ -714,7 +714,7 @@
QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
-
+
BOOST_CHECK_EQUAL ( durable_query.getDurable(), true );
BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
}
@@ -869,9 +869,9 @@
Subscription sub = client.subs.subscribe(q, queue, settings);
client.session.messageFlush(sub.getName());
return sub;
-}
+}
-/**
+/**
* check that the specified queue contains the expected set of
* messages (matched on content) for all nodes in the cluster
*/
@@ -884,7 +884,7 @@
}
}
-void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
{
for (int i = 0; i < count; i++) {
client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag));
Modified: qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federated_cluster_test?rev=783451&r1=783450&r2=783451&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federated_cluster_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/federated_cluster_test Wed Jun 10 18:54:30 2009
@@ -44,7 +44,9 @@
../qpidd -q --port $NODE_2
unset NODE_2
fi
- rm cluster.ports
+ if [ -f cluster.ports ]; then
+ rm cluster.ports
+ fi
}
start_brokers() {
Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=783451&r1=783450&r2=783451&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Wed Jun 10 18:54:30 2009
@@ -19,6 +19,15 @@
# under the License.
#
+
+# Check that top_builddir and srcdir are set
+# If not, assume local run from test dir
+if [ -z ${top_builddir} -o -z ${srcdir} ]; then
+ srcdir=`pwd`
+ top_builddir=${srcdir}/../../
+fi
+
+
# Run the cluster tests.
TEST_DIR=${top_builddir}/src/tests
@@ -64,8 +73,8 @@
mkdir -p ${TMP_STORE_DIR}/cluster
else
# Delete old cluster test dirs
- rm -rf "${TMP_STORE_DIR}/cluster"
- mkdir -p "${TMP_STORE_DIR}/cluster"
+ rm -rf ${TMP_STORE_DIR}/cluster
+ mkdir -p ${TMP_STORE_DIR}/cluster
fi
export TMP_STORE_DIR
@@ -75,3 +84,8 @@
if test x${RETCODE} != x0; then
exit 1;
fi
+
+# Delete cluster store dir if test was successful.
+rm -rf ${TMP_STORE_DIR}
+
+exit 0
\ No newline at end of file
Modified: qpid/trunk/qpid/cpp/src/tests/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testlib.py?rev=783451&r1=783450&r2=783451&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testlib.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/testlib.py Wed Jun 10 18:54:30 2009
@@ -21,7 +21,7 @@
# Support library for qpid python tests.
#
-import os, re, signal, subprocess, unittest
+import os, re, signal, subprocess, time, unittest
class TestBase(unittest.TestCase):
"""
@@ -210,6 +210,12 @@
for n in range(0, numberNodes):
self.createClusterNode(n, clusterName)
+ def waitForNodes(self, clusterName):
+ """Wait for all nodes to become active (ie finish cluster sync)"""
+ # TODO - connect to each known node in cluster
+ # Until this is done, wait a bit (hack)
+ time.sleep(1)
+
# --- Cluster and node status ---
def getTupleList(self, clusterName = None):
@@ -246,13 +252,15 @@
"""Get the (pid, port) tuple for the given cluster node"""
return self._clusterDict[clusterName][nodeNumber]
- def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True):
+ def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True, waitForNodes = True):
"""Check that the total number of brokers in the named cluster is the expected value"""
if expected != None and self.getNumClusterBrokers(clusterName) != expected:
raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
(clusterName, expected, self.getNumClusterBrokers(clusterName)))
if checkPids:
self._checkPids(clusterName)
+ if waitForNodes:
+ self.waitForNodes(clusterName)
def clusterExists(self, clusterName):
""" Return True if clusterName exists, False otherwise"""
@@ -330,7 +338,7 @@
if self.clusterExists(clusterName):
raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
- def stopCheckAll(self, ignoreFailures = False):
+ def stopAllCheck(self, ignoreFailures = False):
"""Kill all known clusters and check that the cluster dictionary is empty"""
self.stopAllClusters()
self.checkNumBrokers(0)
@@ -580,6 +588,7 @@
self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName)
self._nodes.append(nodeNumber)
self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes))
+ self._testBaseCluster.waitForNodes(self._clusterName)
def restoreNode(self, nodeNumber):
"""Restore a cluster node that has been previously killed"""
@@ -598,6 +607,7 @@
self.restoreNode(lastNode)
while len(self._deadNodes) > 0:
self.restoreNode(self._deadNodes[0])
+ self._testBaseCluster.waitForNodes(self._clusterName)
def killNode(self, nodeNumber):
"""Kill a cluster node (if it is in the _nodes list)."""
@@ -635,15 +645,20 @@
if nm == None:
nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages
if nm > 0:
- receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
- cnt = 0
- while cnt < nm:
- rx = receiver.stdout.readline().strip()
- if rx == "" and receiver.poll() != None: break
- self._rxMsgs[qn].append(rx)
- cnt = cnt + 1
+ while nm > 0:
+ receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
+ cnt = 0
+ while cnt < nm:
+ rx = receiver.stdout.readline().strip()
+ if rx == "":
+ if receiver.poll() != None: break
+ elif rx not in self._rxMsgs[qn]:
+ self._rxMsgs[qn].append(rx)
+ cnt = cnt + 1
+ nm = nm - cnt
if wait:
receiver.wait()
+ self._rxMsgs[qn].sort()
self._lastNode = nodeNumber
def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True):
@@ -670,10 +685,10 @@
def finalizeTest(self):
"""Recover all the remaining messages on all queues, then check that all expected messages were received."""
self.receiveRemainingMsgs()
- self._testBaseCluster.stopCheckAll()
+ self._testBaseCluster.stopAllCheck()
if not self.checkMsgs():
- self._testBaseCluster.fail("Send - receive message mismatch")
self.printMsgs()
+ self._testBaseCluster.fail("Send - receive message mismatch")
def printMsgs(self, txMsgs = True, rxMsgs = True):
"""Print all messages transmitted and received."""
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org