You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/01 17:38:10 UTC

[14/52] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
deleted file mode 100644
index 98fc79b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ /dev/null
@@ -1,617 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Non transactional concurrent producer/consumer to single dest
- */
-@RunWith(Parameterized.class)
-public class AMQ5266SingleDestTest {
-
-   static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class);
-   String activemqURL;
-   BrokerService brokerService;
-
-   public int numDests = 1;
-   public int messageSize = 10 * 1000;
-
-   @Parameterized.Parameter(0)
-   public int publisherMessagesPerThread = 1000;
-
-   @Parameterized.Parameter(1)
-   public int publisherThreadCount = 20;
-
-   @Parameterized.Parameter(2)
-   public int consumerThreadsPerQueue = 5;
-
-   @Parameterized.Parameter(3)
-   public int destMemoryLimit = 50 * 1024;
-
-   @Parameterized.Parameter(4)
-   public boolean useCache = true;
-
-   @Parameterized.Parameter(5)
-   public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
-
-   @Parameterized.Parameter(6)
-   public boolean optimizeDispatch = false;
-
-   @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
-   public static Iterable<Object[]> parameters() {
-      return Arrays.asList(new Object[][]{{1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.JDBC, false},});
-   }
-
-   public int consumerBatchSize = 25;
-
-   @BeforeClass
-   public static void derbyTestMode() throws Exception {
-      System.setProperty("derby.system.durability", "test");
-   }
-
-   @Before
-   public void startBroker() throws Exception {
-      brokerService = new BrokerService();
-
-      TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.setUseJmx(false);
-      brokerService.setAdvisorySupport(false);
-
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-      defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
-      defaultEntry.setMaxProducersToAudit(publisherThreadCount);
-      defaultEntry.setEnableAudit(true);
-      defaultEntry.setUseCache(useCache);
-      defaultEntry.setMaxPageSize(1000);
-      defaultEntry.setOptimizedDispatch(optimizeDispatch);
-      defaultEntry.setMemoryLimit(destMemoryLimit);
-      defaultEntry.setExpireMessagesPeriod(0);
-      policyMap.setDefaultEntry(defaultEntry);
-      brokerService.setDestinationPolicy(policyMap);
-
-      brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024);
-
-      TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
-      brokerService.start();
-      activemqURL = transportConnector.getPublishableConnectString();
-      activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (brokerService != null) {
-         brokerService.stop();
-      }
-   }
-
-   @Test
-   public void test() throws Exception {
-
-      String activemqQueues = "activemq";
-      for (int i = 1; i < numDests; i++) {
-         activemqQueues += ",activemq" + i;
-      }
-
-      int consumerWaitForConsumption = 5 * 60 * 1000;
-
-      ExportQueuePublisher publisher = null;
-      ExportQueueConsumer consumer = null;
-
-      LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
-      LOG.info("\nBuilding Publisher...");
-
-      publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
-
-      LOG.info("Building Consumer...");
-
-      consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
-
-      long totalStart = System.currentTimeMillis();
-
-      LOG.info("Starting Publisher...");
-
-      publisher.start();
-
-      LOG.info("Starting Consumer...");
-
-      consumer.start();
-
-      int distinctPublishedCount = 0;
-
-      LOG.info("Waiting For Publisher Completion...");
-
-      publisher.waitForCompletion();
-
-      List<String> publishedIds = publisher.getIDs();
-      distinctPublishedCount = new TreeSet<>(publishedIds).size();
-
-      LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
-      LOG.info("Publisher duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
-
-      long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
-      while (!consumer.completed() && System.currentTimeMillis() < endWait) {
-         try {
-            int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
-            LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
-            Thread.sleep(1000);
-         }
-         catch (Exception e) {
-         }
-      }
-
-      LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
-
-      LOG.info("Total duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
-
-      consumer.shutdown();
-
-      TimeUnit.SECONDS.sleep(2);
-
-      LOG.info("Consumer Stats:");
-
-      for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
-
-         List<String> idList = entry.getValue();
-
-         int distinctConsumed = new TreeSet<>(idList).size();
-
-         StringBuilder sb = new StringBuilder();
-         sb.append("   Queue: " + entry.getKey() +
-                      " -> Total Messages Consumed: " + idList.size() +
-                      ", Distinct IDs Consumed: " + distinctConsumed);
-
-         int diff = distinctPublishedCount - distinctConsumed;
-         sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
-         LOG.info(sb.toString());
-
-         assertEquals("expect to get all messages!", 0, diff);
-
-      }
-
-      // verify empty dlq
-      assertEquals("No pending messages", 0L, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
-   }
-
-   public class ExportQueuePublisher {
-
-      private final String amqUser = ActiveMQConnection.DEFAULT_USER;
-      private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private String activemqURL = null;
-      private String activemqQueues = null;
-      // Collection of distinct IDs that the publisher has published.
-      // After a message is published, its UUID will be written to this list for tracking.
-      // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
-      //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
-      private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
-      private List<PublisherThread> threads;
-
-      public ExportQueuePublisher(String activemqURL,
-                                  String activemqQueues,
-                                  int messagesPerThread,
-                                  int threadCount) throws Exception {
-
-         this.activemqURL = activemqURL;
-         this.activemqQueues = activemqQueues;
-
-         threads = new ArrayList<>();
-
-         // Build the threads and tell them how many messages to publish
-         for (int i = 0; i < threadCount; i++) {
-            PublisherThread pt = new PublisherThread(messagesPerThread);
-            threads.add(pt);
-         }
-      }
-
-      public List<String> getIDs() {
-         return ids;
-      }
-
-      // Kick off threads
-      public void start() throws Exception {
-
-         for (PublisherThread pt : threads) {
-            pt.start();
-         }
-      }
-
-      // Wait for threads to complete. They will complete once they've published all of their messages.
-      public void waitForCompletion() throws Exception {
-
-         for (PublisherThread pt : threads) {
-            pt.join();
-            pt.close();
-         }
-      }
-
-      private Session newSession(QueueConnection queueConnection) throws Exception {
-         return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-
-      private synchronized QueueConnection newQueueConnection() throws Exception {
-
-         if (connectionFactory == null) {
-            connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
-         }
-
-         // Set the redelivery count to -1 (infinite), or else messages will start dropping
-         // after the queue has had a certain number of failures (default is 6)
-         RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
-         policy.setMaximumRedeliveries(-1);
-
-         QueueConnection amqConnection = connectionFactory.createQueueConnection();
-         amqConnection.start();
-         return amqConnection;
-      }
-
-      private class PublisherThread extends Thread {
-
-         private int count;
-         private QueueConnection qc;
-         private Session session;
-         private MessageProducer mp;
-
-         private PublisherThread(int count) throws Exception {
-
-            this.count = count;
-
-            // Each Thread has its own Connection and Session, so no sync worries
-            qc = newQueueConnection();
-            session = newSession(qc);
-
-            // In our code, when publishing to multiple queues,
-            // we're using composite destinations like below
-            Queue q = new ActiveMQQueue(activemqQueues);
-            mp = session.createProducer(q);
-         }
-
-         @Override
-         public void run() {
-
-            try {
-
-               // Loop until we've published enough messages
-               while (count-- > 0) {
-
-                  TextMessage tm = session.createTextMessage(getMessageText());
-                  String id = UUID.randomUUID().toString();
-                  tm.setStringProperty("KEY", id);
-                  ids.add(id);                            // keep track of the key to compare against consumer
-
-                  mp.send(tm);
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-
-         // Called by waitForCompletion
-         public void close() {
-
-            try {
-               mp.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               session.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               qc.close();
-            }
-            catch (Exception e) {
-            }
-         }
-      }
-
-   }
-
-   String messageText;
-
-   private String getMessageText() {
-
-      if (messageText == null) {
-
-         synchronized (this) {
-
-            if (messageText == null) {
-
-               StringBuilder sb = new StringBuilder();
-               for (int i = 0; i < messageSize; i++) {
-                  sb.append("X");
-               }
-               messageText = sb.toString();
-            }
-         }
-      }
-
-      return messageText;
-   }
-
-   public class ExportQueueConsumer {
-
-      private final String amqUser = ActiveMQConnection.DEFAULT_USER;
-      private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
-      private final int totalToExpect;
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private String activemqURL = null;
-      private String activemqQueues = null;
-      private String[] queues = null;
-      // Map of IDs that were consumed, keyed by queue name.
-      // We'll compare these against what was published to know if any got stuck or dropped.
-      private Map<String, List<String>> idsByQueue = new HashMap<>();
-      private Map<String, List<ConsumerThread>> threads;
-
-      public ExportQueueConsumer(String activemqURL,
-                                 String activemqQueues,
-                                 int threadsPerQueue,
-                                 int batchSize,
-                                 int totalToExpect) throws Exception {
-
-         this.activemqURL = activemqURL;
-         this.activemqQueues = activemqQueues;
-         this.totalToExpect = totalToExpect;
-
-         queues = this.activemqQueues.split(",");
-
-         for (int i = 0; i < queues.length; i++) {
-            queues[i] = queues[i].trim();
-         }
-
-         threads = new HashMap<>();
-
-         // For each queue, create a list of threads and set up the list of ids
-         for (String q : queues) {
-
-            List<ConsumerThread> list = new ArrayList<>();
-
-            idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
-
-            for (int i = 0; i < threadsPerQueue; i++) {
-               list.add(new ConsumerThread(q, batchSize));
-            }
-
-            threads.put(q, list);
-         }
-      }
-
-      public Map<String, List<String>> getIDs() {
-         return idsByQueue;
-      }
-
-      // Start the threads
-      public void start() throws Exception {
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.start();
-            }
-         }
-      }
-
-      // Tell the threads to stop
-      // Then wait for them to stop
-      public void shutdown() throws Exception {
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.shutdown();
-            }
-         }
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.join();
-            }
-         }
-      }
-
-      private Session newSession(QueueConnection queueConnection) throws Exception {
-         return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-
-      private synchronized QueueConnection newQueueConnection() throws Exception {
-
-         if (connectionFactory == null) {
-            connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
-         }
-
-         // Set the redelivery count to -1 (infinite), or else messages will start dropping
-         // after the queue has had a certain number of failures (default is 6)
-         RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
-         policy.setMaximumRedeliveries(-1);
-
-         QueueConnection amqConnection = connectionFactory.createQueueConnection();
-         amqConnection.start();
-         return amqConnection;
-      }
-
-      public boolean completed() {
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               if (ct.isAlive()) {
-                  LOG.info("thread for {} is still alive.", ct.qName);
-                  return false;
-               }
-            }
-         }
-         return true;
-      }
-
-      private class ConsumerThread extends Thread {
-
-         private int batchSize;
-         private QueueConnection qc;
-         private Session session;
-         private MessageConsumer mc;
-         private List<String> idList;
-         private boolean shutdown = false;
-         private String qName;
-
-         private ConsumerThread(String queueName, int batchSize) throws Exception {
-
-            this.batchSize = batchSize;
-
-            // Each thread has its own connection and session
-            qName = queueName;
-            qc = newQueueConnection();
-            session = newSession(qc);
-            Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
-            mc = session.createConsumer(q);
-
-            idList = idsByQueue.get(queueName);
-         }
-
-         @Override
-         public void run() {
-
-            try {
-
-               int count = 0;
-
-               // Keep reading as long as it hasn't been told to shutdown
-               while (!shutdown) {
-
-                  if (idList.size() >= totalToExpect) {
-                     LOG.info("Got {} for q: {}", +idList.size(), qName);
-                     break;
-                  }
-                  Message m = mc.receive(4000);
-
-                  if (m != null) {
-
-                     // We received a non-null message, add the ID to our list
-
-                     idList.add(m.getStringProperty("KEY"));
-
-                     count++;
-
-                     // If we've reached our batch size, commit the batch and reset the count
-
-                     if (count == batchSize) {
-                        count = 0;
-                     }
-                  }
-                  else {
-
-                     // We didn't receive anything this time, commit any current batch and reset the count
-
-                     count = 0;
-
-                     // Sleep a little before trying to read after not getting a message
-
-                     try {
-                        if (idList.size() < totalToExpect) {
-                           LOG.info("did not receive on {}, current count: {}", qName, idList.size());
-                        }
-                        //sleep(3000);
-                     }
-                     catch (Exception e) {
-                     }
-                  }
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-            finally {
-
-               // Once we exit, close everything
-               close();
-            }
-         }
-
-         public void shutdown() {
-            shutdown = true;
-         }
-
-         public void close() {
-
-            try {
-               mc.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               session.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               qc.close();
-            }
-            catch (Exception e) {
-
-            }
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
deleted file mode 100644
index c7bc6d2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-/*
- * pause producers if consumers stall and verify broker drained before resume
- */
-@RunWith(Parameterized.class)
-public class AMQ5266StarvedConsumerTest {
-
-   static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
-   String activemqURL;
-   BrokerService brokerService;
-
-   public int messageSize = 1000;
-
-   @Parameterized.Parameter(0)
-   public int publisherMessagesPerThread = 1000;
-
-   @Parameterized.Parameter(1)
-   public int publisherThreadCount = 20;
-
-   @Parameterized.Parameter(2)
-   public int consumerThreadsPerQueue = 5;
-
-   @Parameterized.Parameter(3)
-   public int destMemoryLimit = 50 * 1024;
-
-   @Parameterized.Parameter(4)
-   public boolean useCache = true;
-
-   @Parameterized.Parameter(5)
-   public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
-
-   @Parameterized.Parameter(6)
-   public boolean optimizeDispatch = false;
-   private AtomicBoolean didNotReceive = new AtomicBoolean(false);
-
-   @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
-   public static Iterable<Object[]> parameters() {
-      return Arrays.asList(new Object[][]{{1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
-
-         {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},});
-   }
-
-   public int consumerBatchSize = 5;
-
-   @Before
-   public void startBroker() throws Exception {
-      brokerService = new BrokerService();
-      TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.setUseJmx(false);
-      brokerService.setAdvisorySupport(false);
-
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-      defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
-      defaultEntry.setMaxAuditDepth(publisherThreadCount);
-      defaultEntry.setEnableAudit(true);
-      defaultEntry.setUseCache(useCache);
-      defaultEntry.setMaxPageSize(1000);
-      defaultEntry.setOptimizedDispatch(optimizeDispatch);
-      defaultEntry.setMemoryLimit(destMemoryLimit);
-      defaultEntry.setExpireMessagesPeriod(0);
-      policyMap.setDefaultEntry(defaultEntry);
-      brokerService.setDestinationPolicy(policyMap);
-
-      brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
-
-      TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
-      brokerService.start();
-      activemqURL = transportConnector.getPublishableConnectString();
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (brokerService != null) {
-         brokerService.stop();
-      }
-   }
-
-   CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
-      @Override
-      public void run() {
-         // wait for queue size to go to zero
-         try {
-            while (((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) {
-               LOG.info("Total messageCount: " + ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
-               TimeUnit.SECONDS.sleep(5);
-            }
-         }
-         catch (Exception ignored) {
-            ignored.printStackTrace();
-         }
-      }
-   });
-
-   @Test(timeout = 30 * 60 * 1000)
-   public void test() throws Exception {
-
-      String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9";
-
-      int consumerWaitForConsumption = 5 * 60 * 1000;
-
-      ExportQueuePublisher publisher = null;
-      ExportQueueConsumer consumer = null;
-
-      LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
-      LOG.info("\nBuilding Publisher...");
-
-      publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
-
-      LOG.info("Building Consumer...");
-
-      consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
-
-      LOG.info("Starting Publisher...");
-
-      publisher.start();
-
-      LOG.info("Starting Consumer...");
-
-      consumer.start();
-
-      int distinctPublishedCount = 0;
-
-      LOG.info("Waiting For Publisher Completion...");
-
-      publisher.waitForCompletion();
-
-      List<String> publishedIds = publisher.getIDs();
-      distinctPublishedCount = new TreeSet<>(publishedIds).size();
-
-      LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
-
-      long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
-      while (!consumer.completed() && System.currentTimeMillis() < endWait) {
-         try {
-            int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
-            LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
-            Thread.sleep(10000);
-         }
-         catch (Exception e) {
-         }
-      }
-
-      LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
-
-      consumer.shutdown();
-
-      LOG.info("Consumer Stats:");
-
-      for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
-
-         List<String> idList = entry.getValue();
-
-         int distinctConsumed = new TreeSet<>(idList).size();
-
-         StringBuilder sb = new StringBuilder();
-         sb.append("   Queue: " + entry.getKey() +
-                      " -> Total Messages Consumed: " + idList.size() +
-                      ", Distinct IDs Consumed: " + distinctConsumed);
-
-         int diff = distinctPublishedCount - distinctConsumed;
-         sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
-         LOG.info(sb.toString());
-
-         assertEquals("expect to get all messages!", 0, diff);
-
-      }
-   }
-
-   public class ExportQueuePublisher {
-
-      private final String amqUser = ActiveMQConnection.DEFAULT_USER;
-      private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private String activemqURL = null;
-      private String activemqQueues = null;
-      // Collection of distinct IDs that the publisher has published.
-      // After a message is published, its UUID will be written to this list for tracking.
-      // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
-      //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
-      private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
-      private List<PublisherThread> threads;
-
-      public ExportQueuePublisher(String activemqURL,
-                                  String activemqQueues,
-                                  int messagesPerThread,
-                                  int threadCount) throws Exception {
-
-         this.activemqURL = activemqURL;
-         this.activemqQueues = activemqQueues;
-
-         threads = new ArrayList<>();
-
-         // Build the threads and tell them how many messages to publish
-         for (int i = 0; i < threadCount; i++) {
-            PublisherThread pt = new PublisherThread(messagesPerThread);
-            threads.add(pt);
-         }
-      }
-
-      public List<String> getIDs() {
-         return ids;
-      }
-
-      // Kick off threads
-      public void start() throws Exception {
-
-         for (PublisherThread pt : threads) {
-            pt.start();
-         }
-      }
-
-      // Wait for threads to complete. They will complete once they've published all of their messages.
-      public void waitForCompletion() throws Exception {
-
-         for (PublisherThread pt : threads) {
-            pt.join();
-            pt.close();
-         }
-      }
-
-      private Session newSession(QueueConnection queueConnection) throws Exception {
-         return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
-      }
-
-      private synchronized QueueConnection newQueueConnection() throws Exception {
-
-         if (connectionFactory == null) {
-            connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
-            connectionFactory.setWatchTopicAdvisories(false);
-         }
-
-         // Set the redelivery count to -1 (infinite), or else messages will start dropping
-         // after the queue has had a certain number of failures (default is 6)
-         RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
-         policy.setMaximumRedeliveries(-1);
-
-         QueueConnection amqConnection = connectionFactory.createQueueConnection();
-         amqConnection.start();
-         return amqConnection;
-      }
-
-      private class PublisherThread extends Thread {
-
-         private int count;
-         private QueueConnection qc;
-         private Session session;
-         private MessageProducer mp;
-         private Queue q;
-
-         private PublisherThread(int count) throws Exception {
-
-            this.count = count;
-
-            // Each Thread has its own Connection and Session, so no sync worries
-            qc = newQueueConnection();
-            session = newSession(qc);
-
-            // In our code, when publishing to multiple queues,
-            // we're using composite destinations like below
-            q = new ActiveMQQueue(activemqQueues);
-            mp = session.createProducer(null);
-         }
-
-         @Override
-         public void run() {
-
-            try {
-
-               // Loop until we've published enough messages
-               while (count-- > 0) {
-
-                  TextMessage tm = session.createTextMessage(getMessageText());
-                  String id = UUID.randomUUID().toString();
-                  tm.setStringProperty("KEY", id);
-                  ids.add(id);                            // keep track of the key to compare against consumer
-
-                  mp.send(q, tm);
-                  session.commit();
-
-                  if (didNotReceive.get()) {
-                     globalProducerHalt.await();
-                  }
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-
-         // Called by waitForCompletion
-         public void close() {
-
-            try {
-               mp.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               session.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               qc.close();
-            }
-            catch (Exception e) {
-            }
-         }
-      }
-
-   }
-
-   String messageText;
-
-   private String getMessageText() {
-
-      if (messageText == null) {
-
-         synchronized (this) {
-
-            if (messageText == null) {
-
-               StringBuilder sb = new StringBuilder();
-               for (int i = 0; i < messageSize; i++) {
-                  sb.append("X");
-               }
-               messageText = sb.toString();
-            }
-         }
-      }
-
-      return messageText;
-   }
-
-   public class ExportQueueConsumer {
-
-      private final String amqUser = ActiveMQConnection.DEFAULT_USER;
-      private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
-      private final int totalToExpect;
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private String activemqURL = null;
-      private String activemqQueues = null;
-      private String[] queues = null;
-      // Map of IDs that were consumed, keyed by queue name.
-      // We'll compare these against what was published to know if any got stuck or dropped.
-      private Map<String, List<String>> idsByQueue = new HashMap<>();
-      private Map<String, List<ConsumerThread>> threads;
-
-      public ExportQueueConsumer(String activemqURL,
-                                 String activemqQueues,
-                                 int threadsPerQueue,
-                                 int batchSize,
-                                 int totalToExpect) throws Exception {
-
-         this.activemqURL = activemqURL;
-         this.activemqQueues = activemqQueues;
-         this.totalToExpect = totalToExpect;
-
-         queues = this.activemqQueues.split(",");
-
-         for (int i = 0; i < queues.length; i++) {
-            queues[i] = queues[i].trim();
-         }
-
-         threads = new HashMap<>();
-
-         // For each queue, create a list of threads and set up the list of ids
-         for (String q : queues) {
-
-            List<ConsumerThread> list = new ArrayList<>();
-
-            idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
-
-            for (int i = 0; i < threadsPerQueue; i++) {
-               list.add(new ConsumerThread(q, batchSize));
-            }
-
-            threads.put(q, list);
-         }
-      }
-
-      public Map<String, List<String>> getIDs() {
-         return idsByQueue;
-      }
-
-      // Start the threads
-      public void start() throws Exception {
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.start();
-            }
-         }
-      }
-
-      // Tell the threads to stop
-      // Then wait for them to stop
-      public void shutdown() throws Exception {
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.shutdown();
-            }
-         }
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.join();
-            }
-         }
-      }
-
-      private Session newSession(QueueConnection queueConnection) throws Exception {
-         return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
-      }
-
-      private synchronized QueueConnection newQueueConnection() throws Exception {
-
-         if (connectionFactory == null) {
-            connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
-            connectionFactory.setWatchTopicAdvisories(false);
-         }
-
-         // Set the redelivery count to -1 (infinite), or else messages will start dropping
-         // after the queue has had a certain number of failures (default is 6)
-         RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
-         policy.setMaximumRedeliveries(-1);
-
-         QueueConnection amqConnection = connectionFactory.createQueueConnection();
-         amqConnection.start();
-         return amqConnection;
-      }
-
-      public boolean completed() {
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               if (ct.isAlive()) {
-                  LOG.info("thread for {} is still alive.", ct.qName);
-                  return false;
-               }
-            }
-         }
-         return true;
-      }
-
-      private class ConsumerThread extends Thread {
-
-         private int batchSize;
-         private QueueConnection qc;
-         private Session session;
-         private MessageConsumer mc;
-         private List<String> idList;
-         private boolean shutdown = false;
-         private String qName;
-
-         private ConsumerThread(String queueName, int batchSize) throws Exception {
-
-            this.batchSize = batchSize;
-
-            // Each thread has its own connection and session
-            qName = queueName;
-            qc = newQueueConnection();
-            session = newSession(qc);
-            Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
-            mc = session.createConsumer(q);
-
-            idList = idsByQueue.get(queueName);
-         }
-
-         @Override
-         public void run() {
-
-            try {
-
-               int count = 0;
-
-               // Keep reading as long as it hasn't been told to shutdown
-               while (!shutdown) {
-
-                  if (idList.size() >= totalToExpect) {
-                     LOG.info("Got {} for q: {}", +idList.size(), qName);
-                     session.commit();
-                     break;
-                  }
-                  Message m = mc.receive(4000);
-
-                  if (m != null) {
-
-                     // We received a non-null message, add the ID to our list
-
-                     idList.add(m.getStringProperty("KEY"));
-
-                     count++;
-
-                     // If we've reached our batch size, commit the batch and reset the count
-
-                     if (count == batchSize) {
-                        session.commit();
-                        count = 0;
-                     }
-                  }
-                  else {
-
-                     // We didn't receive anything this time, commit any current batch and reset the count
-
-                     session.commit();
-                     count = 0;
-
-                     // Sleep a little before trying to read after not getting a message
-
-                     try {
-                        if (idList.size() < totalToExpect) {
-                           LOG.info("did not receive on {}, current count: {}", qName, idList.size());
-                           didNotReceive.set(true);
-                        }
-                        //sleep(3000);
-                     }
-                     catch (Exception e) {
-                     }
-                  }
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-            finally {
-
-               // Once we exit, close everything
-               close();
-            }
-         }
-
-         public void shutdown() {
-            shutdown = true;
-         }
-
-         public void close() {
-
-            try {
-               mc.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               session.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               qc.close();
-            }
-            catch (Exception e) {
-
-            }
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
deleted file mode 100644
index c5712b8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ /dev/null
@@ -1,604 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Stuck messages test client.
- * <br>
- * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue.
- */
-@RunWith(Parameterized.class)
-public class AMQ5266Test {
-
-   static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
-   String activemqURL = "tcp://localhost:61617";
-   BrokerService brokerService;
-
-   public int messageSize = 1000;
-
-   @Parameterized.Parameter(0)
-   public int publisherMessagesPerThread = 1000;
-
-   @Parameterized.Parameter(1)
-   public int publisherThreadCount = 20;
-
-   @Parameterized.Parameter(2)
-   public int consumerThreadsPerQueue = 5;
-
-   @Parameterized.Parameter(3)
-   public int destMemoryLimit = 50 * 1024;
-
-   @Parameterized.Parameter(4)
-   public boolean useCache = true;
-
-   @Parameterized.Parameter(5)
-   public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
-
-   @Parameterized.Parameter(6)
-   public boolean optimizeDispatch = false;
-
-   @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
-   public static Iterable<Object[]> parameters() {
-      return Arrays.asList(new Object[][]{{1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
-
-         {1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {100, 5, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
-
-         {1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {100, 5, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
-
-      });
-   }
-
-   public int consumerBatchSize = 5;
-
-   @Before
-   public void startBroker() throws Exception {
-      brokerService = new BrokerService();
-      TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
-      brokerService.setDeleteAllMessagesOnStartup(true);
-      brokerService.setUseJmx(false);
-
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry defaultEntry = new PolicyEntry();
-      defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
-      defaultEntry.setMaxAuditDepth(publisherThreadCount);
-      defaultEntry.setEnableAudit(true);
-      defaultEntry.setUseCache(useCache);
-      defaultEntry.setMaxPageSize(1000);
-      defaultEntry.setOptimizedDispatch(optimizeDispatch);
-      defaultEntry.setMemoryLimit(destMemoryLimit);
-      defaultEntry.setExpireMessagesPeriod(0);
-      policyMap.setDefaultEntry(defaultEntry);
-      brokerService.setDestinationPolicy(policyMap);
-
-      brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
-
-      TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
-      brokerService.start();
-      activemqURL = transportConnector.getPublishableConnectString();
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (brokerService != null) {
-         brokerService.stop();
-      }
-   }
-
-   @Test
-   public void test() throws Exception {
-
-      String activemqQueues = "activemq,activemq2";//,activemq3,activemq4,activemq5,activemq6,activemq7,activemq8,activemq9";
-
-      int consumerWaitForConsumption = 5 * 60 * 1000;
-
-      ExportQueuePublisher publisher = null;
-      ExportQueueConsumer consumer = null;
-
-      LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
-      LOG.info("\nBuilding Publisher...");
-
-      publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
-
-      LOG.info("Building Consumer...");
-
-      consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
-
-      LOG.info("Starting Publisher...");
-
-      publisher.start();
-
-      LOG.info("Starting Consumer...");
-
-      consumer.start();
-
-      int distinctPublishedCount = 0;
-
-      LOG.info("Waiting For Publisher Completion...");
-
-      publisher.waitForCompletion();
-
-      List<String> publishedIds = publisher.getIDs();
-      distinctPublishedCount = new TreeSet<>(publishedIds).size();
-
-      LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
-
-      long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
-      while (!consumer.completed() && System.currentTimeMillis() < endWait) {
-         try {
-            int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
-            LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
-            Thread.sleep(10000);
-         }
-         catch (Exception e) {
-         }
-      }
-
-      LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
-
-      consumer.shutdown();
-
-      LOG.info("Consumer Stats:");
-
-      for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
-
-         List<String> idList = entry.getValue();
-
-         int distinctConsumed = new TreeSet<>(idList).size();
-
-         StringBuilder sb = new StringBuilder();
-         sb.append("   Queue: " + entry.getKey() +
-                      " -> Total Messages Consumed: " + idList.size() +
-                      ", Distinct IDs Consumed: " + distinctConsumed);
-
-         int diff = distinctPublishedCount - distinctConsumed;
-         sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
-         LOG.info(sb.toString());
-
-         assertEquals("expect to get all messages!", 0, diff);
-
-      }
-   }
-
-   public class ExportQueuePublisher {
-
-      private final String amqUser = ActiveMQConnection.DEFAULT_USER;
-      private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private String activemqURL = null;
-      private String activemqQueues = null;
-      // Collection of distinct IDs that the publisher has published.
-      // After a message is published, its UUID will be written to this list for tracking.
-      // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
-      //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
-      private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
-      private List<PublisherThread> threads;
-
-      public ExportQueuePublisher(String activemqURL,
-                                  String activemqQueues,
-                                  int messagesPerThread,
-                                  int threadCount) throws Exception {
-
-         this.activemqURL = activemqURL;
-         this.activemqQueues = activemqQueues;
-
-         threads = new ArrayList<>();
-
-         // Build the threads and tell them how many messages to publish
-         for (int i = 0; i < threadCount; i++) {
-            PublisherThread pt = new PublisherThread(messagesPerThread);
-            threads.add(pt);
-         }
-      }
-
-      public List<String> getIDs() {
-         return ids;
-      }
-
-      // Kick off threads
-      public void start() throws Exception {
-
-         for (PublisherThread pt : threads) {
-            pt.start();
-         }
-      }
-
-      // Wait for threads to complete. They will complete once they've published all of their messages.
-      public void waitForCompletion() throws Exception {
-
-         for (PublisherThread pt : threads) {
-            pt.join();
-            pt.close();
-         }
-      }
-
-      private Session newSession(QueueConnection queueConnection) throws Exception {
-         return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
-      }
-
-      private synchronized QueueConnection newQueueConnection() throws Exception {
-
-         if (connectionFactory == null) {
-            connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
-         }
-
-         // Set the redelivery count to -1 (infinite), or else messages will start dropping
-         // after the queue has had a certain number of failures (default is 6)
-         RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
-         policy.setMaximumRedeliveries(-1);
-
-         QueueConnection amqConnection = connectionFactory.createQueueConnection();
-         amqConnection.start();
-         return amqConnection;
-      }
-
-      private class PublisherThread extends Thread {
-
-         private int count;
-         private QueueConnection qc;
-         private Session session;
-         private MessageProducer mp;
-
-         private PublisherThread(int count) throws Exception {
-
-            this.count = count;
-
-            // Each Thread has its own Connection and Session, so no sync worries
-            qc = newQueueConnection();
-            session = newSession(qc);
-
-            // In our code, when publishing to multiple queues,
-            // we're using composite destinations like below
-            Queue q = new ActiveMQQueue(activemqQueues);
-            mp = session.createProducer(q);
-         }
-
-         @Override
-         public void run() {
-
-            try {
-
-               // Loop until we've published enough messages
-               while (count-- > 0) {
-
-                  TextMessage tm = session.createTextMessage(getMessageText());
-                  String id = UUID.randomUUID().toString();
-                  tm.setStringProperty("KEY", id);
-                  ids.add(id);                            // keep track of the key to compare against consumer
-
-                  mp.send(tm);
-                  session.commit();
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-
-         // Called by waitForCompletion
-         public void close() {
-
-            try {
-               mp.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               session.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               qc.close();
-            }
-            catch (Exception e) {
-            }
-         }
-      }
-
-   }
-
-   String messageText;
-
-   private String getMessageText() {
-
-      if (messageText == null) {
-
-         synchronized (this) {
-
-            if (messageText == null) {
-
-               StringBuilder sb = new StringBuilder();
-               for (int i = 0; i < messageSize; i++) {
-                  sb.append("X");
-               }
-               messageText = sb.toString();
-            }
-         }
-      }
-
-      return messageText;
-   }
-
-   public class ExportQueueConsumer {
-
-      private final String amqUser = ActiveMQConnection.DEFAULT_USER;
-      private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
-      private final int totalToExpect;
-      private ActiveMQConnectionFactory connectionFactory = null;
-      private String activemqURL = null;
-      private String activemqQueues = null;
-      private String[] queues = null;
-      // Map of IDs that were consumed, keyed by queue name.
-      // We'll compare these against what was published to know if any got stuck or dropped.
-      private Map<String, List<String>> idsByQueue = new HashMap<>();
-      private Map<String, List<ConsumerThread>> threads;
-
-      public ExportQueueConsumer(String activemqURL,
-                                 String activemqQueues,
-                                 int threadsPerQueue,
-                                 int batchSize,
-                                 int totalToExpect) throws Exception {
-
-         this.activemqURL = activemqURL;
-         this.activemqQueues = activemqQueues;
-         this.totalToExpect = totalToExpect;
-
-         queues = this.activemqQueues.split(",");
-
-         for (int i = 0; i < queues.length; i++) {
-            queues[i] = queues[i].trim();
-         }
-
-         threads = new HashMap<>();
-
-         // For each queue, create a list of threads and set up the list of ids
-         for (String q : queues) {
-
-            List<ConsumerThread> list = new ArrayList<>();
-
-            idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
-
-            for (int i = 0; i < threadsPerQueue; i++) {
-               list.add(new ConsumerThread(q, batchSize));
-            }
-
-            threads.put(q, list);
-         }
-      }
-
-      public Map<String, List<String>> getIDs() {
-         return idsByQueue;
-      }
-
-      // Start the threads
-      public void start() throws Exception {
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.start();
-            }
-         }
-      }
-
-      // Tell the threads to stop
-      // Then wait for them to stop
-      public void shutdown() throws Exception {
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.shutdown();
-            }
-         }
-
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               ct.join();
-            }
-         }
-      }
-
-      private Session newSession(QueueConnection queueConnection) throws Exception {
-         return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
-      }
-
-      private synchronized QueueConnection newQueueConnection() throws Exception {
-
-         if (connectionFactory == null) {
-            connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
-         }
-
-         // Set the redelivery count to -1 (infinite), or else messages will start dropping
-         // after the queue has had a certain number of failures (default is 6)
-         RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
-         policy.setMaximumRedeliveries(-1);
-
-         QueueConnection amqConnection = connectionFactory.createQueueConnection();
-         amqConnection.start();
-         return amqConnection;
-      }
-
-      public boolean completed() {
-         for (List<ConsumerThread> list : threads.values()) {
-
-            for (ConsumerThread ct : list) {
-
-               if (ct.isAlive()) {
-                  LOG.info("thread for {} is still alive.", ct.qName);
-                  return false;
-               }
-            }
-         }
-         return true;
-      }
-
-      private class ConsumerThread extends Thread {
-
-         private int batchSize;
-         private QueueConnection qc;
-         private Session session;
-         private MessageConsumer mc;
-         private List<String> idList;
-         private boolean shutdown = false;
-         private String qName;
-
-         private ConsumerThread(String queueName, int batchSize) throws Exception {
-
-            this.batchSize = batchSize;
-
-            // Each thread has its own connection and session
-            qName = queueName;
-            qc = newQueueConnection();
-            session = newSession(qc);
-            Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
-            mc = session.createConsumer(q);
-
-            idList = idsByQueue.get(queueName);
-         }
-
-         @Override
-         public void run() {
-
-            try {
-
-               int count = 0;
-
-               // Keep reading as long as it hasn't been told to shutdown
-               while (!shutdown) {
-
-                  if (idList.size() >= totalToExpect) {
-                     LOG.info("Got {} for q: {}", +idList.size(), qName);
-                     session.commit();
-                     break;
-                  }
-                  Message m = mc.receive(4000);
-
-                  if (m != null) {
-
-                     // We received a non-null message, add the ID to our list
-
-                     idList.add(m.getStringProperty("KEY"));
-
-                     count++;
-
-                     // If we've reached our batch size, commit the batch and reset the count
-
-                     if (count == batchSize) {
-                        session.commit();
-                        count = 0;
-                     }
-                  }
-                  else {
-
-                     // We didn't receive anything this time, commit any current batch and reset the count
-
-                     session.commit();
-                     count = 0;
-
-                     // Sleep a little before trying to read after not getting a message
-
-                     try {
-                        if (idList.size() < totalToExpect) {
-                           LOG.info("did not receive on {}, current count: {}", qName, idList.size());
-                        }
-                        //sleep(3000);
-                     }
-                     catch (Exception e) {
-                     }
-                  }
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-            finally {
-
-               // Once we exit, close everything
-               close();
-            }
-         }
-
-         public void shutdown() {
-            shutdown = true;
-         }
-
-         public void close() {
-
-            try {
-               mc.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               session.close();
-            }
-            catch (Exception e) {
-            }
-
-            try {
-               qc.close();
-            }
-            catch (Exception e) {
-
-            }
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
deleted file mode 100644
index d4c02fb..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AMQ5274Test {
-
-   static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
-   String activemqURL;
-   BrokerService brokerService;
-   ActiveMQQueue dest = new ActiveMQQueue("TestQ");
-
-   @Before
-   public void startBroker() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setPersistent(false);
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry defaultPolicy = new PolicyEntry();
-      defaultPolicy.setExpireMessagesPeriod(1000);
-      policyMap.setDefaultEntry(defaultPolicy);
-      brokerService.setDestinationPolicy(policyMap);
-      activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
-      brokerService.start();
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (brokerService != null) {
-         brokerService.stop();
-      }
-   }
-
-   @Test
-   public void test() throws Exception {
-      LOG.info("Starting Test");
-      assertTrue(brokerService.isStarted());
-
-      produce();
-      consumeAndRollback();
-
-      // check reported queue size using JMX
-      long queueSize = getQueueSize();
-      assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize);
-   }
-
-   private void consumeAndRollback() throws JMSException, InterruptedException {
-      ActiveMQConnection connection = createConnection();
-      RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
-      noRedelivery.setMaximumRedeliveries(0);
-      connection.setRedeliveryPolicy(noRedelivery);
-      connection.start();
-      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-      MessageConsumer consumer = session.createConsumer(dest);
-      Message m;
-      while ((m = consumer.receive(4000)) != null) {
-         LOG.info("Got:" + m);
-         TimeUnit.SECONDS.sleep(1);
-         session.rollback();
-      }
-      connection.close();
-   }
-
-   private void produce() throws Exception {
-      Connection connection = createConnection();
-      connection.start();
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(dest);
-      producer.setTimeToLive(10000);
-      for (int i = 0; i < 20; i++) {
-         producer.send(session.createTextMessage("i=" + i));
-      }
-      connection.close();
-   }
-
-   private ActiveMQConnection createConnection() throws JMSException {
-      return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection();
-   }
-
-   public long getQueueSize() throws Exception {
-      long queueSize = 0;
-      try {
-         QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false);
-         queueSize = queueViewMBean.getQueueSize();
-         LOG.info("QueueSize for destination {} is {}", dest, queueSize);
-      }
-      catch (Exception ex) {
-         LOG.error("Error retrieving QueueSize from JMX ", ex);
-         throw ex;
-      }
-      return queueSize;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
deleted file mode 100644
index a05d56d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Arrays;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-public class AMQ5381Test {
-
-   public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
-   public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check";
-
-   private BrokerService brokerService;
-   private String brokerURI;
-
-   @Rule
-   public TestName name = new TestName();
-
-   @Before
-   public void startBroker() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setPersistent(false);
-      brokerService.setUseJmx(false);
-      brokerService.addConnector("tcp://localhost:0");
-      brokerService.start();
-      brokerService.waitUntilStarted();
-
-      brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
-   }
-
-   @After
-   public void stopBroker() throws Exception {
-      if (brokerService != null) {
-         brokerService.stop();
-      }
-   }
-
-   private ActiveMQConnection createConnection(boolean useCompression) throws Exception {
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
-      factory.setUseCompression(useCompression);
-      Connection connection = factory.createConnection();
-      connection.start();
-      return (ActiveMQConnection) connection;
-   }
-
-   @Test
-   public void amq5381Test() throws Exception {
-
-      // Consumer Configured for (useCompression=true)
-      final ActiveMQConnection consumerConnection = createConnection(true);
-      final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final Queue consumerQueue = consumerSession.createQueue(name.getMethodName());
-      final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
-
-      // Producer Configured for (useCompression=false)
-      final ActiveMQConnection producerConnection = createConnection(false);
-      final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final Queue producerQueue = producerSession.createQueue(name.getMethodName());
-
-      try {
-
-         final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage();
-         messageProduced.writeBytes(ORIG_MSG_CONTENT);
-         Assert.assertFalse(messageProduced.isReadOnlyBody());
-
-         Assert.assertFalse("Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)", messageProduced.isCompressed());
-
-         final MessageProducer producer = producerSession.createProducer(null);
-         producer.send(producerQueue, messageProduced);
-
-         Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag", producerConnection.isUseCompression(), messageProduced.isCompressed());
-
-         final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive();
-         Assert.assertNotNull(messageConsumed);
-         Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody());
-         Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag", messageProduced.isCompressed(), messageConsumed.isCompressed());
-
-         // ensure consumed message content matches what was originally set
-         final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
-         messageConsumed.readBytes(consumedMsgContent);
-
-         Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent));
-
-         // make message writable so the consumer can modify and reuse it
-         makeWritable(messageConsumed);
-
-         // modify message, attempt to trigger DataFormatException due
-         // to old incorrect compression logic
-         try {
-            messageConsumed.setStringProperty(this.getClass().getName(), "test");
-         }
-         catch (JMSException jmsE) {
-            if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) {
-               StringWriter sw = new StringWriter();
-               PrintWriter pw = new PrintWriter(sw);
-               jmsE.printStackTrace(pw);
-
-               Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString());
-            }
-            else {
-               throw jmsE;
-            }
-         }
-
-         Assert.assertEquals("The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable", messageProduced.isCompressed(), messageConsumed.isCompressed());
-
-         // simulate re-publishing message
-         simulatePublish(messageConsumed);
-
-         // ensure consumed message content matches what was originally set
-         final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
-         messageConsumed.readBytes(modifiedMsgContent);
-
-         Assert.assertTrue("After the message properties are modified and it is re-published, its message content should still match the original message content", Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent));
-      }
-      finally {
-         producerSession.close();
-         producerConnection.close();
-         consumerSession.close();
-         consumerConnection.close();
-      }
-   }
-
-   protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128;
-
-   protected static byte[] randomByteArray() {
-      final Random random = new Random();
-      final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)];
-      random.nextBytes(byteArray);
-
-      return byteArray;
-   }
-
-   protected static void makeWritable(final ActiveMQMessage message) {
-      message.setReadOnlyBody(false);
-      message.setReadOnlyProperties(false);
-   }
-
-   protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException {
-      message.reset();
-      message.onSend();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
deleted file mode 100644
index 0e9e310..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ5421Test {
-
-   private static final Logger LOG = LoggerFactory.getLogger(AMQ5421Test.class);
-
-   private static final int DEST_COUNT = 1000;
-   private final Destination[] destination = new Destination[DEST_COUNT];
-   private final MessageProducer[] producer = new MessageProducer[DEST_COUNT];
-   private BrokerService brokerService;
-   private String connectionUri;
-
-   protected ConnectionFactory createConnectionFactory() throws Exception {
-      ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri);
-      conFactory.setWatchTopicAdvisories(false);
-      return conFactory;
-   }
-
-   protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
-      AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
-      strategy.setCheckPeriod(2000);
-      strategy.setMaxTimeSinceLastAck(5000);
-      strategy.setIgnoreIdleConsumers(false);
-
-      return strategy;
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
-      PolicyEntry policy = new PolicyEntry();
-
-      policy.setSlowConsumerStrategy(createSlowConsumerStrategy());
-      policy.setQueuePrefetch(10);
-      policy.setTopicPrefetch(10);
-      PolicyMap pMap = new PolicyMap();
-      pMap.setDefaultEntry(policy);
-      brokerService.setDestinationPolicy(pMap);
-      brokerService.addConnector("tcp://0.0.0.0:0");
-      brokerService.start();
-
-      connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
-   }
-
-   @Test
-   public void testManyTempDestinations() throws Exception {
-      Connection connection = createConnectionFactory().createConnection();
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      for (int i = 0; i < DEST_COUNT; i++) {
-         destination[i] = session.createTemporaryQueue();
-         LOG.debug("Created temp queue: [}", i);
-      }
-
-      for (int i = 0; i < DEST_COUNT; i++) {
-         producer[i] = session.createProducer(destination[i]);
-         LOG.debug("Created producer: {}", i);
-         TextMessage msg = session.createTextMessage(" testMessage " + i);
-         producer[i].send(msg);
-         LOG.debug("message sent: {}", i);
-         MessageConsumer consumer = session.createConsumer(destination[i]);
-         Message message = consumer.receive(1000);
-         Assert.assertTrue(message.equals(msg));
-      }
-
-      for (int i = 0; i < DEST_COUNT; i++) {
-         producer[i].close();
-      }
-
-      connection.close();
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      brokerService.stop();
-      brokerService.waitUntilStopped();
-   }
-}