You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2014/01/29 14:58:10 UTC
git commit: Converted to JUnit4 and added @Ignore tags for failing
tests. See AMQ-4286 and AMQ-5001
Updated Branches:
refs/heads/trunk 907660d2c -> bec711c7d
Converted to JUnit4 and added @Ignore tags for failing tests. See AMQ-4286 and AMQ-5001
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bec711c7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bec711c7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bec711c7
Branch: refs/heads/trunk
Commit: bec711c7db420ab23b9eb284bd57b3c4ec0fce36
Parents: 907660d
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Wed Jan 29 14:57:52 2014 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Wed Jan 29 14:57:52 2014 +0100
----------------------------------------------------------------------
.../java/org/apache/activemq/ExpiryHogTest.java | 9 +-
.../activemq/JmsMultipleClientsTestSupport.java | 53 +++-
.../broker/NioQueueSubscriptionTest.java | 25 +-
.../activemq/broker/QueueSubscriptionTest.java | 25 +-
.../activemq/broker/TopicSubscriptionTest.java | 25 +-
.../policy/AbortSlowAckConsumer0Test.java | 164 +++++++++++
.../policy/AbortSlowAckConsumer1Test.java | 77 +++++
.../policy/AbortSlowAckConsumer2Test.java | 77 +++++
.../broker/policy/AbortSlowAckConsumerTest.java | 152 ----------
.../broker/policy/AbortSlowConsumer0Test.java | 175 ++++++++++++
.../broker/policy/AbortSlowConsumer1Test.java | 112 ++++++++
.../broker/policy/AbortSlowConsumer2Test.java | 68 +++++
.../broker/policy/AbortSlowConsumerBase.java | 96 +++++++
.../broker/policy/AbortSlowConsumerTest.java | 283 -------------------
.../policy/RoundRobinDispatchPolicyTest.java | 12 +
.../broker/policy/SimpleDispatchPolicyTest.java | 8 +
.../policy/StrictOrderDispatchPolicyTest.java | 15 +
.../org/apache/activemq/bugs/AMQ2910Test.java | 18 +-
18 files changed, 940 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
index aa4f2e8..eff6efa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java
@@ -24,15 +24,21 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
/**
* User: gtully
*/
+@RunWith(BlockJUnit4ClassRunner.class)
public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
boolean sleep = false;
int numMessages = 4;
+ @Test(timeout = 2 * 60 * 1000)
public void testImmediateDispatchWhenCacheDisabled() throws Exception {
ConnectionFactory f = createConnectionFactory();
destination = createDestination();
@@ -67,7 +73,8 @@ public class ExpiryHogTest extends JmsMultipleClientsTestSupport {
}
@Override
- protected void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
autoFail = false;
persistent = true;
super.setUp();
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
index aa77de5..aa92926 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -43,6 +43,14 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
/**
* Test case support used to test multiple message comsumers and message
@@ -50,7 +58,12 @@ import org.apache.activemq.util.MessageIdList;
*
*
*/
-public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
+public class JmsMultipleClientsTestSupport {
+
+ @Rule
+ public TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JmsMultipleClientsTestSupport.class);
protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages
// received
@@ -217,14 +230,14 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
}
- protected void setUp() throws Exception {
- super.setAutoFail(autoFail);
- super.setUp();
+ @Before
+ public void setUp() throws Exception {
broker = createBroker();
broker.start();
}
- protected void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
Connection conn = iter.next();
try {
@@ -232,10 +245,11 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
} catch (Throwable e) {
}
}
+ if (broker !=null ) { // FIXME remove
broker.stop();
allMessagesList.flushMessages();
consumers.clear();
- super.tearDown();
+ }
}
/*
@@ -285,4 +299,31 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
}
assertEquals("Total of consumers message count", msgCount, totalMsg);
}
+
+
+ public String getName() {
+ return getName(false);
+ }
+
+ public String getName(boolean original) {
+ String currentTestName = testName.getMethodName();
+ currentTestName = currentTestName.replace("[","");
+ currentTestName = currentTestName.replace("]","");
+ return currentTestName;
+ }
+
+
+ /*
+ * This is copied from AutoFailTestSupport. We may want to move it to someplace where more
+ * tests can use it.
+ */
+ public static void dumpAllThreads(String prefix) {
+ Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
+ System.err.println(prefix + " " + stackEntry.getKey());
+ for(StackTraceElement element : stackEntry.getValue()) {
+ System.err.println(" " + element);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
index 7a8932a..a71ebce 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
@@ -35,9 +35,19 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
protected static final Logger LOG = LoggerFactory.getLogger(NioQueueSubscriptionTest.class);
@@ -49,12 +59,18 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
}
- @Override
- protected void setUp() throws Exception {
- // setMaxTestTime(20*60*1000);
+ /*
+ @Before
+ public void setUp() throws Exception {
super.setUp();
}
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+ */
+
@Override
protected BrokerService createBroker() throws Exception {
BrokerService answer = BrokerFactory.createBroker(new URI(
@@ -74,6 +90,9 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
return answer;
}
+
+ @Ignore("See AMQ-4286")
+ @Test(timeout = 60 * 1000)
public void testLotsOfConcurrentConnections() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
final ConnectionFactory factory = createConnectionFactory();
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
index c210353..0177b7b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
@@ -19,17 +19,33 @@ package org.apache.activemq.broker;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+@RunWith(BlockJUnit4ClassRunner.class)
public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
protected int messageCount = 1000; // 1000 Messages per producer
protected int prefetchCount = 10;
- protected void setUp() throws Exception {
+ @Before
+ @Override
+ public void setUp() throws Exception {
super.setUp();
durable = false;
topic = false;
}
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+
+ @Test(timeout = 60 * 1000)
public void testManyProducersOneConsumer() throws Exception {
consumerCount = 1;
producerCount = 10;
@@ -42,6 +58,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -54,6 +71,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -66,6 +84,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -78,6 +97,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -90,6 +110,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerManyConsumersFewMessages() throws Exception {
consumerCount = 50;
producerCount = 1;
@@ -102,6 +123,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerManyConsumersManyMessages() throws Exception {
consumerCount = 50;
producerCount = 1;
@@ -114,6 +136,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testManyProducersManyConsumers() throws Exception {
consumerCount = 200;
producerCount = 50;
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
index 5dc1871..2c530a7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
@@ -19,20 +19,32 @@ package org.apache.activemq.broker;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ThreadTracker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
public class TopicSubscriptionTest extends QueueSubscriptionTest {
- protected void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
super.setUp();
durable = true;
topic = true;
}
-
- protected void tearDown() throws Exception {
+
+ @After
+ public void tearDown() throws Exception {
super.tearDown();
ThreadTracker.result();
}
+ @Test(timeout = 60 * 1000)
public void testManyProducersManyConsumers() throws Exception {
consumerCount = 40;
producerCount = 20;
@@ -46,6 +58,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertDestinationMemoryUsageGoesToZero();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -59,6 +72,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertDestinationMemoryUsageGoesToZero();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -72,6 +86,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertDestinationMemoryUsageGoesToZero();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -84,6 +99,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@@ -97,6 +113,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertDestinationMemoryUsageGoesToZero();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerManyConsumersFewMessages() throws Exception {
consumerCount = 50;
producerCount = 1;
@@ -110,6 +127,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertDestinationMemoryUsageGoesToZero();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerManyConsumersManyMessages() throws Exception {
consumerCount = 50;
producerCount = 1;
@@ -124,6 +142,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
}
+ @Test(timeout = 60 * 1000)
public void testManyProducersOneConsumer() throws Exception {
consumerCount = 1;
producerCount = 20;
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
new file mode 100644
index 0000000..d682926
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+@RunWith(value = BlockJUnit4ClassRunner.class)
+public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
+ protected long maxTimeSinceLastAck = 5 * 1000;
+
+ @Override
+ protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
+ AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+ strategy.setAbortConnection(abortConnection);
+ strategy.setCheckPeriod(checkPeriod);
+ strategy.setMaxSlowDuration(maxSlowDuration);
+ strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+ return strategy;
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ PolicyEntry policy = new PolicyEntry();
+
+ AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+
+ policy.setSlowConsumerStrategy(strategy);
+ policy.setQueuePrefetch(10);
+ policy.setTopicPrefetch(10);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+ return broker;
+ }
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ factory.getPrefetchPolicy().setAll(1);
+ return factory;
+ }
+
+
+ @Ignore("AMQ-5001")
+ @Override
+ @Test
+ public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+ AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+ strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
+ super.testSlowConsumerIsAbortedViaJmx();
+ }
+
+ @Ignore("AMQ-5001")
+ @Test
+ public void testZeroPrefetchConsumerIsAborted() throws Exception {
+ ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(destination);
+ assertNotNull(consumer);
+ conn.start();
+ startProducers(destination, 20);
+
+ Message message = consumer.receive(5000);
+ assertNotNull(message);
+
+ try {
+ consumer.receive(20000);
+ fail("Slow consumer not aborted.");
+ } catch(Exception ex) {
+ }
+ }
+
+ @Ignore("AMQ-5001")
+ @Test
+ public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
+ AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+ strategy.setIgnoreIdleConsumers(false);
+
+ ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(destination);
+ assertNotNull(consumer);
+ conn.start();
+ startProducers(destination, 20);
+
+ try {
+ consumer.receive(20000);
+ fail("Idle consumer not aborted.");
+ } catch(Exception ex) {
+ }
+ }
+
+ @Ignore("AMQ-5001")
+ @Test
+ public void testIdleConsumerCanBeAborted() throws Exception {
+ AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
+ strategy.setIgnoreIdleConsumers(false);
+
+ ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(destination);
+ assertNotNull(consumer);
+ conn.start();
+ startProducers(destination, 20);
+
+ Message message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ try {
+ consumer.receive(20000);
+ fail("Slow consumer not aborted.");
+ } catch(Exception ex) {
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
new file mode 100644
index 0000000..8812a6b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer1Test.class);
+
+ protected long maxTimeSinceLastAck = 5 * 1000;
+
+ public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) {
+ super(abortConnection, topic);
+ }
+
+ @Override
+ protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+ return new AbortSlowConsumerStrategy();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ PolicyEntry policy = new PolicyEntry();
+
+ AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+ strategy.setAbortConnection(abortConnection);
+ strategy.setCheckPeriod(checkPeriod);
+ strategy.setMaxSlowDuration(maxSlowDuration);
+ strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+ policy.setSlowConsumerStrategy(strategy);
+ policy.setQueuePrefetch(10);
+ policy.setTopicPrefetch(10);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+ return broker;
+ }
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ factory.getPrefetchPolicy().setAll(1);
+ return factory;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
new file mode 100644
index 0000000..63c9773
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer2Test.class);
+
+ protected long maxTimeSinceLastAck = 5 * 1000;
+
+ public AbortSlowAckConsumer2Test(Boolean topic) {
+ super(topic);
+ }
+
+ @Override
+ protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+ return new AbortSlowConsumerStrategy();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ PolicyEntry policy = new PolicyEntry();
+
+ AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+ strategy.setAbortConnection(abortConnection);
+ strategy.setCheckPeriod(checkPeriod);
+ strategy.setMaxSlowDuration(maxSlowDuration);
+ strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+ policy.setSlowConsumerStrategy(strategy);
+ policy.setQueuePrefetch(10);
+ policy.setTopicPrefetch(10);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+ return broker;
+ }
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ factory.getPrefetchPolicy().setAll(1);
+ return factory;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
deleted file mode 100644
index 3ed88e2..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.policy;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
-import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class);
-
- protected long maxTimeSinceLastAck = 5 * 1000;
-
- @Override
- protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
- return new AbortSlowConsumerStrategy();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = super.createBroker();
- PolicyEntry policy = new PolicyEntry();
-
- AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
- strategy.setAbortConnection(abortConnection);
- strategy.setCheckPeriod(checkPeriod);
- strategy.setMaxSlowDuration(maxSlowDuration);
- strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
-
- policy.setSlowConsumerStrategy(strategy);
- policy.setQueuePrefetch(10);
- policy.setTopicPrefetch(10);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
- return broker;
- }
-
- @Override
- protected ConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- factory.getPrefetchPolicy().setAll(1);
- return factory;
- }
-
- @Override
- public void testSlowConsumerIsAbortedViaJmx() throws Exception {
- AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
- strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
- super.testSlowConsumerIsAbortedViaJmx();
- }
-
- @Override
- public void initCombosForTestSlowConsumerIsAborted() {
- addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
- addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testZeroPrefetchConsumerIsAborted() throws Exception {
- ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
- conn.setExceptionListener(this);
- connections.add(conn);
-
- Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final MessageConsumer consumer = sess.createConsumer(destination);
- assertNotNull(consumer);
- conn.start();
- startProducers(destination, 20);
-
- Message message = consumer.receive(5000);
- assertNotNull(message);
-
- try {
- consumer.receive(20000);
- fail("Slow consumer not aborted.");
- } catch(Exception ex) {
- }
- }
-
- public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
- AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
- strategy.setIgnoreIdleConsumers(false);
-
- ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
- conn.setExceptionListener(this);
- connections.add(conn);
-
- Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final MessageConsumer consumer = sess.createConsumer(destination);
- assertNotNull(consumer);
- conn.start();
- startProducers(destination, 20);
-
- try {
- consumer.receive(20000);
- fail("Idle consumer not aborted.");
- } catch(Exception ex) {
- }
- }
-
- public void testIdleConsumerCanBeAborted() throws Exception {
- AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
- strategy.setIgnoreIdleConsumers(false);
-
- ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
- conn.setExceptionListener(this);
- connections.add(conn);
-
- Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final MessageConsumer consumer = sess.createConsumer(destination);
- assertNotNull(consumer);
- conn.start();
- startProducers(destination, 20);
-
- Message message = consumer.receive(5000);
- assertNotNull(message);
- message.acknowledge();
-
- try {
- consumer.receive(20000);
- fail("Slow consumer not aborted.");
- } catch(Exception ex) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
new file mode 100644
index 0000000..373b155
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class);
+
+ @Test
+ public void testRegularConsumerIsNotAborted() throws Exception {
+ startConsumers(destination);
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+ allMessagesList.waitForMessagesToArrive(10);
+ allMessagesList.assertAtLeastMessagesReceived(10);
+ }
+
+ @Test
+ public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+ underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+
+ consumertoAbort.getValue().assertMessagesReceived(1);
+
+ ActiveMQDestination amqDest = (ActiveMQDestination)destination;
+ ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
+ (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
+ + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
+
+ DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
+ ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+
+ assertNotNull(slowConsumerPolicyMBeanName);
+
+ AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+ broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ TabularData slowOnes = abortPolicy.getSlowConsumers();
+ assertEquals("one slow consumers", 1, slowOnes.size());
+
+ LOG.info("slow ones:" + slowOnes);
+
+ CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
+ LOG.info("Slow one: " + slowOne);
+
+ assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
+ abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
+
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+ slowOnes = abortPolicy.getSlowConsumers();
+ assertEquals("no slow consumers left", 0, slowOnes.size());
+
+ // verify mbean gone with destination
+ broker.getAdminView().removeTopic(amqDest.getPhysicalName());
+
+ try {
+ abortPolicy.getSlowConsumers();
+ fail("expect not found post destination removal");
+ } catch(UndeclaredThrowableException expected) {
+ assertTrue("correct exception: " + expected.getCause(),
+ expected.getCause() instanceof InstanceNotFoundException);
+ }
+ }
+
+ @Test
+ public void testOnlyOneSlowConsumerIsAborted() throws Exception {
+ consumerCount = 10;
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+
+ allMessagesList.waitForMessagesToArrive(99);
+ allMessagesList.assertAtLeastMessagesReceived(99);
+
+ consumertoAbort.getValue().assertMessagesReceived(1);
+ TimeUnit.SECONDS.sleep(5);
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+ }
+
+ @Test
+ public void testAbortAlreadyClosingConsumers() throws Exception {
+ consumerCount = 1;
+ startConsumers(destination);
+ for (MessageIdList list : consumers.values()) {
+ list.setProcessingDelay(6 * 1000);
+ }
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+ allMessagesList.waitForMessagesToArrive(consumerCount);
+
+ for (MessageConsumer consumer : consumers.keySet()) {
+ LOG.info("closing consumer: " + consumer);
+ /// will block waiting for on message till 6secs expire
+ consumer.close();
+ }
+ }
+
+ @Test
+ public void testAbortConsumerOnDeadConnection() throws Exception {
+ // socket proxy on pause, close could hang??
+ }
+
+ @Override
+ public void onException(JMSException exception) {
+ exceptions.add(exception);
+ exception.printStackTrace();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
new file mode 100644
index 0000000..6fe1e47
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class);
+
+ @Parameterized.Parameters(name = "{0}-{1}")
+ public static Collection<Object[]> getTestParameters() {
+
+ List<Object[]> testParameters = new ArrayList<Object[]>();
+ Boolean[] booleanValues = {Boolean.TRUE, Boolean.TRUE};
+ for (Boolean abortConnection : booleanValues) {
+ for (Boolean topic : booleanValues) {
+ Boolean[] pair = {abortConnection, topic};
+ LOG.info(">>>>> in getTestparameters, adding {}, {}", abortConnection, topic);
+ testParameters.add(pair);
+ }
+ }
+
+ return testParameters;
+ }
+
+ public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) {
+ this.abortConnection = abortConnection;
+ this.topic = topic;
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSlowConsumerIsAborted() throws Exception {
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+
+ consumertoAbort.getValue().assertMessagesReceived(1);
+ TimeUnit.SECONDS.sleep(5);
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testAbortAlreadyClosedConsumers() throws Exception {
+ Connection conn = createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(destination);
+ conn.start();
+ startProducers(destination, 20);
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("closing consumer: " + consumer);
+ consumer.close();
+
+ TimeUnit.SECONDS.sleep(5);
+ assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testAbortAlreadyClosedConnection() throws Exception {
+ Connection conn = createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ sess.createConsumer(destination);
+ conn.start();
+ startProducers(destination, 20);
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("closing connection: " + conn);
+ conn.close();
+
+ TimeUnit.SECONDS.sleep(5);
+ assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
new file mode 100644
index 0000000..2cbea5b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+@RunWith(value = Parameterized.class)
+public class AbortSlowConsumer2Test extends AbortSlowConsumerBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer2Test.class);
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> getTestParameters() {
+
+ List<Object[]> testParameters = new ArrayList<Object[]>();
+ Boolean[] booleanValues = {Boolean.TRUE, Boolean.FALSE};
+ for (Boolean topic : booleanValues) {
+ Boolean[] value = {topic};
+ testParameters.add(value);
+ }
+
+ return testParameters;
+ }
+
+ public AbortSlowConsumer2Test(Boolean isTopic) {
+ this.topic = isTopic;
+ }
+
+
+ @Test(timeout = 60 * 1000)
+ public void testLittleSlowConsumerIsNotAborted() throws Exception {
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(500);
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 12);
+ allMessagesList.waitForMessagesToArrive(10);
+ allMessagesList.assertAtLeastMessagesReceived(10);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
new file mode 100644
index 0000000..ee28112
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import junit.framework.Test;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+
+public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
+
+ protected AbortSlowConsumerStrategy underTest;
+ protected boolean abortConnection = false;
+ protected long checkPeriod = 2 * 1000;
+ protected long maxSlowDuration = 5 * 1000;
+ protected final List<Throwable> exceptions = new ArrayList<Throwable>();
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ exceptions.clear();
+ topic = true;
+ underTest = createSlowConsumerStrategy();
+ super.setUp();
+ createDestination();
+ }
+
+ protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+ return new AbortSlowConsumerStrategy();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ PolicyEntry policy = new PolicyEntry();
+ underTest.setAbortConnection(abortConnection);
+ underTest.setCheckPeriod(checkPeriod);
+ underTest.setMaxSlowDuration(maxSlowDuration);
+
+ policy.setSlowConsumerStrategy(underTest);
+ policy.setQueuePrefetch(10);
+ policy.setTopicPrefetch(10);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+ return broker;
+ }
+
+ @Override
+ public void onException(JMSException exception) {
+ exceptions.add(exception);
+ exception.printStackTrace();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
deleted file mode 100644
index a263ebc..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.policy;
-
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-
-import junit.framework.Test;
-
-import org.apache.activemq.JmsMultipleClientsTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.MessageIdList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
-
- protected AbortSlowConsumerStrategy underTest;
- protected boolean abortConnection = false;
- protected long checkPeriod = 2 * 1000;
- protected long maxSlowDuration = 5 * 1000;
- protected final List<Throwable> exceptions = new ArrayList<Throwable>();
-
- @Override
- protected void setUp() throws Exception {
- exceptions.clear();
- topic = true;
- underTest = createSlowConsumerStrategy();
- super.setUp();
- createDestination();
- }
-
- protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
- return new AbortSlowConsumerStrategy();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = super.createBroker();
- PolicyEntry policy = new PolicyEntry();
- underTest.setAbortConnection(abortConnection);
- underTest.setCheckPeriod(checkPeriod);
- underTest.setMaxSlowDuration(maxSlowDuration);
-
- policy.setSlowConsumerStrategy(underTest);
- policy.setQueuePrefetch(10);
- policy.setTopicPrefetch(10);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
- return broker;
- }
-
- public void testRegularConsumerIsNotAborted() throws Exception {
- startConsumers(destination);
- for (Connection c : connections) {
- c.setExceptionListener(this);
- }
- startProducers(destination, 100);
- allMessagesList.waitForMessagesToArrive(10);
- allMessagesList.assertAtLeastMessagesReceived(10);
- }
-
- public void initCombosForTestLittleSlowConsumerIsNotAborted() {
- addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testLittleSlowConsumerIsNotAborted() throws Exception {
- startConsumers(destination);
- Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
- consumertoAbort.getValue().setProcessingDelay(500);
- for (Connection c : connections) {
- c.setExceptionListener(this);
- }
- startProducers(destination, 12);
- allMessagesList.waitForMessagesToArrive(10);
- allMessagesList.assertAtLeastMessagesReceived(10);
- }
-
- public void initCombosForTestSlowConsumerIsAborted() {
- addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
- addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testSlowConsumerIsAborted() throws Exception {
- startConsumers(destination);
- Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
- consumertoAbort.getValue().setProcessingDelay(8 * 1000);
- for (Connection c : connections) {
- c.setExceptionListener(this);
- }
- startProducers(destination, 100);
-
- consumertoAbort.getValue().assertMessagesReceived(1);
- TimeUnit.SECONDS.sleep(5);
- consumertoAbort.getValue().assertAtMostMessagesReceived(1);
- }
-
- public void testSlowConsumerIsAbortedViaJmx() throws Exception {
- underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
- startConsumers(destination);
- Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
- consumertoAbort.getValue().setProcessingDelay(8 * 1000);
- for (Connection c : connections) {
- c.setExceptionListener(this);
- }
- startProducers(destination, 100);
-
- consumertoAbort.getValue().assertMessagesReceived(1);
-
- ActiveMQDestination amqDest = (ActiveMQDestination)destination;
- ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
- (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
- + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
-
- DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
- ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
-
- assertNotNull(slowConsumerPolicyMBeanName);
-
- AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
- broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
-
- TimeUnit.SECONDS.sleep(3);
-
- TabularData slowOnes = abortPolicy.getSlowConsumers();
- assertEquals("one slow consumers", 1, slowOnes.size());
-
- LOG.info("slow ones:" + slowOnes);
-
- CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
- LOG.info("Slow one: " + slowOne);
-
- assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
- abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
-
- consumertoAbort.getValue().assertAtMostMessagesReceived(1);
-
- slowOnes = abortPolicy.getSlowConsumers();
- assertEquals("no slow consumers left", 0, slowOnes.size());
-
- // verify mbean gone with destination
- broker.getAdminView().removeTopic(amqDest.getPhysicalName());
-
- try {
- abortPolicy.getSlowConsumers();
- fail("expect not found post destination removal");
- } catch(UndeclaredThrowableException expected) {
- assertTrue("correct exception: " + expected.getCause(),
- expected.getCause() instanceof InstanceNotFoundException);
- }
- }
-
- public void testOnlyOneSlowConsumerIsAborted() throws Exception {
- consumerCount = 10;
- startConsumers(destination);
- Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
- consumertoAbort.getValue().setProcessingDelay(8 * 1000);
- for (Connection c : connections) {
- c.setExceptionListener(this);
- }
- startProducers(destination, 100);
-
- allMessagesList.waitForMessagesToArrive(99);
- allMessagesList.assertAtLeastMessagesReceived(99);
-
- consumertoAbort.getValue().assertMessagesReceived(1);
- TimeUnit.SECONDS.sleep(5);
- consumertoAbort.getValue().assertAtMostMessagesReceived(1);
- }
-
- public void testAbortAlreadyClosingConsumers() throws Exception {
- consumerCount = 1;
- startConsumers(destination);
- for (MessageIdList list : consumers.values()) {
- list.setProcessingDelay(6 * 1000);
- }
- for (Connection c : connections) {
- c.setExceptionListener(this);
- }
- startProducers(destination, 100);
- allMessagesList.waitForMessagesToArrive(consumerCount);
-
- for (MessageConsumer consumer : consumers.keySet()) {
- LOG.info("closing consumer: " + consumer);
- /// will block waiting for on message till 6secs expire
- consumer.close();
- }
- }
-
- public void initCombosForTestAbortAlreadyClosedConsumers() {
- addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
- addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testAbortAlreadyClosedConsumers() throws Exception {
- Connection conn = createConnectionFactory().createConnection();
- conn.setExceptionListener(this);
- connections.add(conn);
-
- Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final MessageConsumer consumer = sess.createConsumer(destination);
- conn.start();
- startProducers(destination, 20);
- TimeUnit.SECONDS.sleep(1);
- LOG.info("closing consumer: " + consumer);
- consumer.close();
-
- TimeUnit.SECONDS.sleep(5);
- assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
- }
-
- public void initCombosForTestAbortAlreadyClosedConnection() {
- addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
- addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testAbortAlreadyClosedConnection() throws Exception {
- Connection conn = createConnectionFactory().createConnection();
- conn.setExceptionListener(this);
-
- Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- sess.createConsumer(destination);
- conn.start();
- startProducers(destination, 20);
- TimeUnit.SECONDS.sleep(1);
- LOG.info("closing connection: " + conn);
- conn.close();
-
- TimeUnit.SECONDS.sleep(5);
- assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
- }
-
- public void testAbortConsumerOnDeadConnection() throws Exception {
- // socket proxy on pause, close could hang??
- }
-
- @Override
- public void onException(JMSException exception) {
- exceptions.add(exception);
- exception.printStackTrace();
- }
-
- public static Test suite() {
- return suite(AbortSlowConsumerTest.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
index e966f74..77ca11f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
@@ -26,7 +26,11 @@ import org.apache.activemq.broker.QueueSubscriptionTest;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+@RunWith(BlockJUnit4ClassRunner.class)
public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
protected BrokerService createBroker() throws Exception {
@@ -43,6 +47,7 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
return broker;
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
@@ -52,11 +57,13 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
assertEachConsumerReceivedAtLeastXMessages(1);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
assertMessagesDividedAmongConsumers();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
@@ -66,11 +73,13 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
assertEachConsumerReceivedAtLeastXMessages(1);
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
assertMessagesDividedAmongConsumers();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerManyConsumersFewMessages() throws Exception {
super.testOneProducerManyConsumersFewMessages();
@@ -79,16 +88,19 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
assertMessagesDividedAmongConsumers();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerManyConsumersManyMessages() throws Exception {
super.testOneProducerManyConsumersManyMessages();
assertMessagesDividedAmongConsumers();
}
+ @Test(timeout = 60 * 1000)
public void testManyProducersManyConsumers() throws Exception {
super.testManyProducersManyConsumers();
assertMessagesDividedAmongConsumers();
}
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception {
// Create consumer that won't consume any message
createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1");
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
index 0455950..e181fe2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
@@ -27,7 +27,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
@Override
@@ -46,6 +52,7 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
}
@Override
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
@@ -54,6 +61,7 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
}
@Override
+ @Test(timeout = 60 * 1000)
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
index 91ef89c..dbeabfe 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
@@ -26,7 +26,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
@Override
@@ -44,6 +50,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
return broker;
}
+ @Test
@Override
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
@@ -51,6 +58,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+ @Test
@Override
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
@@ -58,6 +66,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+ @Test
@Override
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
@@ -65,6 +74,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+ @Test
@Override
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
@@ -72,6 +82,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+ @Test
@Override
public void testOneProducerManyConsumersFewMessages() throws Exception {
super.testOneProducerManyConsumersFewMessages();
@@ -79,6 +90,8 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+
+ @Test
@Override
public void testOneProducerManyConsumersManyMessages() throws Exception {
super.testOneProducerManyConsumersManyMessages();
@@ -86,6 +99,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+ @Test
@Override
public void testManyProducersOneConsumer() throws Exception {
super.testManyProducersOneConsumer();
@@ -93,6 +107,7 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
+ @Test
@Override
public void testManyProducersManyConsumers() throws Exception {
super.testManyProducersManyConsumers();
http://git-wip-us.apache.org/repos/asf/activemq/blob/bec711c7/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
index 66cc516..f665431 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
@@ -16,11 +16,6 @@
*/
package org.apache.activemq.bugs;
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
@@ -28,7 +23,18 @@ import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePo
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+@RunWith(BlockJUnit4ClassRunner.class)
public class AMQ2910Test extends JmsMultipleClientsTestSupport {
final int maxConcurrency = 60;
@@ -55,6 +61,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
return broker;
}
+ @Test(timeout = 30 * 1000)
public void testConcurrentSendToPendingCursor() throws Exception {
final ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
@@ -103,6 +110,7 @@ public class AMQ2910Test extends JmsMultipleClientsTestSupport {
if (allMessagesList.getMessageCount() != numExpected) {
dumpAllThreads(getName());
+
}
allMessagesList.assertMessagesReceivedNoWait(numExpected);