You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/27 01:38:16 UTC

svn commit: r916887 [3/4] - in /qpid/branches/qmf-devel0.7: ./ qpid/cpp/include/qmf/engine/ qpid/cpp/include/qpid/ qpid/cpp/include/qpid/sys/posix/ qpid/cpp/include/qpid/sys/windows/ qpid/cpp/rubygen/framing.0-10/ qpid/cpp/src/ qpid/cpp/src/qmf/engine/...

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-821809
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:911618-912022
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-821809
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:911618-912022
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/module.xml?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/module.xml (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/module.xml Sat Feb 27 00:38:13 2010
@@ -273,6 +273,7 @@
       <sysproperty key="broker" value="${broker}"/>
       <sysproperty key="broker.clean" value="${broker.clean}"/>
       <sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
+      <sysproperty key="broker.persistent" value="${broker.persistent}"/>
       <sysproperty key="broker.version" value="${broker.version}"/>
       <sysproperty key="broker.ready" value="${broker.ready}" />
       <sysproperty key="broker.stopped" value="${broker.stopped}" />

Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java Sat Feb 27 00:38:13 2010
@@ -133,8 +133,11 @@
     {
         // _logger.debug("public void testAsyncPingOk(int numPings): called");
 
+        // get prefill count to update the expected count
+        int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+
         // Ensure that at least one ping was requeusted.
-        if (numPings == 0)
+        if (numPings + preFill == 0)
         {
             _logger.error("Number of pings requested was zero.");
             fail("Number of pings requested was zero.");
@@ -149,16 +152,24 @@
         // String messageCorrelationId = perThreadSetup._correlationId;
         // _logger.debug("messageCorrelationId = " + messageCorrelationId);
 
+
         // Initialize the count and timing controller for the new correlation id.
         PerCorrelationId perCorrelationId = new PerCorrelationId();
         TimingController tc = getTimingController().getControllerForCurrentThread();
         perCorrelationId._tc = tc;
-        perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
+        perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
         perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
 
+        // Start the client that will have been paused due to preFill requirement.
+        // or if we have not yet started client because messages are sitting on broker. 
+        if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+        {
+            pingClient.start();
+        }
+
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
-        int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
+        int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId);
 
         // Check that all the replies were received and log a fail if they were not.
         if (numReplies < perCorrelationId._expectedCount)

Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Sat Feb 27 00:38:13 2010
@@ -104,4 +104,9 @@
             return _pingClientCount * _noOfConsumers;
         }
     }
+
+    public int getClientCount()
+    {
+        return _pingClientCount;
+    }
 }

Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java Sat Feb 27 00:38:13 2010
@@ -141,9 +141,65 @@
                 perThreadSetup._pingClient = new PingClient(testParameters);
                 perThreadSetup._pingClient.establishConnection(true, true);
             }
-            // Start the client connection
-            perThreadSetup._pingClient.start();
 
+            // Prefill the broker unless we are in consume only mode. 
+            int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+            if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+            {
+                // Manually set the correlation ID to 1. This is not ideal but it is the
+                // value that the main test loop will use.
+                perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, String.valueOf(perThreadSetup._pingClient.getClientCount()));
+
+                // Note with a large preFill and non-tx session the messages will be
+                // rapidly pushed in to the mina buffers. OOM's are a real risk here.
+                // Should perhaps consider using a TX session for the prefill.
+
+                long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
+
+                //  Only delay if we have consumers and a delayBeforeConsume
+                if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+                    && delayBeforeConsume > 0)
+                {
+
+                    boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+                    // Only do logging if in verbose mode.
+                    if (verbose)
+                    {
+                        if (delayBeforeConsume > 60000)
+                        {
+                            long minutes = delayBeforeConsume / 60000;
+                            long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
+                            long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
+                                _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+                        }
+                        else
+                        {
+                                _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+                        }
+                    }
+
+                    Thread.sleep(delayBeforeConsume);
+
+                    if (verbose)
+                    {
+                        _logger.info("Starting Test.");
+                    }
+                }
+
+                // We can't start the client's here as the test client has not yet been configured to receieve messages.
+                // only when the test method is executed will the correlationID map be set up and ready to consume
+                // the messages we have sent here.
+            }
+            else //Only start the consumer if we are not preFilling.
+            {
+                // Only start the consumer if we don't have messages waiting to be received.
+                // we need to set up the correlationID mapping first
+                if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+                {
+                    // Start the client connection
+                    perThreadSetup._pingClient.start();
+                }
+            }
             // Attach the per-thread set to the thread.
             threadSetup.set(perThreadSetup);
         }
@@ -157,7 +213,7 @@
      * Performs test fixture clean
      */
     public void threadTearDown()
-    {
+    {                                                                                       
         _logger.debug("public void threadTearDown(): called");
 
         try

Modified: qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Sat Feb 27 00:38:13 2010
@@ -324,6 +324,25 @@
     /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
     public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
 
+    /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */
+    public static final String PREFILL_PROPNAME = "preFill";
+
+    /** Defines the default value for the number of messages to prefill. 0,default, no messages. */
+    public static final int PREFILL_DEFAULT = 0;
+
+    /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */
+    public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume";
+
+    /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */
+    public static final long  DELAY_BEFORE_CONSUME = 0;
+
+    /** Holds the name of the property to get when no messasges should be sent. */
+    public static final String CONSUME_ONLY_PROPNAME = "consumeOnly";
+
+    /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+    public static final boolean CONSUME_ONLY_DEFAULT = false;
+
+
     /** Holds the default configuration properties. */
     public static ParsedProperties defaults = new ParsedProperties();
 
@@ -360,6 +379,9 @@
         defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
         defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
         defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+        defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
+        defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
+        defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);        
     }
 
     /** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -455,6 +477,24 @@
      */
     protected int _maxPendingSize;
 
+    /**
+     * Holds the number of messages to send during the setup phase, before the clients start consuming.
+     */
+    private Integer _preFill;
+
+    /**
+     * Holds the time in ms to wait after preFilling before starting thet test.
+     */
+    private Long _delayBeforeConsume;
+
+    /**
+     * Holds a boolean value of wither this test should just consume, i.e. skips
+     * sending messages, but still expects to receive the specified number.
+     * Use in conjuction with numConsumers=0 to fill the broker.
+     */
+    private boolean _consumeOnly;
+
+
     /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
@@ -588,6 +628,9 @@
         _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
         _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
         _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
+        _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
+        _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
+        _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
 
         // Check that one or more destinations were specified.
         if (_noOfDestinations < 1)
@@ -638,7 +681,10 @@
         }
 
         // Create the destinations to send pings to and receive replies from.
-        _replyDestination = _consumerSession[0].createTemporaryQueue();
+        if (_noOfConsumers > 0)
+        {
+            _replyDestination = _consumerSession[0].createTemporaryQueue();
+        }
         createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
 
         // Create the message producer only if instructed to.
@@ -871,6 +917,14 @@
         {
             _consumer = new MessageConsumer[_noOfConsumers];
 
+            // If we don't have consumers then ensure we have created the
+            // destination.   
+            if (_noOfConsumers == 0)
+            {
+                _producerSession.createConsumer(destination, selector,
+                                                NO_LOCAL_DEFAULT).close();
+            }
+
             for (int i = 0; i < _noOfConsumers; i++)
             {
                 // Create a consumer for the destination and set this pinger to listen to its messages.
@@ -980,6 +1034,11 @@
                         // When running in client ack mode, an ack is done instead of a commit, on the commit batch
                         // size boundaries.
                         long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+                        // _noOfConsumers can be set to 0 on the command line but we will not get here to
+                        // divide by 0 as this is executed by the onMessage code when a message is recevied.
+                        // no consumers means no message reception.
+
+
                         // log.debug("commitCount = " + commitCount);
 
                         if ((commitCount % _txBatchSize) == 0)
@@ -1014,6 +1073,7 @@
                 else
                 {
                     log.warn("Got unexpected message with correlationId: " + correlationID);
+                    log.warn("Map contains:" + perCorrelationIds.entrySet());
                 }
             }
             else
@@ -1037,13 +1097,18 @@
      * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
      * the correlation id.
      *
+     * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination
+     * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings.
+     *
+     * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.
+     *
      * @param message              The message to send. If this is null, one is generated.
      * @param numPings             The number of ping messages to send.
      * @param timeout              The timeout in milliseconds.
      * @param messageCorrelationId The message correlation id. If this is null, one is generated.
      *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
-     *         for all prematurely.
+     *         for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
      *
      * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
      * @throws InterruptedException When interrupted by a timeout
@@ -1051,6 +1116,16 @@
     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
         throws JMSException, InterruptedException
     {
+        return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId);
+    }
+
+    public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
+        throws JMSException, InterruptedException
+    {
+
+        // If we are runnning a consumeOnly test then don't send any messages
+
+
         /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
             + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
@@ -1071,29 +1146,41 @@
             // countdown needs to be done before the chained listener can be called.
             PerCorrelationId perCorrelationId = new PerCorrelationId();
 
-            perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
+            int totalPingsRequested = numPings + preFill;
+            perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
             perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
             // Set up the current time as the start time for pinging on the correlation id. This is used to determine
             // timeouts.
             perCorrelationId.timeOutStart = System.nanoTime();
 
-            // Send the specifed number of messages.
+            // Send the specifed number of messages for this test            
             pingNoWaitForReply(message, numPings, messageCorrelationId);
 
             boolean timedOut;
             boolean allMessagesReceived;
             int numReplies;
 
+            // We don't have a consumer so don't try and wait for the messages.
+            // this does mean that if the producerSession is !TXed then we may
+            // get to exit before all msgs have been received.
+            //
+            // Return the number of requested messages, this will let the test
+            // report a pass.
+            if (_noOfConsumers == 0)
+            {
+                return totalPingsRequested;
+            }
+
             do
             {
                 // Block the current thread until replies to all the messages are received, or it times out.
                 perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
                 // Work out how many replies were receieved.
-                numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+                numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount();
 
-                allMessagesReceived = numReplies == getExpectedNumPings(numPings);
+                allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested);
 
                 // log.debug("numReplies = " + numReplies);
                 // log.debug("allMessagesReceived = " + allMessagesReceived);
@@ -1108,7 +1195,7 @@
             }
             while (!timedOut && !allMessagesReceived);
 
-            if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
+            if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose)
             {
                 log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
             }
@@ -1146,6 +1233,12 @@
         /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
             + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
+        // If we are runnning a consumeOnly test then don't send any messages
+        if (_consumeOnly)
+        {
+            return;
+        }
+        
         if (message == null)
         {
             message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
@@ -1667,6 +1760,10 @@
     /**
      * Calculates how many pings are expected to be received for the given number sent.
      *
+     * Note : that if you have set noConsumers to 0 then this will also return 0
+     * in the case of PubSub testing. This is correct as without consumers there
+     * will be no-one to receive the sent messages so they will be unable to respond. 
+     *
      * @param numpings The number of pings that will be sent.
      *
      * @return The number that should be received, for the test to pass.

Propchange: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:443187-707694
 /qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:805429-821809
-/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:911618-912022
+/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Sat Feb 27 00:38:13 2010
@@ -28,12 +28,16 @@
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TopicSubscriber;
 
 import junit.framework.Assert;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 import java.util.concurrent.locks.ReentrantLock;
@@ -154,6 +158,7 @@
     {
         Message send = producerSession.createTextMessage("Message " + msg);
         send.setBooleanProperty("first", first);
+        send.setStringProperty("testprop", "TimeToLiveTest");
         send.setLongProperty("TTL", producer.getTimeToLive());
         return send;
     }
@@ -206,5 +211,160 @@
         producerSession.close();
         producerConnection.close();
     }
+    
+    public void testPassiveTTLwithDurableSubscription() throws Exception
+    {
+        //Create Client 1
+        Connection clientConnection = getConnection();
+        
+        Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        // Create and close the durable subscriber
+        AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+        TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
+        durableSubscriber.close();
+        
+        //Create Producer
+        Connection producerConnection = getConnection();
+
+        producerConnection.start();
+
+        // Move to a Transacted session to ensure that all messages have been delivered to broker before
+        // we start waiting for TTL
+        Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+        MessageProducer producer = producerSession.createProducer(topic);
+
+        //Set TTL
+        int msg = 0;
+        producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
+
+        producer.setTimeToLive(TIME_TO_LIVE);
+
+        for (; msg < MSG_COUNT - 2; msg++)
+        {
+            producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+        }
+
+        //Reset TTL
+        producer.setTimeToLive(0L);
+        producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+
+        producerSession.commit();
+        
+        //resubscribe
+        durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName());
+
+        // Ensure we sleep the required amount of time.
+        ReentrantLock waitLock = new ReentrantLock();
+        Condition wait = waitLock.newCondition();
+        final long MILLIS = 1000000L;
+
+        long waitTime = TIME_TO_LIVE * MILLIS;
+        while (waitTime > 0)
+        {
+            try
+            {
+                waitLock.lock();
+
+                waitTime = wait.awaitNanos(waitTime);
+            }
+            catch (InterruptedException e)
+            {
+                //Stop if we are interrupted
+                fail(e.getMessage());
+            }
+            finally
+            {
+                waitLock.unlock();
+            }
+
+        }
+
+        clientConnection.start();
+
+        //Receive Message 0
+        // Set 5s receive time for messages we expect to receive.
+        Message receivedFirst = durableSubscriber.receive(5000);
+        Message receivedSecond = durableSubscriber.receive(5000);
+        Message receivedThird = durableSubscriber.receive(1000);
+        
+        // Log the messages to help diagnosis incase of failure
+        _logger.info("First:"+receivedFirst);
+        _logger.info("Second:"+receivedSecond);
+        _logger.info("Third:"+receivedThird);
+
+        // Only first and last messages sent should survive expiry
+        Assert.assertNull("More messages received", receivedThird); 
+
+        Assert.assertNotNull("First message not received", receivedFirst);
+        Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
+        Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
+
+        Assert.assertNotNull("Final message not received", receivedSecond);
+        Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
+        Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
+
+        clientSession.unsubscribe(getTestQueueName());
+        clientConnection.close();
+
+        producerConnection.close();
+    }
+
+    public void testActiveTTLwithDurableSubscription() throws Exception
+    {
+        //Create Client 1
+        Connection clientConnection = getConnection();
+        Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        // Create and close the durable subscriber
+        AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+        TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false);
+        durableSubscriber.close();
+        
+        //Create Producer
+        Connection producerConnection = getConnection();
+        AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(topic);
+        producer.setTimeToLive(1000L);
+
+        // send Messages
+        for(int i = 0; i < MSG_COUNT; i++)
+        {
+            producer.send(producerSession.createTextMessage("Message: "+i));
+        }
+        long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+
+        // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
+        long messageCount = MSG_COUNT;
+        long lastPass;
+        AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription");
+        do
+        {
+            lastPass = messageCount;
+            Thread.sleep(100);
+            messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue);
+
+            // If we have received messages in the last loop then extend the timeout time.
+            // if we get messages stuck that are not expiring then the failureTime will occur
+            // failing the test. This will help with the scenario when the broker does not
+            // have enough CPU cycles to process the TTLs.
+            if (lastPass != messageCount)
+            {
+                failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+            }
+        }
+        while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+        assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+        
+        clientSession.unsubscribe("MyDurableTTLSubscription");
+        clientSession.close();
+        clientConnection.close();
+    }
 
 }

Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Sat Feb 27 00:38:13 2010
@@ -38,7 +38,7 @@
      */
     public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
     {
-        if (!isBroker08())
+        if (isBrokerStorePersistent() || !isBroker08())
         {
             TopicConnectionFactory factory = getConnectionFactory();
             Topic topic = (Topic) getInitialContext().lookup(_topicName);
@@ -102,7 +102,7 @@
      */
     public void testDurSubRestoresMessageSelector() throws Exception
     {
-        if (!isBroker08())
+        if (isBrokerStorePersistent() || !isBroker08())
         {
             TopicConnectionFactory factory = getConnectionFactory();
             Topic topic = (Topic) getInitialContext().lookup(_topicName);

Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Sat Feb 27 00:38:13 2010
@@ -115,10 +115,26 @@
         _logger.info("Close connection");
         con.close();
     }
+    
+    public void testDurabilityNOACK() throws Exception
+    {
+        durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false);
+    }
 
     public void testDurabilityAUTOACK() throws Exception
     {
-        durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+        durabilityImpl(Session.AUTO_ACKNOWLEDGE, false);
+    }
+    
+    public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception
+    {
+        if(!isBrokerStorePersistent())
+        {
+            System.out.println("The broker store is not persistent, skipping this test.");
+            return;
+        }
+        
+        durabilityImpl(Session.AUTO_ACKNOWLEDGE, true);
     }
 
     public void testDurabilityNOACKSessionPerConnection() throws Exception
@@ -131,56 +147,85 @@
         durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
     }
 
-    private void durabilityImpl(int ackMode) throws Exception
-    {
+    private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception
+    {        
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con, "MyTopic");
         Session session1 = con.createSession(false, ackMode);
         MessageConsumer consumer1 = session1.createConsumer(topic);
 
-        Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session sessionProd = con.createSession(false, ackMode);
         MessageProducer producer = sessionProd.createProducer(topic);
 
-        Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        Session session2 = con.createSession(false, ackMode);
         TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
 
         con.start();
 
+        //send message A and check both consumers receive
         producer.send(session1.createTextMessage("A"));
 
         Message msg;
+        _logger.info("Receive message on consumer 1 :expecting A");
         msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
+        _logger.info("Receive message on consumer 2 :expecting A");
         msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        consumer2.close();
-        session2.close();
-
+        //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK)
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
         msg = consumer1.receive(500);
         assertNotNull("Consumer 1 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+        assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(500);
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
+        consumer2.close();
+        session2.close();
+        
+        //Send message C, then connect consumer 3 to durable subscription and get
+        //message B if not using NO_ACK, then receive C with consumer 1 and 3
+        producer.send(session1.createTextMessage("C"));
+
         Session session3 = con.createSession(false, ackMode);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        _logger.info("Receive message on consumer 3 :expecting B");
+        if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+        {
+            //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+            //when we didn't call receive() to get it before closing consumer 2
+        }
+        else
+        {
+            _logger.info("Receive message on consumer 3 :expecting B");
+            msg = consumer3.receive(500);
+            assertNotNull("Consumer 3 should get message 'B'.", msg);
+            assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
+        }
+
+        _logger.info("Receive message on consumer 1 :expecting C");
+        msg = consumer1.receive(500);
+        assertNotNull("Consumer 1 should get message 'C'.", msg);
+        assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+        _logger.info("Receive message on consumer 3 :expecting C");
         msg = consumer3.receive(500);
-        assertNotNull("Consumer 3 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        assertNotNull("Consumer 3 should get message 'C'.", msg);
+        assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
         msg = consumer3.receive(500);
         assertNull("There should be no more messages for consumption on consumer3.", msg);
@@ -191,6 +236,18 @@
         session3.unsubscribe("MySubscription");
 
         con.close();
+        
+        if(restartBroker)
+        {
+            try
+            {
+                restartBroker();
+            }
+            catch (Exception e)
+            {
+                fail("Error restarting the broker");
+            }
+        }
     }
 
     private void durabilityImplSessionPerConnection(int ackMode) throws Exception
@@ -211,7 +268,7 @@
         con1.start();
         Session session1 = con1.createSession(false, ackMode);
 
-        MessageConsumer consumer1 = session0.createConsumer(topic);
+        MessageConsumer consumer1 = session1.createConsumer(topic);
 
         // Create consumer 2.
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
@@ -232,37 +289,60 @@
         msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should have been received",msg);
         assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(500);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull("There should be no more messages for consumption on consumer2.", msg);
 
+        // Send message and receive on consumer 1.
+        producer.send(session0.createTextMessage("B"));
+
+        _logger.info("Receive message on consumer 1 :expecting B");
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertEquals("B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
+        assertEquals(null, msg);
+        
         // Detach the durable subscriber.
         consumer2.close();
         session2.close();
         con2.close();
+        
+        // Send message C and receive on consumer 1
+        producer.send(session0.createTextMessage("C"));
 
-        // Send message and receive on open consumer.
-        producer.send(session0.createTextMessage("B"));
-
-        _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
-        assertEquals("B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting C");
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertEquals("C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+        // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
+        // and also gets message C sent after it was disconnected.
         AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
         con3.start();
         Session session3 = con3.createSession(false, ackMode);
 
         TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        _logger.info("Receive message on consumer 3 :expecting B");
-        msg = consumer3.receive(500);
-        assertNotNull("Consumer 3 should get message 'B'.", msg);
-        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+        {
+            //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+            //when we didn't call receive() to get it before closing consumer 2
+        }
+        else
+        {
+            _logger.info("Receive message on consumer 3 :expecting B");
+            msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+            assertEquals("B", ((TextMessage) msg).getText());
+        }
+        
+        _logger.info("Receive message on consumer 3 :expecting C");
+        msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Consumer 3 should get message 'C'.", msg);
+        assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
-        msg = consumer3.receive(500);
+        msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull("There should be no more messages for consumption on consumer3.", msg);
 
         consumer1.close();

Modified: qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Sat Feb 27 00:38:13 2010
@@ -166,6 +166,7 @@
     private static final String TEST_OUTPUT = "test.output";
     private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
     private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
+    private static final String BROKER_PERSITENT = "broker.persistent";
 
     // values
     protected static final String JAVA = "java";
@@ -187,6 +188,7 @@
     private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
     private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
     private String _output = System.getProperty(TEST_OUTPUT);
+    protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
 
     private static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
     protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
@@ -929,6 +931,11 @@
     {
         return !_broker.equals("vm");
     }
+    
+    protected boolean isBrokerStorePersistent()
+    {
+        return _brokerPersistent;
+    }
 
     public void restartBroker() throws Exception
     {

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/CPPExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/CPPExcludes:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/Excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/Excludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/Excludes:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes Sat Feb 27 00:38:13 2010
@@ -1,4 +1,3 @@
-org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
 // Those tests are not finished
 org.apache.qpid.test.testcases.TTLTest#*
 org.apache.qpid.test.testcases.FailoverTest#*
@@ -18,3 +17,5 @@
 org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
 org.apache.qpid.server.queue.ModelTest#*
 
+//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#*

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/JavaExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/JavaExcludes:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes Sat Feb 27 00:38:13 2010
@@ -1,4 +1,3 @@
-org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
 // Those tests are not finished
 org.apache.qpid.test.testcases.TTLTest#*
 org.apache.qpid.test.testcases.FailoverTest#*

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaStandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
-/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:911618-912022
+/qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/JavaTransientExcludes Sat Feb 27 00:38:13 2010
@@ -1 +1,3 @@
+//These tests require a persistent store
 org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#*

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/clean-dir
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/clean-dir:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir:805429-821809
-/qpid/trunk/qpid/java/test-profiles/clean-dir:911618-912022
+/qpid/trunk/qpid/java/test-profiles/clean-dir:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/default.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/default.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/default.testprofile:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile Sat Feb 27 00:38:13 2010
@@ -7,3 +7,4 @@
 qpid.amqp.version=0-9
 profile.excludes=JavaStandaloneExcludes
 broker.clean.between.tests=true
+broker.persistent=true

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java-derby.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/java.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
-/qpid/trunk/qpid/java/test-profiles/java.testprofile:911618-912022
+/qpid/trunk/qpid/java/test-profiles/java.testprofile:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/log4j-test.xml:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml:805429-821809
-/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:911618-912022
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:443187-726139
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/test-provider.properties:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties:805429-821809
-/qpid/trunk/qpid/java/test-profiles/test-provider.properties:911618-912022
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:911618-916854

Propchange: qpid/branches/qmf-devel0.7/qpid/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
 /qpid/branches/0.5.x-dev/qpid/python:892761,894875
 /qpid/branches/java-network-refactor/qpid/python:805429-825319
 /qpid/branches/qmfv2/qpid/python:902858,902894
-/qpid/trunk/qpid/python:911618-912022
+/qpid/trunk/qpid/python:911618-916854

Modified: qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test (original)
+++ qpid/branches/qmf-devel0.7/qpid/python/qpid-python-test Sat Feb 27 00:38:13 2010
@@ -20,7 +20,7 @@
 
 # TODO: summarize, test harness preconditions (e.g. broker is alive)
 
-import logging, optparse, os, struct, sys, traceback, types
+import logging, optparse, os, struct, sys, time, traceback, types
 from fnmatch import fnmatchcase as match
 from getopt import GetoptError
 from logging import getLogger, StreamHandler, Formatter, Filter, \
@@ -60,6 +60,8 @@
                   help="ignore tests matching patterns in IFILE")
 parser.add_option("-H", "--halt-on-error", action="store_true", default=False,
                   dest="hoe", help="halt if an error is encountered")
+parser.add_option("-t", "--time", action="store_true", default=False,
+                  help="report timing information on test run")
 parser.add_option("-D", "--define", metavar="DEFINE", dest="defines",
                   action="append", default=[], help="define test parameters")
 
@@ -165,7 +167,9 @@
             "start": (34,),
             "total": (34,),
             "ignored": (33,),
-            "selected": (34,)}
+            "selected": (34,),
+            "elapsed": (34,),
+            "average": (34,)}
 
 COLORIZE = is_smart()
 
@@ -525,6 +529,7 @@
 passed = 0
 failed = 0
 skipped = 0
+start = time.time()
 for t in filtered:
   if list_only:
     print t.name()
@@ -538,6 +543,7 @@
       failed += 1
       if opts.hoe:
         break
+end = time.time()
 
 run = passed + failed
 
@@ -558,16 +564,22 @@
     skip = "skip"
   else:
     skip = "pass"
-  print colorize("Totals:", 1), \
-      colorize_word("total", "%s tests" % total) + ",", \
-      colorize_word(_pass, "%s passed" % passed) + ",", \
-      colorize_word(skip, "%s skipped" % skipped) + ",", \
-      colorize_word(ign, "%s ignored" % len(ignored)) + ",", \
-      colorize_word(outcome, "%s failed" % failed),
+  print colorize("Totals:", 1),
+  totals = [colorize_word("total", "%s tests" % total),
+            colorize_word(_pass, "%s passed" % passed),
+            colorize_word(skip, "%s skipped" % skipped),
+            colorize_word(ign, "%s ignored" % len(ignored)),
+            colorize_word(outcome, "%s failed" % failed)]
+  print ", ".join(totals),
   if opts.hoe and failed > 0:
     print " -- (halted after %s)" % run
   else:
     print
+  if opts.time and run > 0:
+    print colorize("Timing:", 1),
+    timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)),
+              colorize_word("average", "%.2fs average" % ((end - start)/run))]
+    print ", ".join(timing)
 
 if failed or skipped:
   sys.exit(1)

Modified: qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/python/qpid/messaging/driver.py Sat Feb 27 00:38:13 2010
@@ -283,6 +283,11 @@
 SUBJECT = "qpid.subject"
 TO = "qpid.to"
 
+CLOSED = "CLOSED"
+READ_ONLY = "READ_ONLY"
+WRITE_ONLY = "WRITE_ONLY"
+OPEN = "OPEN"
+
 class Driver:
 
   def __init__(self, connection):
@@ -290,24 +295,158 @@
     self.log_id = "%x" % id(self.connection)
     self._lock = self.connection._lock
 
-    self._in = LinkIn()
-    self._out = LinkOut()
-
     self._selector = Selector.default()
     self._attempts = 0
     self._hosts = [(self.connection.host, self.connection.port)] + \
         self.connection.backups
     self._host = 0
     self._retrying = False
+    self._socket = None
+
+    self._timeout = None
+
+    self.engine = None
+
+  @synchronized
+  def wakeup(self):
+    self.dispatch()
+    self._selector.wakeup()
+
+  def start(self):
+    self._selector.register(self)
+
+  def fileno(self):
+    return self._socket.fileno()
+
+  @synchronized
+  def reading(self):
+    return self._socket is not None
+
+  @synchronized
+  def writing(self):
+    return self._socket is not None and self.engine.pending()
+
+  @synchronized
+  def timing(self):
+    return self._timeout
+
+  @synchronized
+  def readable(self):
+    try:
+      data = self._socket.recv(64*1024)
+      if data:
+        rawlog.debug("READ[%s]: %r", self.log_id, data)
+        self.engine.write(data)
+      else:
+        self.close_engine()
+    except socket.error, e:
+      self.close_engine(e)
+
+    self.update_status()
+
+    self.connection._waiter.notifyAll()
+
+  def close_engine(self, e=None):
+    if e is None:
+      e = "connection aborted"
+
+    if (self.connection.reconnect and
+        (self.connection.reconnect_limit is None or
+         self.connection.reconnect_limit <= 0 or
+         self._attempts <= self.connection.reconnect_limit)):
+      if self._host > 0:
+        delay = 0
+      else:
+        delay = self.connection.reconnect_delay
+      self._timeout = time.time() + delay
+      log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
+      if delay > 0:
+        log.warn("sleeping %s seconds" % delay)
+      self._retrying = True
+      self.engine.close()
+    else:
+      self.engine.close(e)
+
+  def update_status(self):
+    status = self.engine.status()
+    return getattr(self, "st_%s" % status.lower())()
+
+  def st_closed(self):
+    # XXX: this log statement seems to sometimes hit when the socket is not connected
+    # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
+    self._socket.close()
+    self._socket = None
+    self.engine = None
+    return True
+
+  def st_open(self):
+    return False
+
+  @synchronized
+  def writeable(self):
+    notify = False
+    try:
+      n = self._socket.send(self.engine.peek())
+      sent = self.engine.read(n)
+      rawlog.debug("SENT[%s]: %r", self.log_id, sent)
+    except socket.error, e:
+      self.close_engine(e)
+      notify = True
+
+    if self.update_status() or notify:
+      self.connection._waiter.notifyAll()
+
+  @synchronized
+  def timeout(self):
+    self.dispatch()
+    self.connection._waiter.notifyAll()
 
-    self.reset()
+  def dispatch(self):
+    try:
+      if self._socket is None:
+        if self.connection._connected:
+          self.connect()
+      else:
+        self.engine.dispatch()
+    except:
+      # XXX: Does socket get leaked if this occurs?
+      msg = compat.format_exc()
+      self.connection.error = (msg,)
 
-  def reset(self):
-    self._opening = False
+  def connect(self):
+    try:
+      # XXX: should make this non blocking
+      if self._host == 0:
+        self._attempts += 1
+      host, port = self._hosts[self._host]
+      if self._retrying:
+        log.warn("trying: %s:%s", host, port)
+      self.engine = Engine(self.connection)
+      self.engine.open()
+      rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
+      self._socket = connect(host, port)
+      if self._retrying:
+        log.warn("reconnect succeeded: %s:%s", host, port)
+      self._timeout = None
+      self._attempts = 0
+      self._host = 0
+      self._retrying = False
+    except socket.error, e:
+      self._host = (self._host + 1) % len(self._hosts)
+      self.close_engine(e)
+
+class Engine:
+
+  def __init__(self, connection):
+    self.connection = connection
+    self.log_id = "%x" % id(self.connection)
     self._closing = False
     self._connected = False
     self._attachments = {}
 
+    self._in = LinkIn()
+    self._out = LinkOut()
+
     self._channel_max = 65536
     self._channels = 0
     self._sessions = {}
@@ -316,7 +455,7 @@
 
     self.address_cache = Cache(options.get("address_ttl", 60))
 
-    self._socket = None
+    self._status = CLOSED
     self._buf = ""
     self._hdr = ""
     self._op_enc = OpEncoder()
@@ -325,7 +464,6 @@
     self._frame_dec = FrameDecoder()
     self._seg_dec = SegmentDecoder()
     self._op_dec = OpDecoder()
-    self._timeout = None
 
     self._sasl = sasl.Client()
     if self.connection.username:
@@ -343,6 +481,9 @@
     self._sasl_encode = False
     self._sasl_decode = False
 
+  def _reset(self):
+    self.connection._transport_connected = False
+
     for ssn in self.connection.sessions.values():
       for m in ssn.acked + ssn.unacked + ssn.incoming:
         m._transfer_id = None
@@ -352,76 +493,40 @@
         rcv.impending = rcv.received
         rcv.linked = False
 
-  @synchronized
-  def wakeup(self):
-    self.dispatch()
-    self._selector.wakeup()
-
-  def start(self):
-    self._selector.register(self)
-
-  def fileno(self):
-    return self._socket.fileno()
-
-  @synchronized
-  def reading(self):
-    return self._socket is not None
-
-  @synchronized
-  def writing(self):
-    return self._socket is not None and self._buf
-
-  @synchronized
-  def timing(self):
-    return self._timeout
+  def status(self):
+    return self._status
 
-  @synchronized
-  def readable(self):
-    error = None
-    recoverable = False
+  def write(self, data):
     try:
-      data = self._socket.recv(64*1024)
-      if data:
-        rawlog.debug("READ[%s]: %r", self.log_id, data)
-        if self._sasl_decode:
-          data = self._sasl.decode(data)
-      else:
-        rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
-        error = "connection aborted"
-        recoverable = True
-    except socket.error, e:
-      error = e
-      recoverable = True
-
-    if not error:
-      try:
-        if len(self._hdr) < 8:
-          r = 8 - len(self._hdr)
-          self._hdr += data[:r]
-          data = data[r:]
-
-          if len(self._hdr) == 8:
-            self.do_header(self._hdr)
-
-        self._frame_dec.write(data)
-        self._seg_dec.write(*self._frame_dec.read())
-        self._op_dec.write(*self._seg_dec.read())
-        for op in self._op_dec.read():
-          self.assign_id(op)
-          opslog.debug("RCVD[%s]: %r", self.log_id, op)
-          op.dispatch(self)
-      except VersionError, e:
-        error = e
-      except:
-        msg = compat.format_exc()
-        error = msg
+      if self._sasl_decode:
+        data = self._sasl.decode(data)
 
-    if error:
-      self._error(error, recoverable)
-    else:
+      if len(self._hdr) < 8:
+        r = 8 - len(self._hdr)
+        self._hdr += data[:r]
+        data = data[r:]
+
+        if len(self._hdr) == 8:
+          self.do_header(self._hdr)
+
+      self._frame_dec.write(data)
+      self._seg_dec.write(*self._frame_dec.read())
+      self._op_dec.write(*self._seg_dec.read())
+      for op in self._op_dec.read():
+        self.assign_id(op)
+        opslog.debug("RCVD[%s]: %r", self.log_id, op)
+        op.dispatch(self)
       self.dispatch()
+    except VersionError, e:
+      self.close(e)
+    except:
+      self.close(compat.format_exc())
 
-    self.connection._waiter.notifyAll()
+  def close(self, e=None):
+    self._reset()
+    if e:
+      self.connection.error = (e,)
+    self._status = CLOSED
 
   def assign_id(self, op):
     if isinstance(op, Command):
@@ -429,40 +534,16 @@
       op.id = sst.received
       sst.received += 1
 
-  @synchronized
-  def writeable(self):
-    try:
-      n = self._socket.send(self._buf)
-      rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
-      self._buf = self._buf[n:]
-    except socket.error, e:
-      self._error(e, True)
-      self.connection._waiter.notifyAll()
+  def pending(self):
+    return len(self._buf)
 
-  @synchronized
-  def timeout(self):
-    self.dispatch()
-    self.connection._waiter.notifyAll()
+  def read(self, n):
+    result = self._buf[:n]
+    self._buf = self._buf[n:]
+    return result
 
-  def _error(self, err, recoverable):
-    if self._socket is not None:
-      self._socket.close()
-    self.reset()
-    if (recoverable and self.connection.reconnect and
-        (self.connection.reconnect_limit is None or
-         self.connection.reconnect_limit <= 0 or
-         self._attempts <= self.connection.reconnect_limit)):
-      if self._host > 0:
-        delay = 0
-      else:
-        delay = self.connection.reconnect_delay
-      self._timeout = time.time() + delay
-      log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
-      if delay > 0:
-        log.warn("sleeping %s seconds" % delay)
-      self._retrying = True
-    else:
-      self.connection.error = (err,)
+  def peek(self):
+    return self._buf
 
   def write_op(self, op):
     opslog.debug("SENT[%s]: %r", self.log_id, op)
@@ -507,6 +588,7 @@
   def do_connection_open_ok(self, open_ok):
     self._connected = True
     self._sasl_decode = True
+    self.connection._transport_connected = True
 
   def connection_heartbeat(self, hrt):
     self.write_op(ConnectionHeartbeat())
@@ -522,8 +604,7 @@
     # probably the right thing to do
 
   def do_connection_close_ok(self, close_ok):
-    self._socket.close()
-    self.reset()
+    self.close()
 
   def do_session_attached(self, atc):
     pass
@@ -576,40 +657,18 @@
     sst.session.error = (ex,)
 
   def dispatch(self):
-    try:
-      if self._socket is None and self.connection._connected and not self._opening:
-        self.connect()
-      elif self._socket is not None and not self.connection._connected and not self._closing:
-        self.disconnect()
-
-      if self._connected and not self._closing:
-        for ssn in self.connection.sessions.values():
-          self.attach(ssn)
-          self.process(ssn)
-    except:
-      msg = compat.format_exc()
-      self.connection.error = (msg,)
+    if not self.connection._connected and not self._closing and self._status != CLOSED:
+      self.disconnect()
 
-  def connect(self):
-    try:
-      # XXX: should make this non blocking
-      if self._host == 0:
-        self._attempts += 1
-      host, port = self._hosts[self._host]
-      if self._retrying:
-        log.warn("trying: %s:%s", host, port)
-      self._socket = connect(host, port)
-      if self._retrying:
-        log.warn("reconnect succeeded: %s:%s", host, port)
-      self._timeout = None
-      self._attempts = 0
-      self._host = 0
-      self._retrying = False
-      self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
-      self._opening = True
-    except socket.error, e:
-      self._host = (self._host + 1) % len(self._hosts)
-      self._error(e, True)
+    if self._connected and not self._closing:
+      for ssn in self.connection.sessions.values():
+        self.attach(ssn)
+        self.process(ssn)
+
+  def open(self):
+    self._reset()
+    self._status = OPEN
+    self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
 
   def disconnect(self):
     self.write_op(ConnectionClose(close_code.normal))
@@ -1023,7 +1082,6 @@
     rcv.received += 1
     log.debug("RCVD[%s]: %s", ssn.log_id, msg)
     ssn.incoming.append(msg)
-    self.connection._waiter.notifyAll()
 
   def _decode(self, xfr):
     dp = EMPTY_DP



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org