You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/27 01:38:16 UTC
svn commit: r916887 [3/4] - in /qpid/branches/qmf-devel0.7: ./
qpid/cpp/include/qmf/engine/ qpid/cpp/include/qpid/
qpid/cpp/include/qpid/sys/posix/ qpid/cpp/include/qpid/sys/windows/
qpid/cpp/rubygen/framing.0-10/ qpid/cpp/src/ qpid/cpp/src/qmf/engine/...
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/module.xml?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/module.xml (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/module.xml Sat Feb 27 00:38:13 2010
@@ -273,6 +273,7 @@
<sysproperty key="broker" value="${broker}"/>
<sysproperty key="broker.clean" value="${broker.clean}"/>
<sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
+ <sysproperty key="broker.persistent" value="${broker.persistent}"/>
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="broker.ready" value="${broker.ready}" />
<sysproperty key="broker.stopped" value="${broker.stopped}" />
Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java Sat Feb 27 00:38:13 2010
@@ -133,8 +133,11 @@
{
// _logger.debug("public void testAsyncPingOk(int numPings): called");
+ // get prefill count to update the expected count
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+
// Ensure that at least one ping was requeusted.
- if (numPings == 0)
+ if (numPings + preFill == 0)
{
_logger.error("Number of pings requested was zero.");
fail("Number of pings requested was zero.");
@@ -149,16 +152,24 @@
// String messageCorrelationId = perThreadSetup._correlationId;
// _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
// Initialize the count and timing controller for the new correlation id.
PerCorrelationId perCorrelationId = new PerCorrelationId();
TimingController tc = getTimingController().getControllerForCurrentThread();
perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
+ perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
+ // Start the client that will have been paused due to preFill requirement.
+ // or if we have not yet started client because messages are sitting on broker.
+ if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ {
+ pingClient.start();
+ }
+
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Sat Feb 27 00:38:13 2010
@@ -104,4 +104,9 @@
return _pingClientCount * _noOfConsumers;
}
}
+
+ public int getClientCount()
+ {
+ return _pingClientCount;
+ }
}
Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java Sat Feb 27 00:38:13 2010
@@ -141,9 +141,65 @@
perThreadSetup._pingClient = new PingClient(testParameters);
perThreadSetup._pingClient.establishConnection(true, true);
}
- // Start the client connection
- perThreadSetup._pingClient.start();
+ // Prefill the broker unless we are in consume only mode.
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+ {
+ // Manually set the correlation ID to 1. This is not ideal but it is the
+ // value that the main test loop will use.
+ perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, String.valueOf(perThreadSetup._pingClient.getClientCount()));
+
+ // Note with a large preFill and non-tx session the messages will be
+ // rapidly pushed in to the mina buffers. OOM's are a real risk here.
+ // Should perhaps consider using a TX session for the prefill.
+
+ long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
+
+ // Only delay if we have consumers and a delayBeforeConsume
+ if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+ && delayBeforeConsume > 0)
+ {
+
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+ // Only do logging if in verbose mode.
+ if (verbose)
+ {
+ if (delayBeforeConsume > 60000)
+ {
+ long minutes = delayBeforeConsume / 60000;
+ long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
+ long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
+ _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+ }
+ else
+ {
+ _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+ }
+ }
+
+ Thread.sleep(delayBeforeConsume);
+
+ if (verbose)
+ {
+ _logger.info("Starting Test.");
+ }
+ }
+
+ // We can't start the client's here as the test client has not yet been configured to receieve messages.
+ // only when the test method is executed will the correlationID map be set up and ready to consume
+ // the messages we have sent here.
+ }
+ else //Only start the consumer if we are not preFilling.
+ {
+ // Only start the consumer if we don't have messages waiting to be received.
+ // we need to set up the correlationID mapping first
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ {
+ // Start the client connection
+ perThreadSetup._pingClient.start();
+ }
+ }
// Attach the per-thread set to the thread.
threadSetup.set(perThreadSetup);
}
@@ -157,7 +213,7 @@
* Performs test fixture clean
*/
public void threadTearDown()
- {
+ {
_logger.debug("public void threadTearDown(): called");
try
Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Sat Feb 27 00:38:13 2010
@@ -324,6 +324,25 @@
/** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+ /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */
+ public static final String PREFILL_PROPNAME = "preFill";
+
+ /** Defines the default value for the number of messages to prefill. 0,default, no messages. */
+ public static final int PREFILL_DEFAULT = 0;
+
+ /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */
+ public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume";
+
+ /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */
+ public static final long DELAY_BEFORE_CONSUME = 0;
+
+ /** Holds the name of the property to get when no messasges should be sent. */
+ public static final String CONSUME_ONLY_PROPNAME = "consumeOnly";
+
+ /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+ public static final boolean CONSUME_ONLY_DEFAULT = false;
+
+
/** Holds the default configuration properties. */
public static ParsedProperties defaults = new ParsedProperties();
@@ -360,6 +379,9 @@
defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+ defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
+ defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
+ defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
}
/** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -455,6 +477,24 @@
*/
protected int _maxPendingSize;
+ /**
+ * Holds the number of messages to send during the setup phase, before the clients start consuming.
+ */
+ private Integer _preFill;
+
+ /**
+ * Holds the time in ms to wait after preFilling before starting thet test.
+ */
+ private Long _delayBeforeConsume;
+
+ /**
+ * Holds a boolean value of wither this test should just consume, i.e. skips
+ * sending messages, but still expects to receive the specified number.
+ * Use in conjuction with numConsumers=0 to fill the broker.
+ */
+ private boolean _consumeOnly;
+
+
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -588,6 +628,9 @@
_ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
_consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
+ _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
+ _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
+ _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
// Check that one or more destinations were specified.
if (_noOfDestinations < 1)
@@ -638,7 +681,10 @@
}
// Create the destinations to send pings to and receive replies from.
- _replyDestination = _consumerSession[0].createTemporaryQueue();
+ if (_noOfConsumers > 0)
+ {
+ _replyDestination = _consumerSession[0].createTemporaryQueue();
+ }
createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
@@ -871,6 +917,14 @@
{
_consumer = new MessageConsumer[_noOfConsumers];
+ // If we don't have consumers then ensure we have created the
+ // destination.
+ if (_noOfConsumers == 0)
+ {
+ _producerSession.createConsumer(destination, selector,
+ NO_LOCAL_DEFAULT).close();
+ }
+
for (int i = 0; i < _noOfConsumers; i++)
{
// Create a consumer for the destination and set this pinger to listen to its messages.
@@ -980,6 +1034,11 @@
// When running in client ack mode, an ack is done instead of a commit, on the commit batch
// size boundaries.
long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+ // _noOfConsumers can be set to 0 on the command line but we will not get here to
+ // divide by 0 as this is executed by the onMessage code when a message is recevied.
+ // no consumers means no message reception.
+
+
// log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
@@ -1014,6 +1073,7 @@
else
{
log.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Map contains:" + perCorrelationIds.entrySet());
}
}
else
@@ -1037,13 +1097,18 @@
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
* the correlation id.
*
+ * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination
+ * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings.
+ *
+ * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.
+ *
* @param message The message to send. If this is null, one is generated.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
- * for all prematurely.
+ * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
* @throws InterruptedException When interrupted by a timeout
@@ -1051,6 +1116,16 @@
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
throws JMSException, InterruptedException
{
+ return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId);
+ }
+
+ public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+
+ // If we are runnning a consumeOnly test then don't send any messages
+
+
/*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
@@ -1071,29 +1146,41 @@
// countdown needs to be done before the chained listener can be called.
PerCorrelationId perCorrelationId = new PerCorrelationId();
- perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
+ int totalPingsRequested = numPings + preFill;
+ perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
// timeouts.
perCorrelationId.timeOutStart = System.nanoTime();
- // Send the specifed number of messages.
+ // Send the specifed number of messages for this test
pingNoWaitForReply(message, numPings, messageCorrelationId);
boolean timedOut;
boolean allMessagesReceived;
int numReplies;
+ // We don't have a consumer so don't try and wait for the messages.
+ // this does mean that if the producerSession is !TXed then we may
+ // get to exit before all msgs have been received.
+ //
+ // Return the number of requested messages, this will let the test
+ // report a pass.
+ if (_noOfConsumers == 0)
+ {
+ return totalPingsRequested;
+ }
+
do
{
// Block the current thread until replies to all the messages are received, or it times out.
perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
- numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+ numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount();
- allMessagesReceived = numReplies == getExpectedNumPings(numPings);
+ allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested);
// log.debug("numReplies = " + numReplies);
// log.debug("allMessagesReceived = " + allMessagesReceived);
@@ -1108,7 +1195,7 @@
}
while (!timedOut && !allMessagesReceived);
- if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
+ if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose)
{
log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
@@ -1146,6 +1233,12 @@
/*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
+ // If we are runnning a consumeOnly test then don't send any messages
+ if (_consumeOnly)
+ {
+ return;
+ }
+
if (message == null)
{
message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
@@ -1667,6 +1760,10 @@
/**
* Calculates how many pings are expected to be received for the given number sent.
*
+ * Note : that if you have set noConsumers to 0 then this will also return 0
+ * in the case of PubSub testing. This is correct as without consumers there
+ * will be no-one to receive the sent messages so they will be unable to respond.
+ *
* @param numpings The number of pings that will be sent.
*
* @return The number that should be received, for the test to pass.
Propchange: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:443187-707694
/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:805429-821809
-/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:911618-912022
+/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Sat Feb 27 00:38:13 2010
@@ -28,12 +28,16 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TopicSubscriber;
import junit.framework.Assert;
import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.concurrent.locks.ReentrantLock;
@@ -154,6 +158,7 @@
{
Message send = producerSession.createTextMessage("Message " + msg);
send.setBooleanProperty("first", first);
+ send.setStringProperty("testprop", "TimeToLiveTest");
send.setLongProperty("TTL", producer.getTimeToLive());
return send;
}
@@ -206,5 +211,160 @@
producerSession.close();
producerConnection.close();
}
+
+ public void testPassiveTTLwithDurableSubscription() throws Exception
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create and close the durable subscriber
+ AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+ TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
+ durableSubscriber.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ // Move to a Transacted session to ensure that all messages have been delivered to broker before
+ // we start waiting for TTL
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ //Set TTL
+ int msg = 0;
+ producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
+
+ producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+ }
+
+ //Reset TTL
+ producer.setTimeToLive(0L);
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+
+ producerSession.commit();
+
+ //resubscribe
+ durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName());
+
+ // Ensure we sleep the required amount of time.
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ final long MILLIS = 1000000L;
+
+ long waitTime = TIME_TO_LIVE * MILLIS;
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ clientConnection.start();
+
+ //Receive Message 0
+ // Set 5s receive time for messages we expect to receive.
+ Message receivedFirst = durableSubscriber.receive(5000);
+ Message receivedSecond = durableSubscriber.receive(5000);
+ Message receivedThird = durableSubscriber.receive(1000);
+
+ // Log the messages to help diagnosis incase of failure
+ _logger.info("First:"+receivedFirst);
+ _logger.info("Second:"+receivedSecond);
+ _logger.info("Third:"+receivedThird);
+
+ // Only first and last messages sent should survive expiry
+ Assert.assertNull("More messages received", receivedThird);
+
+ Assert.assertNotNull("First message not received", receivedFirst);
+ Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
+
+ Assert.assertNotNull("Final message not received", receivedSecond);
+ Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
+
+ clientSession.unsubscribe(getTestQueueName());
+ clientConnection.close();
+
+ producerConnection.close();
+ }
+
+ public void testActiveTTLwithDurableSubscription() throws Exception
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create and close the durable subscriber
+ AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+ TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false);
+ durableSubscriber.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+ AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(topic);
+ producer.setTimeToLive(1000L);
+
+ // send Messages
+ for(int i = 0; i < MSG_COUNT; i++)
+ {
+ producer.send(producerSession.createTextMessage("Message: "+i));
+ }
+ long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+
+ // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
+ long messageCount = MSG_COUNT;
+ long lastPass;
+ AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription");
+ do
+ {
+ lastPass = messageCount;
+ Thread.sleep(100);
+ messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue);
+
+ // If we have received messages in the last loop then extend the timeout time.
+ // if we get messages stuck that are not expiring then the failureTime will occur
+ // failing the test. This will help with the scenario when the broker does not
+ // have enough CPU cycles to process the TTLs.
+ if (lastPass != messageCount)
+ {
+ failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+ }
+ }
+ while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+ assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ clientSession.unsubscribe("MyDurableTTLSubscription");
+ clientSession.close();
+ clientConnection.close();
+ }
}
Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Sat Feb 27 00:38:13 2010
@@ -38,7 +38,7 @@
*/
public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
{
- if (!isBroker08())
+ if (isBrokerStorePersistent() || !isBroker08())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
@@ -102,7 +102,7 @@
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
- if (!isBroker08())
+ if (isBrokerStorePersistent() || !isBroker08())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Sat Feb 27 00:38:13 2010
@@ -115,10 +115,26 @@
_logger.info("Close connection");
con.close();
}
+
+ public void testDurabilityNOACK() throws Exception
+ {
+ durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false);
+ }
public void testDurabilityAUTOACK() throws Exception
{
- durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception
+ {
+ if(!isBrokerStorePersistent())
+ {
+ System.out.println("The broker store is not persistent, skipping this test.");
+ return;
+ }
+
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE, true);
}
public void testDurabilityNOACKSessionPerConnection() throws Exception
@@ -131,56 +147,85 @@
durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
}
- private void durabilityImpl(int ackMode) throws Exception
- {
+ private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception
+ {
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session sessionProd = con.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
+ //send message A and check both consumers receive
producer.send(session1.createTextMessage("A"));
Message msg;
+ _logger.info("Receive message on consumer 1 :expecting A");
msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 2 :expecting A");
msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- consumer2.close();
- session2.close();
-
+ //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK)
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(500);
assertNotNull("Consumer 1 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+ assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
+ consumer2.close();
+ session2.close();
+
+ //Send message C, then connect consumer 3 to durable subscription and get
+ //message B if not using NO_ACK, then receive C with consumer 1 and 3
+ producer.send(session1.createTextMessage("C"));
+
Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- _logger.info("Receive message on consumer 3 :expecting B");
+ if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+ {
+ //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+ //when we didn't call receive() to get it before closing consumer 2
+ }
+ else
+ {
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
+ }
+
+ _logger.info("Receive message on consumer 1 :expecting C");
+ msg = consumer1.receive(500);
+ assertNotNull("Consumer 1 should get message 'C'.", msg);
+ assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ _logger.info("Receive message on consumer 3 :expecting C");
msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 3 should get message 'C'.", msg);
+ assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(500);
assertNull("There should be no more messages for consumption on consumer3.", msg);
@@ -191,6 +236,18 @@
session3.unsubscribe("MySubscription");
con.close();
+
+ if(restartBroker)
+ {
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ fail("Error restarting the broker");
+ }
+ }
}
private void durabilityImplSessionPerConnection(int ackMode) throws Exception
@@ -211,7 +268,7 @@
con1.start();
Session session1 = con1.createSession(false, ackMode);
- MessageConsumer consumer1 = session0.createConsumer(topic);
+ MessageConsumer consumer1 = session1.createConsumer(topic);
// Create consumer 2.
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
@@ -232,37 +289,60 @@
msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
- msg = consumer2.receive(500);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("There should be no more messages for consumption on consumer2.", msg);
+ // Send message and receive on consumer 1.
+ producer.send(session0.createTextMessage("B"));
+
+ _logger.info("Receive message on consumer 1 :expecting B");
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertEquals(null, msg);
+
// Detach the durable subscriber.
consumer2.close();
session2.close();
con2.close();
+
+ // Send message C and receive on consumer 1
+ producer.send(session0.createTextMessage("C"));
- // Send message and receive on open consumer.
- producer.send(session0.createTextMessage("B"));
-
- _logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting C");
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+ // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
+ // and also gets message C sent after it was disconnected.
AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
con3.start();
Session session3 = con3.createSession(false, ackMode);
TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- _logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+ {
+ //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+ //when we didn't call receive() to get it before closing consumer 2
+ }
+ else
+ {
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("B", ((TextMessage) msg).getText());
+ }
+
+ _logger.info("Receive message on consumer 3 :expecting C");
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Consumer 3 should get message 'C'.", msg);
+ assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("There should be no more messages for consumption on consumer3.", msg);
consumer1.close();
Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Sat Feb 27 00:38:13 2010
@@ -166,6 +166,7 @@
private static final String TEST_OUTPUT = "test.output";
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
+ private static final String BROKER_PERSITENT = "broker.persistent";
// values
protected static final String JAVA = "java";
@@ -187,6 +188,7 @@
private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
+ protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
private static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
@@ -929,6 +931,11 @@
{
return !_broker.equals("vm");
}
+
+ protected boolean isBrokerStorePersistent()
+ {
+ return _brokerPersistent;
+ }
public void restartBroker() throws Exception
{
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/CPPExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/CPPExcludes:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/Excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/Excludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/Excludes:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes Sat Feb 27 00:38:13 2010
@@ -1,4 +1,3 @@
-org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
// Those tests are not finished
org.apache.qpid.test.testcases.TTLTest#*
org.apache.qpid.test.testcases.FailoverTest#*
@@ -18,3 +17,5 @@
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
+//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/JavaExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/JavaExcludes:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes Sat Feb 27 00:38:13 2010
@@ -1,4 +1,3 @@
-org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
// Those tests are not finished
org.apache.qpid.test.testcases.TTLTest#*
org.apache.qpid.test.testcases.FailoverTest#*
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes Sat Feb 27 00:38:13 2010
@@ -1 +1,3 @@
+//These tests require a persistent store
org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/clean-dir
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/clean-dir:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir:805429-821809
-/qpid/trunk/qpid/java/test-profiles/clean-dir:911618-912022
+/qpid/trunk/qpid/java/test-profiles/clean-dir:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/default.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/default.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/default.testprofile:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile Sat Feb 27 00:38:13 2010
@@ -7,3 +7,4 @@
qpid.amqp.version=0-9
profile.excludes=JavaStandaloneExcludes
broker.clean.between.tests=true
+broker.persistent=true
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/java.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/java.testprofile:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/log4j-test.xml:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml:805429-821809
-/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:911618-912022
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:443187-726139
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/test-provider.properties:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties:805429-821809
-/qpid/trunk/qpid/java/test-profiles/test-provider.properties:911618-912022
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/qpid/branches/0.5.x-dev/qpid/python:892761,894875
/qpid/branches/java-network-refactor/qpid/python:805429-825319
/qpid/branches/qmfv2/qpid/python:902858,902894
-/qpid/trunk/qpid/python:911618-912022
+/qpid/trunk/qpid/python:911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test (original)
+++ qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test Sat Feb 27 00:38:13 2010
@@ -20,7 +20,7 @@
# TODO: summarize, test harness preconditions (e.g. broker is alive)
-import logging, optparse, os, struct, sys, traceback, types
+import logging, optparse, os, struct, sys, time, traceback, types
from fnmatch import fnmatchcase as match
from getopt import GetoptError
from logging import getLogger, StreamHandler, Formatter, Filter, \
@@ -60,6 +60,8 @@
help="ignore tests matching patterns in IFILE")
parser.add_option("-H", "--halt-on-error", action="store_true", default=False,
dest="hoe", help="halt if an error is encountered")
+parser.add_option("-t", "--time", action="store_true", default=False,
+ help="report timing information on test run")
parser.add_option("-D", "--define", metavar="DEFINE", dest="defines",
action="append", default=[], help="define test parameters")
@@ -165,7 +167,9 @@
"start": (34,),
"total": (34,),
"ignored": (33,),
- "selected": (34,)}
+ "selected": (34,),
+ "elapsed": (34,),
+ "average": (34,)}
COLORIZE = is_smart()
@@ -525,6 +529,7 @@
passed = 0
failed = 0
skipped = 0
+start = time.time()
for t in filtered:
if list_only:
print t.name()
@@ -538,6 +543,7 @@
failed += 1
if opts.hoe:
break
+end = time.time()
run = passed + failed
@@ -558,16 +564,22 @@
skip = "skip"
else:
skip = "pass"
- print colorize("Totals:", 1), \
- colorize_word("total", "%s tests" % total) + ",", \
- colorize_word(_pass, "%s passed" % passed) + ",", \
- colorize_word(skip, "%s skipped" % skipped) + ",", \
- colorize_word(ign, "%s ignored" % len(ignored)) + ",", \
- colorize_word(outcome, "%s failed" % failed),
+ print colorize("Totals:", 1),
+ totals = [colorize_word("total", "%s tests" % total),
+ colorize_word(_pass, "%s passed" % passed),
+ colorize_word(skip, "%s skipped" % skipped),
+ colorize_word(ign, "%s ignored" % len(ignored)),
+ colorize_word(outcome, "%s failed" % failed)]
+ print ", ".join(totals),
if opts.hoe and failed > 0:
print " -- (halted after %s)" % run
else:
print
+ if opts.time and run > 0:
+ print colorize("Timing:", 1),
+ timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)),
+ colorize_word("average", "%.2fs average" % ((end - start)/run))]
+ print ", ".join(timing)
if failed or skipped:
sys.exit(1)
Modified: qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py Sat Feb 27 00:38:13 2010
@@ -283,6 +283,11 @@
SUBJECT = "qpid.subject"
TO = "qpid.to"
+CLOSED = "CLOSED"
+READ_ONLY = "READ_ONLY"
+WRITE_ONLY = "WRITE_ONLY"
+OPEN = "OPEN"
+
class Driver:
def __init__(self, connection):
@@ -290,24 +295,158 @@
self.log_id = "%x" % id(self.connection)
self._lock = self.connection._lock
- self._in = LinkIn()
- self._out = LinkOut()
-
self._selector = Selector.default()
self._attempts = 0
self._hosts = [(self.connection.host, self.connection.port)] + \
self.connection.backups
self._host = 0
self._retrying = False
+ self._socket = None
+
+ self._timeout = None
+
+ self.engine = None
+
+ @synchronized
+ def wakeup(self):
+ self.dispatch()
+ self._selector.wakeup()
+
+ def start(self):
+ self._selector.register(self)
+
+ def fileno(self):
+ return self._socket.fileno()
+
+ @synchronized
+ def reading(self):
+ return self._socket is not None
+
+ @synchronized
+ def writing(self):
+ return self._socket is not None and self.engine.pending()
+
+ @synchronized
+ def timing(self):
+ return self._timeout
+
+ @synchronized
+ def readable(self):
+ try:
+ data = self._socket.recv(64*1024)
+ if data:
+ rawlog.debug("READ[%s]: %r", self.log_id, data)
+ self.engine.write(data)
+ else:
+ self.close_engine()
+ except socket.error, e:
+ self.close_engine(e)
+
+ self.update_status()
+
+ self.connection._waiter.notifyAll()
+
+ def close_engine(self, e=None):
+ if e is None:
+ e = "connection aborted"
+
+ if (self.connection.reconnect and
+ (self.connection.reconnect_limit is None or
+ self.connection.reconnect_limit <= 0 or
+ self._attempts <= self.connection.reconnect_limit)):
+ if self._host > 0:
+ delay = 0
+ else:
+ delay = self.connection.reconnect_delay
+ self._timeout = time.time() + delay
+ log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
+ if delay > 0:
+ log.warn("sleeping %s seconds" % delay)
+ self._retrying = True
+ self.engine.close()
+ else:
+ self.engine.close(e)
+
+ def update_status(self):
+ status = self.engine.status()
+ return getattr(self, "st_%s" % status.lower())()
+
+ def st_closed(self):
+ # XXX: this log statement seems to sometimes hit when the socket is not connected
+ # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
+ self._socket.close()
+ self._socket = None
+ self.engine = None
+ return True
+
+ def st_open(self):
+ return False
+
+ @synchronized
+ def writeable(self):
+ notify = False
+ try:
+ n = self._socket.send(self.engine.peek())
+ sent = self.engine.read(n)
+ rawlog.debug("SENT[%s]: %r", self.log_id, sent)
+ except socket.error, e:
+ self.close_engine(e)
+ notify = True
+
+ if self.update_status() or notify:
+ self.connection._waiter.notifyAll()
+
+ @synchronized
+ def timeout(self):
+ self.dispatch()
+ self.connection._waiter.notifyAll()
- self.reset()
+ def dispatch(self):
+ try:
+ if self._socket is None:
+ if self.connection._connected:
+ self.connect()
+ else:
+ self.engine.dispatch()
+ except:
+ # XXX: Does socket get leaked if this occurs?
+ msg = compat.format_exc()
+ self.connection.error = (msg,)
- def reset(self):
- self._opening = False
+ def connect(self):
+ try:
+ # XXX: should make this non blocking
+ if self._host == 0:
+ self._attempts += 1
+ host, port = self._hosts[self._host]
+ if self._retrying:
+ log.warn("trying: %s:%s", host, port)
+ self.engine = Engine(self.connection)
+ self.engine.open()
+ rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
+ self._socket = connect(host, port)
+ if self._retrying:
+ log.warn("reconnect succeeded: %s:%s", host, port)
+ self._timeout = None
+ self._attempts = 0
+ self._host = 0
+ self._retrying = False
+ except socket.error, e:
+ self._host = (self._host + 1) % len(self._hosts)
+ self.close_engine(e)
+
+class Engine:
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.log_id = "%x" % id(self.connection)
self._closing = False
self._connected = False
self._attachments = {}
+ self._in = LinkIn()
+ self._out = LinkOut()
+
self._channel_max = 65536
self._channels = 0
self._sessions = {}
@@ -316,7 +455,7 @@
self.address_cache = Cache(options.get("address_ttl", 60))
- self._socket = None
+ self._status = CLOSED
self._buf = ""
self._hdr = ""
self._op_enc = OpEncoder()
@@ -325,7 +464,6 @@
self._frame_dec = FrameDecoder()
self._seg_dec = SegmentDecoder()
self._op_dec = OpDecoder()
- self._timeout = None
self._sasl = sasl.Client()
if self.connection.username:
@@ -343,6 +481,9 @@
self._sasl_encode = False
self._sasl_decode = False
+ def _reset(self):
+ self.connection._transport_connected = False
+
for ssn in self.connection.sessions.values():
for m in ssn.acked + ssn.unacked + ssn.incoming:
m._transfer_id = None
@@ -352,76 +493,40 @@
rcv.impending = rcv.received
rcv.linked = False
- @synchronized
- def wakeup(self):
- self.dispatch()
- self._selector.wakeup()
-
- def start(self):
- self._selector.register(self)
-
- def fileno(self):
- return self._socket.fileno()
-
- @synchronized
- def reading(self):
- return self._socket is not None
-
- @synchronized
- def writing(self):
- return self._socket is not None and self._buf
-
- @synchronized
- def timing(self):
- return self._timeout
+ def status(self):
+ return self._status
- @synchronized
- def readable(self):
- error = None
- recoverable = False
+ def write(self, data):
try:
- data = self._socket.recv(64*1024)
- if data:
- rawlog.debug("READ[%s]: %r", self.log_id, data)
- if self._sasl_decode:
- data = self._sasl.decode(data)
- else:
- rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
- error = "connection aborted"
- recoverable = True
- except socket.error, e:
- error = e
- recoverable = True
-
- if not error:
- try:
- if len(self._hdr) < 8:
- r = 8 - len(self._hdr)
- self._hdr += data[:r]
- data = data[r:]
-
- if len(self._hdr) == 8:
- self.do_header(self._hdr)
-
- self._frame_dec.write(data)
- self._seg_dec.write(*self._frame_dec.read())
- self._op_dec.write(*self._seg_dec.read())
- for op in self._op_dec.read():
- self.assign_id(op)
- opslog.debug("RCVD[%s]: %r", self.log_id, op)
- op.dispatch(self)
- except VersionError, e:
- error = e
- except:
- msg = compat.format_exc()
- error = msg
+ if self._sasl_decode:
+ data = self._sasl.decode(data)
- if error:
- self._error(error, recoverable)
- else:
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ opslog.debug("RCVD[%s]: %r", self.log_id, op)
+ op.dispatch(self)
self.dispatch()
+ except VersionError, e:
+ self.close(e)
+ except:
+ self.close(compat.format_exc())
- self.connection._waiter.notifyAll()
+ def close(self, e=None):
+ self._reset()
+ if e:
+ self.connection.error = (e,)
+ self._status = CLOSED
def assign_id(self, op):
if isinstance(op, Command):
@@ -429,40 +534,16 @@
op.id = sst.received
sst.received += 1
- @synchronized
- def writeable(self):
- try:
- n = self._socket.send(self._buf)
- rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
- self._buf = self._buf[n:]
- except socket.error, e:
- self._error(e, True)
- self.connection._waiter.notifyAll()
+ def pending(self):
+ return len(self._buf)
- @synchronized
- def timeout(self):
- self.dispatch()
- self.connection._waiter.notifyAll()
+ def read(self, n):
+ result = self._buf[:n]
+ self._buf = self._buf[n:]
+ return result
- def _error(self, err, recoverable):
- if self._socket is not None:
- self._socket.close()
- self.reset()
- if (recoverable and self.connection.reconnect and
- (self.connection.reconnect_limit is None or
- self.connection.reconnect_limit <= 0 or
- self._attempts <= self.connection.reconnect_limit)):
- if self._host > 0:
- delay = 0
- else:
- delay = self.connection.reconnect_delay
- self._timeout = time.time() + delay
- log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
- if delay > 0:
- log.warn("sleeping %s seconds" % delay)
- self._retrying = True
- else:
- self.connection.error = (err,)
+ def peek(self):
+ return self._buf
def write_op(self, op):
opslog.debug("SENT[%s]: %r", self.log_id, op)
@@ -507,6 +588,7 @@
def do_connection_open_ok(self, open_ok):
self._connected = True
self._sasl_decode = True
+ self.connection._transport_connected = True
def connection_heartbeat(self, hrt):
self.write_op(ConnectionHeartbeat())
@@ -522,8 +604,7 @@
# probably the right thing to do
def do_connection_close_ok(self, close_ok):
- self._socket.close()
- self.reset()
+ self.close()
def do_session_attached(self, atc):
pass
@@ -576,40 +657,18 @@
sst.session.error = (ex,)
def dispatch(self):
- try:
- if self._socket is None and self.connection._connected and not self._opening:
- self.connect()
- elif self._socket is not None and not self.connection._connected and not self._closing:
- self.disconnect()
-
- if self._connected and not self._closing:
- for ssn in self.connection.sessions.values():
- self.attach(ssn)
- self.process(ssn)
- except:
- msg = compat.format_exc()
- self.connection.error = (msg,)
+ if not self.connection._connected and not self._closing and self._status != CLOSED:
+ self.disconnect()
- def connect(self):
- try:
- # XXX: should make this non blocking
- if self._host == 0:
- self._attempts += 1
- host, port = self._hosts[self._host]
- if self._retrying:
- log.warn("trying: %s:%s", host, port)
- self._socket = connect(host, port)
- if self._retrying:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._timeout = None
- self._attempts = 0
- self._host = 0
- self._retrying = False
- self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
- self._opening = True
- except socket.error, e:
- self._host = (self._host + 1) % len(self._hosts)
- self._error(e, True)
+ if self._connected and not self._closing:
+ for ssn in self.connection.sessions.values():
+ self.attach(ssn)
+ self.process(ssn)
+
+ def open(self):
+ self._reset()
+ self._status = OPEN
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
def disconnect(self):
self.write_op(ConnectionClose(close_code.normal))
@@ -1023,7 +1082,6 @@
rcv.received += 1
log.debug("RCVD[%s]: %s", ssn.log_id, msg)
ssn.incoming.append(msg)
- self.connection._waiter.notifyAll()
def _decode(self, xfr):
dp = EMPTY_DP
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org