You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2014/01/29 14:58:10 UTC

git commit: Converted to JUnit4 and added @Ignore tags for failing tests. See AMQ-4286 and AMQ-5001

Updated Branches:
  refs/heads/trunk 907660d2c -> bec711c7d


Converted to JUnit4 and added @Ignore tags for failing tests.  See AMQ-4286 and AMQ-5001


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bec711c7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bec711c7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bec711c7

Branch: refs/heads/trunk
Commit: bec711c7db420ab23b9eb284bd57b3c4ec0fce36
Parents: 907660d
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Wed Jan 29 14:57:52 2014 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Wed Jan 29 14:57:52 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/activemq/ExpiryHogTest.java |   9 +-
 .../activemq/JmsMultipleClientsTestSupport.java |  53 +++-
 .../broker/NioQueueSubscriptionTest.java        |  25 +-
 .../activemq/broker/QueueSubscriptionTest.java  |  25 +-
 .../activemq/broker/TopicSubscriptionTest.java  |  25 +-
 .../policy/AbortSlowAckConsumer0Test.java       | 164 +++++++++++
 .../policy/AbortSlowAckConsumer1Test.java       |  77 +++++
 .../policy/AbortSlowAckConsumer2Test.java       |  77 +++++
 .../broker/policy/AbortSlowAckConsumerTest.java | 152 ----------
 .../broker/policy/AbortSlowConsumer0Test.java   | 175 ++++++++++++
 .../broker/policy/AbortSlowConsumer1Test.java   | 112 ++++++++
 .../broker/policy/AbortSlowConsumer2Test.java   |  68 +++++
 .../broker/policy/AbortSlowConsumerBase.java    |  96 +++++++
 .../broker/policy/AbortSlowConsumerTest.java    | 283 -------------------
 .../policy/RoundRobinDispatchPolicyTest.java    |  12 +
 .../broker/policy/SimpleDispatchPolicyTest.java |   8 +
 .../policy/StrictOrderDispatchPolicyTest.java   |  15 +
 .../org/apache/activemq/bugs/AMQ2910Test.java   |  18 +-
 18 files changed, 940 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
index aa4f2e8..eff6efa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
@@ -24,15 +24,21 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 
 /**
  * User: gtully
  */
+@RunWith(BlockJUnit4ClassRunner.class)
 public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
     boolean sleep = false;
 
     int numMessages = 4;
 
+    @Test(timeout = 2 * 60 * 1000)
     public void testImmediateDispatchWhenCacheDisabled() throws Exception {
         ConnectionFactory f = createConnectionFactory();
         destination = createDestination();
@@ -67,7 +73,8 @@ public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
     }
 
     @Override
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         autoFail = false;
         persistent = true;
         super.setUp();

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
index aa77de5..aa92926 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -43,6 +43,14 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
 
 /**
  * Test case support used to test multiple message comsumers and message
@@ -50,7 +58,12 @@ import org.apache.activemq.util.MessageIdList;
  * 
  * 
  */
-public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
+public class JmsMultipleClientsTestSupport {
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsMultipleClientsTestSupport.class);
 
     protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages
                                                 // received
@@ -217,14 +230,14 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
         return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
     }
 
-    protected void setUp() throws Exception {
-        super.setAutoFail(autoFail);
-        super.setUp();
+    @Before
+    public void setUp() throws Exception {
         broker = createBroker();
         broker.start();
     }
 
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
             Connection conn = iter.next();
             try {
@@ -232,10 +245,11 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
             } catch (Throwable e) {
             }
         }
+        if (broker !=null ) { // FIXME remove
         broker.stop();
         allMessagesList.flushMessages();
         consumers.clear();
-        super.tearDown();
+        }
     }
 
     /*
@@ -285,4 +299,31 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
         }
         assertEquals("Total of consumers message count", msgCount, totalMsg);
     }
+
+
+    public String getName() {
+        return getName(false);
+    }
+
+    public String getName(boolean original) {
+        String currentTestName = testName.getMethodName();
+        currentTestName = currentTestName.replace("[","");
+        currentTestName = currentTestName.replace("]","");
+        return currentTestName;
+    }
+
+
+    /*
+     * This is copied from AutoFailTestSupport.  We may want to move it to someplace where more
+     * tests can use it.
+     */
+    public static void dumpAllThreads(String prefix) {
+        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
+        for (Map.Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
+            System.err.println(prefix + " " + stackEntry.getKey());
+            for(StackTraceElement element : stackEntry.getValue()) {
+                System.err.println("     " + element);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
index 7a8932a..a71ebce 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
@@ -35,9 +35,19 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
 public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
 
     protected static final Logger LOG = LoggerFactory.getLogger(NioQueueSubscriptionTest.class);
@@ -49,12 +59,18 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
         return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
     }
 
-    @Override
-    protected void setUp() throws Exception {
-        // setMaxTestTime(20*60*1000);
+    /*
+    @Before
+    public void setUp() throws Exception {
         super.setUp();
     }
 
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+    */
+
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService answer = BrokerFactory.createBroker(new URI(
@@ -74,6 +90,9 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
         return answer;
     }
 
+
+    @Ignore("See AMQ-4286")
+    @Test(timeout = 60 * 1000)
     public void testLotsOfConcurrentConnections() throws Exception {
         ExecutorService executor = Executors.newCachedThreadPool();
         final ConnectionFactory factory = createConnectionFactory();

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
index c210353..0177b7b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
@@ -19,17 +19,33 @@ package org.apache.activemq.broker;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsMultipleClientsTestSupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 
+@RunWith(BlockJUnit4ClassRunner.class)
 public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
     protected int messageCount = 1000; // 1000 Messages per producer
     protected int prefetchCount = 10;
 
-    protected void setUp() throws Exception {
+    @Before
+    @Override
+    public void setUp() throws Exception {
         super.setUp();
         durable = false;
         topic = false;
     }
 
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+
+    @Test(timeout = 60 * 1000)
     public void testManyProducersOneConsumer() throws Exception {
         consumerCount = 1;
         producerCount = 10;
@@ -42,6 +58,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -54,6 +71,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -66,6 +84,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -78,6 +97,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -90,6 +110,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerManyConsumersFewMessages() throws Exception {
         consumerCount = 50;
         producerCount = 1;
@@ -102,6 +123,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerManyConsumersManyMessages() throws Exception {
         consumerCount = 50;
         producerCount = 1;
@@ -114,6 +136,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
         assertTotalMessagesReceived(messageCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testManyProducersManyConsumers() throws Exception {
         consumerCount = 200;
         producerCount = 50;

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
index 5dc1871..2c530a7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
@@ -19,20 +19,32 @@ package org.apache.activemq.broker;
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.ThreadTracker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
 public class TopicSubscriptionTest extends QueueSubscriptionTest {
 
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         super.setUp();
         durable = true;
         topic = true;
     }
-    
-    protected void tearDown() throws Exception {
+
+    @After
+    public void tearDown() throws Exception {
         super.tearDown();
         ThreadTracker.result();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testManyProducersManyConsumers() throws Exception {
         consumerCount = 40;
         producerCount = 20;
@@ -46,6 +58,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertDestinationMemoryUsageGoesToZero();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -59,6 +72,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertDestinationMemoryUsageGoesToZero();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -72,6 +86,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertDestinationMemoryUsageGoesToZero();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -84,6 +99,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
@@ -97,6 +113,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertDestinationMemoryUsageGoesToZero();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerManyConsumersFewMessages() throws Exception {
         consumerCount = 50;
         producerCount = 1;
@@ -110,6 +127,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
         assertDestinationMemoryUsageGoesToZero();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerManyConsumersManyMessages() throws Exception {
         consumerCount = 50;
         producerCount = 1;
@@ -124,6 +142,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
     }
 
 
+    @Test(timeout = 60 * 1000)
     public void testManyProducersOneConsumer() throws Exception {
         consumerCount = 1;
         producerCount = 20;

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
new file mode 100644
index 0000000..d682926
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+@RunWith(value = BlockJUnit4ClassRunner.class)
+public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    @Override
+    protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        return strategy;
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+
+    @Ignore("AMQ-5001")
+    @Override
+    @Test
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+        strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
+        super.testSlowConsumerIsAbortedViaJmx();
+    }
+
+    @Ignore("AMQ-5001")
+    @Test
+    public void testZeroPrefetchConsumerIsAborted() throws Exception {
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+
+        try {
+            consumer.receive(20000);
+            fail("Slow consumer not aborted.");
+        } catch(Exception ex) {
+        }
+    }
+
+    @Ignore("AMQ-5001")
+    @Test
+    public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
+        AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+        strategy.setIgnoreIdleConsumers(false);
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        try {
+            consumer.receive(20000);
+            fail("Idle consumer not aborted.");
+        } catch(Exception ex) {
+        }
+    }
+
+    @Ignore("AMQ-5001")
+    @Test
+    public void testIdleConsumerCanBeAborted() throws Exception {
+        AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+        strategy.setIgnoreIdleConsumers(false);
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        message.acknowledge();
+
+        try {
+            consumer.receive(20000);
+            fail("Slow consumer not aborted.");
+        } catch(Exception ex) {
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
new file mode 100644
index 0000000..8812a6b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer1Test.class);
+
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) {
+        super(abortConnection, topic);
+    }
+
+    @Override
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
new file mode 100644
index 0000000..63c9773
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer2Test.class);
+
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    public AbortSlowAckConsumer2Test(Boolean topic) {
+        super(topic);
+    }
+
+    @Override
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
deleted file mode 100644
index 3ed88e2..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.policy;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
-import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class);
-
-    protected long maxTimeSinceLastAck = 5 * 1000;
-
-    @Override
-    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
-        return new AbortSlowConsumerStrategy();
-    }
-
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        BrokerService broker = super.createBroker();
-        PolicyEntry policy = new PolicyEntry();
-
-        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
-        strategy.setAbortConnection(abortConnection);
-        strategy.setCheckPeriod(checkPeriod);
-        strategy.setMaxSlowDuration(maxSlowDuration);
-        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
-
-        policy.setSlowConsumerStrategy(strategy);
-        policy.setQueuePrefetch(10);
-        policy.setTopicPrefetch(10);
-        PolicyMap pMap = new PolicyMap();
-        pMap.setDefaultEntry(policy);
-        broker.setDestinationPolicy(pMap);
-        return broker;
-    }
-
-    @Override
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-        factory.getPrefetchPolicy().setAll(1);
-        return factory;
-    }
-
-    @Override
-    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
-        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
-        strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
-        super.testSlowConsumerIsAbortedViaJmx();
-    }
-
-    @Override
-    public void initCombosForTestSlowConsumerIsAborted() {
-        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
-        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testZeroPrefetchConsumerIsAborted() throws Exception {
-        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
-        conn.setExceptionListener(this);
-        connections.add(conn);
-
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final MessageConsumer consumer = sess.createConsumer(destination);
-        assertNotNull(consumer);
-        conn.start();
-        startProducers(destination, 20);
-
-        Message message = consumer.receive(5000);
-        assertNotNull(message);
-
-        try {
-            consumer.receive(20000);
-            fail("Slow consumer not aborted.");
-        } catch(Exception ex) {
-        }
-    }
-
-    public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
-        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
-        strategy.setIgnoreIdleConsumers(false);
-
-        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
-        conn.setExceptionListener(this);
-        connections.add(conn);
-
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final MessageConsumer consumer = sess.createConsumer(destination);
-        assertNotNull(consumer);
-        conn.start();
-        startProducers(destination, 20);
-
-        try {
-            consumer.receive(20000);
-            fail("Idle consumer not aborted.");
-        } catch(Exception ex) {
-        }
-    }
-
-    public void testIdleConsumerCanBeAborted() throws Exception {
-        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
-        strategy.setIgnoreIdleConsumers(false);
-
-        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
-        conn.setExceptionListener(this);
-        connections.add(conn);
-
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final MessageConsumer consumer = sess.createConsumer(destination);
-        assertNotNull(consumer);
-        conn.start();
-        startProducers(destination, 20);
-
-        Message message = consumer.receive(5000);
-        assertNotNull(message);
-        message.acknowledge();
-
-        try {
-            consumer.receive(20000);
-            fail("Slow consumer not aborted.");
-        } catch(Exception ex) {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
new file mode 100644
index 0000000..373b155
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class);
+
+    @Test
+    public void testRegularConsumerIsNotAborted() throws Exception {
+        startConsumers(destination);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        allMessagesList.waitForMessagesToArrive(10);
+        allMessagesList.assertAtLeastMessagesReceived(10);
+    }
+
+    @Test
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+
+        ActiveMQDestination amqDest = (ActiveMQDestination)destination;
+        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
+                (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
+                + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
+
+        DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
+        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+
+        assertNotNull(slowConsumerPolicyMBeanName);
+
+        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+                broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TabularData slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("one slow consumers", 1, slowOnes.size());
+
+        LOG.info("slow ones:"  + slowOnes);
+
+        CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
+        LOG.info("Slow one: " + slowOne);
+
+        assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
+        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
+
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+        slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("no slow consumers left", 0, slowOnes.size());
+
+        // verify mbean gone with destination
+        broker.getAdminView().removeTopic(amqDest.getPhysicalName());
+
+        try {
+            abortPolicy.getSlowConsumers();
+            fail("expect not found post destination removal");
+        } catch(UndeclaredThrowableException expected) {
+            assertTrue("correct exception: " + expected.getCause(),
+                    expected.getCause() instanceof InstanceNotFoundException);
+        }
+    }
+
+    @Test
+    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
+        consumerCount = 10;
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        allMessagesList.waitForMessagesToArrive(99);
+        allMessagesList.assertAtLeastMessagesReceived(99);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+        TimeUnit.SECONDS.sleep(5);
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+    }
+
+    @Test
+    public void testAbortAlreadyClosingConsumers() throws Exception {
+        consumerCount = 1;
+        startConsumers(destination);
+        for (MessageIdList list : consumers.values()) {
+            list.setProcessingDelay(6 * 1000);
+        }
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        allMessagesList.waitForMessagesToArrive(consumerCount);
+
+        for (MessageConsumer consumer : consumers.keySet()) {
+            LOG.info("closing consumer: " + consumer);
+            /// will block waiting for on message till 6secs expire
+            consumer.close();
+        }
+    }
+
+    @Test
+    public void testAbortConsumerOnDeadConnection() throws Exception {
+        // socket proxy on pause, close could hang??
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        exceptions.add(exception);
+        exception.printStackTrace();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
new file mode 100644
index 0000000..6fe1e47
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class);
+
+    @Parameterized.Parameters(name = "{0}-{1}")
+    public static Collection<Object[]> getTestParameters() {
+
+        List<Object[]> testParameters = new ArrayList<Object[]>();
+        Boolean[] booleanValues = {Boolean.TRUE, Boolean.TRUE};
+        for (Boolean abortConnection : booleanValues) {
+            for (Boolean topic : booleanValues) {
+                Boolean[] pair = {abortConnection, topic};
+                LOG.info(">>>>> in getTestparameters, adding {}, {}", abortConnection, topic);
+                testParameters.add(pair);
+            }
+        }
+
+        return testParameters;
+    }
+
+    public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) {
+        this.abortConnection = abortConnection;
+        this.topic = topic;
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSlowConsumerIsAborted() throws Exception {
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+        TimeUnit.SECONDS.sleep(5);
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testAbortAlreadyClosedConsumers() throws Exception {
+        Connection conn = createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        conn.start();
+        startProducers(destination, 20);
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("closing consumer: " + consumer);
+        consumer.close();
+
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testAbortAlreadyClosedConnection() throws Exception {
+        Connection conn = createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        sess.createConsumer(destination);
+        conn.start();
+        startProducers(destination, 20);
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("closing connection: " + conn);
+        conn.close();
+
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
new file mode 100644
index 0000000..2cbea5b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer2Test extends AbortSlowConsumerBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer2Test.class);
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> getTestParameters() {
+
+        List<Object[]> testParameters = new ArrayList<Object[]>();
+        Boolean[] booleanValues = {Boolean.TRUE, Boolean.FALSE};
+        for (Boolean topic : booleanValues) {
+            Boolean[] value = {topic};
+            testParameters.add(value);
+        }
+
+        return testParameters;
+    }
+
+    public AbortSlowConsumer2Test(Boolean isTopic) {
+        this.topic = isTopic;
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testLittleSlowConsumerIsNotAborted() throws Exception {
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(500);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 12);
+        allMessagesList.waitForMessagesToArrive(10);
+        allMessagesList.assertAtLeastMessagesReceived(10);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
new file mode 100644
index 0000000..ee28112
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import junit.framework.Test;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+
+public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
+
+    protected AbortSlowConsumerStrategy underTest;
+    protected boolean abortConnection = false;
+    protected long checkPeriod = 2 * 1000;
+    protected long maxSlowDuration = 5 * 1000;
+    protected final List<Throwable> exceptions = new ArrayList<Throwable>();
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        exceptions.clear();
+        topic = true;
+        underTest = createSlowConsumerStrategy();
+        super.setUp();
+        createDestination();
+    }
+
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+        underTest.setAbortConnection(abortConnection);
+        underTest.setCheckPeriod(checkPeriod);
+        underTest.setMaxSlowDuration(maxSlowDuration);
+
+        policy.setSlowConsumerStrategy(underTest);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        exceptions.add(exception);
+        exception.printStackTrace();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
deleted file mode 100644
index a263ebc..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.policy;
-
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-
-import junit.framework.Test;
-
-import org.apache.activemq.JmsMultipleClientsTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.MessageIdList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
-
-    protected AbortSlowConsumerStrategy underTest;
-    protected boolean abortConnection = false;
-    protected long checkPeriod = 2 * 1000;
-    protected long maxSlowDuration = 5 * 1000;
-    protected final List<Throwable> exceptions = new ArrayList<Throwable>();
-
-    @Override
-    protected void setUp() throws Exception {
-        exceptions.clear();
-        topic = true;
-        underTest = createSlowConsumerStrategy();
-        super.setUp();
-        createDestination();
-    }
-
-    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
-        return new AbortSlowConsumerStrategy();
-    }
-
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        BrokerService broker = super.createBroker();
-        PolicyEntry policy = new PolicyEntry();
-        underTest.setAbortConnection(abortConnection);
-        underTest.setCheckPeriod(checkPeriod);
-        underTest.setMaxSlowDuration(maxSlowDuration);
-
-        policy.setSlowConsumerStrategy(underTest);
-        policy.setQueuePrefetch(10);
-        policy.setTopicPrefetch(10);
-        PolicyMap pMap = new PolicyMap();
-        pMap.setDefaultEntry(policy);
-        broker.setDestinationPolicy(pMap);
-        return broker;
-    }
-
-    public void testRegularConsumerIsNotAborted() throws Exception {
-        startConsumers(destination);
-        for (Connection c : connections) {
-            c.setExceptionListener(this);
-        }
-        startProducers(destination, 100);
-        allMessagesList.waitForMessagesToArrive(10);
-        allMessagesList.assertAtLeastMessagesReceived(10);
-    }
-
-    public void initCombosForTestLittleSlowConsumerIsNotAborted() {
-        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testLittleSlowConsumerIsNotAborted() throws Exception {
-        startConsumers(destination);
-        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
-        consumertoAbort.getValue().setProcessingDelay(500);
-        for (Connection c : connections) {
-            c.setExceptionListener(this);
-        }
-        startProducers(destination, 12);
-        allMessagesList.waitForMessagesToArrive(10);
-        allMessagesList.assertAtLeastMessagesReceived(10);
-    }
-
-    public void initCombosForTestSlowConsumerIsAborted() {
-        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
-        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testSlowConsumerIsAborted() throws Exception {
-        startConsumers(destination);
-        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
-        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
-        for (Connection c : connections) {
-            c.setExceptionListener(this);
-        }
-        startProducers(destination, 100);
-
-        consumertoAbort.getValue().assertMessagesReceived(1);
-        TimeUnit.SECONDS.sleep(5);
-        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
-    }
-
-    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
-        underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
-        startConsumers(destination);
-        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
-        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
-        for (Connection c : connections) {
-            c.setExceptionListener(this);
-        }
-        startProducers(destination, 100);
-
-        consumertoAbort.getValue().assertMessagesReceived(1);
-
-        ActiveMQDestination amqDest = (ActiveMQDestination)destination;
-        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
-                (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
-                + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
-
-        DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
-        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
-
-        assertNotNull(slowConsumerPolicyMBeanName);
-
-        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
-                broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
-
-        TimeUnit.SECONDS.sleep(3);
-
-        TabularData slowOnes = abortPolicy.getSlowConsumers();
-        assertEquals("one slow consumers", 1, slowOnes.size());
-
-        LOG.info("slow ones:"  + slowOnes);
-
-        CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
-        LOG.info("Slow one: " + slowOne);
-
-        assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
-        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
-
-        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
-
-        slowOnes = abortPolicy.getSlowConsumers();
-        assertEquals("no slow consumers left", 0, slowOnes.size());
-
-        // verify mbean gone with destination
-        broker.getAdminView().removeTopic(amqDest.getPhysicalName());
-
-        try {
-            abortPolicy.getSlowConsumers();
-            fail("expect not found post destination removal");
-        } catch(UndeclaredThrowableException expected) {
-            assertTrue("correct exception: " + expected.getCause(),
-                    expected.getCause() instanceof InstanceNotFoundException);
-        }
-    }
-
-    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
-        consumerCount = 10;
-        startConsumers(destination);
-        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
-        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
-        for (Connection c : connections) {
-            c.setExceptionListener(this);
-        }
-        startProducers(destination, 100);
-
-        allMessagesList.waitForMessagesToArrive(99);
-        allMessagesList.assertAtLeastMessagesReceived(99);
-
-        consumertoAbort.getValue().assertMessagesReceived(1);
-        TimeUnit.SECONDS.sleep(5);
-        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
-    }
-
-    public void testAbortAlreadyClosingConsumers() throws Exception {
-        consumerCount = 1;
-        startConsumers(destination);
-        for (MessageIdList list : consumers.values()) {
-            list.setProcessingDelay(6 * 1000);
-        }
-        for (Connection c : connections) {
-            c.setExceptionListener(this);
-        }
-        startProducers(destination, 100);
-        allMessagesList.waitForMessagesToArrive(consumerCount);
-
-        for (MessageConsumer consumer : consumers.keySet()) {
-            LOG.info("closing consumer: " + consumer);
-            /// will block waiting for on message till 6secs expire
-            consumer.close();
-        }
-    }
-
-    public void initCombosForTestAbortAlreadyClosedConsumers() {
-        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
-        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testAbortAlreadyClosedConsumers() throws Exception {
-        Connection conn = createConnectionFactory().createConnection();
-        conn.setExceptionListener(this);
-        connections.add(conn);
-
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final MessageConsumer consumer = sess.createConsumer(destination);
-        conn.start();
-        startProducers(destination, 20);
-        TimeUnit.SECONDS.sleep(1);
-        LOG.info("closing consumer: " + consumer);
-        consumer.close();
-
-        TimeUnit.SECONDS.sleep(5);
-        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
-    }
-
-    public void initCombosForTestAbortAlreadyClosedConnection() {
-        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
-        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testAbortAlreadyClosedConnection() throws Exception {
-        Connection conn = createConnectionFactory().createConnection();
-        conn.setExceptionListener(this);
-
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        sess.createConsumer(destination);
-        conn.start();
-        startProducers(destination, 20);
-        TimeUnit.SECONDS.sleep(1);
-        LOG.info("closing connection: " + conn);
-        conn.close();
-
-        TimeUnit.SECONDS.sleep(5);
-        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
-    }
-
-    public void testAbortConsumerOnDeadConnection() throws Exception {
-        // socket proxy on pause, close could hang??
-    }
-
-    @Override
-    public void onException(JMSException exception) {
-        exceptions.add(exception);
-        exception.printStackTrace();
-    }
-
-    public static Test suite() {
-        return suite(AbortSlowConsumerTest.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
index e966f74..77ca11f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
@@ -26,7 +26,11 @@ import org.apache.activemq.broker.QueueSubscriptionTest;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 
+@RunWith(BlockJUnit4ClassRunner.class)
 public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
 
     protected BrokerService createBroker() throws Exception {
@@ -43,6 +47,7 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
         return broker;
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
         super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
 
@@ -52,11 +57,13 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
         assertEachConsumerReceivedAtLeastXMessages(1);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
         super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
         assertMessagesDividedAmongConsumers();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
         super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
 
@@ -66,11 +73,13 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
         assertEachConsumerReceivedAtLeastXMessages(1);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
         super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
         assertMessagesDividedAmongConsumers();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerManyConsumersFewMessages() throws Exception {
         super.testOneProducerManyConsumersFewMessages();
 
@@ -79,16 +88,19 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
         assertMessagesDividedAmongConsumers();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerManyConsumersManyMessages() throws Exception {
         super.testOneProducerManyConsumersManyMessages();
         assertMessagesDividedAmongConsumers();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testManyProducersManyConsumers() throws Exception {
         super.testManyProducersManyConsumers();
         assertMessagesDividedAmongConsumers();
     }
 
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception {
         // Create consumer that won't consume any message
         createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1");

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
index 0455950..e181fe2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
@@ -27,7 +27,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
 public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
 
     @Override
@@ -46,6 +52,7 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
     }
 
     @Override
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
         super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
 
@@ -54,6 +61,7 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
     }
 
     @Override
+    @Test(timeout = 60 * 1000)
     public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
         super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
index 91ef89c..dbeabfe 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
@@ -26,7 +26,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
 import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
 
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
 public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
 
     @Override
@@ -44,6 +50,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         return broker;
     }
 
+    @Test
     @Override
     public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
         super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
@@ -51,6 +58,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+    @Test
     @Override
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
         super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
@@ -58,6 +66,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+    @Test
     @Override
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
         super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
@@ -65,6 +74,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+    @Test
     @Override
     public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
         super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
@@ -72,6 +82,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+    @Test
     @Override
     public void testOneProducerManyConsumersFewMessages() throws Exception {
         super.testOneProducerManyConsumersFewMessages();
@@ -79,6 +90,8 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+
+    @Test
     @Override
     public void testOneProducerManyConsumersManyMessages() throws Exception {
         super.testOneProducerManyConsumersManyMessages();
@@ -86,6 +99,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+    @Test
     @Override
     public void testManyProducersOneConsumer() throws Exception {
         super.testManyProducersOneConsumer();
@@ -93,6 +107,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
         assertReceivedMessagesAreOrdered();
     }
 
+    @Test
     @Override
     public void testManyProducersManyConsumers() throws Exception {
         super.testManyProducersManyConsumers();

http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
index 66cc516..f665431 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
@@ -16,11 +16,6 @@
  */
 package org.apache.activemq.bugs;
 
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsMultipleClientsTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -28,7 +23,18 @@ import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePo
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
 
+@RunWith(BlockJUnit4ClassRunner.class)
 public class AMQ2910Test extends JmsMultipleClientsTestSupport {
 
     final int maxConcurrency = 60;
@@ -55,6 +61,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
         return broker;
     }
 
+    @Test(timeout = 30 * 1000)
     public void testConcurrentSendToPendingCursor() throws Exception {
         final ActiveMQConnectionFactory factory =
                 new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
@@ -103,6 +110,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
 
         if (allMessagesList.getMessageCount() != numExpected) {
             dumpAllThreads(getName());
+
         }
         allMessagesList.assertMessagesReceivedNoWait(numExpected);