You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/03/04 23:42:53 UTC

[09/58] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/
deleted file mode 100644
index a70ef67..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/
+++ /dev/null
@@ -1,584 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-abstract public class MessagePriorityTest extends CombinationTestSupport {
-   private static final Logger LOG = LoggerFactory.getLogger(MessagePriorityTest.class);
-   BrokerService broker;
-   PersistenceAdapter adapter;
-   protected ActiveMQConnectionFactory factory;
-   protected Connection conn;
-   protected Session sess;
-   public boolean useCache = true;
-   public int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
-   public boolean dispatchAsync = true;
-   public boolean prioritizeMessages = true;
-   public boolean immediatePriorityDispatch = true;
-   public int prefetchVal = 500;
-   public int expireMessagePeriod = 30000;
-   public int MSG_NUM = 600;
-   public int HIGH_PRI = 7;
-   public int LOW_PRI = 3;
-   abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
-   @Override
-   protected void setUp() throws Exception {
-      broker = new BrokerService();
-      broker.setBrokerName("priorityTest");
-      broker.setAdvisorySupport(false);
-      adapter = createPersistenceAdapter(true);
-      broker.setPersistenceAdapter(adapter);
-      PolicyEntry policy = new PolicyEntry();
-      policy.setPrioritizedMessages(prioritizeMessages);
-      policy.setUseCache(useCache);
-      policy.setExpireMessagesPeriod(expireMessagePeriod);
-      StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy();
-      durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
-      durableSubPending.setUseCache(useCache);
-      policy.setPendingDurableSubscriberPolicy(durableSubPending);
-      PolicyMap policyMap = new PolicyMap();
-      policyMap.put(new ActiveMQQueue("TEST"), policy);
-      policyMap.put(new ActiveMQTopic("TEST"), policy);
-      // do not process expired for one test
-      PolicyEntry ignoreExpired = new PolicyEntry();
-      SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy();
-      ignoreExpiredStrategy.setProcessExpired(false);
-      ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
-      policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), ignoreExpired);
-      broker.setDestinationPolicy(policyMap);
-      broker.start();
-      broker.waitUntilStarted();
-      factory = new ActiveMQConnectionFactory("vm://priorityTest");
-      ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
-      prefetch.setAll(prefetchVal);
-      factory.setPrefetchPolicy(prefetch);
-      factory.setWatchTopicAdvisories(false);
-      factory.setDispatchAsync(dispatchAsync);
-      conn = factory.createConnection();
-      conn.setClientID("priority");
-      conn.start();
-      sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-   }
-   @Override
-   protected void tearDown() throws Exception {
-      try {
-         sess.close();
-         conn.close();
-      }
-      catch (Exception ignored) {
-      }
-      finally {
-         broker.stop();
-         broker.waitUntilStopped();
-      }
-   }
-   public void testStoreConfigured() throws Exception {
-      final Queue queue = sess.createQueue("TEST");
-      final Topic topic = sess.createTopic("TEST");
-      MessageProducer queueProducer = sess.createProducer(queue);
-      MessageProducer topicProducer = sess.createProducer(topic);
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return broker.getRegionBroker().getDestinationMap().get(queue) != null;
-         }
-      });
-      assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return broker.getRegionBroker().getDestinationMap().get(topic) != null;
-         }
-      });
-      assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
-      queueProducer.close();
-      topicProducer.close();
-   }
-   protected class ProducerThread extends Thread {
-      int priority;
-      int messageCount;
-      ActiveMQDestination dest;
-      public ProducerThread(ActiveMQDestination dest, int messageCount, int priority) {
-         this.messageCount = messageCount;
-         this.priority = priority;
-         this.dest = dest;
-      }
-      @Override
-      public void run() {
-         try {
-            MessageProducer producer = sess.createProducer(dest);
-            producer.setPriority(priority);
-            producer.setDeliveryMode(deliveryMode);
-            for (int i = 0; i < messageCount; i++) {
-               producer.send(sess.createTextMessage("message priority: " + priority));
-            }
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-         }
-      }
-      public void setMessagePriority(int priority) {
-         this.priority = priority;
-      }
-      public void setMessageCount(int messageCount) {
-         this.messageCount = messageCount;
-      }
-   }
-   public void initCombosForTestQueues() {
-      addCombinationValues("useCache", new Object[]{new Boolean(true), new Boolean(false)});
-      addCombinationValues("deliveryMode", new Object[]{new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)});
-   }
-   public void testQueues() throws Exception {
-      ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
-      ProducerThread lowPri = new ProducerThread(queue, MSG_NUM, LOW_PRI);
-      ProducerThread highPri = new ProducerThread(queue, MSG_NUM, HIGH_PRI);
-      lowPri.start();
-      highPri.start();
-      lowPri.join();
-      highPri.join();
-      MessageConsumer queueConsumer = sess.createConsumer(queue);
-      for (int i = 0; i < MSG_NUM * 2; i++) {
-         Message msg = queueConsumer.receive(5000);
-         LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
-         assertNotNull("Message " + i + " was null", msg);
-         assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
-      }
-   }
-   protected Message createMessage(int priority) throws Exception {
-      final String text = "priority " + priority;
-      Message msg = sess.createTextMessage(text);
-"Sending  " + text);
-      return msg;
-   }
-   public void initCombosForTestDurableSubs() {
-      addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(MSG_NUM / 4)});
-   }
-   public void testDurableSubs() throws Exception {
-      ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
-      TopicSubscriber sub = sess.createDurableSubscriber(topic, "priority");
-      sub.close();
-      ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
-      ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
-      lowPri.start();
-      highPri.start();
-      lowPri.join();
-      highPri.join();
-      sub = sess.createDurableSubscriber(topic, "priority");
-      for (int i = 0; i < MSG_NUM * 2; i++) {
-         Message msg = sub.receive(5000);
-         assertNotNull("Message " + i + " was null", msg);
-         assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
-      }
-      // verify that same broker/store can deal with non priority dest also
-      topic = (ActiveMQTopic) sess.createTopic("HAS_NO_PRIORITY");
-      sub = sess.createDurableSubscriber(topic, "no_priority");
-      sub.close();
-      lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
-      highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
-      lowPri.start();
-      highPri.start();
-      lowPri.join();
-      highPri.join();
-      sub = sess.createDurableSubscriber(topic, "no_priority");
-      // verify we got them all
-      for (int i = 0; i < MSG_NUM * 2; i++) {
-         Message msg = sub.receive(5000);
-         assertNotNull("Message " + i + " was null", msg);
-      }
-   }
-   public void initCombosForTestDurableSubsReconnect() {
-      addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(MSG_NUM / 2)});
-      // REVISIT = is dispatchAsync = true a problem or is it just the test?
-      addCombinationValues("dispatchAsync", new Object[]{Boolean.FALSE});
-      addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
-   }
-   public void testDurableSubsReconnect() throws Exception {
-      ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
-      final String subName = "priorityDisconnect";
-      TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
-      sub.close();
-      ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
-      ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
-      lowPri.start();
-      highPri.start();
-      lowPri.join();
-      highPri.join();
-      final int closeFrequency = MSG_NUM / 4;
-      sub = sess.createDurableSubscriber(topic, subName);
-      for (int i = 0; i < MSG_NUM * 2; i++) {
-         Message msg = sub.receive(15000);
-         LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
-         assertNotNull("Message " + i + " was null", msg);
-         assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
-         if (i > 0 && i % closeFrequency == 0) {
-  "Closing durable sub.. on: " + i);
-            sub.close();
-            sub = sess.createDurableSubscriber(topic, subName);
-         }
-      }
-   }
-   public void testHighPriorityDelivery() throws Exception {
-      // get zero prefetch
-      ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
-      prefetch.setAll(0);
-      factory.setPrefetchPolicy(prefetch);
-      conn.close();
-      conn = factory.createConnection();
-      conn.setClientID("priority");
-      conn.start();
-      sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
-      final String subName = "priorityDisconnect";
-      TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
-      sub.close();
-      final int numToProduce = 2000;
-      final int[] dups = new int[numToProduce * 2];
-      ProducerThread producerThread = new ProducerThread(topic, numToProduce, LOW_PRI + 1);
-"Low priority messages sent");
-      sub = sess.createDurableSubscriber(topic, subName);
-      final int batchSize = 250;
-      int lowLowCount = 0;
-      for (int i = 0; i < numToProduce; i++) {
-         Message msg = sub.receive(15000);
-"received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority() : null));
-         assertNotNull("Message " + i + " was null", msg);
-         assertEquals("Message " + i + " has wrong priority", LOW_PRI + 1, msg.getJMSPriority());
-         assertTrue("not duplicate ", dups[i] == 0);
-         dups[i] = 1;
-         if (i % batchSize == 0) {
-            producerThread.setMessagePriority(HIGH_PRI);
-            producerThread.setMessageCount(1);
-  ;
-  "High priority message sent, should be able to receive immediately");
-            if (i % batchSize * 2 == 0) {
-               producerThread.setMessagePriority(HIGH_PRI - 1);
-               producerThread.setMessageCount(1);
-     ;
-     "High -1 priority message sent, should be able to receive immediately");
-            }
-            if (i % batchSize * 4 == 0) {
-               producerThread.setMessagePriority(LOW_PRI);
-               producerThread.setMessageCount(1);
-     ;
-               lowLowCount++;
-     "Low low priority message sent, should not be able to receive immediately");
-            }
-            msg = sub.receive(15000);
-            assertNotNull("Message was null", msg);
-  "received hi? : " + msg);
-            assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
-            if (i % batchSize * 2 == 0) {
-               msg = sub.receive(15000);
-               assertNotNull("Message was null", msg);
-     "received hi -1 ? i=" + i + ", " + msg);
-               assertEquals("high priority", HIGH_PRI - 1, msg.getJMSPriority());
-            }
-         }
-      }
-      for (int i = 0; i < lowLowCount; i++) {
-         Message msg = sub.receive(15000);
-         LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
-         assertNotNull("Message " + i + " was null", msg);
-         assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
-      }
-   }
-   public void initCombosForTestHighPriorityDeliveryInterleaved() {
-      addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
-   }
-   public void testHighPriorityDeliveryInterleaved() throws Exception {
-      // get zero prefetch
-      ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
-      prefetch.setAll(0);
-      factory.setPrefetchPolicy(prefetch);
-      conn.close();
-      conn = factory.createConnection();
-      conn.setClientID("priority");
-      conn.start();
-      sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
-      final String subName = "priorityDisconnect";
-      TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
-      sub.close();
-      ProducerThread producerThread = new ProducerThread(topic, 1, HIGH_PRI);
-      producerThread.setMessagePriority(HIGH_PRI - 1);
-      producerThread.setMessageCount(1);
-      producerThread.setMessagePriority(LOW_PRI);
-      producerThread.setMessageCount(1);
-"Ordered priority messages sent");
-      sub = sess.createDurableSubscriber(topic, subName);
-      Message msg = sub.receive(15000);
-      assertNotNull("Message was null", msg);
-"received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
-      assertEquals("Message has wrong priority", HIGH_PRI, msg.getJMSPriority());
-      producerThread.setMessagePriority(LOW_PRI + 1);
-      producerThread.setMessageCount(1);
-      msg = sub.receive(15000);
-      assertNotNull("Message was null", msg);
-"received " + msg.getJMSMessageID() + ", priority:" + msg.getJMSPriority());
-      assertEquals("high priority", HIGH_PRI - 1, msg.getJMSPriority());
-      msg = sub.receive(15000);
-      assertNotNull("Message was null", msg);
-"received hi? : " + msg);
-      assertEquals("high priority", LOW_PRI + 1, msg.getJMSPriority());
-      msg = sub.receive(15000);
-      assertNotNull("Message was null", msg);
-"received hi? : " + msg);
-      assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-      msg = sub.receive(4000);
-      assertNull("Message was null", msg);
-   }
-   // immediatePriorityDispatch is only relevant when cache is exhausted
-   public void initCombosForTestHighPriorityDeliveryThroughBackLog() {
-      addCombinationValues("useCache", new Object[]{Boolean.FALSE});
-      addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.TRUE});
-   }
-   public void testHighPriorityDeliveryThroughBackLog() throws Exception {
-      // get zero prefetch
-      ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
-      prefetch.setAll(0);
-      factory.setPrefetchPolicy(prefetch);
-      conn.close();
-      conn = factory.createConnection();
-      conn.setClientID("priority");
-      conn.start();
-      sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
-      final String subName = "priorityDisconnect";
-      TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
-      sub.close();
-      ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
-      sub = sess.createDurableSubscriber(topic, subName);
-      int count = 0;
-      for (; count < 300; count++) {
-         Message msg = sub.receive(15000);
-         assertNotNull("Message was null", msg);
-         assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-      }
-      producerThread.setMessagePriority(HIGH_PRI);
-      producerThread.setMessageCount(1);
-      Message msg = sub.receive(15000);
-      assertNotNull("Message was null", msg);
-      assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
-      for (; count < 600; count++) {
-         msg = sub.receive(15000);
-         assertNotNull("Message was null", msg);
-         assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-      }
-   }
-   public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() {
-      addCombinationValues("useCache", new Object[]{Boolean.FALSE});
-      addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.FALSE});
-   }
-   public void testHighPriorityNonDeliveryThroughBackLog() throws Exception {
-      // get zero prefetch
-      ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
-      prefetch.setAll(0);
-      factory.setPrefetchPolicy(prefetch);
-      conn.close();
-      conn = factory.createConnection();
-      conn.setClientID("priority");
-      conn.start();
-      sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
-      final String subName = "priorityDisconnect";
-      TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
-      sub.close();
-      ProducerThread producerThread = new ProducerThread(topic, 600, LOW_PRI);
-      sub = sess.createDurableSubscriber(topic, subName);
-      int count = 0;
-      for (; count < 300; count++) {
-         Message msg = sub.receive(15000);
-         assertNotNull("Message was null", msg);
-         assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-      }
-      producerThread.setMessagePriority(HIGH_PRI);
-      producerThread.setMessageCount(1);
-      for (; count < 400; count++) {
-         Message msg = sub.receive(15000);
-         assertNotNull("Message was null", msg);
-         assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-      }
-      Message msg = sub.receive(15000);
-      assertNotNull("Message was null", msg);
-      assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
-      for (; count < 600; count++) {
-         msg = sub.receive(15000);
-         assertNotNull("Message was null", msg);
-         assertEquals("high priority", LOW_PRI, msg.getJMSPriority());
-      }
-   }
-   public void initCombosForTestQueueBacklog() {
-      // the cache limits the priority ordering to available memory
-      addCombinationValues("useCache", new Object[]{new Boolean(false)});
-      // expiry processing can fill the cursor with a snapshot of the producer
-      // priority, before producers are complete
-      addCombinationValues("expireMessagePeriod", new Object[]{new Integer(0)});
-   }
-   public void testQueueBacklog() throws Exception {
-      final int backlog = 180000;
-      ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
-      ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI);
-      ProducerThread highPri = new ProducerThread(queue, 10, HIGH_PRI);
-      lowPri.start();
-      lowPri.join();
-      highPri.start();
-      highPri.join();
-"Starting consumer...");
-      MessageConsumer queueConsumer = sess.createConsumer(queue);
-      for (int i = 0; i < 500; i++) {
-         Message msg = queueConsumer.receive(20000);
-         LOG.debug("received i=" + i + ", " + (msg != null ? msg.getJMSMessageID() : null));
-         if (msg == null)
-            dumpAllThreads("backlog");
-         assertNotNull("Message " + i + " was null", msg);
-         assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
-      }
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/
deleted file mode 100644
index f7ab98d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/
+++ /dev/null
@@ -1,274 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-public abstract class StoreOrderTest {
-   private static final Logger LOG = LoggerFactory.getLogger(StoreOrderTest.class);
-   protected BrokerService broker;
-   private ActiveMQConnection connection;
-   public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");
-   protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception;
-   protected void dumpMessages() throws Exception {
-   }
-   public class TransactedSend implements Runnable {
-      private CountDownLatch readyForCommit;
-      private CountDownLatch firstDone;
-      private boolean first;
-      private Session session;
-      private MessageProducer producer;
-      public TransactedSend(CountDownLatch readyForCommit, CountDownLatch firstDone, boolean b) throws Exception {
-         this.readyForCommit = readyForCommit;
-         this.firstDone = firstDone;
-         this.first = b;
-         session = connection.createSession(true, Session.SESSION_TRANSACTED);
-         producer = session.createProducer(destination);
-      }
-      @Override
-      public void run() {
-         try {
-            if (!first) {
-               firstDone.await(30, TimeUnit.SECONDS);
-            }
-            producer.send(session.createTextMessage(first ? "first" : "second"));
-            if (first) {
-               firstDone.countDown();
-            }
-            readyForCommit.countDown();
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-            fail("unexpected ex on run " + e);
-         }
-      }
-      public void commit() throws Exception {
-         session.commit();
-         session.close();
-      }
-   }
-   @Before
-   public void setup() throws Exception {
-      broker = createBroker();
-      initConnection();
-   }
-   public void initConnection() throws Exception {
-      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
-      connection = (ActiveMQConnection) connectionFactory.createConnection();
-      connection.setWatchTopicAdvisories(false);
-      connection.start();
-   }
-   @After
-   public void stopBroker() throws Exception {
-      if (connection != null) {
-         connection.close();
-      }
-      if (broker != null) {
-         broker.stop();
-      }
-   }
-   @Test
-   public void testCompositeSendReceiveAfterRestart() throws Exception {
-      destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
-      enqueueOneMessage();
-"restart broker");
-      stopBroker();
-      broker = createRestartedBroker();
-      dumpMessages();
-      initConnection();
-      destination = new ActiveMQQueue("StoreOrderTest");
-      assertNotNull("got one message from first dest", receiveOne());
-      dumpMessages();
-      destination = new ActiveMQQueue("SecondStoreOrderTest");
-      assertNotNull("got one message from second dest", receiveOne());
-   }
-   @Test
-   public void validateUnorderedTxCommit() throws Exception {
-      Executor executor = Executors.newCachedThreadPool();
-      CountDownLatch readyForCommit = new CountDownLatch(2);
-      CountDownLatch firstDone = new CountDownLatch(1);
-      TransactedSend first = new TransactedSend(readyForCommit, firstDone, true);
-      TransactedSend second = new TransactedSend(readyForCommit, firstDone, false);
-      executor.execute(first);
-      executor.execute(second);
-      assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS));
-"commit out of order");
-      // send interleaved so sequence id at time of commit could be reversed
-      second.commit();
-      // force usage over the limit before second commit to flush cache
-      enqueueOneMessage();
-      // can get lost in the cursor as it is behind the last sequenceId that was cached
-      first.commit();
-"send/commit done..");
-      dumpMessages();
-      String received1, received2, received3 = null;
-      if (true) {
-"receive and rollback...");
-         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-         received1 = receive(session);
-         received2 = receive(session);
-         received3 = receive(session);
-         assertEquals("second", received1);
-         assertEquals("middle", received2);
-         assertEquals("first", received3);
-         session.rollback();
-         session.close();
-      }
-"restart broker");
-      stopBroker();
-      broker = createRestartedBroker();
-      initConnection();
-      if (true) {
-"receive and rollback after restart...");
-         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-         received1 = receive(session);
-         received2 = receive(session);
-         received3 = receive(session);
-         assertEquals("second", received1);
-         assertEquals("middle", received2);
-         assertEquals("first", received3);
-         session.rollback();
-         session.close();
-      }
-"receive and ack each message");
-      received1 = receiveOne();
-      received2 = receiveOne();
-      received3 = receiveOne();
-      assertEquals("second", received1);
-      assertEquals("middle", received2);
-      assertEquals("first", received3);
-   }
-   private void enqueueOneMessage() throws Exception {
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      MessageProducer producer = session.createProducer(destination);
-      producer.send(session.createTextMessage("middle"));
-      session.commit();
-      session.close();
-   }
-   private String receiveOne() throws Exception {
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      String received = receive(session);
-      session.commit();
-      session.close();
-      return received;
-   }
-   private String receive(Session session) throws Exception {
-      MessageConsumer consumer = session.createConsumer(destination);
-      String result = null;
-      TextMessage message = (TextMessage) consumer.receive(5000);
-      if (message != null) {
-"got message: " + message);
-         result = message.getText();
-      }
-      consumer.close();
-      return result;
-   }
-   protected BrokerService createBroker() throws Exception {
-      boolean deleteMessagesOnStartup = true;
-      return startBroker(deleteMessagesOnStartup);
-   }
-   protected BrokerService createRestartedBroker() throws Exception {
-      boolean deleteMessagesOnStartup = false;
-      return startBroker(deleteMessagesOnStartup);
-   }
-   protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception {
-      BrokerService newBroker = new BrokerService();
-      configureBroker(newBroker);
-      newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
-      newBroker.start();
-      return newBroker;
-   }
-   protected void configureBroker(BrokerService brokerService) throws Exception {
-      setPersistentAdapter(brokerService);
-      brokerService.setAdvisorySupport(false);
-      PolicyMap map = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-      defaultEntry.setMemoryLimit(1024 * 3);
-      defaultEntry.setCursorMemoryHighWaterMark(68);
-      defaultEntry.setExpireMessagesPeriod(0);
-      map.setDefaultEntry(defaultEntry);
-      brokerService.setDestinationPolicy(map);
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/
deleted file mode 100644
index cc144d0..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/
+++ /dev/null
@@ -1,314 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.ArrayList;
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-public class StorePerDestinationTest {
-   static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class);
-   final static int maxFileLength = 1024 * 100;
-   final static int numToSend = 5000;
-   final Vector<Throwable> exceptions = new Vector<>();
-   BrokerService brokerService;
-   protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setUseJmx(false);
-      broker.setPersistenceAdapter(kaha);
-      return broker;
-   }
-   protected PersistenceAdapter createStore(boolean delete) throws IOException {
-      KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-      kaha.setJournalMaxFileLength(maxFileLength);
-      kaha.setCleanupInterval(5000);
-      if (delete) {
-         kaha.deleteAllMessages();
-      }
-      return kaha;
-   }
-   @Before
-   public void prepareCleanBrokerWithMultiStore() throws Exception {
-      prepareBrokerWithMultiStore(true);
-   }
-   public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-      MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
-      if (deleteAllMessages) {
-         multiKahaDBPersistenceAdapter.deleteAllMessages();
-      }
-      ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-      FilteredKahaDBPersistenceAdapter theRest = new FilteredKahaDBPersistenceAdapter();
-      theRest.setPersistenceAdapter(createStore(deleteAllMessages));
-      // default destination when not set is a match for all
-      adapters.add(theRest);
-      // separate store for FastQ
-      FilteredKahaDBPersistenceAdapter fastQStore = new FilteredKahaDBPersistenceAdapter();
-      fastQStore.setPersistenceAdapter(createStore(deleteAllMessages));
-      fastQStore.setDestination(new ActiveMQQueue("FastQ"));
-      adapters.add(fastQStore);
-      multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
-      brokerService = createBroker(multiKahaDBPersistenceAdapter);
-   }
-   @After
-   public void tearDown() throws Exception {
-      brokerService.stop();
-   }
-   @Test
-   public void testTransactedSendReceive() throws Exception {
-      brokerService.start();
-      sendMessages(true, "SlowQ", 1, 0);
-      assertEquals("got one", 1, receiveMessages(true, "SlowQ", 1));
-   }
-   @Test
-   public void testTransactedSendReceiveAcrossStores() throws Exception {
-      brokerService.start();
-      sendMessages(true, "SlowQ,FastQ", 1, 0);
-      assertEquals("got one", 2, receiveMessages(true, "SlowQ,FastQ", 2));
-   }
-   @Test
-   public void testCommitRecovery() throws Exception {
-      doTestRecovery(true);
-   }
-   @Test
-   public void testRollbackRecovery() throws Exception {
-      doTestRecovery(false);
-   }
-   public void doTestRecovery(final boolean haveOutcome) throws Exception {
-      final MultiKahaDBPersistenceAdapter persistenceAdapter = (MultiKahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-      MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(persistenceAdapter) {
-         @Override
-         public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
-            if (haveOutcome) {
-               super.persistOutcome(tx, txid);
-            }
-            try {
-               // IOExceptions will stop the broker
-               persistenceAdapter.stop();
-            }
-            catch (Exception e) {
-               LOG.error("ex on stop ", e);
-               exceptions.add(e);
-            }
-         }
-      };
-      persistenceAdapter.setTransactionStore(transactionStore);
-      brokerService.start();
-      ExecutorService executorService = Executors.newCachedThreadPool();
-      executorService.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               // commit will block
-               sendMessages(true, "SlowQ,FastQ", 1, 0);
-            }
-            catch (Exception expected) {
-     "expected", expected);
-            }
-         }
-      });
-      brokerService.waitUntilStopped();
-      // interrupt the send thread
-      executorService.shutdownNow();
-      // verify auto recovery
-      prepareBrokerWithMultiStore(false);
-      brokerService.start();
-      assertEquals("expect to get the recovered message", haveOutcome ? 2 : 0, receiveMessages(false, "SlowQ,FastQ", 2));
-      assertEquals("all transactions are complete", 0, brokerService.getBroker().getPreparedTransactions(null).length);
-   }
-   @Test
-   public void testDirectoryDefault() throws Exception {
-      MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
-      ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-      FilteredKahaDBPersistenceAdapter otherFilteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter();
-      PersistenceAdapter otherStore = createStore(false);
-      File someOtherDisk = new File("target" + File.separator + "someOtherDisk");
-      otherStore.setDirectory(someOtherDisk);
-      otherFilteredKahaDBPersistenceAdapter.setPersistenceAdapter(otherStore);
-      otherFilteredKahaDBPersistenceAdapter.setDestination(new ActiveMQQueue("Other"));
-      adapters.add(otherFilteredKahaDBPersistenceAdapter);
-      FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapterDefault = new FilteredKahaDBPersistenceAdapter();
-      PersistenceAdapter storeDefault = createStore(false);
-      filteredKahaDBPersistenceAdapterDefault.setPersistenceAdapter(storeDefault);
-      adapters.add(filteredKahaDBPersistenceAdapterDefault);
-      multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
-      assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), storeDefault.getDirectory().getParentFile());
-      assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile());
-   }
-   @Test
-   public void testSlowFastDestinationsStoreUsage() throws Exception {
-      brokerService.start();
-      ExecutorService executorService = Executors.newCachedThreadPool();
-      executorService.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               sendMessages(false, "SlowQ", 50, 500);
-            }
-            catch (Exception e) {
-               exceptions.add(e);
-            }
-         }
-      });
-      executorService.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               sendMessages(false, "FastQ", numToSend, 0);
-            }
-            catch (Exception e) {
-               exceptions.add(e);
-            }
-         }
-      });
-      executorService.execute(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               assertEquals("Got all sent", numToSend, receiveMessages(false, "FastQ", numToSend));
-            }
-            catch (Exception e) {
-               exceptions.add(e);
-            }
-         }
-      });
-      executorService.shutdown();
-      assertTrue("consumers executor finished on time", executorService.awaitTermination(5 * 60, TimeUnit.SECONDS));
-      final SystemUsage usage = brokerService.getSystemUsage();
-      assertTrue("Store is not hogged", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            long storeUsage = usage.getStoreUsage().getUsage();
-  "Store Usage: " + storeUsage);
-            return storeUsage < 5 * maxFileLength;
-         }
-      }));
-      assertTrue("no exceptions", exceptions.isEmpty());
-   }
-   private void sendMessages(boolean transacted, String destName, int count, long sleep) throws Exception {
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-      Connection connection = cf.createConnection();
-      try {
-         Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(new ActiveMQQueue(destName));
-         for (int i = 0; i < count; i++) {
-            if (sleep > 0) {
-               TimeUnit.MILLISECONDS.sleep(sleep);
-            }
-            producer.send(session.createTextMessage(createContent(i)));
-         }
-         if (transacted) {
-            session.commit();
-         }
-      }
-      finally {
-         connection.close();
-      }
-   }
-   private int receiveMessages(boolean transacted, String destName, int max) throws JMSException {
-      int rc = 0;
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-      Connection connection = cf.createConnection();
-      try {
-         connection.start();
-         Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(destName));
-         while (rc < max && messageConsumer.receive(4000) != null) {
-            rc++;
-            if (transacted && rc % 200 == 0) {
-               session.commit();
-            }
-         }
-         if (transacted) {
-            session.commit();
-         }
-         return rc;
-      }
-      finally {
-         connection.close();
-      }
-   }
-   private String createContent(int i) {
-      StringBuilder sb = new StringBuilder(i + ":");
-      while (sb.length() < 1024) {
-         sb.append("*");
-      }
-      return sb.toString();
-   }
\ No newline at end of file
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
deleted file mode 100644
index a484c64..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
+++ /dev/null
@@ -1,47 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-class BrokenPersistenceAdapter extends JDBCPersistenceAdapter {
-   private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class);
-   private boolean shouldBreak = false;
-   @Override
-   public void commitTransaction(ConnectionContext context) throws IOException {
-      if (shouldBreak) {
-         LOG.warn("Throwing exception on purpose");
-         throw new IOException("Breaking on purpose");
-      }
-      LOG.debug("in commitTransaction");
-      super.commitTransaction(context);
-   }
-   public void setShouldBreak(boolean shouldBreak) {
-      this.shouldBreak = shouldBreak;
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
deleted file mode 100644
index 5a4aae8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
+++ /dev/null
@@ -1,55 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import org.junit.Test;
-public class DatabaseLockerConfigTest {
-   @Test
-   public void testSleepConfig() throws Exception {
-      LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
-      underTest.setLockAcquireSleepInterval(50);
-      underTest.configure(null);
-      assertEquals("configured sleep value retained", 50, underTest.getLockAcquireSleepInterval());
-   }
-   @Test
-   public void testDefaultSleepConfig() throws Exception {
-      LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
-      underTest.configure(null);
-      assertEquals("configured sleep value retained", AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL, underTest.getLockAcquireSleepInterval());
-   }
-   @Test
-   public void testSleepConfigOrig() throws Exception {
-      DefaultDatabaseLocker underTest = new DefaultDatabaseLocker();
-      underTest.setLockAcquireSleepInterval(50);
-      underTest.configure(null);
-      assertEquals("configured sleep value retained", 50, underTest.getLockAcquireSleepInterval());
-   }
-   @Test
-   public void testDefaultSleepConfigOrig() throws Exception {
-      DefaultDatabaseLocker underTest = new DefaultDatabaseLocker();
-      underTest.configure(null);
-      assertEquals("configured sleep value retained", AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL, underTest.getLockAcquireSleepInterval());
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
deleted file mode 100644
index 00c501f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
+++ /dev/null
@@ -1,176 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class JDBCCommitExceptionTest extends TestCase {
-   private static final Logger LOG = LoggerFactory.getLogger(JDBCCommitExceptionTest.class);
-   protected static final int messagesExpected = 10;
-   protected ActiveMQConnectionFactory factory;
-   protected BrokerService broker;
-   protected String connectionUri;
-   protected EmbeddedDataSource dataSource;
-   protected java.sql.Connection dbConnection;
-   protected BrokenPersistenceAdapter jdbc;
-   @Override
-   public void setUp() throws Exception {
-      broker = createBroker();
-      broker.start();
-      factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
-   }
-   @Override
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-   public void testSqlException() throws Exception {
-      doTestSqlException();
-   }
-   public void doTestSqlException() throws Exception {
-      sendMessages(messagesExpected);
-      int messagesReceived = receiveMessages(messagesExpected);
-      dumpMessages();
-      assertEquals("Messages expected doesn't equal messages received", messagesExpected, messagesReceived);
-      broker.stop();
-   }
-   protected void dumpMessages() throws Exception {
-      WireFormat wireFormat = new OpenWireFormat();
-      java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
-      PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
-      ResultSet result = statement.executeQuery();
-"Messages left in broker after test");
-      while ( {
-         long id = result.getLong(1);
-         org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
-"id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
-      }
-      statement.close();
-      conn.close();
-   }
-   protected int receiveMessages(int messagesExpected) throws Exception {
-      javax.jms.Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      jdbc.setShouldBreak(true);
-      // first try and receive these messages, they'll continually fail
-      receiveMessages(messagesExpected, session);
-      jdbc.setShouldBreak(false);
-      // now that the store is sane, try and get all the messages sent
-      return receiveMessages(messagesExpected, session);
-   }
-   protected int receiveMessages(int messagesExpected, Session session) throws Exception {
-      int messagesReceived = 0;
-      for (int i = 0; i < messagesExpected; i++) {
-         Destination destination = session.createQueue("TEST");
-         MessageConsumer consumer = session.createConsumer(destination);
-         Message message = null;
-         try {
-            LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
-            message = consumer.receive(2000);
-  "Received : " + message);
-            if (message != null) {
-               session.commit();
-               messagesReceived++;
-            }
-         }
-         catch (Exception e) {
-            LOG.debug("Caught exception " + e);
-            session.rollback();
-         }
-         finally {
-            if (consumer != null) {
-               consumer.close();
-            }
-         }
-      }
-      return messagesReceived;
-   }
-   protected void sendMessages(int messagesExpected) throws Exception {
-      javax.jms.Connection connection = factory.createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Destination destination = session.createQueue("TEST");
-      MessageProducer producer = session.createProducer(destination);
-      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-      for (int i = 0; i < messagesExpected; i++) {
-         LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected);
-         producer.send(session.createTextMessage("test message " + (i + 1)));
-      }
-   }
-   protected BrokerService createBroker() throws Exception {
-      BrokerService broker = new BrokerService();
-      jdbc = new BrokenPersistenceAdapter();
-      dataSource = new EmbeddedDataSource();
-      dataSource.setDatabaseName("target/derbyDb");
-      dataSource.setCreateDatabase("create");
-      jdbc.setDataSource(dataSource);
-      jdbc.setUseLock(false);
-      jdbc.deleteAllMessages();
-      broker.setPersistenceAdapter(jdbc);
-      broker.setPersistent(true);
-      connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
-      return broker;
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
deleted file mode 100644
index fa7b848..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
+++ /dev/null
@@ -1,110 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.HashMap;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.Wait;
-import org.jmock.Expectations;
-import org.jmock.Mockery;
-import org.jmock.States;
-import org.jmock.lib.legacy.ClassImposteriser;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertTrue;
-import static;
-public class JDBCIOExceptionHandlerMockeryTest {
-   private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerMockeryTest.class);
-   private HashMap<Thread, Throwable> exceptions = new HashMap<>();
-   @Test
-   public void testShutdownWithoutTransportRestart() throws Exception {
-      Mockery context = new Mockery() {{
-         setImposteriser(ClassImposteriser.INSTANCE);
-      }};
-      Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-         @Override
-         public void uncaughtException(Thread t, Throwable e) {
-            LOG.error("unexpected exception {} on thread {}", e, t);
-            exceptions.put(t, e);
-         }
-      });
-      final BrokerService brokerService = context.mock(BrokerService.class);
-      final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class);
-      final Locker locker = context.mock(Locker.class);
-      final States jdbcConn = context.states("jdbc").startsAs("down");
-      final States broker = context.states("broker").startsAs("started");
-      // simulate jdbc up between hasLock and checkpoint, so hasLock fails to verify
-      context.checking(new Expectations() {{
-         allowing(brokerService).isRestartAllowed();
-         will(returnValue(false));
-         allowing(brokerService).stopAllConnectors(with(any(ServiceStopper.class)));
-         allowing(brokerService).getPersistenceAdapter();
-         will(returnValue(jdbcPersistenceAdapter));
-         allowing(jdbcPersistenceAdapter).getLocker();
-         will(returnValue(locker));
-         allowing(locker).keepAlive();
-         when("down"));
-         will(returnValue(true));
-         allowing(locker).keepAlive();
-         when("up"));
-         will(returnValue(false));
-         allowing(jdbcPersistenceAdapter).checkpoint(with(true));
-         then("up"));
-         allowing(brokerService).stop();
-         then("stopped"));
-      }});
-      LeaseLockerIOExceptionHandler underTest = new LeaseLockerIOExceptionHandler();
-      underTest.setBrokerService(brokerService);
-      try {
-         underTest.handle(new IOException());
-         fail("except suppress reply ex");
-      }
-      catch (SuppressReplyException expected) {
-      }
-      assertTrue("broker stopped state triggered", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-  "broker state {}", broker);
-            return"stopped").isActive();
-         }
-      }));
-      context.assertIsSatisfied();
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
deleted file mode 100644
index 6c43646..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
+++ /dev/null
@@ -1,330 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.jms.Connection;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.activemq.util.Wait;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Test to see if the JDBCExceptionIOHandler will restart the transport connectors correctly after
- * the underlying DB has been stopped and restarted
- *
- * see AMQ-4575
- */
-public class JDBCIOExceptionHandlerTest extends TestCase {
-   private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerTest.class);
-   private static final String TRANSPORT_URL = "tcp://";
-   private static final String DATABASE_NAME = "DERBY_OVERRIDE";
-   private ActiveMQConnectionFactory factory;
-   private ReconnectingEmbeddedDataSource dataSource;
-   private BrokerService broker;
-   protected BrokerService createBroker(boolean withJMX) throws Exception {
-      return createBroker("localhost", withJMX, true, true);
-   }
-   protected BrokerService createBroker(String name,
-                                        boolean withJMX,
-                                        boolean leaseLocker,
-                                        boolean startStopConnectors) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setBrokerName(name);
-      broker.setUseJmx(withJMX);
-      EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource();
-      embeddedDataSource.setDatabaseName(DATABASE_NAME);
-      embeddedDataSource.setCreateDatabase("create");
-      // create a wrapper to EmbeddedDataSource to allow the connection be
-      // reestablished to derby db
-      dataSource = new ReconnectingEmbeddedDataSource(embeddedDataSource);
-      JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
-      jdbc.setDataSource(dataSource);
-      jdbc.setLockKeepAlivePeriod(1000L);
-      if (leaseLocker) {
-         LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
-         leaseDatabaseLocker.setHandleStartException(true);
-         leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
-         jdbc.setLocker(leaseDatabaseLocker);
-      }
-      broker.setPersistenceAdapter(jdbc);
-      LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
-      ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
-      ioExceptionHandler.setStopStartConnectors(startStopConnectors);
-      broker.setIoExceptionHandler(ioExceptionHandler);
-      String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
-      factory = new ActiveMQConnectionFactory(connectionUri);
-      return broker;
-   }
-   /*
-    * run test without JMX enabled
-    */
-   public void testRecoverWithOutJMX() throws Exception {
-      recoverFromDisconnectDB(false);
-   }
-   /*
-    * run test with JMX enabled
-    */
-   public void testRecoverWithJMX() throws Exception {
-      recoverFromDisconnectDB(true);
-   }
-   public void testSlaveStoppedLease() throws Exception {
-      testSlaveStopped(true);
-   }
-   public void testSlaveStoppedDefault() throws Exception {
-      testSlaveStopped(false);
-   }
-   public void testSlaveStopped(final boolean lease) throws Exception {
-      final BrokerService master = createBroker("master", true, lease, false);
-      master.start();
-      master.waitUntilStarted();
-      final AtomicReference<BrokerService> slave = new AtomicReference<>();
-      Thread slaveThread = new Thread() {
-         @Override
-         public void run() {
-            try {
-               BrokerService broker = new BrokerService();
-               broker.setBrokerName("slave");
-               JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
-               jdbc.setDataSource(dataSource);
-               jdbc.setLockKeepAlivePeriod(1000L);
-               if (lease) {
-                  LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
-                  leaseDatabaseLocker.setHandleStartException(true);
-                  leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
-                  jdbc.setLocker(leaseDatabaseLocker);
-               }
-               broker.setPersistenceAdapter(jdbc);
-               LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
-               ioExceptionHandler.setResumeCheckSleepPeriod(1000L);
-               ioExceptionHandler.setStopStartConnectors(false);
-               broker.setIoExceptionHandler(ioExceptionHandler);
-               slave.set(broker);
-               broker.start();
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-      };
-      slaveThread.start();
-      Thread.sleep(5000);
-      dataSource.stopDB();
-      assertTrue("Master hasn't been stopped", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return master.isStopped();
-         }
-      }));
-      assertTrue("Slave hasn't been stopped", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return slave.get().isStopped();
-         }
-      }));
-   }
-   public void recoverFromDisconnectDB(boolean withJMX) throws Exception {
-      try {
-         broker = createBroker(withJMX);
-         broker.start();
-         broker.waitUntilStarted();
-         // broker started - stop db underneath it
-         dataSource.stopDB();
-         // wait - allow the leaselocker to kick the JDBCIOExceptionHandler
-         TimeUnit.SECONDS.sleep(3);
-         // check connector has shutdown
-         checkTransportConnectorStopped();
-         // restart db underneath
-         dataSource.restartDB();
-         Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-               LOG.debug("*** checking connector to start...");
-               try {
-                  checkTransportConnectorStarted();
-                  return true;
-               }
-               catch (Throwable t) {
-                  LOG.debug(t.toString());
-               }
-               return false;
-            }
-         });
-      }
-      finally {
-         LOG.debug("*** broker is stopping...");
-         broker.stop();
-      }
-   }
-   private void checkTransportConnectorStopped() {
-      // connection is expected to fail
-      try {
-         factory.createConnection();
-         fail("Transport connector should be stopped");
-      }
-      catch (Exception ex) {
-         // expected an exception
-         LOG.debug(" checkTransportConnectorStopped() threw", ex);
-      }
-   }
-   private void checkTransportConnectorStarted() {
-      // connection is expected to succeed
-      try {
-         Connection conn = factory.createConnection();
-         conn.close();
-      }
-      catch (Exception ex) {
-         LOG.debug("checkTransportConnectorStarted() threw", ex);
-         fail("Transport connector should have been started");
-      }
-   }
-   /*
-    * Wrapped the derby datasource object to get DB reconnect functionality as I not
-    * manage to get that working directly on the EmbeddedDataSource
-    *
-    * NOTE: Not a thread Safe but for this unit test it should be fine
-    */
-   public class ReconnectingEmbeddedDataSource implements javax.sql.DataSource {
-      private EmbeddedDataSource realDatasource;
-      public ReconnectingEmbeddedDataSource(EmbeddedDataSource datasource) {
-         this.realDatasource = datasource;
-      }
-      @Override
-      public PrintWriter getLogWriter() throws SQLException {
-         return this.realDatasource.getLogWriter();
-      }
-      @Override
-      public void setLogWriter(PrintWriter out) throws SQLException {
-         this.realDatasource.setLogWriter(out);
-      }
-      @Override
-      public void setLoginTimeout(int seconds) throws SQLException {
-         this.realDatasource.setLoginTimeout(seconds);
-      }
-      @Override
-      public int getLoginTimeout() throws SQLException {
-         return this.realDatasource.getLoginTimeout();
-      }
-      @Override
-      public <T> T unwrap(Class<T> iface) throws SQLException {
-         return this.unwrap(iface);
-      }
-      @Override
-      public boolean isWrapperFor(Class<?> iface) throws SQLException {
-         return this.isWrapperFor(iface);
-      }
-      @Override
-      public java.sql.Connection getConnection() throws SQLException {
-         return this.realDatasource.getConnection();
-      }
-      @Override
-      public java.sql.Connection getConnection(String username, String password) throws SQLException {
-         return this.getConnection(username, password);
-      }
-      /**
-       * To simulate a db reconnect I just create a new EmbeddedDataSource .
-       *
-       * @throws SQLException
-       */
-      public void restartDB() throws SQLException {
-         EmbeddedDataSource newDatasource = new EmbeddedDataSource();
-         newDatasource.setDatabaseName(DATABASE_NAME);
-         newDatasource.getConnection();
-"*** DB restarted now...");
-         this.realDatasource = newDatasource;
-      }
-      public void stopDB() {
-         try {
-            realDatasource.setShutdownDatabase("shutdown");
-  "***DB is being shutdown...");
-            dataSource.getConnection();
-            fail("should have thrown a db closed exception");
-         }
-         catch (Exception ex) {
-            ex.printStackTrace(System.out);
-         }
-      }
-      @Override
-      public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
-         return null;
-      }
-   }
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
deleted file mode 100644
index ac70fa7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-        xmlns=""
-        xmlns:amq=""
-        xmlns:xsi=""
-        xsi:schemaLocation="
-    <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
-    <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
-                useLoggingForShutdownErrors="true" useJmx="false"
-                persistent="true" vmConnectorURI="vm://javacoola"
-                useShutdownHook="false" deleteAllMessagesOnStartup="true">
-      <amq:persistenceAdapter>
-        <amq:jdbcPersistenceAdapter dataDirectory="target/activemq-data" dataSource="#derby-ds" lockKeepAlivePeriod="5000" createTablesOnStartup="true">
-          <!-- test that we can define the locker before th statements,
-          but the locker will still pickup the statements -->
-          <amq:locker>
-            <amq:lease-database-locker lockAcquireSleepInterval="10000"/>
-          </amq:locker>
-          <amq:statements>
-            <amq:statements tablePrefix="TTT_" messageTableName="AMQ_MSGS2" durableSubAcksTableName="AMQ_ACKS2" lockTableName="AMQ_LOCK2"/>
-          </amq:statements>
-          <amq:adapter>
-            <amq:defaultJDBCAdapter/>
-          </amq:adapter>
-        </amq:jdbcPersistenceAdapter>
-      </amq:persistenceAdapter>
-        <amq:transportConnectors>
-            <amq:transportConnector uri="vm://brokerConfigTest"/>
-        </amq:transportConnectors>
-    </amq:broker>
-  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
-    <property name="databaseName" value="target/derbyDb"/>
-    <property name="connectionAttributes" value=";create=true"/>
-  </bean>
\ No newline at end of file
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
deleted file mode 100644
index 854dd7a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/
+++ /dev/null
@@ -1,43 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import junit.framework.TestCase;
-public class JDBCLockTablePrefixTest extends TestCase {
-   public void testLockTable() throws Exception {
-      BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/jdbc/JDBCLockTablePrefix.xml");
-      broker.waitUntilStarted();
-      PersistenceAdapter pa = broker.getPersistenceAdapter();
-      assertNotNull(pa);
-      JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter) pa;
-      assertEquals("TTT_", jpa.getStatements().getTablePrefix());
-      assertEquals("AMQ_MSGS2", jpa.getStatements().getMessageTableName());
-      assertEquals("AMQ_LOCK2", jpa.getStatements().getLockTableName());
-      broker.stop();
-      broker.waitUntilStopped();
-   }