You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/03/16 16:21:40 UTC

[11/61] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index b0e7bd3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,164 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Testing if the the broker "sends" the message as expected after the redeliveryPlugin has redelivered the
- * message previously.
- */
-public class RedeliveryPluginHeaderTest extends TestCase {
-   private static final String TEST_QUEUE_ONE = "TEST_QUEUE_ONE";
-   private static final String TEST_QUEUE_TWO = "TEST_QUEUE_TWO";
-   private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginHeaderTest.class);
-   private String transportURL;
-   private BrokerService broker;
-   /**
-    * Test
-    * - consumes message from Queue1
-    * - rolls back message to Queue1 and message is scheduled for redelivery to Queue1 by brokers plugin
-    * - consumes message from Queue1 again
-    * - sends same message to Queue2
-    * - expects to consume message from Queue2 immediately
-    */
-   public void testSendAfterRedelivery() throws Exception {
-      broker = this.createBroker(false);
-      broker.start();
-      broker.waitUntilStarted();
-"***Broker started...");
-      //pushed message to broker
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0");
-      Connection connection = factory.createConnection();
-      connection.start();
-      try {
-         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-         Destination destinationQ1 = session.createQueue(TEST_QUEUE_ONE);
-         Destination destinationQ2 = session.createQueue(TEST_QUEUE_TWO);
-         MessageProducer producerQ1 = session.createProducer(destinationQ1);
-         producerQ1.setDeliveryMode(DeliveryMode.PERSISTENT);
-         Message m = session.createTextMessage("testMessage");
-"*** send message to broker...");
-         producerQ1.send(m);
-         session.commit();
-         //consume message from Q1 and rollback to get it redelivered
-         MessageConsumer consumerQ1 = session.createConsumer(destinationQ1);
-"*** consume message from Q1 and rolled back..");
-         TextMessage textMessage = (TextMessage) consumerQ1.receive();
-"got redelivered: " + textMessage);
-         assertFalse("JMSRedelivered flag is not set", textMessage.getJMSRedelivered());
-         session.rollback();
-"*** consumed message from Q1 again and sending to Q2..");
-         TextMessage textMessage2 = (TextMessage) consumerQ1.receive();
-"got: " + textMessage2);
-         session.commit();
-         assertTrue("JMSRedelivered flag is set", textMessage2.getJMSRedelivered());
-         //send message to Q2 and consume from Q2
-         MessageConsumer consumerQ2 = session.createConsumer(destinationQ2);
-         MessageProducer producer_two = session.createProducer(destinationQ2);
-         producer_two.send(textMessage2);
-         session.commit();
-         //Message should be available straight away on the queue_two
-         Message textMessage3 = consumerQ2.receive(1000);
-         assertNotNull("should have consumed a message from TEST_QUEUE_TWO", textMessage3);
-         assertFalse("JMSRedelivered flag is not set", textMessage3.getJMSRedelivered());
-         session.commit();
-      }
-      finally {
-         connection.close();
-         if (broker != null) {
-            broker.stop();
-         }
-      }
-   }
-   protected BrokerService createBroker(boolean withJMX) throws Exception {
-      File schedulerDirectory = new File("target/scheduler");
-      IOHelper.mkdirs(schedulerDirectory);
-      IOHelper.deleteChildren(schedulerDirectory);
-      BrokerService answer = new BrokerService();
-      answer.setAdvisorySupport(false);
-      answer.setDataDirectory("target");
-      answer.setSchedulerDirectoryFile(schedulerDirectory);
-      answer.setSchedulerSupport(true);
-      answer.setPersistent(true);
-      answer.setDeleteAllMessagesOnStartup(true);
-      answer.setUseJmx(withJMX);
-      RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
-      RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
-      RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
-      defaultEntry.setInitialRedeliveryDelay(5000);
-      defaultEntry.setMaximumRedeliveries(5);
-      redeliveryPolicyMap.setDefaultEntry(defaultEntry);
-      redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
-      answer.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
-      TransportConnector transportConnector = answer.addConnector("tcp://localhost:0");
-      transportURL = transportConnector.getConnectUri().toASCIIString();
-      return answer;
-   }
\ No newline at end of file
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index b4858c1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,165 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class SlowConsumerTest extends TestCase {
-   private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class);
-   private static final int MESSAGES_COUNT = 10000;
-   private final int messageLogFrequency = 2500;
-   private final long messageReceiveTimeout = 10000L;
-   private Socket stompSocket;
-   private ByteArrayOutputStream inputBuffer;
-   private int messagesCount;
-   /**
-    * @param args
-    * @throws Exception
-    */
-   public void testRemoveSubscriber() throws Exception {
-      final BrokerService broker = new BrokerService();
-      broker.setPersistent(true);
-      broker.setUseJmx(true);
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.addConnector("tcp://localhost:0").setName("Default");
-      broker.start();
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
-      final Connection connection = factory.createConnection();
-      connection.start();
-      Thread producingThread = new Thread("Producing thread") {
-         @Override
-         public void run() {
-            try {
-               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
-               for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
-                  Message message = session.createTextMessage("" + idx);
-                  producer.send(message);
-                  LOG.debug("Sending: " + idx);
-               }
-               producer.close();
-               session.close();
-            }
-            catch (Throwable ex) {
-               ex.printStackTrace();
-            }
-         }
-      };
-      producingThread.setPriority(Thread.MAX_PRIORITY);
-      producingThread.start();
-      Thread.sleep(1000);
-      Thread consumingThread = new Thread("Consuming thread") {
-         @Override
-         public void run() {
-            try {
-               Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-               MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationName()));
-               int diff = 0;
-               while (messagesCount != MESSAGES_COUNT) {
-                  Message msg = consumer.receive(messageReceiveTimeout);
-                  if (msg == null) {
-                     LOG.warn("Got null message at count: " + messagesCount + ". Continuing...");
-                     break;
-                  }
-                  String text = ((TextMessage) msg).getText();
-                  int currentMsgIdx = Integer.parseInt(text);
-                  LOG.debug("Received: " + text + " messageCount: " + messagesCount);
-                  msg.acknowledge();
-                  if ((messagesCount + diff) != currentMsgIdx) {
-                     LOG.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx);
-                     diff = currentMsgIdx - messagesCount;
-                  }
-                  ++messagesCount;
-                  if (messagesCount % messageLogFrequency == 0) {
-           "Received: " + messagesCount + " messages so far");
-                  }
-                  // Thread.sleep(70);
-               }
-            }
-            catch (Throwable ex) {
-               ex.printStackTrace();
-            }
-         }
-      };
-      consumingThread.start();
-      consumingThread.join();
-      assertEquals(MESSAGES_COUNT, messagesCount);
-   }
-   public void sendFrame(String data) throws Exception {
-      byte[] bytes = data.getBytes("UTF-8");
-      OutputStream outputStream = stompSocket.getOutputStream();
-      for (int i = 0; i < bytes.length; i++) {
-         outputStream.write(bytes[i]);
-      }
-      outputStream.flush();
-   }
-   public String receiveFrame(long timeOut) throws Exception {
-      stompSocket.setSoTimeout((int) timeOut);
-      InputStream is = stompSocket.getInputStream();
-      int c = 0;
-      for (;;) {
-         c =;
-         if (c < 0) {
-            throw new IOException("socket closed.");
-         }
-         else if (c == 0) {
-            c =;
-            byte[] ba = inputBuffer.toByteArray();
-            inputBuffer.reset();
-            return new String(ba, "UTF-8");
-         }
-         else {
-            inputBuffer.write(c);
-         }
-      }
-   }
-   protected String getDestinationName() {
-      return getClass().getName() + "." + getName();
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index 3e22dc2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,30 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import org.apache.activemq.leveldb.LevelDBStore;
-public class SparseAckReplayAfterStoreCleanupLevelDBStoreTest extends AMQ2832Test {
-   @Override
-   protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
-      LevelDBStore store = new LevelDBStore();
-      store.setFlushDelay(0);
-      brokerService.setPersistenceAdapter(store);
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index f521d40..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,54 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.junit.Test;
- * Demonstrates how unmarshalled VM advisory messages for temporary queues prevent other connections from being closed.
- */
-public class TempQueueDeleteOnCloseTest {
-   @Test
-   public void test() throws Exception {
-      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
-      // create a connection and session with a temporary queue
-      Connection connectionA = connectionFactory.createConnection();
-      connectionA.setClientID("ConnectionA");
-      Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Destination tempQueueA = sessionA.createTemporaryQueue();
-      MessageConsumer consumer = sessionA.createConsumer(tempQueueA);
-      connectionA.start();
-      // start and stop another connection
-      Connection connectionB = connectionFactory.createConnection();
-      connectionB.setClientID("ConnectionB");
-      connectionB.start();
-      connectionB.close();
-      consumer.close();
-      connectionA.close();
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index dc15f87..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,266 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.StoreUsage;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.TempUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class TempStorageBlockedBrokerTest extends TestSupport {
-   public int deliveryMode = DeliveryMode.PERSISTENT;
-   private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class);
-   private static final int MESSAGES_COUNT = 1000;
-   private static byte[] buf = new byte[4 * 1024];
-   private BrokerService broker;
-   AtomicInteger messagesSent = new AtomicInteger(0);
-   AtomicInteger messagesConsumed = new AtomicInteger(0);
-   protected long messageReceiveTimeout = 10000L;
-   Destination destination = new ActiveMQTopic("FooTwo");
-   private String connectionUri;
-   public void testRunProducerWithHungConsumer() throws Exception {
-      final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      // ensure messages are spooled to disk for this consumer
-      ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
-      prefetch.setTopicPrefetch(10);
-      factory.setPrefetchPolicy(prefetch);
-      Connection consumerConnection = factory.createConnection();
-      consumerConnection.start();
-      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = consumerSession.createConsumer(destination);
-      final Connection producerConnection = factory.createConnection();
-      producerConnection.start();
-      final CountDownLatch producerHasSentTenMessages = new CountDownLatch(10);
-      Thread producingThread = new Thread("Producing thread") {
-         @Override
-         public void run() {
-            try {
-               Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = session.createProducer(destination);
-               producer.setDeliveryMode(deliveryMode);
-               for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
-                  Message message = session.createTextMessage(new String(buf) + idx);
-                  producer.send(message);
-                  messagesSent.incrementAndGet();
-                  producerHasSentTenMessages.countDown();
-                  Thread.sleep(10);
-                  if (idx != 0 && idx % 100 == 0) {
-           "Sent Message " + idx);
-           "Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
-                  }
-               }
-               producer.close();
-               session.close();
-            }
-            catch (Throwable ex) {
-               ex.printStackTrace();
-            }
-         }
-      };
-      producingThread.start();
-      assertTrue("producer has sent 10 in a reasonable time", producerHasSentTenMessages.await(30, TimeUnit.SECONDS));
-      int count = 0;
-      Message m = null;
-      while ((m = consumer.receive(messageReceiveTimeout)) != null) {
-         count++;
-         if (count != 0 && count % 10 == 0) {
-  "Received Message (" + count + "):" + m);
-         }
-         messagesConsumed.incrementAndGet();
-         try {
-            Thread.sleep(100);
-         }
-         catch (Exception e) {
-  "error sleeping");
-         }
-      }
-"Connection Timeout: Retrying.. count: " + count);
-      while ((m = consumer.receive(messageReceiveTimeout)) != null) {
-         count++;
-         if (count != 0 && count % 100 == 0) {
-  "Received Message (" + count + "):" + m);
-         }
-         messagesConsumed.incrementAndGet();
-         try {
-            Thread.sleep(100);
-         }
-         catch (Exception e) {
-  "error sleeping");
-         }
-      }
-"consumer session closing: consumed count: " + count);
-      consumerSession.close();
-      producingThread.join();
-      final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
-"Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
-      producerConnection.close();
-      consumerConnection.close();
-"Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage());
-      // do a cleanup
-      ((PListStoreImpl) broker.getTempDataStore()).run();
-"Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage());
-      assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT);
-      assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), MESSAGES_COUNT);
-   }
-   public void testFillTempAndConsume() throws Exception {
-      broker.getSystemUsage().setSendFailIfNoSpace(true);
-      destination = new ActiveMQQueue("Foo");
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
-      // so we can easily catch the ResourceAllocationException on send
-      producerConnection.setAlwaysSyncSend(true);
-      producerConnection.start();
-      Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(destination);
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      try {
-         while (true) {
-            Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
-            producer.send(message);
-            messagesSent.incrementAndGet();
-            if (messagesSent.get() % 100 == 0) {
-     "Sent Message " + messagesSent.get());
-     "Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
-            }
-         }
-      }
-      catch (ResourceAllocationException ex) {
-"Got resource exception : " + ex + ", after sent: " + messagesSent.get());
-      }
-      // consume all sent
-      Connection consumerConnection = factory.createConnection();
-      consumerConnection.start();
-      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = consumerSession.createConsumer(destination);
-      while (consumer.receive(messageReceiveTimeout) != null) {
-         messagesConsumed.incrementAndGet();
-         if (messagesConsumed.get() % 1000 == 0) {
-  "received Message " + messagesConsumed.get());
-  "Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
-         }
-      }
-      assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), messagesSent.get());
-   }
-   @Override
-   public void setUp() throws Exception {
-      broker = new BrokerService();
-      broker.setDataDirectory("target" + File.separator + "activemq-data");
-      broker.setPersistent(true);
-      broker.setUseJmx(true);
-      broker.setAdvisorySupport(false);
-      broker.setDeleteAllMessagesOnStartup(true);
-      setDefaultPersistenceAdapter(broker);
-      SystemUsage sysUsage = broker.getSystemUsage();
-      MemoryUsage memUsage = new MemoryUsage();
-      memUsage.setLimit((1024 * 1024));
-      StoreUsage storeUsage = new StoreUsage();
-      storeUsage.setLimit((1024 * 1024) * 38);
-      TempUsage tmpUsage = new TempUsage();
-      tmpUsage.setLimit((1024 * 1024) * 38);
-      PolicyEntry defaultPolicy = new PolicyEntry();
-      // defaultPolicy.setTopic("FooTwo");
-      defaultPolicy.setProducerFlowControl(false);
-      defaultPolicy.setMemoryLimit(10 * 1024);
-      PolicyMap policyMap = new PolicyMap();
-      policyMap.setDefaultEntry(defaultPolicy);
-      sysUsage.setMemoryUsage(memUsage);
-      sysUsage.setStoreUsage(storeUsage);
-      sysUsage.setTempUsage(tmpUsage);
-      broker.setDestinationPolicy(policyMap);
-      broker.setSystemUsage(sysUsage);
-      broker.addConnector("tcp://localhost:0").setName("Default");
-      broker.start();
-      connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-   }
-   @Override
-   public void tearDown() throws Exception {
-      if (broker != null) {
-         broker.stop();
-      }
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index d04cc3f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,220 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import static org.junit.Assert.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Test that when configuring small temp store limits the journal size must also
- * be smaller than the configured limit, but will still send a ResourceAllocationException
- * if its not when sendFailIfNoSpace is enabled.
- */
-public class TempStorageConfigBrokerTest {
-   public int deliveryMode = DeliveryMode.PERSISTENT;
-   private static final Logger LOG = LoggerFactory.getLogger(TempStorageConfigBrokerTest.class);
-   private static byte[] buf = new byte[4 * 1024];
-   private BrokerService broker;
-   private AtomicInteger messagesSent = new AtomicInteger(0);
-   private AtomicInteger messagesConsumed = new AtomicInteger(0);
-   private String brokerUri;
-   private long messageReceiveTimeout = 10000L;
-   private Destination destination = new ActiveMQTopic("FooTwo");
-   @Test(timeout = 360000)
-   @Ignore("blocks in hudson, needs investigation")
-   public void testFillTempAndConsumeWithBadTempStoreConfig() throws Exception {
-      createBrokerWithInvalidTempStoreConfig();
-      broker.getSystemUsage().setSendFailIfNoSpace(true);
-      destination = new ActiveMQQueue("Foo");
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
-      final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
-      // so we can easily catch the ResourceAllocationException on send
-      producerConnection.setAlwaysSyncSend(true);
-      producerConnection.start();
-      Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(destination);
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      try {
-         while (true) {
-            Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
-            producer.send(message);
-            messagesSent.incrementAndGet();
-            if (messagesSent.get() % 100 == 0) {
-     "Sent Message " + messagesSent.get());
-     "Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
-            }
-         }
-      }
-      catch (ResourceAllocationException ex) {
-         assertTrue("Should not be able to send 100 messages: ", messagesSent.get() < 100);
-"Got resource exception : " + ex + ", after sent: " + messagesSent.get());
-      }
-   }
-   @Test(timeout = 360000)
-   @Ignore("blocks in hudson, needs investigation")
-   public void testFillTempAndConsumeWithGoodTempStoreConfig() throws Exception {
-      createBrokerWithValidTempStoreConfig();
-      broker.getSystemUsage().setSendFailIfNoSpace(true);
-      destination = new ActiveMQQueue("Foo");
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
-      final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
-      // so we can easily catch the ResourceAllocationException on send
-      producerConnection.setAlwaysSyncSend(true);
-      producerConnection.start();
-      Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(destination);
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      try {
-         while (true) {
-            Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
-            producer.send(message);
-            messagesSent.incrementAndGet();
-            if (messagesSent.get() % 100 == 0) {
-     "Sent Message " + messagesSent.get());
-     "Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
-            }
-         }
-      }
-      catch (ResourceAllocationException ex) {
-         assertTrue("Should be able to send at least 200 messages but was: " + messagesSent.get(), messagesSent.get() > 200);
-"Got resource exception : " + ex + ", after sent: " + messagesSent.get());
-      }
-      // consume all sent
-      Connection consumerConnection = factory.createConnection();
-      consumerConnection.start();
-      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = consumerSession.createConsumer(destination);
-      while (consumer.receive(messageReceiveTimeout) != null) {
-         messagesConsumed.incrementAndGet();
-         if (messagesConsumed.get() % 1000 == 0) {
-  "received Message " + messagesConsumed.get());
-  "Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
-         }
-      }
-      assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), messagesSent.get());
-   }
-   private void createBrokerWithValidTempStoreConfig() throws Exception {
-      broker = new BrokerService();
-      broker.setDataDirectory("target" + File.separator + "activemq-data");
-      broker.setPersistent(true);
-      broker.setUseJmx(true);
-      broker.setAdvisorySupport(false);
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
-      broker.getSystemUsage().setSendFailIfNoSpace(true);
-      broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
-      broker.getSystemUsage().getTempUsage().setLimit(2 * 1048576);
-      ((PListStoreImpl) broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576);
-      broker.getSystemUsage().getStoreUsage().setLimit(20 * 1048576);
-      PolicyEntry defaultPolicy = new PolicyEntry();
-      defaultPolicy.setProducerFlowControl(false);
-      defaultPolicy.setMemoryLimit(10 * 1024);
-      PolicyMap policyMap = new PolicyMap();
-      policyMap.setDefaultEntry(defaultPolicy);
-      broker.setDestinationPolicy(policyMap);
-      broker.addConnector("tcp://localhost:0").setName("Default");
-      broker.start();
-      brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-   }
-   private void createBrokerWithInvalidTempStoreConfig() throws Exception {
-      broker = new BrokerService();
-      broker.setDataDirectory("target" + File.separator + "activemq-data");
-      broker.setPersistent(true);
-      broker.setUseJmx(true);
-      broker.setAdvisorySupport(false);
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
-      broker.getSystemUsage().setSendFailIfNoSpace(true);
-      broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
-      broker.getSystemUsage().getTempUsage().setLimit(2 * 1048576);
-      broker.getSystemUsage().getStoreUsage().setLimit(2 * 1048576);
-      PolicyEntry defaultPolicy = new PolicyEntry();
-      defaultPolicy.setProducerFlowControl(false);
-      defaultPolicy.setMemoryLimit(10 * 1024);
-      PolicyMap policyMap = new PolicyMap();
-      policyMap.setDefaultEntry(defaultPolicy);
-      broker.setDestinationPolicy(policyMap);
-      broker.addConnector("tcp://localhost:0").setName("Default");
-      broker.start();
-      brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-   }
-   @After
-   public void tearDown() throws Exception {
-      if (broker != null) {
-         broker.stop();
-      }
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index 8051a59..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,262 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class TempStoreDataCleanupTest {
-   private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class);
-   private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue";
-   private final String str = new String("QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
-   private BrokerService broker;
-   private String connectionUri;
-   private ExecutorService pool;
-   private String queueName;
-   private Random r = new Random();
-   @Before
-   public void setUp() throws Exception {
-      broker = new BrokerService();
-      broker.setDataDirectory("target" + File.separator + "activemq-data");
-      broker.setPersistent(true);
-      broker.setUseJmx(true);
-      broker.setDedicatedTaskRunner(false);
-      broker.setAdvisorySupport(false);
-      broker.setDeleteAllMessagesOnStartup(true);
-      SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
-      strategy.setProcessExpired(false);
-      strategy.setProcessNonPersistent(false);
-      PolicyEntry defaultPolicy = new PolicyEntry();
-      defaultPolicy.setQueue(">");
-      defaultPolicy.setOptimizedDispatch(true);
-      defaultPolicy.setDeadLetterStrategy(strategy);
-      defaultPolicy.setMemoryLimit(9000000);
-      PolicyMap policyMap = new PolicyMap();
-      policyMap.setDefaultEntry(defaultPolicy);
-      broker.setDestinationPolicy(policyMap);
-      broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
-      broker.addConnector("tcp://localhost:0").setName("Default");
-      broker.start();
-      broker.waitUntilStarted();
-      connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-      pool = Executors.newFixedThreadPool(10);
-   }
-   @After
-   public void tearDown() throws Exception {
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-      }
-      if (pool != null) {
-         pool.shutdown();
-      }
-   }
-   @Test
-   public void testIt() throws Exception {
-      int startPercentage = broker.getAdminView().getMemoryPercentUsage();
-"MemoryUsage at test start = " + startPercentage);
-      for (int i = 0; i < 2; i++) {
-"Started the test iteration: " + i + " using queueName = " + queueName);
-         queueName = QUEUE_NAME + i;
-         final CountDownLatch latch = new CountDownLatch(11);
-         pool.execute(new Runnable() {
-            @Override
-            public void run() {
-               receiveAndDiscard100messages(latch);
-            }
-         });
-         for (int j = 0; j < 10; j++) {
-            pool.execute(new Runnable() {
-               @Override
-               public void run() {
-                  send10000messages(latch);
-               }
-            });
-         }
-"Waiting on the send / receive latch");
-         latch.await(5, TimeUnit.MINUTES);
-         destroyQueue();
-         TimeUnit.SECONDS.sleep(2);
-      }
-"MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
-      final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
-      assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
-                    @Override
-                    public boolean isSatisified() throws Exception {
-                       return pa.getJournal().getFileMap().size() == 1;
-                    }
-                 }, TimeUnit.MINUTES.toMillis(3)));
-      int endPercentage = broker.getAdminView().getMemoryPercentUsage();
-"MemoryUsage at test end = " + endPercentage);
-      assertEquals(startPercentage, endPercentage);
-   }
-   public void destroyQueue() {
-      try {
-         Broker broker =;
-         if (!broker.isStopped()) {
-  "Removing: " + queueName);
-            broker.removeDestination(, new ActiveMQQueue(queueName), 10);
-         }
-      }
-      catch (Exception e) {
-         LOG.warn("Got an error while removing the test queue", e);
-      }
-   }
-   private void send10000messages(CountDownLatch latch) {
-      ActiveMQConnection activeMQConnection = null;
-      try {
-         activeMQConnection = createConnection(null);
-         Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(session.createQueue(queueName));
-         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         activeMQConnection.start();
-         for (int i = 0; i < 10000; i++) {
-            TextMessage textMessage = session.createTextMessage();
-            textMessage.setText(generateBody(1000));
-            textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
-            producer.send(textMessage);
-            try {
-               Thread.sleep(10);
-            }
-            catch (InterruptedException e) {
-            }
-         }
-         producer.close();
-      }
-      catch (JMSException e) {
-         LOG.warn("Got an error while sending the messages", e);
-      }
-      finally {
-         if (activeMQConnection != null) {
-            try {
-               activeMQConnection.close();
-            }
-            catch (JMSException e) {
-            }
-         }
-      }
-      latch.countDown();
-   }
-   private void receiveAndDiscard100messages(CountDownLatch latch) {
-      ActiveMQConnection activeMQConnection = null;
-      try {
-         activeMQConnection = createConnection(null);
-         Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName));
-         activeMQConnection.start();
-         for (int i = 0; i < 100; i++) {
-            messageConsumer.receive();
-         }
-         messageConsumer.close();
-"Created and disconnected");
-      }
-      catch (JMSException e) {
-         LOG.warn("Got an error while receiving the messages", e);
-      }
-      finally {
-         if (activeMQConnection != null) {
-            try {
-               activeMQConnection.close();
-            }
-            catch (JMSException e) {
-            }
-         }
-      }
-      latch.countDown();
-   }
-   private ActiveMQConnection createConnection(String id) throws JMSException {
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
-      if (id != null) {
-         factory.setClientID(id);
-      }
-      ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-      return connection;
-   }
-   private String generateBody(int length) {
-      StringBuilder sb = new StringBuilder();
-      int te = 0;
-      for (int i = 1; i <= length; i++) {
-         te = r.nextInt(62);
-         sb.append(str.charAt(te));
-      }
-      return sb.toString();
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index db3888a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,196 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertTrue;
-public class TransactedStoreUsageSuspendResumeTest {
-   private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class);
-   private static final int MAX_MESSAGES = 10000;
-   private static final String QUEUE_NAME = "test.queue";
-   private BrokerService broker;
-   private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES);
-   private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES);
-   private final CountDownLatch consumerStartLatch = new CountDownLatch(1);
-   private class ConsumerThread extends Thread {
-      @Override
-      public void run() {
-         try {
-            consumerStartLatch.await(30, TimeUnit.SECONDS);
-            ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-            Connection connection = factory.createConnection();
-            connection.start();
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            // wait for producer to stop
-            long currentSendCount;
-            do {
-               currentSendCount = messagesSentCountDown.getCount();
-               TimeUnit.SECONDS.sleep(5);
-            } while (currentSendCount != messagesSentCountDown.getCount());
-  "Starting consumer at: " + currentSendCount);
-            MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-            do {
-               Message message = consumer.receive(5000);
-               if (message != null) {
-                  session.commit();
-                  messagesReceivedCountDown.countDown();
-               }
-               if (messagesReceivedCountDown.getCount() % 500 == 0) {
-        "remaining to receive: " + messagesReceivedCountDown.getCount());
-               }
-            } while (messagesReceivedCountDown.getCount() != 0);
-            consumer.close();
-            session.close();
-            connection.close();
-         }
-         catch (Exception e) {
-  ;
-         }
-      }
-   }
-   @Before
-   public void setup() throws Exception {
-      broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setPersistent(true);
-      KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
-      kahaDB.setJournalMaxFileLength(500 * 1024);
-      kahaDB.setCleanupInterval(10 * 1000);
-      broker.setPersistenceAdapter(kahaDB);
-      broker.getSystemUsage().getStoreUsage().setLimit(7 * 1024 * 1024);
-      broker.start();
-      broker.waitUntilStarted();
-   }
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-   @Test
-   public void testTransactedStoreUsageSuspendResume() throws Exception {
-      ConsumerThread thread = new ConsumerThread();
-      thread.start();
-      ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
-      sendExecutor.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               sendMessages();
-            }
-            catch (Exception ignored) {
-            }
-         }
-      });
-      sendExecutor.shutdown();
-      sendExecutor.awaitTermination(5, TimeUnit.MINUTES);
-      boolean allMessagesReceived = messagesReceivedCountDown.await(10, TimeUnit.MINUTES);
-      if (!allMessagesReceived) {
-         AutoFailTestSupport.dumpAllThreads("StuckConsumer!");
-      }
-      assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived);
-      // give consumers a chance to exit gracefully
-      TimeUnit.SECONDS.sleep(2);
-   }
-   private void sendMessages() throws Exception {
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-      factory.setAlwaysSyncSend(true);
-      Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      Destination queue = session.createQueue(QUEUE_NAME);
-      Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain");
-      MessageProducer producer = session.createProducer(null);
-      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      BytesMessage message = session.createBytesMessage();
-      message.writeBytes(new byte[10]);
-      for (int i = 0; i < 4240; i++) {
-         // mostly fill the store with retained messages
-         // so consumer only has a small bit of store usage to work with
-         producer.send(retainQueue, message);
-         session.commit();
-      }
-      consumerStartLatch.countDown();
-      for (int i = 0; i < MAX_MESSAGES; i++) {
-         producer.send(queue, message);
-         if (i > 0 && i % 20 == 0) {
-            session.commit();
-         }
-         messagesSentCountDown.countDown();
-         if (i > 0 && i % 500 == 0) {
-  "Sent : " + i);
-         }
-      }
-      session.commit();
-      producer.close();
-      session.close();
-      connection.close();
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index 2038279..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,298 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * simulate message flow which cause the following exception in the broker
- * (exception logged by client) <br> 2007-07-24 13:51:23,624
- * com.easynet.halo.Halo ERROR ( 23) JMS failure
- * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344'
- * has not been started. at
- *
- * This appears to be consistent in a MacBook. Haven't been able to replicate it
- * on Windows though
- */
-public class TransactionNotStartedErrorTest extends TestCase {
-   private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class);
-   private static final int counter = 500;
-   private static int hectorToHaloCtr;
-   private static int xenaToHaloCtr;
-   private static int troyToHaloCtr;
-   private static int haloToHectorCtr;
-   private static int haloToXenaCtr;
-   private static int haloToTroyCtr;
-   private final String hectorToHalo = "hectorToHalo";
-   private final String xenaToHalo = "xenaToHalo";
-   private final String troyToHalo = "troyToHalo";
-   private final String haloToHector = "haloToHector";
-   private final String haloToXena = "haloToXena";
-   private final String haloToTroy = "haloToTroy";
-   private BrokerService broker;
-   private Connection hectorConnection;
-   private Connection xenaConnection;
-   private Connection troyConnection;
-   private Connection haloConnection;
-   private final Object lock = new Object();
-   public Connection createConnection() throws Exception {
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
-      return factory.createConnection();
-   }
-   public Session createSession(Connection connection, boolean transacted) throws JMSException {
-      return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
-   }
-   public void startBroker() throws Exception {
-      broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.setPersistent(true);
-      broker.setUseJmx(true);
-      broker.addConnector("tcp://localhost:0").setName("Default");
-      broker.start();
-"Starting broker..");
-   }
-   @Override
-   public void tearDown() throws Exception {
-      hectorConnection.close();
-      xenaConnection.close();
-      troyConnection.close();
-      haloConnection.close();
-      broker.stop();
-   }
-   public void testTransactionNotStartedError() throws Exception {
-      startBroker();
-      hectorConnection = createConnection();
-      Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
-      Receiver hHectorReceiver = new Receiver() {
-         @Override
-         public void receive(String s) throws Exception {
-            haloToHectorCtr++;
-            if (haloToHectorCtr >= counter) {
-               synchronized (lock) {
-                  lock.notifyAll();
-               }
-            }
-         }
-      };
-      buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
-      troyConnection = createConnection();
-      Thread troyThread = buildProducer(troyConnection, troyToHalo);
-      Receiver hTroyReceiver = new Receiver() {
-         @Override
-         public void receive(String s) throws Exception {
-            haloToTroyCtr++;
-            if (haloToTroyCtr >= counter) {
-               synchronized (lock) {
-                  lock.notifyAll();
-               }
-            }
-         }
-      };
-      buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
-      xenaConnection = createConnection();
-      Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
-      Receiver hXenaReceiver = new Receiver() {
-         @Override
-         public void receive(String s) throws Exception {
-            haloToXenaCtr++;
-            if (haloToXenaCtr >= counter) {
-               synchronized (lock) {
-                  lock.notifyAll();
-               }
-            }
-         }
-      };
-      buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
-      haloConnection = createConnection();
-      final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection);
-      final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection);
-      final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection);
-      Receiver hectorReceiver = new Receiver() {
-         @Override
-         public void receive(String s) throws Exception {
-            hectorToHaloCtr++;
-            troySender.send("halo to troy because of hector");
-            if (hectorToHaloCtr >= counter) {
-               synchronized (lock) {
-                  lock.notifyAll();
-               }
-            }
-         }
-      };
-      Receiver xenaReceiver = new Receiver() {
-         @Override
-         public void receive(String s) throws Exception {
-            xenaToHaloCtr++;
-            hectorSender.send("halo to hector because of xena");
-            if (xenaToHaloCtr >= counter) {
-               synchronized (lock) {
-                  lock.notifyAll();
-               }
-            }
-         }
-      };
-      Receiver troyReceiver = new Receiver() {
-         @Override
-         public void receive(String s) throws Exception {
-            troyToHaloCtr++;
-            xenaSender.send("halo to xena because of troy");
-            if (troyToHaloCtr >= counter) {
-               synchronized (lock) {
-                  lock.notifyAll();
-               }
-            }
-         }
-      };
-      buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
-      buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
-      buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
-      haloConnection.start();
-      troyConnection.start();
-      troyThread.start();
-      xenaConnection.start();
-      xenaThread.start();
-      hectorConnection.start();
-      hectorThread.start();
-      waitForMessagesToBeDelivered();
-      // number of messages received should match messages sent
-      assertEquals(hectorToHaloCtr, counter);
-"hectorToHalo received " + hectorToHaloCtr + " messages");
-      assertEquals(xenaToHaloCtr, counter);
-"xenaToHalo received " + xenaToHaloCtr + " messages");
-      assertEquals(troyToHaloCtr, counter);
-"troyToHalo received " + troyToHaloCtr + " messages");
-      assertEquals(haloToHectorCtr, counter);
-"haloToHector received " + haloToHectorCtr + " messages");
-      assertEquals(haloToXenaCtr, counter);
-"haloToXena received " + haloToXenaCtr + " messages");
-      assertEquals(haloToTroyCtr, counter);
-"haloToTroy received " + haloToTroyCtr + " messages");
-   }
-   protected void waitForMessagesToBeDelivered() {
-      // let's give the listeners enough time to read all messages
-      long maxWaitTime = counter * 3000;
-      long waitTime = maxWaitTime;
-      long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
-      synchronized (lock) {
-         boolean hasMessages = true;
-         while (hasMessages && waitTime >= 0) {
-            try {
-               lock.wait(200);
-            }
-            catch (InterruptedException e) {
-               LOG.error(e.toString());
-            }
-            // check if all messages have been received
-            hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter;
-            waitTime = maxWaitTime - (System.currentTimeMillis() - start);
-         }
-      }
-   }
-   public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception {
-      return new MessageSender(queueName, connection, true, false);
-   }
-   public Thread buildProducer(Connection connection, final String queueName) throws Exception {
-      final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final MessageSender producer = new MessageSender(queueName, connection, false, false);
-      Thread thread = new Thread() {
-         @Override
-         public synchronized void run() {
-            for (int i = 0; i < counter; i++) {
-               try {
-                  producer.send(queueName);
-                  if (session.getTransacted()) {
-                     session.commit();
-                  }
-               }
-               catch (Exception e) {
-                  throw new RuntimeException("on " + queueName + " send", e);
-               }
-            }
-         }
-      };
-      return thread;
-   }
-   public void buildReceiver(Connection connection,
-                             final String queueName,
-                             boolean transacted,
-                             final Receiver receiver) throws Exception {
-      final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName));
-      MessageListener messageListener = new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            try {
-               ObjectMessage objectMessage = (ObjectMessage) message;
-               String s = (String) objectMessage.getObject();
-               receiver.receive(s);
-               if (session.getTransacted()) {
-                  session.commit();
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-      };
-      inputMessageConsumer.setMessageListener(messageListener);
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index 67b284f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,277 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Test to demostrate a message trapped in the JDBC store and not
- * delivered to consumer
- *
- * The test throws issues the commit to the DB but throws
- * an exception back to the broker. This scenario could happen when a network
- * cable is disconnected - message is committed to DB but broker does not know.
- */
-public class TrapMessageInJDBCStoreTest extends TestCase {
-   private static final String MY_TEST_Q = "MY_TEST_Q";
-   private static final Logger LOG = LoggerFactory.getLogger(TrapMessageInJDBCStoreTest.class);
-   private String transportUrl = "tcp://";
-   private BrokerService broker;
-   private TestTransactionContext testTransactionContext;
-   private TestJDBCPersistenceAdapter jdbc;
-   protected BrokerService createBroker(boolean withJMX) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setUseJmx(withJMX);
-      EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
-      embeddedDataSource.setCreateDatabase("create");
-      //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
-      // method that can be configured to throw a SQL exception on demand
-      jdbc = new TestJDBCPersistenceAdapter();
-      jdbc.setDataSource(embeddedDataSource);
-      jdbc.setCleanupPeriod(0);
-      testTransactionContext = new TestTransactionContext(jdbc);
-      jdbc.setLockKeepAlivePeriod(1000L);
-      LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
-      leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
-      jdbc.setLocker(leaseDatabaseLocker);
-      broker.setPersistenceAdapter(jdbc);
-      broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler());
-      transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
-      return broker;
-   }
-   /**
-    * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $
-    * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception
-    * is thrown
-    *
-    * Excepted correct outcome: receive 3 messages and the DB should contain no messages
-    *
-    * @throws Exception
-    */
-   public void testDBCommitException() throws Exception {
-      broker = this.createBroker(false);
-      broker.deleteAllMessages();
-      broker.start();
-      broker.waitUntilStarted();
-"***Broker started...");
-      // failover but timeout in 5 seconds so the test does not hang
-      String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=5000";
-      sendMessage(MY_TEST_Q, failoverTransportURL);
-      //check db contents
-      ArrayList<Long> dbSeq = dbMessageCount();
-"*** after send: db contains message seq " + dbSeq);
-      List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q, failoverTransportURL);
-      assertEquals("number of consumed messages", 3, consumedMessages.size());
-      //check db contents
-      dbSeq = dbMessageCount();
-"*** after consume - db contains message seq " + dbSeq);
-      assertEquals("number of messages in DB after test", 0, dbSeq.size());
-      broker.stop();
-      broker.waitUntilStopped();
-   }
-   public List<TextMessage> consumeMessages(String queue, String transportURL) throws JMSException {
-      Connection connection = null;
-      LOG.debug("*** consumeMessages() called ...");
-      try {
-         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
-         connection = factory.createConnection();
-         connection.start();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Destination destination = session.createQueue(queue);
-         ArrayList<TextMessage> consumedMessages = new ArrayList<>();
-         MessageConsumer messageConsumer = session.createConsumer(destination);
-         while (true) {
-            TextMessage textMessage = (TextMessage) messageConsumer.receive(4000);
-            LOG.debug("*** consumed Messages :" + textMessage);
-            if (textMessage == null) {
-               return consumedMessages;
-            }
-            consumedMessages.add(textMessage);
-         }
-      }
-      finally {
-         if (connection != null) {
-            connection.close();
-         }
-      }
-   }
-   public void sendMessage(String queue, String transportURL) throws Exception {
-      Connection connection = null;
-      try {
-         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
-         connection = factory.createConnection();
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Destination destination = session.createQueue(queue);
-         MessageProducer producer = session.createProducer(destination);
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-         TextMessage m = session.createTextMessage("1");
-         LOG.debug("*** send message 1 to broker...");
-         producer.send(m);
-         // trigger SQL exception in transactionContext
-         LOG.debug("***  send message 2 to broker");
-         m.setText("2");
-         producer.send(m);
-         //check db contents
-         ArrayList<Long> dbSeq = dbMessageCount();
-"*** after send 2 - db contains message seq " + dbSeq);
-         assertEquals("number of messages in DB after send 2", 2, dbSeq.size());
-         LOG.debug("***  send  message 3 to broker");
-         m.setText("3");
-         producer.send(m);
-         LOG.debug("*** Finished sending messages to broker");
-      }
-      finally {
-         if (connection != null) {
-            connection.close();
-         }
-      }
-   }
-   /**
-    * query the DB to see what messages are left in the store
-    *
-    * @return
-    * @throws SQLException
-    * @throws IOException
-    */
-   private ArrayList<Long> dbMessageCount() throws SQLException, IOException {
-      java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
-      PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");
-      try {
-         ResultSet result = statement.executeQuery();
-         ArrayList<Long> dbSeq = new ArrayList<>();
-         while ( {
-            dbSeq.add(result.getLong(1));
-         }
-         return dbSeq;
-      }
-      finally {
-         statement.close();
-         conn.close();
-      }
-   }
-	/*
-     * Mock classes used for testing
-	 */
-   public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
-      @Override
-      public TransactionContext getTransactionContext() throws IOException {
-         return testTransactionContext;
-      }
-   }
-   public class TestTransactionContext extends TransactionContext {
-      private int count;
-      public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException {
-         super(jdbcPersistenceAdapter);
-      }
-      @Override
-      public void executeBatch() throws SQLException {
-         super.executeBatch();
-         count++;
-         LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch"));
-         // throw on second add message
-         if (count == 16) {
-            throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count);
-         }
-      }
-   }
\ No newline at end of file
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
deleted file mode 100644
index 84c1765..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/
+++ /dev/null
@@ -1,135 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-public class VMTransportClosureTest extends EmbeddedBrokerTestSupport {
-   private static final Log LOG = LogFactory.getLog(VMTransportClosureTest.class);
-   private static final long MAX_TEST_TIME_MILLIS = 300000; // 5min
-   private static final int NUM_ATTEMPTS = 100000;
-   @Override
-   public void setUp() throws Exception {
-      setAutoFail(true);
-      setMaxTestTime(MAX_TEST_TIME_MILLIS);
-      super.setUp();
-   }
-   /**
-    * EmbeddedBrokerTestSupport.createBroker() binds the broker to a VM
-    * transport address, which results in a call to
-    * VMTransportFactory.doBind(location):
-    * <p>
-    * <code>
-    * public TransportServer doBind(URI location) throws IOException {
-    * return bind(location, false);
-    * }
-    * </code>
-    * </p>
-    * As a result, VMTransportServer.disposeOnDisconnect is <code>false</code>.
-    * To expose the bug, we need to have VMTransportServer.disposeOnDisconnect
-    * <code>true</code>, which is the case when the VMTransportServer is not
-    * already bound when the first connection is made.
-    */
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setPersistent(isPersistent());
-      // answer.addConnector(bindAddress);
-      return answer;
-   }
-   /**
-    * This test demonstrates how the "disposeOnDisonnect" feature of
-    * VMTransportServer can incorrectly close all VM connections to the local
-    * broker.
-    */
-   public void testPrematureClosure() throws Exception {
-      // Open a persistent connection to the local broker. The persistent
-      // connection is maintained through the test and should prevent the
-      // VMTransportServer from stopping itself when the local transport is
-      // closed.
-      ActiveMQConnection persistentConn = (ActiveMQConnection) createConnection();
-      persistentConn.start();
-      Session session = persistentConn.createSession(true, Session.SESSION_TRANSACTED);
-      MessageProducer producer = session.createProducer(destination);
-      for (int i = 0; i < NUM_ATTEMPTS; i++) {
-"Attempt: " + i);
-         // Open and close a local transport connection. As is done by by
-         // most users of the transport, ensure that the transport is stopped
-         // when closed by the peer (via ShutdownInfo). Closing the local
-         // transport should not affect the persistent connection.
-         final Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
-         localTransport.setTransportListener(new TransportListener() {
-            @Override
-            public void onCommand(Object command) {
-               if (command instanceof ShutdownInfo) {
-                  try {
-                     localTransport.stop();
-                  }
-                  catch (Exception ex) {
-                     throw new RuntimeException(ex);
-                  }
-               }
-            }
-            @Override
-            public void onException(IOException error) {
-               // ignore
-            }
-            @Override
-            public void transportInterupted() {
-               // ignore
-            }
-            @Override
-            public void transportResumed() {
-               // ignore
-            }
-         });
-         localTransport.start();
-         localTransport.stop();
-         // Ensure that the persistent connection is still usable.
-         producer.send(session.createMessage());
-         session.rollback();
-      }
-      persistentConn.close();
-   }