You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/19 13:56:53 UTC
svn commit: r946138 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/
test/java/org/apache/activemq/ test/java/org/apache/activemq/broker/policy/
test/java/org/apach...
Author: gtully
Date: Wed May 19 11:56:52 2010
New Revision: 946138
URL: http://svn.apache.org/viewvc?rev=946138&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-378 - add AbortSlowConsumerStrategy destination policy that will abort consumers that are repeatildy slow or slow for a defined period. Slowness is a product of the prefetch and message production rate.
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed May 19 11:56:52 2010
@@ -48,6 +48,7 @@ public abstract class AbstractSubscripti
private BooleanExpression selectorExpression;
private ObjectName objectName;
private int cursorMemoryHighWaterMark = 70;
+ private boolean slowConsumer;
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@@ -162,6 +163,14 @@ public abstract class AbstractSubscripti
public boolean isRecoveryRequired() {
return true;
}
+
+ public boolean isSlowConsumer() {
+ return slowConsumer;
+ }
+
+ public void setSlowConsumer(boolean val) {
+ slowConsumer = val;
+ }
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
boolean result = false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed May 19 11:56:52 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@@ -81,6 +82,7 @@ public abstract class BaseDestination im
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
protected int cursorMemoryHighWaterMark = 70;
protected int storeUsageHighWaterMark = 100;
+ private SlowConsumerStrategy slowConsumerStrategy;
/**
* @param broker
@@ -449,6 +451,9 @@ public abstract class BaseDestination im
if (advisoryForSlowConsumers) {
broker.slowConsumer(context, this, subs);
}
+ if (slowConsumerStrategy != null) {
+ slowConsumerStrategy.slowConsumer(context, subs);
+ }
}
/**
@@ -573,5 +578,9 @@ public abstract class BaseDestination im
}
protected abstract Log getLog();
+
+ public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
+ this.slowConsumerStrategy = slowConsumerStrategy;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed May 19 11:56:52 2010
@@ -262,6 +262,7 @@ public class DurableTopicSubscription ex
}
dispatched.clear();
}
+ setSlowConsumer(false);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed May 19 11:56:52 2010
@@ -70,8 +70,6 @@ public abstract class PrefetchSubscripti
private final Object pendingLock = new Object();
private final Object dispatchLock = new Object();
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
- private boolean slowConsumer;
-
private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
@@ -565,7 +563,7 @@ public abstract class PrefetchSubscripti
try {
int numberToDispatch = countBeforeFull();
if (numberToDispatch > 0) {
- slowConsumer=false;
+ setSlowConsumer(false);
pending.setMaxBatchSize(numberToDispatch);
int count = 0;
pending.reset();
@@ -598,15 +596,10 @@ public abstract class PrefetchSubscripti
}
}
}
- }else {
- if (!slowConsumer) {
- slowConsumer=true;
- ConnectionContext c = new ConnectionContext();
- c.setBroker(context.getBroker());
- for (Destination dest :destinations) {
- dest.slowConsumer(c,this);
- }
-
+ } else if (!isSlowConsumer()) {
+ setSlowConsumer(true);
+ for (Destination dest :destinations) {
+ dest.slowConsumer(context, this);
}
}
} finally {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed May 19 11:56:52 2010
@@ -103,6 +103,7 @@ public class QueueSubscription extends P
/**
*/
public void destroy() {
+ setSlowConsumer(false);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Wed May 19 11:56:52 2010
@@ -227,4 +227,6 @@ public interface Subscription extends Su
public int getCursorMemoryHighWaterMark();
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
+ boolean isSlowConsumer();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed May 19 11:56:52 2010
@@ -62,8 +62,6 @@ public class TopicSubscription extends A
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
- private boolean slowConsumer;
-
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
protected int maxAuditDepth = 1000;
@@ -99,11 +97,11 @@ public class TopicSubscription extends A
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
- slowConsumer=false;
+ setSlowConsumer(false);
} else {
//we are slow
- if(!slowConsumer) {
- slowConsumer=true;
+ if(!isSlowConsumer()) {
+ setSlowConsumer(true);
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
}
@@ -540,6 +538,7 @@ public class TopicSubscription extends A
LOG.warn("Failed to destroy cursor", e);
}
}
+ setSlowConsumer(false);
}
public int getPrefetchSize() {
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=946138&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed May 19 11:56:52 2010
@@ -0,0 +1,179 @@
+package org.apache.activemq.broker.region.policy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
+ *
+ * @org.apache.xbean.XBean
+ */
+public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
+
+ private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
+
+ private static final Scheduler scheduler = Scheduler.getInstance();
+ private AtomicBoolean taskStarted = new AtomicBoolean(false);
+ private Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
+
+ private long maxSlowCount = -1;
+ private long maxSlowDuration = 30*1000;
+ private long checkPeriod = 30*1000;
+ private boolean abortConnection = false;
+
+ public void slowConsumer(ConnectionContext context, Subscription subs) {
+ if (maxSlowCount < 0 && maxSlowDuration < 0) {
+ // nothing to do
+ LOG.info("no limits set, slowConsumer strategy has nothing to do");
+ return;
+ }
+
+ if (taskStarted.compareAndSet(false, true)) {
+ scheduler.executePeriodically(this, checkPeriod);
+ }
+
+ if (!slowConsumers.containsKey(subs)) {
+ slowConsumers.put(subs, new SlowConsumerEntry(context));
+ } else if (maxSlowCount > 0) {
+ slowConsumers.get(subs).slow();
+ }
+ }
+
+ public void run() {
+ if (maxSlowDuration > 0) {
+ // mark
+ for (SlowConsumerEntry entry : slowConsumers.values()) {
+ entry.mark();
+ }
+ }
+
+ HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
+ for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+ if (entry.getKey().isSlowConsumer()) {
+ if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
+ || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) {
+ toAbort.put(entry.getKey(), entry.getValue());
+ slowConsumers.remove(entry.getKey());
+ }
+ } else {
+ LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
+ slowConsumers.remove(entry.getKey());
+ }
+ }
+
+ for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
+ ConnectionContext connectionContext = entry.getValue().context;
+ if (connectionContext!= null) {
+ try {
+ LOG.info("aborting "
+ + (abortConnection ? "connection" : "consumer")
+ + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
+
+ final Connection connection = connectionContext.getConnection();
+ if (connection != null) {
+ if (abortConnection) {
+ scheduler.executeAfterDelay(new Runnable() {
+ public void run() {
+ connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
+ + maxSlowCount + ") or too long (>"
+ + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
+ }}, 0l);
+ } else {
+ // just abort the consumer by telling it to stop
+ ConsumerControl stopConsumer = new ConsumerControl();
+ stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
+ stopConsumer.setClose(true);
+ connection.dispatchAsync(stopConsumer);
+ }
+ } else {
+ LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
+ }
+ } catch (Exception e) {
+ LOG.info("exception on stopping "
+ + (abortConnection ? "connection" : "consumer")
+ + " to abort slow consumer: " + entry.getKey(), e);
+ }
+ }
+ }
+ }
+
+ public long getMaxSlowCount() {
+ return maxSlowCount;
+ }
+
+ /**
+ * number of times a subscription can be deemed slow before triggering abort
+ * effect depends on dispatch rate as slow determination is done on dispatch
+ */
+ public void setMaxSlowCount(int maxSlowCount) {
+ this.maxSlowCount = maxSlowCount;
+ }
+
+ public long getMaxSlowDuration() {
+ return maxSlowDuration;
+ }
+
+ /**
+ * time in milliseconds that a sub can remain slow before triggering
+ * an abort.
+ * @param maxSlowDuration
+ */
+ public void setMaxSlowDuration(long maxSlowDuration) {
+ this.maxSlowDuration = maxSlowDuration;
+ }
+
+ public long getCheckPeriod() {
+ return checkPeriod;
+ }
+
+ /**
+ * time in milliseconds between checks for slow subscriptions
+ * @param checkPeriod
+ */
+ public void setCheckPeriod(long checkPeriod) {
+ this.checkPeriod = checkPeriod;
+ }
+
+ public boolean isAbortConnection() {
+ return abortConnection;
+ }
+
+ /**
+ * abort the consumers connection rather than sending a stop command to the remote consumer
+ * @param abortConnection
+ */
+ public void setAbortConnection(boolean abortConnection) {
+ this.abortConnection = abortConnection;
+ }
+
+ static class SlowConsumerEntry {
+
+ final ConnectionContext context;
+ int slowCount = 1;
+ int markCount = 0;
+
+ SlowConsumerEntry(ConnectionContext context) {
+ this.context = context;
+ }
+
+ public void slow() {
+ slowCount++;
+ }
+
+ public void mark() {
+ markCount++;
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed May 19 11:56:52 2010
@@ -86,6 +86,7 @@ public class PolicyEntry extends Destina
private boolean usePrefetchExtension = true;
private int cursorMemoryHighWaterMark = 70;
private int storeUsageHighWaterMark = 100;
+ private SlowConsumerStrategy slowConsumerStrategy;
public void configure(Broker broker,Queue queue) {
@@ -147,6 +148,7 @@ public class PolicyEntry extends Destina
destination.setMaxExpirePageSize(getMaxExpirePageSize());
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
+ destination.setSlowConsumerStrategy(getSlowConsumerStrategy());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@@ -724,4 +726,12 @@ public class PolicyEntry extends Destina
return storeUsageHighWaterMark;
}
+ public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
+ this.slowConsumerStrategy = slowConsumerStrategy;
+ }
+
+ public SlowConsumerStrategy getSlowConsumerStrategy() {
+ return this.slowConsumerStrategy;
+ }
+
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=946138&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed May 19 11:56:52 2010
@@ -0,0 +1,13 @@
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+
+/*
+ * a strategy for dealing with slow consumers
+ */
+public interface SlowConsumerStrategy {
+
+ void slowConsumer(ConnectionContext context, Subscription subs);
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java Wed May 19 11:56:52 2010
@@ -61,7 +61,7 @@ public class JmsMultipleClientsTestSuppo
protected boolean useConcurrentSend = true;
protected boolean durable;
- protected boolean topic;
+ public boolean topic;
protected boolean persistent;
protected BrokerService broker;
@@ -115,6 +115,7 @@ public class JmsMultipleClientsTestSuppo
}
protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+ connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -195,6 +196,9 @@ public class JmsMultipleClientsTestSuppo
protected ActiveMQDestination createDestination() throws JMSException {
String name = "." + getClass().getName() + "." + getName();
+ // ensure not inadvertently composite because of combos
+ name = name.replace(' ','_');
+ name = name.replace(',','&');
if (topic) {
destination = new ActiveMQTopic("Topic" + name);
return (ActiveMQDestination)destination;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java Wed May 19 11:56:52 2010
@@ -52,7 +52,6 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.ThreadTracker;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -82,7 +81,6 @@ public class MessageEvictionTest {
@After
public void tearDown() throws Exception {
- ThreadTracker.result();
connection.stop();
broker.stop();
}
@@ -155,6 +153,7 @@ public class MessageEvictionTest {
ExecutorService executor = Executors.newCachedThreadPool();
final CountDownLatch doAck = new CountDownLatch(1);
+ final CountDownLatch ackDone = new CountDownLatch(1);
final CountDownLatch consumerRegistered = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
@@ -167,15 +166,18 @@ public class MessageEvictionTest {
doAck.await(60, TimeUnit.SECONDS);
LOG.info("acking: " + message.getJMSMessageID());
message.acknowledge();
+ ackDone.countDown();
} catch (Exception e) {
- e.printStackTrace();
- consumerRegistered.countDown();
+ e.printStackTrace();
fail(e.toString());
+ } finally {
+ consumerRegistered.countDown();
+ ackDone.countDown();
}
}
});
consumerRegistered.countDown();
- doAck.await(60, TimeUnit.SECONDS);
+ ackDone.await(60, TimeUnit.SECONDS);
consumer.close();
} catch (Exception e) {
e.printStackTrace();
@@ -256,7 +258,8 @@ public class MessageEvictionTest {
// to keep the limit in check and up to date rather than just the first few, evict some
OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
- messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(100);
+ // whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test
+ //messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000);
entry.setMessageEvictionStrategy(messageEvictionStrategy);
// let evicted messaged disappear
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=946138&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed May 19 11:56:52 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
+
+ private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class);
+
+ AbortSlowConsumerStrategy underTest;
+
+ public boolean abortConnection = false;
+ public long checkPeriod = 2*1000;
+ public long maxSlowDuration = 5*1000;
+
+ private List<Throwable> exceptions = new ArrayList<Throwable>();
+
+ @Override
+ protected void setUp() throws Exception {
+ exceptions.clear();
+ topic = true;
+ underTest = new AbortSlowConsumerStrategy();
+ super.setUp();
+ createDestination();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ PolicyEntry policy = new PolicyEntry();
+ underTest.setAbortConnection(abortConnection);
+ underTest.setCheckPeriod(checkPeriod);
+ underTest.setMaxSlowDuration(maxSlowDuration);
+
+ policy.setSlowConsumerStrategy(underTest);
+ policy.setQueuePrefetch(10);
+ policy.setTopicPrefetch(10);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+ return broker;
+ }
+
+ public void testRegularConsumerIsNotAborted() throws Exception {
+ startConsumers(destination);
+ for (Connection c: connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+ allMessagesList.waitForMessagesToArrive(10);
+ allMessagesList.assertAtLeastMessagesReceived(10);
+ }
+
+ public void initCombosForTestLittleSlowConsumerIsNotAborted() {
+ addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testLittleSlowConsumerIsNotAborted() throws Exception {
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(500);
+ for (Connection c: connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 12);
+ allMessagesList.waitForMessagesToArrive(10);
+ allMessagesList.assertAtLeastMessagesReceived(10);
+ }
+
+
+ public void initCombosForTestSlowConsumerIsAborted() {
+ addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testSlowConsumerIsAborted() throws Exception {
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(8*1000);
+ for (Connection c: connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+
+ consumertoAbort.getValue().assertMessagesReceived(1);
+
+ TimeUnit.SECONDS.sleep(5);
+
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+ }
+
+
+ public void testOnlyOneSlowConsumerIsAborted() throws Exception {
+ consumerCount = 10;
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(8*1000);
+ for (Connection c: connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+
+ allMessagesList.waitForMessagesToArrive(99);
+ allMessagesList.assertAtLeastMessagesReceived(99);
+
+ consumertoAbort.getValue().assertMessagesReceived(1);
+
+ TimeUnit.SECONDS.sleep(5);
+
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+ }
+
+ public void testAbortAlreadyClosingConsumers() throws Exception {
+ consumerCount = 1;
+ startConsumers(destination);
+ for (MessageIdList list : consumers.values()) {
+ list.setProcessingDelay(6*1000);
+ }
+ for (Connection c: connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+ allMessagesList.waitForMessagesToArrive(consumerCount);
+
+ for (MessageConsumer consumer : consumers.keySet()) {
+ LOG.info("closing consumer: " + consumer);
+ /// will block waiting for on message till 6secs expire
+ consumer.close();
+ }
+ }
+
+ public void initCombosForTestAbortAlreadyClosedConsumers() {
+ addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testAbortAlreadyClosedConsumers() throws Exception {
+ Connection conn = createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(destination);
+ conn.start();
+ startProducers(destination, 20);
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("closing consumer: " + consumer);
+ consumer.close();
+
+ TimeUnit.SECONDS.sleep(5);
+ assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+ }
+
+
+ public void initCombosForTestAbortAlreadyClosedConnection() {
+ addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+ }
+
+ public void testAbortAlreadyClosedConnection() throws Exception {
+ Connection conn = createConnectionFactory().createConnection();
+ conn.setExceptionListener(this);
+
+ Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ sess.createConsumer(destination);
+ conn.start();
+ startProducers(destination, 20);
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("closing connection: " + conn);
+ conn.close();
+
+ TimeUnit.SECONDS.sleep(5);
+ assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+ }
+
+ public void testAbortConsumerOnDeadConnection() throws Exception {
+ // socket proxy on pause, close could hang??
+ }
+
+ public void onException(JMSException exception) {
+ exceptions.add(exception);
+ exception.printStackTrace();
+ }
+
+ public static Test suite() {
+ return suite(AbortSlowConsumerTest.class);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed May 19 11:56:52 2010
@@ -291,8 +291,12 @@ public class QueueDuplicatesFromStoreTes
}
public void setCursorMemoryHighWaterMark(
- int cursorMemoryHighWaterMark) {
+ int cursorMemoryHighWaterMark) {
}
+
+ public boolean isSlowConsumer() {
+ return false;
+ }
};
queue.addSubscription(contextNotInTx, subscription);