You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC

svn commit: r1299257 [23/26] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/ broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/ broker-plugins/access-control/src/main/java/org/apache/qpid/serve...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid.management.jmx;
 
-import javax.jms.Connection;
-
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
 import org.apache.qpid.management.common.mbeans.ManagedConnection;
 
+import javax.jms.Connection;
+
 /**
  * Test generation of message statistics.
  */
@@ -162,29 +162,7 @@ public class MessageStatisticsTest exten
         assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived());
         assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived());
     }
-    
-    /**
-     * Test message peak rate generation.
-     */
-    public void testMessagePeakRates() throws Exception
-    {
-        sendUsing(_test, 2, 10);
-        Thread.sleep(10000);
-        sendUsing(_dev, 4, 10);
-        Thread.sleep(10000);
-        
-        ManagedBroker test = _jmxUtils.getManagedBroker("test");
-        ManagedBroker dev = _jmxUtils.getManagedBroker("development");
-        
-        assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate());
-        assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate());
-        assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate());
-        assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate());
 
-        assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate());
-        assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate());
-    }
-    
     /**
      * Test message totals when a vhost has its statistics reset
      */

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java Sat Mar 10 19:22:10 2012
@@ -20,13 +20,6 @@
  */
 package org.apache.qpid.management.jmx;
 
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
@@ -37,6 +30,13 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
 /**
  * Test generation of message statistics.
  */

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java Sat Mar 10 19:22:10 2012
@@ -20,18 +20,17 @@
  */
 package org.apache.qpid.server;
 
-import org.apache.qpid.server.logging.AbstractTestLogging;
-import org.apache.qpid.util.LogMonitor;
-import org.apache.log4j.Logger;
+import junit.framework.AssertionFailedError;
 import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 
-import java.util.List;
-
-import junit.framework.AssertionFailedError;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.util.LogMonitor;
 
 import javax.jms.Connection;
-import javax.jms.Session;
 import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.List;
 
 /**
  * Series of tests to validate the external Java broker starts up as expected.

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.configuration;
 
 import org.apache.commons.configuration.ConfigurationException;
+
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
@@ -32,7 +33,7 @@ import org.apache.qpid.test.utils.QpidBr
  */
 public class ServerConfigurationFileTest extends QpidBrokerTestCase
 {
-    ServerConfiguration _serverConfig;
+    private ServerConfiguration _serverConfig;
 
     public void setUp() throws ConfigurationException
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Sat Mar 10 19:22:10 2012
@@ -21,20 +21,8 @@
 
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.client.AMQHeadersExchange;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.client.AMQQueue;
@@ -47,6 +35,18 @@ import org.apache.qpid.test.utils.QpidBr
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class ReturnUnroutableMandatoryMessageTest extends QpidBrokerTestCase implements ExceptionListener
 {
     private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
@@ -241,7 +241,7 @@ public class ReturnUnroutableMandatoryMe
         con2.start();
 
         MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
-        MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic);
+        MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic, false, true);
 
         // First test - should be routed
         _logger.info("Sending non-mandatory message");

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java Sat Mar 10 19:22:10 2012
@@ -20,11 +20,8 @@
  */
 package org.apache.qpid.server.failover;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
@@ -32,15 +29,17 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionListener
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverMethodTest.class);
     private CountDownLatch _failoverComplete = new CountDownLatch(1);
-    protected static final Logger _logger = LoggerFactory.getLogger(FailoverMethodTest.class);
-
-
+    private final int _freePortWithNoBroker = findFreePort();
 
     /**
      * Test that the round robin method has the correct delays.
@@ -53,8 +52,8 @@ public class FailoverMethodTest extends 
         //note: The first broker has no connect delay and the default 1 retry
         //        while the tcp:localhost broker has 3 retries with a 2s connect delay
         String connectionString = "amqp://guest:guest@/test?brokerlist=" +
-                                  "'tcp://:" + getPort() +
-                                  ";tcp://localhost:5670?connectdelay='2000',retries='3''";
+                                  "'tcp://localhost:" + getPort() +
+                                  ";tcp://localhost:" + _freePortWithNoBroker + "?connectdelay='2000',retries='3''";
 
         AMQConnectionURL url = new AMQConnectionURL(connectionString);
 
@@ -65,7 +64,9 @@ public class FailoverMethodTest extends 
 
             connection.setExceptionListener(this);
 
+            LOGGER.debug("Stopping broker");
             stopBroker();
+            LOGGER.debug("Stopped broker");
 
             _failoverComplete.await(30, TimeUnit.SECONDS);
             assertEquals("failoverLatch was not decremented in given timeframe",
@@ -109,7 +110,9 @@ public class FailoverMethodTest extends 
 
             connection.setExceptionListener(this);
 
+            LOGGER.debug("Stopping broker");
             stopBroker();
+            LOGGER.debug("Stopped broker");
 
             _failoverComplete.await(30, TimeUnit.SECONDS);
             assertEquals("failoverLatch was not decremented in given timeframe",
@@ -138,18 +141,6 @@ public class FailoverMethodTest extends 
         }
     }
 
-    public void onException(JMSException e)
-    {
-        if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
-        {
-            _logger.debug("Received AMQDisconnectedException");
-            _failoverComplete.countDown();
-        }
-        else
-        {
-            _logger.error("Unexpected underlying exception", e.getLinkedException());
-        }
-    }
 
     /**
      * Test that setting 'nofailover' as the failover policy does not result in
@@ -200,13 +191,11 @@ public class FailoverMethodTest extends 
                     }
                     catch (Exception e)
                     {
-                        System.err.println(e.getMessage());
-                        e.printStackTrace();
+                        LOGGER.error("Exception whilst starting broker", e);
                     }
                 }
             });
 
-
             brokerStart.start();
             long start = System.currentTimeMillis();
 
@@ -260,11 +249,17 @@ public class FailoverMethodTest extends 
         }
     }
 
-    public void stopBroker(int port) throws Exception
+    @Override
+    public void onException(JMSException e)
     {
-        if (isBrokerPresent(port))
+        if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
+        {
+            LOGGER.debug("Received AMQDisconnectedException");
+            _failoverComplete.countDown();
+        }
+        else
         {
-            super.stopBroker(port);
+            LOGGER.error("Unexpected underlying exception", e.getLinkedException());
         }
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Sat Mar 10 19:22:10 2012
@@ -22,14 +22,15 @@
 package org.apache.qpid.server.failure;
 
 import junit.framework.TestCase;
-import org.apache.qpid.test.utils.QpidClientConnectionHelper;
-import org.apache.qpid.client.failover.FailoverException;
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidClientConnectionHelper;
 
-import javax.jms.JMSException;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import java.io.IOException;
 
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java Sat Mar 10 19:22:10 2012
@@ -20,14 +20,8 @@
  */
 package org.apache.qpid.server.logging;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.commons.configuration.ConfigurationException;
+
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -35,6 +29,13 @@ import org.apache.qpid.server.util.Inter
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.util.LogMonitor;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * Abstract superclass for logging test set up and utility methods.
  *
@@ -46,7 +47,7 @@ public class AbstractTestLogging extends
     public static final String TEST_LOG_PREFIX = "MESSAGE";
     protected LogMonitor _monitor;
 
-    InternalBrokerBaseCase _configLoader;
+    private InternalBrokerBaseCase _configLoader;
 
     @Override
     public void setUp() throws Exception

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -18,18 +18,17 @@
  */
 package org.apache.qpid.server.logging;
 
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.security.acl.AbstractACLTestCase;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.util.List;
+
 /**
  * ACL version 2/3 file testing to verify that ACL actor logging works correctly.
  * 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java Sat Mar 10 19:22:10 2012
@@ -20,10 +20,6 @@
 */
 package org.apache.qpid.server.logging;
 
-import javax.jms.Connection;
-import javax.jms.Queue;
-import javax.jms.Session;
-
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.AMQShortString;
@@ -31,6 +27,10 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.util.FileUtils;
 
+import javax.jms.Connection;
+import javax.jms.Queue;
+import javax.jms.Session;
+
 public class AlertingTest extends AbstractTestLogging
 {
     private String VIRTUALHOST = "test";

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -45,10 +45,10 @@ public class BindingLoggingTest extends 
 
     static final String BND_PREFIX = "BND-";
 
-    Connection _connection;
-    Session _session;
-    Queue _queue;
-    Topic _topic;
+    private Connection _connection;
+    private Session _session;
+    private Queue _queue;
+    private Topic _topic;
 
     @Override
     public void setUp() throws Exception

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -23,11 +23,9 @@ package org.apache.qpid.server.logging;
 import junit.framework.AssertionFailedError;
 
 import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.Main;
 import org.apache.qpid.transport.ConnectionException;
 import org.apache.qpid.util.LogMonitor;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.Socket;
 import java.util.List;
@@ -39,7 +37,7 @@ import java.util.List;
  *
  * BRK-1001 : Startup : Version: <Version> Build: <Build>
  * BRK-1002 : Starting : Listening on <Transport> port <Port>
- * BRK-1003 : Shuting down : <Transport> port <Port>
+ * BRK-1003 : Shutting down : <Transport> port <Port>
  * BRK-1004 : Ready
  * BRK-1005 : Stopped
  * BRK-1006 : Using configuration : <path>
@@ -809,7 +807,7 @@ public class BrokerLoggingTest extends A
             setConfigurationProperty("connector.ssl.keyStorePath", getConfigurationStringProperty("management.ssl.keyStorePath"));
             setConfigurationProperty("connector.ssl.keyStorePassword", getConfigurationStringProperty("management.ssl.keyStorePassword"));
 
-            Integer sslPort = Integer.parseInt(getConfigurationStringProperty("connector.sslport"));
+            Integer sslPort = Integer.parseInt(getConfigurationStringProperty("connector.ssl.port"));
 
             startBroker();
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -21,8 +21,11 @@
 package org.apache.qpid.server.logging;
 
 import javax.jms.Connection;
-import java.util.List;
+
+import org.apache.qpid.common.QpidProperties;
+
 import java.util.HashMap;
+import java.util.List;
 import java.util.TreeSet;
 
 public class ConnectionLoggingTest extends AbstractTestLogging
@@ -58,7 +61,7 @@ public class ConnectionLoggingTest exten
         // Wait until opened
         waitForMessage("CON-1001");
         
-        // Close the conneciton
+        // Close the connection
         connection.close();
 
         // Wait to ensure that the desired message is logged
@@ -66,18 +69,10 @@ public class ConnectionLoggingTest exten
 
         List<String> results = waitAndFindMatches("CON-1001");
 
-        // Validation
-        // We should have at least three messages when running InVM but when running External
-        // we will get 0-10 negotiation on con:0 whcih may close at some random point
-        // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open
-        // MESSAGE [con:0(/127.0.0.1:46926)] CON-1001 : Open : Protocol Version : 0-10
         // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open
         // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Protocol Version : 0-9
-        // MESSAGE [con:0(/127.0.0.1:46926)] CON-1002 : Close
-        // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
-
-        //So check how many connections we have in the result set and extract the last one.
-        // When running InVM we will have con:0 and externally con:1
+        // MESSAGE [con:1(/127.0.0.1:46927)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 : Client Version : 1.2.3_4
+        // MESSAGE [con:0(/127.0.0.1:46927)] CON-1002 : Close
 
         HashMap<Integer, List<String>> connectionData = splitResultsOnConnectionID(results);
 
@@ -87,31 +82,26 @@ public class ConnectionLoggingTest exten
         //Use just the data from the last connection for the test
         results = connectionData.get(connectionID);
 
-        // If we are running inVM or with 0-10 we will get three open messagse
-	    // if running externally with 0-8/0-9 we will also have open and close messages from the failed 0-10 negotiation 
-	    assertTrue("CON messages not logged:" + results.size(), results.size() >= 3);
+	    assertEquals("Unexpected CON-1001 messages count", 3, results.size());
 
         String log = getLogMessage(results, 0);
         //  MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open
         //1 & 2
         validateMessageID("CON-1001",log);
 
-        // validate the last three CON- messages.
-        // This is because when running externally we may also have logged the failed
-        // 0-10 negotiation messages if using 0-8/0-9 on the broker.
-
-        // 3 - Assert the options are correct
-        //  MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9
-        validateConnectionOpen(results, 0, true, true, clientid);
+        // validate the last three CON-1001 messages.
+        //  MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Client ID : clientid : Protocol Version : 0-9 : Client Version : 1.2.3_4
+        validateConnectionOpen(results, 0, true, true, clientid, true, QpidProperties.getReleaseVersion());
 
         //  MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open : Protocol Version : 0-9
-        validateConnectionOpen(results, 1, true, false, null);
+        validateConnectionOpen(results, 1, true, false, null, false, null);
 
-        validateConnectionOpen(results, 2, false, false, null);
+        //  MESSAGE [con:1(/127.0.0.1:52540)] CON-1001 : Open
+        validateConnectionOpen(results, 2, false, false, null, false, null);
     }
     
     private void validateConnectionOpen(List<String> results, int positionFromEnd,
-                 boolean protocolVersionPresent, boolean clientIdOptionPresent, String clientIdValue)
+                 boolean protocolVersionPresent, boolean clientIdOptionPresent, String clientIdValue, boolean clientVersionPresent, String clientVersionValue)
     {
         String log = getLogMessageFromEnd(results, positionFromEnd);
         
@@ -128,6 +118,13 @@ public class ConnectionLoggingTest exten
                 protocolVersionPresent, fromMessage(log).contains("Protocol Version :"));
         //fixme there is no way currently to find out the negotiated protocol version
         // The delegate is the versioned class ((AMQConnection)connection)._delegate
+
+        assertEquals("unexpected Client ID option state", clientVersionPresent, fromMessage(log).contains("Client Version :"));
+
+        if(clientVersionPresent && clientVersionValue != null)
+        {
+            assertTrue("Client version value is not present: " + clientVersionValue, fromMessage(log).contains(clientVersionValue));
+        }
     }
 
     /**

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/DerbyMessageStoreLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,15 +20,14 @@
  */
 package org.apache.qpid.server.logging;
 
-import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
 
 import javax.jms.Connection;
 import javax.jms.Queue;
 import javax.jms.Session;
-import java.util.List;
 import java.io.File;
+import java.util.List;
 
 /**
  * The MessageStore test suite validates that the follow log messages as

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,16 +20,6 @@
  */
 package org.apache.qpid.server.logging;
 
-import java.io.IOException;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
@@ -39,6 +29,15 @@ import org.apache.qpid.framing.ExchangeD
 import org.apache.qpid.framing.ExchangeDeleteOkBody;
 import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Exchange
  *
@@ -54,11 +53,11 @@ public class ExchangeLoggingTest extends
 
     static final String EXH_PREFIX = "EXH-";
 
-    Connection _connection;
-    Session _session;
-    Queue _queue;
-    String _name;
-    String _type;
+    private Connection _connection;
+    private Session _session;
+    private Queue _queue;
+    private String _name;
+    private String _type;
 
     @Override
     public void setUp() throws Exception

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -25,8 +25,8 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.util.LogMonitor;
 
-import java.util.List;
 import java.io.File;
+import java.util.List;
 
 /**
  * Management Console Test Suite

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/MemoryMessageStoreLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,13 +20,13 @@
 */
 package org.apache.qpid.server.logging;
 
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
 import org.apache.qpid.util.LogMonitor;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * The MessageStore test suite validates that the follow log messages as
  * specified in the Functional Specification.

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/QueueLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,19 +20,19 @@
  */
 package org.apache.qpid.server.logging;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
 
 import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.Queue;
 import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
 import javax.naming.NamingException;
-import java.util.List;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * The Queue test suite validates that the follow log messages as specified in

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -20,16 +20,18 @@
  */
 package org.apache.qpid.server.logging;
 
+import javax.jms.QueueBrowser;
 import junit.framework.AssertionFailedError;
+
 import org.apache.qpid.client.AMQConnection;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.Message;
 import java.io.IOException;
 import java.util.List;
 
@@ -48,10 +50,10 @@ public class SubscriptionLoggingTest ext
 {
     static final String SUB_PREFIX = "SUB-";
 
-    Connection _connection;
-    Session _session;
-    Queue _queue;
-    Topic _topic;
+    private Connection _connection;
+    private Session _session;
+    private Queue _queue;
+    private Topic _topic;
 
     @Override
     public void setUp() throws Exception
@@ -165,8 +167,10 @@ public class SubscriptionLoggingTest ext
      */
     public void testSubscriptionCreateQueueBrowser() throws JMSException, IOException
     {
-        _session.createBrowser(_queue);
+        _connection.start();
+        QueueBrowser browser = _session.createBrowser(_queue);
 
+        browser.getEnumeration();
         //Validate
         //Ensure that we wait for the SUB log message
         waitAndFindMatches("SUB-1001");

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/VirtualHostLoggingTest.java Sat Mar 10 19:22:10 2012
@@ -21,13 +21,13 @@
 
 package org.apache.qpid.server.logging;
 
-import java.util.Arrays;
-import java.util.List;
-
 import junit.framework.AssertionFailedError;
 
 import org.apache.qpid.server.configuration.ServerConfiguration;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * Virtualhost Test Cases
  * The virtualhost test suite validates that the follow log messages as specified in the Functional Specification.

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java Sat Mar 10 19:22:10 2012
@@ -23,10 +23,8 @@ package org.apache.qpid.server.message;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.AMQBindingURL;
 
 import javax.jms.*;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java Sat Mar 10 19:22:10 2012
@@ -20,16 +20,8 @@
  */
 package org.apache.qpid.server.persistent;
 
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.store.DerbyMessageStore;
-import org.apache.commons.configuration.XMLConfiguration;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -38,61 +30,28 @@ import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
-import java.util.concurrent.CountDownLatch;
-import java.io.File;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
- * QPID-1813 : We do not store the client id with a message so on store restart
- * that information is lost and we are unable to perform no local checks.
- *
- * QPID-1813 highlights the lack of testing here as the broker will NPE as it
- * assumes that the client id of the publisher will always exist
+ * Verifies that after recovery, a new Connection with no-local in use is
+ * able to receive messages sent prior to the broker restart.
  */
-public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements ConnectionListener
+public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
 {
     protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
     protected static final int SEND_COUNT = 10;
-    private CountDownLatch _failoverComplete = new CountDownLatch(1);
-
-    protected ConnectionURL _connectionURL;
-
-    @Override
-    protected void setUp() throws Exception
-    {
-
-        XMLConfiguration configuration = new XMLConfiguration(_configFile);
-        configuration.setProperty("virtualhosts.virtualhost.test.store.class", "org.apache.qpid.server.store.DerbyMessageStore");
-        configuration.setProperty("virtualhosts.virtualhost.test.store."+ DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
-                                  System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + File.separator + "derbyDB-NoLocalAfterRecoveryTest");
-
-        File tmpFile = File.createTempFile("configFile", "test");
-        tmpFile.deleteOnExit();
-        configuration.save(tmpFile);
-
-        _configFile = tmpFile;
-        _connectionURL = getConnectionURL();
-
-        BrokerDetails details = _connectionURL.getBrokerDetails(0);
-
-        // This will attempt to failover for 3 seconds.
-        // Local testing suggests failover takes 2 seconds
-        details.setProperty(BrokerDetails.OPTIONS_RETRY, "10");
-        details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500");
-
-        super.setUp();        
-    }
 
-    public void test() throws Exception
+    public void testNoLocalNotQueued() throws Exception
     {
+        if(!isBrokerStorePersistent())
+        {
+            fail("This test requires a broker with a persistent store");
+        }
 
-        Connection connection = getConnection(_connectionURL);
+        Connection connection = getConnection();
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
-        Topic topic = (Topic) getInitialContext().lookup("topic");
+        Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
 
         TopicSubscriber noLocalSubscriber = session.
                 createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
@@ -102,88 +61,104 @@ public class NoLocalAfterRecoveryTest ex
                 createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal",
                                         null, false);
 
-        List<Message> sent = sendMessage(session, topic, SEND_COUNT);
-
-        session.commit();
-
-        assertEquals("Incorrect number of messages sent",
-                     SEND_COUNT, sent.size());
-
+        sendMessage(session, topic, SEND_COUNT);
 
         // Check messages can be received as expected.
         connection.start();
 
-        assertTrue("No Local Subscriber is not a no-local subscriber",
-                   noLocalSubscriber.getNoLocal());
-
-        assertFalse("Normal Subscriber is a no-local subscriber",
-                    normalSubscriber.getNoLocal());
-
-
+        //As the no-local subscriber was on the same connection the messages were
+        //published on, tit will receive no messages as they will be discarded on the broker
         List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
         assertEquals("No Local Subscriber Received messages", 0, received.size());
 
         received = receiveMessage(normalSubscriber, SEND_COUNT);
         assertEquals("Normal Subscriber Received no messages",
                      SEND_COUNT, received.size());
+        session.commit();
 
+        normalSubscriber.close();
+        connection.close();
 
-        ((AMQConnection)connection).setConnectionListener(this);
-
+        //Ensure the no-local subscribers messages were discarded by restarting the broker
+        //and reconnecting to the subscription to ensure they were not recovered.
         restartBroker();
 
+        Connection connection2 = getConnection();
+        connection2.start();
+
+        Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+        Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+        TopicSubscriber noLocalSubscriber2 = session2.
+                createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
+                                        null, true);
+
+        // The NO-local subscriber should not get any messages
+        received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+        session2.commit();
+        assertEquals("No Local Subscriber Received messages", 0, received.size());
+
+        noLocalSubscriber2.close();
+
+
+    }
 
-        //Await
-        if (!_failoverComplete.await(4000L, TimeUnit.MILLISECONDS))
+
+    public void testNonNoLocalQueued() throws Exception
+    {
+        if(!isBrokerStorePersistent())
         {
-            fail("Failover Failed to compelete");
+            fail("This test requires a broker with a persistent store");
         }
 
-        session.rollback();
+        Connection connection = getConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
 
-        //Failover will restablish our clients
-        assertTrue("No Local Subscriber is not a no-local subscriber",
-                   noLocalSubscriber.getNoLocal());
+        TopicSubscriber noLocalSubscriber =
+                session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true);
 
-        assertFalse("Normal Subscriber is a no-local subscriber",
-                    normalSubscriber.getNoLocal());
 
+        sendMessage(session, topic, SEND_COUNT);
+
+        // Check messages can be received as expected.
+        connection.start();
+
+        List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+        assertEquals("No Local Subscriber Received messages", 0, received.size());
 
-        // NOTE : here that the NO-local subscriber actually now gets ALL the
-        // messages as the connection has failed and they are consuming on a
-        // different connnection to the one that was published on.
-        received = receiveMessage(noLocalSubscriber, SEND_COUNT);
-        assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size());
 
-        received = receiveMessage(normalSubscriber, SEND_COUNT);
-        assertEquals("Normal Subscriber Received no messages",
-                     SEND_COUNT, received.size());
 
-        //leave the store in a clean state.
         session.commit();
-    }
 
-    protected List<Message> assertReceiveMessage(MessageConsumer messageConsumer,
-                                                 int count) throws JMSException
-    {
+        Connection connection3 = getConnection();
+        Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED);
+        sendMessage(session3, topic, SEND_COUNT);
 
-        List<Message> receivedMessages = new ArrayList<Message>(count);
-        for (int i = 0; i < count; i++)
-        {
-            Message received = messageConsumer.receive(1000);
 
-            if (received != null)
-            {
-                receivedMessages.add(received);
-            }
-            else
-            {
-                fail("Only "
-                     + receivedMessages.size() + "/" + count + " received.");
-            }
-        }
+        connection.close();
+
+        //We didn't receive the messages on the durable queue for the no-local subscriber
+        //so they are still on the broker. Restart the broker, prompting their recovery.
+        restartBroker();
+
+        Connection connection2 = getConnection();
+        connection2.start();
+
+        Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+        Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+        TopicSubscriber noLocalSubscriber2 =
+                session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true);
+
+        // The NO-local subscriber should receive messages sent from connection3
+        received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+        session2.commit();
+        assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size());
+
+        noLocalSubscriber2.close();
+
 
-        return receivedMessages;
     }
 
     protected List<Message> receiveMessage(MessageConsumer messageConsumer,
@@ -207,29 +182,4 @@ public class NoLocalAfterRecoveryTest ex
 
         return receivedMessages;
     }
-
-    public void bytesSent(long count)
-    {
-
-    }
-
-    public void bytesReceived(long count)
-    {
-
-    }
-
-    public boolean preFailover(boolean redirect)
-    {
-        return true;
-    }
-
-    public boolean preResubscribe()
-    {
-        return true;
-    }
-
-    public void failoverComplete()
-    {
-        _failoverComplete.countDown();
-    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java Sat Mar 10 19:22:10 2012
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java Sat Mar 10 19:22:10 2012
@@ -20,9 +20,9 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -30,9 +30,9 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQException;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,12 @@
 */
 package org.apache.qpid.server.queue;
 
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -30,11 +34,8 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.naming.NamingException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.HashMap;
+import java.util.Map;
 
 public class PriorityQueueTest extends QpidBrokerTestCase
 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Sat Mar 10 19:22:10 2012
@@ -20,38 +20,44 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQDestination;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.server.logging.AbstractTestLogging;
 import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.*;
-import javax.naming.NamingException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.io.IOException;
 
 public class ProducerFlowControlTest extends AbstractTestLogging
 {
     private static final int TIMEOUT = 10000;
 
-    private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
-
     private Connection producerConnection;
-    private MessageProducer producer;
-    private Session producerSession;
-    private Queue queue;
     private Connection consumerConnection;
+    private Session producerSession;
     private Session consumerSession;
-
+    private MessageProducer producer;
     private MessageConsumer consumer;
-    private final AtomicInteger _sentMessages = new AtomicInteger();
+    private Queue queue;
+
+    private final AtomicInteger _sentMessages = new AtomicInteger(0);
 
     private JMXTestUtils _jmxUtils;
     private boolean _jmxUtilConnected;
@@ -77,37 +83,34 @@ public class ProducerFlowControlTest ext
 
     public void tearDown() throws Exception
     {
-        if(_jmxUtilConnected)
+        try
         {
-            try
+            if(_jmxUtilConnected)
             {
-                _jmxUtils.close();
-            }
-            catch (IOException e)
-            {
-                e.printStackTrace();
+                try
+                {
+                    _jmxUtils.close();
+                }
+                catch (IOException e)
+                {
+                    e.printStackTrace();
+                }
             }
+            producerConnection.close();
+            consumerConnection.close();
+        }
+        finally
+        {
+            super.tearDown();
         }
-        producerConnection.close();
-        consumerConnection.close();
-        super.tearDown();
     }
 
-    public void testCapacityExceededCausesBlock()
-            throws JMSException, NamingException, AMQException, InterruptedException
+    public void testCapacityExceededCausesBlock() throws Exception
     {
         String queueName = getTestQueueName();
-        
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
-
-        _sentMessages.set(0);
 
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+        producer = producerSession.createProducer(queue);
 
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -135,22 +138,14 @@ public class ProducerFlowControlTest ext
 
     }
 
-    public void testBrokerLogMessages()
-            throws JMSException, NamingException, AMQException, InterruptedException, IOException
+
+    public void testBrokerLogMessages() throws Exception
     {
         String queueName = getTestQueueName();
         
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
         producer = producerSession.createProducer(queue);
 
-        _sentMessages.set(0);
-
-
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
 
@@ -162,41 +157,28 @@ public class ProducerFlowControlTest ext
         consumerConnection.start();
 
 
-        while(consumer.receive(1000) != null);
+        while(consumer.receive(1000) != null) {};
 
         results = waitAndFindMatches("QUE-1004");
 
         assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
-
-
-        
     }
 
 
-    public void testClientLogMessages()
-            throws JMSException, NamingException, AMQException, InterruptedException, IOException
+    public void testClientLogMessages() throws Exception
     {
         String queueName = getTestQueueName();
-        
+
         setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
         setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
 
         Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) session).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800);
         producer = session.createProducer(queue);
 
-        _sentMessages.set(0);
-
-
         // try to send 5 messages (should block after 4)
-        MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+        MessageSender sender = sendMessagesAsync(producer, session, 5, 50L);
 
         List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
         assertTrue("No delay messages logged by client",results.size()!=0);
@@ -205,26 +187,16 @@ public class ProducerFlowControlTest ext
                                                   + " flow control", TIMEOUT);
         assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
                      + "messages)",1,failedMessages.size());
-
-
-
     }
 
 
-    public void testFlowControlOnCapacityResumeEqual()
-            throws JMSException, NamingException, AMQException, InterruptedException
+    public void testFlowControlOnCapacityResumeEqual() throws Exception
     {
         String queueName = getTestQueueName();
         
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",1000);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 1000);
         producer = producerSession.createProducer(queue);
 
-        _sentMessages.set(0);
 
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -236,7 +208,6 @@ public class ProducerFlowControlTest ext
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
 
-
         consumer.receive();
 
         Thread.sleep(1000);
@@ -247,23 +218,16 @@ public class ProducerFlowControlTest ext
     }
 
 
-    public void testFlowControlSoak()
-            throws Exception, NamingException, AMQException, InterruptedException
+    public void testFlowControlSoak() throws Exception
     {
         String queueName = getTestQueueName();
         
-        _sentMessages.set(0);
+
         final int numProducers = 10;
         final int numMessages = 100;
 
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",6000);
-        arguments.put("x-qpid-flow-resume-capacity",3000);
-
-        ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 6000, 3000);
 
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
-        ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
         consumerConnection.start();
 
         Connection[] producers = new Connection[numProducers];
@@ -303,58 +267,38 @@ public class ProducerFlowControlTest ext
 
     }
 
-
-
-    public void testSendTimeout()
-            throws JMSException, NamingException, AMQException, InterruptedException
+    public void testSendTimeout() throws Exception
     {
         String queueName = getTestQueueName();
-        
+        final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
+                : "Unable to send message for 3 seconds due to broker enforced flow control";
+
         setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
         Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) session).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
         producer = session.createProducer(queue);
 
-        _sentMessages.set(0);
-
-
         // try to send 5 messages (should block after 4)
-        MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L);
+        MessageSender sender = sendMessagesAsync(producer, session, 5, 100L);
 
-        
-        Thread.sleep(10000);
-
-        Exception e = sender.getException();
+        Exception e = sender.awaitSenderException(10000);
 
         assertNotNull("No timeout exception on sending", e);
 
+
+        assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+
     }
-    
-    
-    public void testFlowControlAttributeModificationViaJMX()
-    throws JMSException, NamingException, AMQException, InterruptedException, Exception
+
+    public void testFlowControlAttributeModificationViaJMX() throws Exception
     {
         _jmxUtils.open();
         _jmxUtilConnected = true;
         
         String queueName = getTestQueueName();
-        
-        //create queue
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",0);
-        arguments.put("x-qpid-flow-resume-capacity",0);
-        ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-
-        queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
 
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 0, 0);
         producer = producerSession.createProducer(queue);
         
         Thread.sleep(1000);
@@ -375,7 +319,7 @@ public class ProducerFlowControlTest ext
         assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
         
         // try to send 2 messages (should block after 1)
-        _sentMessages.set(0);
+
         sendMessagesAsync(producer, producerSession, 2, 50L);
 
         Thread.sleep(2000);
@@ -406,13 +350,23 @@ public class ProducerFlowControlTest ext
         consumer.receive();
         
         //perform a synchronous op on the connection
-        ((AMQSession) consumerSession).sync();
+        ((AMQSession<?,?>) consumerSession).sync();
         
         assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
         
         consumer.receive();
     }
 
+    private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",capacity);
+        arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
+        ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+        queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+        ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+    }
+
     private MessageSender sendMessagesAsync(final MessageProducer producer,
                                             final Session producerSession,
                                             final int numMessages,
@@ -435,7 +389,7 @@ public class ProducerFlowControlTest ext
 
             try
             {
-                ((AMQSession)producerSession).sync();
+                ((AMQSession<?,?>)producerSession).sync();
             }
             catch (AMQException e)
             {
@@ -456,7 +410,6 @@ public class ProducerFlowControlTest ext
 
     private static final byte[] BYTE_300 = new byte[300];
 
-
     private Message nextMessage(int msg, Session producerSession) throws JMSException
     {
         BytesMessage send = producerSession.createBytesMessage();
@@ -466,22 +419,19 @@ public class ProducerFlowControlTest ext
         return send;
     }
 
-
     private class MessageSender implements Runnable
     {
-        private final MessageProducer _producer;
-        private final Session _producerSession;
+        private final MessageProducer _senderProducer;
+        private final Session _senderSession;
         private final int _numMessages;
-
-
-
-        private JMSException _exception;
+        private volatile JMSException _exception;
+        private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
         private long _sleepPeriod;
 
         public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
         {
-            _producer = producer;
-            _producerSession = producerSession;
+            _senderProducer = producer;
+            _senderSession = producerSession;
             _numMessages = numMessages;
             _sleepPeriod = sleepPeriod;
         }
@@ -490,16 +440,18 @@ public class ProducerFlowControlTest ext
         {
             try
             {
-                sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+                sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod);
             }
             catch (JMSException e)
             {
                 _exception = e;
+                _exceptionThrownLatch.countDown();
             }
         }
 
-        public JMSException getException()
+        public Exception awaitSenderException(long timeout) throws InterruptedException
         {
+            _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
             return _exception;
         }
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,11 @@
 
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -29,11 +34,6 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
 /**
  * Test Case to ensure that messages are correctly returned.
  * This includes checking:

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java Sat Mar 10 19:22:10 2012
@@ -19,11 +19,15 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -34,14 +38,12 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.naming.NamingException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class SortedQueueTest extends QpidBrokerTestCase
 {
@@ -54,6 +56,7 @@ public class SortedQueueTest extends Qpi
     private Connection _producerConnection;
     private Session _producerSession;
     private Connection _consumerConnection;
+    private long _receiveInterval;
 
     protected void setUp() throws Exception
     {
@@ -64,6 +67,7 @@ public class SortedQueueTest extends Qpi
         _producerConnection = getConnection();
         _consumerConnection = getConnection();
         _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        _receiveInterval = isBrokerStorePersistent() ? 3000l : 1500l;
     }
 
     protected void tearDown() throws Exception
@@ -94,7 +98,7 @@ public class SortedQueueTest extends Qpi
         _consumerConnection.start();
         TextMessage received;
         int messageCount = 0;
-        while((received = (TextMessage) consumer.receive(1000)) != null)
+        while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
         {
             assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount],
                             received.getStringProperty(TEST_SORT_KEY));
@@ -138,17 +142,15 @@ public class SortedQueueTest extends Qpi
             _producerSession.commit();
         }
 
-        synchronized(consumerThread)
+        try
         {
-            try
-            {
-                consumerThread.join(5000L);
-            }
-            catch(InterruptedException e)
-            {
-                fail("Test failed waiting for consumer to complete");
-            }
+            consumerThread.join(getConsumerThreadJoinInterval());
         }
+        catch(InterruptedException e)
+        {
+            fail("Test failed waiting for consumer to complete");
+        }
+
         assertTrue("Consumer timed out", consumerThread.isStopped());
         assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed());
 
@@ -172,23 +174,26 @@ public class SortedQueueTest extends Qpi
             _producerSession.commit();
         }
 
-        synchronized(consumerThread)
+        try
         {
-            try
-            {
-                consumerThread.join(5000L);
-            }
-            catch(InterruptedException e)
-            {
-                fail("Test failed waiting for consumer to complete");
-            }
+            consumerThread.join(getConsumerThreadJoinInterval());
         }
+        catch(InterruptedException e)
+        {
+            fail("Test failed waiting for consumer to complete");
+        }
+
         assertTrue("Consumer timed out", consumerThread.isStopped());
         assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed());
 
         producer.close();
     }
 
+    private long getConsumerThreadJoinInterval()
+    {
+        return isBrokerStorePersistent() ? 50000L: 5000L;
+    }
+
     public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException
     {
         final Queue queue = createQueue();
@@ -211,7 +216,7 @@ public class SortedQueueTest extends Qpi
         TextMessage received = null;
         int messageCount = 0;
 
-        while((received = (TextMessage) consumer.receive(1000)) != null)
+        while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
         {
             assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue",
                             received.getStringProperty(TEST_SORT_KEY));
@@ -247,7 +252,7 @@ public class SortedQueueTest extends Qpi
         TextMessage received;
         int messageCount = 0;
 
-        while((received = (TextMessage) consumer.receive(1000)) != null)
+        while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
         {
             assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10],
                             received.getStringProperty(TEST_SORT_KEY));
@@ -362,16 +367,16 @@ public class SortedQueueTest extends Qpi
     private Message assertReceiveMessage(final MessageConsumer consumer)
             throws JMSException
     {
-        final Message received = (TextMessage) consumer.receive(10000);
+        final Message received = (TextMessage) consumer.receive(_receiveInterval);
         assertNotNull("Received message is unexpectedly null", received);
         return received;
     }
 
     private class TestConsumerThread extends Thread
     {
-        private boolean _stopped = false;
+        private final AtomicInteger _consumed = new AtomicInteger(0);
+        private volatile boolean _stopped = false;
         private int _count = 0;
-        private int _consumed = 0;
         private int _sessionType = Session.AUTO_ACKNOWLEDGE;
         private Queue _queue;
 
@@ -402,7 +407,7 @@ public class SortedQueueTest extends Qpi
                 conn.start();
 
                 Message msg;
-                while((msg = consumer.receive(1000)) != null)
+                while((msg = consumer.receive(_receiveInterval)) != null)
                 {
                     if(_sessionType == Session.SESSION_TRANSACTED)
                     {
@@ -415,7 +420,7 @@ public class SortedQueueTest extends Qpi
                          {
                              LOGGER.debug("transacted session commit");
                             session.commit();
-                            _consumed++;
+                            _consumed.incrementAndGet();
                          }
                     }
                     else if(_sessionType == Session.CLIENT_ACKNOWLEDGE)
@@ -429,18 +434,18 @@ public class SortedQueueTest extends Qpi
                          {
                              LOGGER.debug("client ack session acknowledge");
                              msg.acknowledge();
-                             _consumed++;
+                             _consumed.incrementAndGet();
                          }
                     }
                     else
                     {
                         LOGGER.debug("auto ack session");
-                        _consumed++;
+                        _consumed.incrementAndGet();
                     }
 
                     _count++;
                     LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY));
-                    LOGGER.debug("Message consumed with consumed index: " + _consumed);
+                    LOGGER.debug("Message consumed with consumed index: " + _consumed.get());
                 }
 
                 _stopped = true;
@@ -453,14 +458,14 @@ public class SortedQueueTest extends Qpi
            }
         }
 
-        public synchronized boolean isStopped()
+        public boolean isStopped()
         {
             return _stopped;
         }
 
-        public synchronized int getConsumed()
+        public int getConsumed()
         {
-            return _consumed;
+            return _consumed.get();
         }
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Sat Mar 10 19:22:10 2012
@@ -21,8 +21,15 @@
 
 package org.apache.qpid.server.queue;
 
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import junit.framework.Assert;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -32,16 +39,8 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TopicSubscriber;
-
-import junit.framework.Assert;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class TimeToLiveTest extends QpidBrokerTestCase
 {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org