You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 16:21:42 UTC

[13/61] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
deleted file mode 100644
index d0066a3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-public class AMQ5450Test {
-
-   static final Logger LOG = LoggerFactory.getLogger(AMQ5450Test.class);
-   private final static int maxFileLength = 1024 * 1024 * 32;
-
-   private final static String POSTFIX_DESTINATION_NAME = ".dlq";
-
-   private final static String DESTINATION_NAME = "test" + POSTFIX_DESTINATION_NAME;
-   private final static String DESTINATION_NAME_2 = "2.test" + POSTFIX_DESTINATION_NAME;
-   private final static String DESTINATION_NAME_3 = "3.2.test" + POSTFIX_DESTINATION_NAME;
-
-   private final static String[] DESTS = new String[]{DESTINATION_NAME, DESTINATION_NAME_2, DESTINATION_NAME_3, DESTINATION_NAME, DESTINATION_NAME};
-
-   BrokerService broker;
-   private HashMap<Object, PersistenceAdapter> adapters = new HashMap<>();
-
-   @After
-   public void tearDown() throws Exception {
-      broker.stop();
-   }
-
-   protected BrokerService createAndStartBroker(PersistenceAdapter persistenceAdapter) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setUseJmx(false);
-      broker.setBrokerName("localhost");
-      broker.setPersistenceAdapter(persistenceAdapter);
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.start();
-      broker.waitUntilStarted();
-      return broker;
-   }
-
-   @Test
-   public void testPostFixMatch() throws Exception {
-      doTestPostFixMatch(false);
-   }
-
-   @Test
-   public void testPostFixCompositeMatch() throws Exception {
-      doTestPostFixMatch(true);
-   }
-
-   private void doTestPostFixMatch(boolean useComposite) throws Exception {
-      prepareBrokerWithMultiStore(useComposite);
-
-      sendMessage(DESTINATION_NAME, "test 1");
-      sendMessage(DESTINATION_NAME_2, "test 1");
-      sendMessage(DESTINATION_NAME_3, "test 1");
-
-      assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
-      assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
-      assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3)));
-
-      for (String dest : DESTS) {
-         Destination destination2 = broker.getDestination(new ActiveMQQueue(dest));
-         assertNotNull(destination2);
-         assertEquals(1, destination2.getMessageStore().getMessageCount());
-      }
-
-      HashMap<Integer, PersistenceAdapter> numDests = new HashMap<>();
-      for (PersistenceAdapter pa : adapters.values()) {
-         numDests.put(pa.getDestinations().size(), pa);
-      }
-
-      // ensure wildcard does not match any
-      assertTrue("0 in wildcard matcher", adapters.get(null).getDestinations().isEmpty());
-
-      assertEquals("only two values", 2, numDests.size());
-      assertTrue("0 in others", numDests.containsKey(0));
-
-      if (useComposite) {
-         assertTrue("3 in one", numDests.containsKey(3));
-      }
-      else {
-         assertTrue("1 in some", numDests.containsKey(1));
-      }
-
-   }
-
-   protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
-      KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-      kaha.setJournalMaxFileLength(maxFileLength);
-      kaha.setCleanupInterval(5000);
-      if (delete) {
-         kaha.deleteAllMessages();
-      }
-      return kaha;
-   }
-
-   public void prepareBrokerWithMultiStore(boolean compositeMatch) throws Exception {
-
-      MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
-      multiKahaDBPersistenceAdapter.deleteAllMessages();
-      ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
-      if (compositeMatch) {
-         StringBuffer compositeDestBuf = new StringBuffer();
-         for (int i = 1; i <= DESTS.length; i++) {
-            for (int j = 0; j < i; j++) {
-               compositeDestBuf.append("*");
-               if ((j + 1 == i)) {
-                  compositeDestBuf.append(POSTFIX_DESTINATION_NAME);
-               }
-               else {
-                  compositeDestBuf.append(".");
-               }
-            }
-            if (!(i + 1 > DESTS.length)) {
-               compositeDestBuf.append(",");
-            }
-         }
-         adapters.add(createFilteredKahaDBByDestinationPrefix(compositeDestBuf.toString(), true));
-
-      }
-      else {
-         // destination map does not do post fix wild card matches on paths, so we need to cover
-         // each path length
-         adapters.add(createFilteredKahaDBByDestinationPrefix("*" + POSTFIX_DESTINATION_NAME, true));
-         adapters.add(createFilteredKahaDBByDestinationPrefix("*.*" + POSTFIX_DESTINATION_NAME, true));
-         adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*" + POSTFIX_DESTINATION_NAME, true));
-         adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*.*" + POSTFIX_DESTINATION_NAME, true));
-      }
-
-      // ensure wildcard matcher is there for other dests
-      adapters.add(createFilteredKahaDBByDestinationPrefix(null, true));
-
-      multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
-      broker = createAndStartBroker(multiKahaDBPersistenceAdapter);
-   }
-
-   private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix,
-                                                                                    boolean deleteAllMessages) throws IOException {
-      FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
-      template.setPersistenceAdapter(createStore(deleteAllMessages));
-      if (destinationPrefix != null) {
-         template.setQueue(destinationPrefix);
-      }
-      adapters.put(destinationPrefix, template.getPersistenceAdapter());
-      return template;
-   }
-
-   private void sendMessage(String destinationName, String message) throws JMSException {
-      ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
-      f.setAlwaysSyncSend(true);
-      Connection c = f.createConnection();
-      c.start();
-      Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName));
-      producer.send(s.createTextMessage(message));
-      producer.close();
-      s.close();
-      c.stop();
-   }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
deleted file mode 100644
index 5ed211b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerRestartTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ5567Test extends BrokerRestartTestSupport {
-
-   protected static final Logger LOG = LoggerFactory.getLogger(AMQ5567Test.class);
-   ActiveMQQueue destination = new ActiveMQQueue("Q");
-
-   @Override
-   protected void configureBroker(BrokerService broker) throws Exception {
-      super.configureBroker(broker);
-      broker.setPersistenceAdapter(persistenceAdapter);
-   }
-
-   @Override
-   protected PolicyEntry getDefaultPolicy() {
-      PolicyEntry policy = new PolicyEntry();
-      policy.setMemoryLimit(60 * 1024);
-      return policy;
-   }
-
-   public void initCombosForTestPreparedTransactionNotDispatched() throws Exception {
-      PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{new KahaDBPersistenceAdapter(), new LevelDBPersistenceAdapter(), new JDBCPersistenceAdapter(DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())};
-      for (PersistenceAdapter adapter : persistenceAdapters) {
-         adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
-      }
-      addCombinationValues("persistenceAdapter", persistenceAdapters);
-   }
-
-   public void testPreparedTransactionNotDispatched() throws Exception {
-
-      ActiveMQDestination destination = new ActiveMQQueue("Q");
-
-      StubConnection connection = createConnection();
-      ConnectionInfo connectionInfo = createConnectionInfo();
-      SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-      ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-      connection.send(connectionInfo);
-      connection.send(sessionInfo);
-      connection.send(producerInfo);
-
-      XATransactionId txid = createXATransaction(sessionInfo);
-      connection.send(createBeginTransaction(connectionInfo, txid));
-      Message message = createMessage(producerInfo, destination);
-      message.setPersistent(true);
-      message.setTransactionId(txid);
-      connection.send(message);
-
-      connection.send(createPrepareTransaction(connectionInfo, txid));
-
-      // send another non tx, will poke dispatch
-      message = createMessage(producerInfo, destination);
-      message.setPersistent(true);
-      connection.send(message);
-
-      // Since prepared but not committed.. only one should get delivered
-      StubConnection connectionC = createConnection();
-      ConnectionInfo connectionInfoC = createConnectionInfo();
-      SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
-      ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination);
-      connectionC.send(connectionInfoC);
-      connectionC.send(sessionInfoC);
-      connectionC.send(consumerInfo);
-
-      Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
-      LOG.info("received: " + m);
-      assertNotNull("Got message", m);
-      assertNull("Got non tx message", m.getTransactionId());
-
-      // cannot get the prepared message till commit
-      assertNull(receiveMessage(connectionC));
-      assertNoMessagesLeft(connectionC);
-
-      LOG.info("commit: " + txid);
-      connection.request(createCommitTransaction2Phase(connectionInfo, txid));
-
-      m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
-      LOG.info("received: " + m);
-      assertNotNull("Got non null message", m);
-
-   }
-
-   public void initCombosForTestCursorStoreSync() throws Exception {
-      PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{new KahaDBPersistenceAdapter(), new LevelDBPersistenceAdapter(), new JDBCPersistenceAdapter(DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())};
-      for (PersistenceAdapter adapter : persistenceAdapters) {
-         adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
-      }
-      addCombinationValues("persistenceAdapter", persistenceAdapters);
-   }
-
-   public void testCursorStoreSync() throws Exception {
-
-      StubConnection connection = createConnection();
-      ConnectionInfo connectionInfo = createConnectionInfo();
-      SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-      ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-      connection.send(connectionInfo);
-      connection.send(sessionInfo);
-      connection.send(producerInfo);
-
-      XATransactionId txid = createXATransaction(sessionInfo);
-      connection.send(createBeginTransaction(connectionInfo, txid));
-      Message message = createMessage(producerInfo, destination);
-      message.setPersistent(true);
-      message.setTransactionId(txid);
-      connection.request(message);
-
-      connection.request(createPrepareTransaction(connectionInfo, txid));
-
-      QueueViewMBean proxy = getProxyToQueueViewMBean();
-      assertTrue("cache is enabled", proxy.isCacheEnabled());
-
-      // send another non tx, will fill cursor
-      String payload = new String(new byte[10 * 1024]);
-      for (int i = 0; i < 6; i++) {
-         message = createMessage(producerInfo, destination);
-         message.setPersistent(true);
-         ((TextMessage) message).setText(payload);
-         connection.request(message);
-      }
-
-      assertTrue("cache is disabled", !proxy.isCacheEnabled());
-
-      StubConnection connectionC = createConnection();
-      ConnectionInfo connectionInfoC = createConnectionInfo();
-      SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
-      ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination);
-      connectionC.send(connectionInfoC);
-      connectionC.send(sessionInfoC);
-      connectionC.send(consumerInfo);
-
-      Message m = null;
-      for (int i = 0; i < 3; i++) {
-         m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
-         LOG.info("received: " + m);
-         assertNotNull("Got message", m);
-         assertNull("Got non tx message", m.getTransactionId());
-         connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
-      }
-
-      LOG.info("commit: " + txid);
-      connection.request(createCommitTransaction2Phase(connectionInfo, txid));
-      // consume the rest including the 2pc send in TX
-
-      for (int i = 0; i < 4; i++) {
-         m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
-         LOG.info("received[" + i + "] " + m);
-         assertNotNull("Got message", m);
-         if (i == 3) {
-            assertNotNull("Got  tx message", m.getTransactionId());
-         }
-         else {
-            assertNull("Got non tx message", m.getTransactionId());
-         }
-         connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
-      }
-   }
-
-   private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException {
-      ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + destination.getQueueName() + ",type=Broker,brokerName=localhost");
-      QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
-      return proxy;
-   }
-
-   public static Test suite() {
-      return suite(AMQ5567Test.class);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
deleted file mode 100644
index e8414d5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
-import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * @author James Furness
- *         https://issues.apache.org/jira/browse/AMQ-3607
- */
-public class ActiveMQSlowConsumerManualTest {
-
-   private static final int PORT = 12345;
-   private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
-   private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";
-
-   @Test(timeout = 60000)
-   public void testDefaultSettings() throws Exception {
-      runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
-   }
-
-   @Test(timeout = 60000)
-   public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
-      runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
-   }
-
-   @Test(timeout = 60000)
-   public void testBounded() throws Exception {
-      runTest("testBounded", 30, 5, 25, false, false, false, false);
-   }
-
-   @Test(timeout = 60000)
-   public void testBoundedWithOptimiseAcknowledge() throws Exception {
-      runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false);
-   }
-
-   public void runTest(String name,
-                       int sendMessageCount,
-                       int prefetchLimit,
-                       int messageLimit,
-                       boolean evictOldestMessage,
-                       boolean disableFlowControl,
-                       boolean optimizeAcknowledge,
-                       boolean persistent) throws Exception {
-      BrokerService broker = createBroker(persistent);
-      broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
-      broker.start();
-
-      // Slow consumer
-      Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
-      final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
-      final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
-      final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
-      MessageConsumer slowConsumer = createSubscriber(slowConsumerSession, new MessageListener() {
-                                                         @Override
-                                                         public void onMessage(Message message) {
-                                                            try {
-                                                               slowConsumerReceiveCount.incrementAndGet();
-                                                               int count = Integer.parseInt(((TextMessage) message).getText());
-                                                               if (slowConsumerReceived != null)
-                                                                  slowConsumerReceived.add(count);
-                                                               if (count % 10000 == 0)
-                                                                  System.out.println("SlowConsumer: Receive " + count);
-                                                               blockSlowConsumer.await();
-                                                            }
-                                                            catch (Exception ignored) {
-                                                            }
-                                                         }
-                                                      });
-
-      // Fast consumer
-      Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
-      final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
-      final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
-      MessageConsumer fastConsumer = createSubscriber(fastConsumerSession, new MessageListener() {
-                                                         @Override
-                                                         public void onMessage(Message message) {
-                                                            try {
-                                                               fastConsumerReceiveCount.incrementAndGet();
-                                                               TimeUnit.MILLISECONDS.sleep(5);
-                                                               int count = Integer.parseInt(((TextMessage) message).getText());
-                                                               if (fastConsumerReceived != null)
-                                                                  fastConsumerReceived.add(count);
-                                                               if (count % 10000 == 0)
-                                                                  System.out.println("FastConsumer: Receive " + count);
-                                                            }
-                                                            catch (Exception ignored) {
-                                                            }
-                                                         }
-                                                      });
-
-      // Wait for consumers to connect
-      Thread.sleep(500);
-
-      // Publisher
-      AtomicInteger sentCount = new AtomicInteger();
-      List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
-      Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
-      MessageProducer publisher = createPublisher(publisherSession);
-      for (int i = 0; i < sendMessageCount; i++) {
-         sentCount.incrementAndGet();
-         if (sent != null)
-            sent.add(i);
-         if (i % 10000 == 0)
-            System.out.println("Publisher: Send " + i);
-         publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
-      }
-
-      // Wait for messages to arrive
-      Thread.sleep(500);
-
-      System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
-      System.out.println(name + ": Whilst slow consumer blocked:");
-      System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
-      System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
-
-      // Unblock slow consumer
-      blockSlowConsumer.countDown();
-
-      // Wait for messages to arrive
-      Thread.sleep(500);
-
-      System.out.println(name + ": After slow consumer unblocked:");
-      System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
-      System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
-      System.out.println();
-
-      publisher.close();
-      publisherSession.close();
-      slowConsumer.close();
-      slowConsumerSession.close();
-      fastConsumer.close();
-      fastConsumerSession.close();
-      broker.stop();
-
-      Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
-      // this is too timine dependent  as sometimes there is message eviction, would need to check the dlq
-      //Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
-   }
-
-   private static BrokerService createBroker(boolean persistent) throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setBrokerName("TestBroker");
-      broker.setPersistent(persistent);
-      broker.addConnector(URL);
-      return broker;
-   }
-
-   private static MessageConsumer createSubscriber(Session session,
-                                                   MessageListener messageListener) throws JMSException {
-      MessageConsumer consumer = session.createConsumer(TOPIC);
-      consumer.setMessageListener(messageListener);
-      return consumer;
-   }
-
-   private static MessageProducer createPublisher(Session session) throws JMSException {
-      MessageProducer producer = session.createProducer(TOPIC);
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      return producer;
-   }
-
-   private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
-      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
-
-      connectionFactory.setCopyMessageOnSend(false);
-      connectionFactory.setDisableTimeStampsByDefault(true);
-      connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
-      if (optimizeAcknowledge) {
-         connectionFactory.setOptimizeAcknowledgeTimeOut(1);
-      }
-
-      Connection connection = connectionFactory.createConnection();
-      connection.setClientID(clientId);
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      connection.start();
-
-      return session;
-   }
-
-   private static PolicyMap buildPolicy(ActiveMQTopic topic,
-                                        int prefetchLimit,
-                                        int messageLimit,
-                                        boolean evictOldestMessage,
-                                        boolean disableFlowControl) {
-      PolicyMap policyMap = new PolicyMap();
-
-      PolicyEntry policyEntry = new PolicyEntry();
-
-      if (evictOldestMessage) {
-         policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
-      }
-
-      if (disableFlowControl) {
-         policyEntry.setProducerFlowControl(false);
-      }
-
-      if (prefetchLimit > 0) {
-         policyEntry.setTopicPrefetch(prefetchLimit);
-      }
-
-      if (messageLimit > 0) {
-         ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
-         messageLimitStrategy.setLimit(messageLimit);
-         policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
-      }
-
-      policyMap.put(topic, policyEntry);
-
-      return policyMap;
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
deleted file mode 100644
index 2d6a48c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class);
-   private static final int COUNT = 2000;
-   protected String bindAddress;
-
-   public void testConnectionPerMessage() throws Exception {
-      final String topicName = "test.topic";
-
-      LOG.info("Initializing connection factory for JMS to URL: " + bindAddress);
-      final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory();
-      normalFactory.setBrokerURL(bindAddress);
-      for (int i = 0; i < COUNT; i++) {
-
-         if (i % 100 == 0) {
-            LOG.info(new Integer(i).toString());
-         }
-
-         Connection conn = null;
-         try {
-
-            conn = normalFactory.createConnection();
-            final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final Topic topic = session.createTopic(topicName);
-            final MessageProducer producer = session.createProducer(topic);
-            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-            final MapMessage m = session.createMapMessage();
-            m.setInt("hey", i);
-
-            producer.send(m);
-
-         }
-         catch (JMSException e) {
-            LOG.warn(e.getMessage(), e);
-         }
-         finally {
-            if (conn != null)
-               try {
-                  conn.close();
-               }
-               catch (JMSException e) {
-                  LOG.warn(e.getMessage(), e);
-               }
-         }
-      }
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      bindAddress = "vm://localhost";
-      super.setUp();
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setDeleteAllMessagesOnStartup(true);
-      answer.setUseJmx(false);
-      answer.setPersistent(isPersistent());
-      answer.addConnector(bindAddress);
-      return answer;
-   }
-
-   @Override
-   protected boolean isPersistent() {
-      return true;
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
deleted file mode 100644
index 35da06c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class CraigsBugTest extends EmbeddedBrokerTestSupport {
-
-   private String connectionUri;
-
-   public void testConnectionFactory() throws Exception {
-      final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
-      final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
-      final Connection conn = cf.createConnection();
-
-      Runnable r = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               Session session = conn.createSession(false, 1);
-               MessageConsumer consumer = session.createConsumer(queue, null);
-               consumer.receive(1000);
-            }
-            catch (JMSException e) {
-               e.printStackTrace();
-            }
-         }
-      };
-      new Thread(r).start();
-      conn.start();
-
-      try {
-         new CountDownLatch(1).await(3, TimeUnit.SECONDS);
-      }
-      catch (InterruptedException e) {
-         e.printStackTrace();
-      }
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      bindAddress = "tcp://localhost:0";
-      super.setUp();
-
-      connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
deleted file mode 100644
index a79ca58..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.TimeoutException;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.Assert;
-
-public class DoubleExpireTest extends EmbeddedBrokerTestSupport {
-
-   private static final long MESSAGE_TTL_MILLIS = 1000;
-   private static final long MAX_TEST_TIME_MILLIS = 60000;
-
-   @Override
-   public void setUp() throws Exception {
-      setAutoFail(true);
-      setMaxTestTime(MAX_TEST_TIME_MILLIS);
-      super.setUp();
-   }
-
-   /**
-    * This test verifies that a message that expires can be be resent to queue
-    * with a new expiration and that it will be processed as a new message and
-    * allowed to re-expire.
-    * <p>
-    * <b>NOTE:</b> This test fails on AMQ 5.4.2 because the originalExpiration
-    * timestamp is not cleared when the message is resent.
-    */
-   public void testDoubleExpireWithoutMove() throws Exception {
-      // Create the default dead letter queue.
-      final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ");
-
-      Connection conn = createConnection();
-      try {
-         conn.start();
-         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         // Verify that the test queue and DLQ are empty.
-         Assert.assertEquals(0, getSize(destination));
-         Assert.assertEquals(0, getSize(DLQ));
-
-         // Enqueue a message to the test queue that will expire after 1s.
-         MessageProducer producer = session.createProducer(destination);
-         Message testMessage = session.createTextMessage("test message");
-         producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
-         Assert.assertEquals(1, getSize(destination));
-
-         // Wait for the message to expire.
-         waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
-         Assert.assertEquals(1, getSize(DLQ));
-
-         // Consume the message from the DLQ and re-enqueue it to the test
-         // queue so that it expires after 1s.
-         MessageConsumer consumer = session.createConsumer(DLQ);
-         Message expiredMessage = consumer.receive();
-         Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage.getJMSMessageID());
-
-         producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
-         Assert.assertEquals(1, getSize(destination));
-         Assert.assertEquals(0, getSize(DLQ));
-
-         // Verify that the resent message is "different" in that it has
-         // another ID.
-         Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage.getJMSMessageID());
-
-         // Wait for the message to re-expire.
-         waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
-         Assert.assertEquals(1, getSize(DLQ));
-
-         // Re-consume the message from the DLQ.
-         Message reexpiredMessage = consumer.receive();
-         Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage.getJMSMessageID());
-      }
-      finally {
-         conn.close();
-      }
-   }
-
-   /**
-    * A helper method that returns the embedded broker's implementation of a
-    * JMS queue.
-    */
-   private Queue getPhysicalDestination(ActiveMQDestination destination) throws Exception {
-      return (Queue) broker.getAdminView().getBroker().getDestinationMap().get(destination);
-   }
-
-   /**
-    * A helper method that returns the size of the specified queue/topic.
-    */
-   private long getSize(ActiveMQDestination destination) throws Exception {
-      return getPhysicalDestination(destination) != null ? getPhysicalDestination(destination).getDestinationStatistics().getMessages().getCount() : 0;
-   }
-
-   /**
-    * A helper method that waits for a destination to reach a certain size.
-    */
-   private void waitForSize(ActiveMQDestination destination,
-                            int size,
-                            long timeoutMillis) throws Exception, TimeoutException {
-      long startTimeMillis = System.currentTimeMillis();
-
-      while (getSize(destination) != size && System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) {
-         Thread.sleep(250);
-      }
-
-      if (getSize(destination) != size) {
-         throw new TimeoutException("Destination " + destination.getPhysicalName() + " did not reach size " + size + " within " + timeoutMillis + "ms.");
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
deleted file mode 100644
index 3046423..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Test case for AMQ-1479
- */
-public class DurableConsumerTest extends CombinationTestSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
-   private static int COUNT = 1024;
-   private static String CONSUMER_NAME = "DURABLE_TEST";
-   protected BrokerService broker;
-
-   protected String bindAddress = "tcp://localhost:61616";
-
-   protected byte[] payload = new byte[1024 * 32];
-   protected ConnectionFactory factory;
-   protected Vector<Exception> exceptions = new Vector<>();
-
-   private static final String TOPIC_NAME = "failoverTopic";
-   private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
-   public boolean useDedicatedTaskRunner = false;
-
-   private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
-
-      private TopicConnection topicConnection = null;
-
-      public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
-
-         ActiveMQConnectionFactory topicConnectionFactory = null;
-         TopicSession topicSession = null;
-         Topic topic = null;
-         TopicSubscriber topicSubscriber = null;
-
-         topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
-         try {
-
-            topic = new ActiveMQTopic(topicName);
-            topicConnection = topicConnectionFactory.createTopicConnection();
-            topicConnection.setClientID((clientId));
-            topicConnection.start();
-
-            topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
-            topicSubscriber.setMessageListener(this);
-
-         }
-         catch (JMSException e) {
-            e.printStackTrace();
-         }
-      }
-
-      @Override
-      public void onMessage(Message arg0) {
-      }
-
-      public void closeConnection() {
-         if (topicConnection != null) {
-            try {
-               topicConnection.close();
-            }
-            catch (JMSException e) {
-            }
-         }
-      }
-
-      @Override
-      public void onException(JMSException exception) {
-         exceptions.add(exception);
-      }
-   }
-
-   private class MessagePublisher implements Runnable {
-
-      private final boolean shouldPublish = true;
-
-      @Override
-      public void run() {
-         TopicConnectionFactory topicConnectionFactory = null;
-         TopicConnection topicConnection = null;
-         TopicSession topicSession = null;
-         Topic topic = null;
-         TopicPublisher topicPublisher = null;
-         Message message = null;
-
-         topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
-         try {
-            topic = new ActiveMQTopic(TOPIC_NAME);
-            topicConnection = topicConnectionFactory.createTopicConnection();
-            topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            topicPublisher = topicSession.createPublisher(topic);
-            message = topicSession.createMessage();
-         }
-         catch (Exception ex) {
-            exceptions.add(ex);
-         }
-         while (shouldPublish) {
-            try {
-               topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
-            }
-            catch (JMSException ex) {
-               exceptions.add(ex);
-            }
-            try {
-               Thread.sleep(1);
-            }
-            catch (Exception ex) {
-            }
-         }
-      }
-   }
-
-   private void configurePersistence(BrokerService broker) throws Exception {
-      File dataDirFile = new File("target/" + getName());
-      KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter();
-      kahaDBAdapter.setDirectory(dataDirFile);
-      broker.setPersistenceAdapter(kahaDBAdapter);
-   }
-
-   public void testFailover() throws Exception {
-
-      configurePersistence(broker);
-      broker.start();
-
-      Thread publisherThread = new Thread(new MessagePublisher());
-      publisherThread.start();
-      final int numSubs = 100;
-      final List<SimpleTopicSubscriber> list = new ArrayList<>(numSubs);
-      for (int i = 0; i < numSubs; i++) {
-
-         final int id = i;
-         Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-               SimpleTopicSubscriber s = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
-               list.add(s);
-            }
-         });
-         thread.start();
-
-      }
-
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            return numSubs == list.size();
-         }
-      });
-
-      broker.stop();
-      broker = createBroker(false);
-      configurePersistence(broker);
-      broker.start();
-      Thread.sleep(10000);
-      for (SimpleTopicSubscriber s : list) {
-         s.closeConnection();
-      }
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-   }
-
-   // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
-   // with use dedicatedTaskRunner=true and produce OOM
-   public void initCombosForTestConcurrentDurableConsumer() {
-      addCombinationValues("useDedicatedTaskRunner", new Object[]{Boolean.TRUE, Boolean.FALSE});
-   }
-
-   public void testConcurrentDurableConsumer() throws Exception {
-
-      broker.start();
-      broker.waitUntilStarted();
-
-      factory = createConnectionFactory();
-      final String topicName = getName();
-      final int numMessages = 500;
-      int numConsumers = 1;
-      final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers);
-      final AtomicInteger receivedCount = new AtomicInteger();
-      Runnable consumer = new Runnable() {
-         @Override
-         public void run() {
-            final String consumerName = Thread.currentThread().getName();
-            int acked = 0;
-            int received = 0;
-
-            try {
-               while (acked < numMessages / 2) {
-                  // take one message and close, ack on occasion
-                  Connection consumerConnection = factory.createConnection();
-                  ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false);
-                  consumerConnection.setClientID(consumerName);
-                  Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-                  Topic topic = consumerSession.createTopic(topicName);
-                  consumerConnection.start();
-
-                  MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName);
-
-                  counsumerStarted.countDown();
-                  Message msg = null;
-                  do {
-                     msg = consumer.receive(5000);
-                     if (msg != null) {
-                        receivedCount.incrementAndGet();
-                        if (received != 0 && received % 100 == 0) {
-                           LOG.info("Received msg: " + msg.getJMSMessageID());
-                        }
-                        if (++received % 2 == 0) {
-                           msg.acknowledge();
-                           acked++;
-                        }
-                     }
-                  } while (msg == null);
-
-                  consumerConnection.close();
-               }
-               assertTrue(received >= acked);
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               exceptions.add(e);
-            }
-         }
-      };
-
-      ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
-
-      for (int i = 0; i < numConsumers; i++) {
-         executor.execute(consumer);
-      }
-
-      assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
-
-      Connection producerConnection = factory.createConnection();
-      ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false);
-      Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Topic topic = producerSession.createTopic(topicName);
-      MessageProducer producer = producerSession.createProducer(topic);
-      producerConnection.start();
-      for (int i = 0; i < numMessages; i++) {
-         BytesMessage msg = producerSession.createBytesMessage();
-         msg.writeBytes(payload);
-         producer.send(msg);
-         if (i != 0 && i % 100 == 0) {
-            LOG.info("Sent msg " + i);
-         }
-      }
-
-      executor.shutdown();
-      executor.awaitTermination(30, TimeUnit.SECONDS);
-
-      Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
-            LOG.info("receivedCount: " + receivedCount.get());
-            return receivedCount.get() == numMessages;
-         }
-      }, 360 * 1000);
-      assertEquals("got required some messages", numMessages, receivedCount.get());
-      assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
-   }
-
-   public void testConsumerRecover() throws Exception {
-      doTestConsumer(true);
-   }
-
-   public void testConsumer() throws Exception {
-      doTestConsumer(false);
-   }
-
-   public void testPrefetchViaBrokerConfig() throws Exception {
-
-      Integer prefetchVal = new Integer(150);
-      PolicyEntry policyEntry = new PolicyEntry();
-      policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
-      policyEntry.setPrioritizedMessages(true);
-      PolicyMap policyMap = new PolicyMap();
-      policyMap.setDefaultEntry(policyEntry);
-      broker.setDestinationPolicy(policyMap);
-      broker.start();
-
-      factory = createConnectionFactory();
-      Connection consumerConnection = factory.createConnection();
-      consumerConnection.setClientID(CONSUMER_NAME);
-      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Topic topic = consumerSession.createTopic(getClass().getName());
-      MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
-      consumerConnection.start();
-
-      ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
-      Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize");
-      assertEquals(prefetchVal, prefetchFromSubView);
-   }
-
-   public void doTestConsumer(boolean forceRecover) throws Exception {
-
-      if (forceRecover) {
-         configurePersistence(broker);
-      }
-      broker.start();
-
-      factory = createConnectionFactory();
-      Connection consumerConnection = factory.createConnection();
-      consumerConnection.setClientID(CONSUMER_NAME);
-      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      Topic topic = consumerSession.createTopic(getClass().getName());
-      MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
-      consumerConnection.start();
-      consumerConnection.close();
-      broker.stop();
-      broker = createBroker(false);
-      if (forceRecover) {
-         configurePersistence(broker);
-      }
-      broker.start();
-
-      Connection producerConnection = factory.createConnection();
-
-      Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      MessageProducer producer = producerSession.createProducer(topic);
-      producerConnection.start();
-      for (int i = 0; i < COUNT; i++) {
-         BytesMessage msg = producerSession.createBytesMessage();
-         msg.writeBytes(payload);
-         producer.send(msg);
-         if (i != 0 && i % 1000 == 0) {
-            LOG.info("Sent msg " + i);
-         }
-      }
-      producerConnection.close();
-      broker.stop();
-      broker = createBroker(false);
-      if (forceRecover) {
-         configurePersistence(broker);
-      }
-      broker.start();
-
-      consumerConnection = factory.createConnection();
-      consumerConnection.setClientID(CONSUMER_NAME);
-      consumerConnection.start();
-      consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
-      for (int i = 0; i < COUNT; i++) {
-         Message msg = consumer.receive(10000);
-         assertNotNull("Missing message: " + i, msg);
-         if (i != 0 && i % 1000 == 0) {
-            LOG.info("Received msg " + i);
-         }
-
-      }
-      consumerConnection.close();
-
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      if (broker == null) {
-         broker = createBroker(true);
-      }
-
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-      if (broker != null) {
-         broker.stop();
-         broker.waitUntilStopped();
-         broker = null;
-      }
-   }
-
-   protected Topic creatTopic(Session s, String destinationName) throws JMSException {
-      return s.createTopic(destinationName);
-   }
-
-   /**
-    * Factory method to create a new broker
-    *
-    * @throws Exception
-    */
-   protected BrokerService createBroker(boolean deleteStore) throws Exception {
-      BrokerService answer = new BrokerService();
-      configureBroker(answer, deleteStore);
-      return answer;
-   }
-
-   protected void configureBroker(BrokerService answer, boolean deleteStore) throws Exception {
-      answer.setDeleteAllMessagesOnStartup(deleteStore);
-      KahaDBStore kaha = new KahaDBStore();
-      //kaha.setConcurrentStoreAndDispatchTopics(false);
-      File directory = new File("target/activemq-data/kahadb");
-      if (deleteStore) {
-         IOHelper.deleteChildren(directory);
-      }
-      kaha.setDirectory(directory);
-      //kaha.setMaxAsyncJobs(10);
-
-      answer.setPersistenceAdapter(kaha);
-      answer.addConnector(bindAddress);
-      answer.setUseShutdownHook(false);
-      answer.setAdvisorySupport(false);
-      answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
-   }
-
-   protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
-      factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
-      return factory;
-   }
-
-   public static Test suite() {
-      return suite(DurableConsumerTest.class);
-   }
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
deleted file mode 100644
index ef24795..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-
-/**
- *
- */
-public class JMSDurableTopicNoLocalTest extends EmbeddedBrokerTestSupport {
-
-   protected String bindAddress;
-
-   public void testConsumeNoLocal() throws Exception {
-      final String TEST_NAME = getClass().getName();
-      Connection connection = createConnection();
-      connection.setClientID(TEST_NAME);
-
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      TopicSubscriber subscriber = session.createDurableSubscriber((Topic) destination, "topicUser2", null, true);
-
-      final CountDownLatch latch = new CountDownLatch(1);
-      subscriber.setMessageListener(new MessageListener() {
-         @Override
-         public void onMessage(Message message) {
-            System.out.println("Receive a message " + message);
-            latch.countDown();
-         }
-      });
-
-      connection.start();
-
-      MessageProducer producer = session.createProducer(destination);
-      TextMessage message = session.createTextMessage("THIS IS A TEST");
-      producer.send(message);
-      producer.close();
-      latch.await(5, TimeUnit.SECONDS);
-      assertEquals(latch.getCount(), 1);
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      bindAddress = "vm://localhost";
-      useTopic = true;
-      super.setUp();
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      answer.setUseJmx(false);
-      answer.setPersistent(true);
-      answer.setDeleteAllMessagesOnStartup(true);
-      answer.addConnector(bindAddress);
-      return answer;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
deleted file mode 100644
index 137caa3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.Properties;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.test.JmsTopicSendReceiveTest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
-
-   static final int NMSG = 200;
-   static final int MSIZE = 256000;
-   private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class);
-   private static final String COUNT_PROPERY_NAME = "count";
-
-   protected Connection connection2;
-   protected Session session2;
-   protected Session consumeSession2;
-   protected MessageConsumer consumer2;
-   protected MessageProducer producer2;
-   protected Destination consumerDestination2;
-   BrokerService broker;
-   private Connection connection3;
-   private Session consumeSession3;
-   private TopicSubscriber consumer3;
-
-   /**
-    * Set up a durable suscriber test.
-    *
-    * @see junit.framework.TestCase#setUp()
-    */
-   @Override
-   protected void setUp() throws Exception {
-      this.durable = true;
-      broker = createBroker();
-      super.setUp();
-   }
-
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-      broker.stop();
-   }
-
-   @Override
-   protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-      ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost?async=false");
-      Properties props = new Properties();
-      props.put("prefetchPolicy.durableTopicPrefetch", "5");
-      props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
-      result.setProperties(props);
-      return result;
-   }
-
-   protected BrokerService createBroker() throws Exception {
-      BrokerService answer = new BrokerService();
-      configureBroker(answer);
-      answer.start();
-      return answer;
-   }
-
-   protected void configureBroker(BrokerService answer) throws Exception {
-      answer.setDeleteAllMessagesOnStartup(true);
-   }
-
-   /**
-    * Test if all the messages sent are being received.
-    *
-    * @throws Exception
-    */
-   public void testSlowReceiver() throws Exception {
-      connection2 = createConnection();
-      connection2.setClientID("test");
-      connection2.start();
-      consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
-      consumer2 = consumeSession2.createDurableSubscriber((Topic) consumerDestination2, getName());
-
-      consumer2.close();
-      connection2.close();
-      new Thread(new Runnable() {
-
-         @Override
-         public void run() {
-            try {
-               int count = 0;
-               for (int loop = 0; loop < 4; loop++) {
-                  connection2 = createConnection();
-                  connection2.start();
-                  session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                  producer2 = session2.createProducer(null);
-                  producer2.setDeliveryMode(deliveryMode);
-                  Thread.sleep(1000);
-                  for (int i = 0; i < NMSG / 4; i++) {
-                     BytesMessage message = session2.createBytesMessage();
-                     message.writeBytes(new byte[MSIZE]);
-                     message.setStringProperty("test", "test");
-                     message.setIntProperty(COUNT_PROPERY_NAME, count);
-                     message.setJMSType("test");
-                     producer2.send(consumerDestination2, message);
-                     Thread.sleep(50);
-                     if (verbose) {
-                        LOG.debug("Sent(" + loop + "): " + i);
-                     }
-                     count++;
-                  }
-                  producer2.close();
-                  connection2.stop();
-                  connection2.close();
-               }
-            }
-            catch (Throwable e) {
-               e.printStackTrace();
-            }
-         }
-      }, "SENDER Thread").start();
-      connection3 = createConnection();
-      connection3.setClientID("test");
-      connection3.start();
-      consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      consumer3 = consumeSession3.createDurableSubscriber((Topic) consumerDestination2, getName());
-      connection3.close();
-      int count = 0;
-      for (int loop = 0; loop < 4; ++loop) {
-         connection3 = createConnection();
-         connection3.setClientID("test");
-         connection3.start();
-         consumeSession3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         consumer3 = consumeSession3.createDurableSubscriber((Topic) consumerDestination2, getName());
-         Message msg = null;
-         int i;
-         for (i = 0; i < NMSG / 4; i++) {
-            msg = consumer3.receive(10000);
-            if (msg == null) {
-               break;
-            }
-            if (verbose) {
-               LOG.debug("Received(" + loop + "): " + i + " count = " + msg.getIntProperty(COUNT_PROPERY_NAME));
-            }
-            assertNotNull(msg);
-            assertEquals(msg.getJMSType(), "test");
-            assertEquals(msg.getStringProperty("test"), "test");
-            assertEquals("Messages received out of order", count, msg.getIntProperty(COUNT_PROPERY_NAME));
-            Thread.sleep(500);
-            msg.acknowledge();
-            count++;
-         }
-         consumer3.close();
-         assertEquals("Receiver " + loop, NMSG / 4, i);
-         connection3.close();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
deleted file mode 100644
index 81987be..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.transport.RequestTimedOutIOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
-
-   static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
-
-   private final int messageSize = 1024 * 64;
-   private final int messageCount = 10000;
-   private final AtomicInteger exceptionCount = new AtomicInteger(0);
-
-   /**
-    * Test the case where the broker is blocked due to a memory limit
-    * and a producer timeout is set on the connection.
-    *
-    * @throws Exception
-    */
-   public void testBlockedProducerConnectionTimeout() throws Exception {
-      final ActiveMQConnection cx = (ActiveMQConnection) createConnection();
-      final ActiveMQDestination queue = createDestination("testqueue");
-
-      // we should not take longer than 10 seconds to return from send
-      cx.setSendTimeout(10000);
-
-      Runnable r = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               LOG.info("Sender thread starting");
-               Session session = cx.createSession(false, 1);
-               MessageProducer producer = session.createProducer(queue);
-               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-               TextMessage message = session.createTextMessage(createMessageText());
-               for (int count = 0; count < messageCount; count++) {
-                  producer.send(message);
-               }
-               LOG.info("Done sending..");
-            }
-            catch (JMSException e) {
-               if (e.getCause() instanceof RequestTimedOutIOException) {
-                  exceptionCount.incrementAndGet();
-               }
-               else {
-                  e.printStackTrace();
-               }
-               return;
-            }
-
-         }
-      };
-      cx.start();
-      Thread producerThread = new Thread(r);
-      producerThread.start();
-      producerThread.join(30000);
-      cx.close();
-      // We should have a few timeout exceptions as memory store will fill up
-      assertTrue("No exception from the broker", exceptionCount.get() > 0);
-   }
-
-   /**
-    * Test the case where the broker is blocked due to a memory limit
-    * with a fail timeout
-    *
-    * @throws Exception
-    */
-   public void testBlockedProducerUsageSendFailTimeout() throws Exception {
-      final ActiveMQConnection cx = (ActiveMQConnection) createConnection();
-      final ActiveMQDestination queue = createDestination("testqueue");
-
-      broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
-      Runnable r = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               LOG.info("Sender thread starting");
-               Session session = cx.createSession(false, 1);
-               MessageProducer producer = session.createProducer(queue);
-               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-               TextMessage message = session.createTextMessage(createMessageText());
-               for (int count = 0; count < messageCount; count++) {
-                  producer.send(message);
-               }
-               LOG.info("Done sending..");
-            }
-            catch (JMSException e) {
-               if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
-                  exceptionCount.incrementAndGet();
-               }
-               else {
-                  e.printStackTrace();
-               }
-               return;
-            }
-         }
-      };
-      cx.start();
-      Thread producerThread = new Thread(r);
-      producerThread.start();
-      producerThread.join(30000);
-      cx.close();
-      // We should have a few timeout exceptions as memory store will fill up
-      assertTrue("No exception from the broker", exceptionCount.get() > 0);
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      exceptionCount.set(0);
-      bindAddress = "tcp://localhost:0";
-      broker = createBroker();
-      broker.setDeleteAllMessagesOnStartup(true);
-      broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024);
-
-      super.setUp();
-   }
-
-   @Override
-   protected ConnectionFactory createConnectionFactory() throws Exception {
-      return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
-   }
-
-   private String createMessageText() {
-      StringBuffer buffer = new StringBuffer();
-      buffer.append("<filler>");
-      for (int i = buffer.length(); i < messageSize; i++) {
-         buffer.append('X');
-      }
-      buffer.append("</filler>");
-      return buffer.toString();
-   }
-
-}