You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/19 13:56:53 UTC

svn commit: r946138 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/ test/java/org/apache/activemq/broker/policy/ test/java/org/apach...

Author: gtully
Date: Wed May 19 11:56:52 2010
New Revision: 946138

URL: http://svn.apache.org/viewvc?rev=946138&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-378 - add AbortSlowConsumerStrategy destination policy that will abort consumers that are repeatildy slow or slow for a defined period. Slowness is a product of the prefetch and message production rate.

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed May 19 11:56:52 2010
@@ -48,6 +48,7 @@ public abstract class AbstractSubscripti
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
     private int cursorMemoryHighWaterMark = 70;
+    private boolean slowConsumer;
 
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@@ -162,6 +163,14 @@ public abstract class AbstractSubscripti
     public boolean isRecoveryRequired() {
         return true;
     }
+    
+    public boolean isSlowConsumer() {
+        return slowConsumer;
+    }
+    
+    public void setSlowConsumer(boolean val) {
+        slowConsumer = val;
+    }
 
     public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
         boolean result = false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed May 19 11:56:52 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
@@ -81,6 +82,7 @@ public abstract class BaseDestination im
     private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
     protected int cursorMemoryHighWaterMark = 70;
     protected int storeUsageHighWaterMark = 100;
+    private SlowConsumerStrategy slowConsumerStrategy;
 
     /**
      * @param broker
@@ -449,6 +451,9 @@ public abstract class BaseDestination im
         if (advisoryForSlowConsumers) {
             broker.slowConsumer(context, this, subs);
         }
+        if (slowConsumerStrategy != null) {
+            slowConsumerStrategy.slowConsumer(context, subs);
+        }
     }
 
     /**
@@ -573,5 +578,9 @@ public abstract class BaseDestination im
     }
 
     protected abstract Log getLog();
+
+    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
+        this.slowConsumerStrategy = slowConsumerStrategy;
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed May 19 11:56:52 2010
@@ -262,6 +262,7 @@ public class DurableTopicSubscription ex
             }
             dispatched.clear();
         }
+        setSlowConsumer(false);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed May 19 11:56:52 2010
@@ -70,8 +70,6 @@ public abstract class PrefetchSubscripti
     private final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
-    private boolean slowConsumer;
-
     private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
     
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
@@ -565,7 +563,7 @@ public abstract class PrefetchSubscripti
                 try {
                     int numberToDispatch = countBeforeFull();
                     if (numberToDispatch > 0) {
-                        slowConsumer=false;
+                        setSlowConsumer(false);
                         pending.setMaxBatchSize(numberToDispatch);
                         int count = 0;
                         pending.reset();
@@ -598,15 +596,10 @@ public abstract class PrefetchSubscripti
                                 }
                             }
                         }
-                    }else {
-                        if (!slowConsumer) {
-                            slowConsumer=true;
-                            ConnectionContext c = new ConnectionContext();
-                            c.setBroker(context.getBroker());
-                            for (Destination dest :destinations) {
-                                dest.slowConsumer(c,this);
-                            }
-                            
+                    } else if (!isSlowConsumer()) {
+                        setSlowConsumer(true);
+                        for (Destination dest :destinations) {
+                            dest.slowConsumer(context, this);
                         }
                     }
                 } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed May 19 11:56:52 2010
@@ -103,6 +103,7 @@ public class QueueSubscription extends P
     /**
      */
     public void destroy() {
+        setSlowConsumer(false);
     }
 
    

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Wed May 19 11:56:52 2010
@@ -227,4 +227,6 @@ public interface Subscription extends Su
     public int getCursorMemoryHighWaterMark();
 
 	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
+    boolean isSlowConsumer();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed May 19 11:56:52 2010
@@ -62,8 +62,6 @@ public class TopicSubscription extends A
     private final AtomicLong enqueueCounter = new AtomicLong(0);
     private final AtomicLong dequeueCounter = new AtomicLong(0);
     private int memoryUsageHighWaterMark = 95;
-    private boolean slowConsumer;
-    
     // allow duplicate suppression in a ring network of brokers
     protected int maxProducersToAudit = 1024;
     protected int maxAuditDepth = 1000;
@@ -99,11 +97,11 @@ public class TopicSubscription extends A
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
             dispatch(node);
-            slowConsumer=false;
+            setSlowConsumer(false);
         } else {
             //we are slow
-            if(!slowConsumer) {
-                slowConsumer=true;
+            if(!isSlowConsumer()) {
+                setSlowConsumer(true);
                 for (Destination dest: destinations) {
                     dest.slowConsumer(getContext(), this);
                 }
@@ -540,6 +538,7 @@ public class TopicSubscription extends A
                 LOG.warn("Failed to destroy cursor", e);
             }
         }
+        setSlowConsumer(false);
     }
 
     public int getPrefetchSize() {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=946138&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed May 19 11:56:52 2010
@@ -0,0 +1,179 @@
+package org.apache.activemq.broker.region.policy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
+ * 
+ * @org.apache.xbean.XBean
+ */
+public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
+    
+    private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
+
+    private static final Scheduler scheduler = Scheduler.getInstance();
+    private AtomicBoolean taskStarted = new AtomicBoolean(false);
+    private Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
+
+    private long maxSlowCount = -1;
+    private long maxSlowDuration = 30*1000;
+    private long checkPeriod = 30*1000;
+    private boolean abortConnection = false;
+
+    public void slowConsumer(ConnectionContext context, Subscription subs) {
+        if (maxSlowCount < 0 && maxSlowDuration < 0) {
+            // nothing to do
+            LOG.info("no limits set, slowConsumer strategy has nothing to do");
+            return;
+        }
+        
+        if (taskStarted.compareAndSet(false, true)) {
+            scheduler.executePeriodically(this, checkPeriod);
+        }
+            
+        if (!slowConsumers.containsKey(subs)) {
+            slowConsumers.put(subs, new SlowConsumerEntry(context));
+        } else if (maxSlowCount > 0) {
+            slowConsumers.get(subs).slow();
+        }
+    }
+
+    public void run() {
+        if (maxSlowDuration > 0) {
+            // mark
+            for (SlowConsumerEntry entry : slowConsumers.values()) {
+                entry.mark();
+            }
+        }
+        
+        HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
+        for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+            if (entry.getKey().isSlowConsumer()) {
+                if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
+                        || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { 
+                    toAbort.put(entry.getKey(), entry.getValue());
+                    slowConsumers.remove(entry.getKey());
+                }
+            } else {
+                LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
+                slowConsumers.remove(entry.getKey());
+            }
+        }
+        
+        for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
+            ConnectionContext connectionContext = entry.getValue().context;
+            if (connectionContext!= null) {
+                try {
+                    LOG.info("aborting " 
+                            + (abortConnection ? "connection" : "consumer") 
+                            + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId()); 
+
+                    final Connection connection = connectionContext.getConnection();
+                    if (connection != null) {    
+                        if (abortConnection) {
+                            scheduler.executeAfterDelay(new Runnable() {
+                                public void run() {
+                                    connection.serviceException(new InactivityIOException("Consumer was slow too often (>" 
+                                            + maxSlowCount +  ") or too long (>"
+                                            + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
+                                }}, 0l);
+                        } else {
+                            // just abort the consumer by telling it to stop
+                            ConsumerControl stopConsumer = new ConsumerControl();
+                            stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
+                            stopConsumer.setClose(true);    
+                            connection.dispatchAsync(stopConsumer);
+                        }
+                    } else {
+                        LOG.debug("slowConsumer abort ignored, no connection in context:"  + connectionContext);
+                    }
+                } catch (Exception e) {
+                    LOG.info("exception on stopping " 
+                            + (abortConnection ? "connection" : "consumer") 
+                            + " to abort slow consumer: " + entry.getKey(), e);   
+                }
+            }
+        }
+    }
+    
+    public long getMaxSlowCount() {
+        return maxSlowCount;
+    }
+
+    /**
+     * number of times a subscription can be deemed slow before triggering abort
+     * effect depends on dispatch rate as slow determination is done on dispatch
+     */
+    public void setMaxSlowCount(int maxSlowCount) {
+        this.maxSlowCount = maxSlowCount;
+    }
+
+    public long getMaxSlowDuration() {
+        return maxSlowDuration;
+    }
+
+    /**
+     * time in milliseconds that a sub can remain slow before triggering
+     * an abort.
+     * @param maxSlowDuration
+     */
+    public void setMaxSlowDuration(long maxSlowDuration) {
+        this.maxSlowDuration = maxSlowDuration;
+    }
+
+    public long getCheckPeriod() {
+        return checkPeriod;
+    }
+
+    /**
+     * time in milliseconds between checks for slow subscriptions
+     * @param checkPeriod
+     */
+    public void setCheckPeriod(long checkPeriod) {
+        this.checkPeriod = checkPeriod;
+    }
+
+    public boolean isAbortConnection() {
+        return abortConnection;
+    }
+
+    /**
+     * abort the consumers connection rather than sending a stop command to the remote consumer
+     * @param abortConnection
+     */
+    public void setAbortConnection(boolean abortConnection) {
+        this.abortConnection = abortConnection;
+    }
+
+    static class SlowConsumerEntry {
+        
+        final ConnectionContext context;
+        int slowCount = 1;
+        int markCount = 0;
+        
+        SlowConsumerEntry(ConnectionContext context) {
+            this.context = context;
+        }
+
+        public void slow() {
+            slowCount++;
+        }
+        
+        public void mark() {
+            markCount++;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed May 19 11:56:52 2010
@@ -86,6 +86,7 @@ public class PolicyEntry extends Destina
     private boolean usePrefetchExtension = true;
     private int cursorMemoryHighWaterMark = 70;
     private int storeUsageHighWaterMark = 100;
+    private SlowConsumerStrategy slowConsumerStrategy;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -147,6 +148,7 @@ public class PolicyEntry extends Destina
         destination.setMaxExpirePageSize(getMaxExpirePageSize());
         destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
+        destination.setSlowConsumerStrategy(getSlowConsumerStrategy());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@@ -724,4 +726,12 @@ public class PolicyEntry extends Destina
         return storeUsageHighWaterMark;
     }
 
+    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
+        this.slowConsumerStrategy = slowConsumerStrategy;
+    }
+    
+    public SlowConsumerStrategy getSlowConsumerStrategy() {
+        return this.slowConsumerStrategy;
+    }
+
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=946138&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed May 19 11:56:52 2010
@@ -0,0 +1,13 @@
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+
+/*
+ * a strategy for dealing with slow consumers
+ */
+public interface SlowConsumerStrategy {
+
+    void slowConsumer(ConnectionContext context, Subscription subs);
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java Wed May 19 11:56:52 2010
@@ -61,7 +61,7 @@ public class JmsMultipleClientsTestSuppo
 
     protected boolean useConcurrentSend = true;
     protected boolean durable;
-    protected boolean topic;
+    public boolean topic;
     protected boolean persistent;
 
     protected BrokerService broker;
@@ -115,6 +115,7 @@ public class JmsMultipleClientsTestSuppo
     }
 
     protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+        connections.add(connection);
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -195,6 +196,9 @@ public class JmsMultipleClientsTestSuppo
 
     protected ActiveMQDestination createDestination() throws JMSException {
         String name = "." + getClass().getName() + "." + getName();
+        // ensure not inadvertently composite because of combos
+        name = name.replace(' ','_');
+        name = name.replace(',','&');
         if (topic) {
             destination = new ActiveMQTopic("Topic" + name);
             return (ActiveMQDestination)destination;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java Wed May 19 11:56:52 2010
@@ -52,7 +52,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.ThreadTracker;
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -82,7 +81,6 @@ public class MessageEvictionTest {
     
     @After
     public void tearDown() throws Exception {
-        ThreadTracker.result();
         connection.stop();
         broker.stop();
     }
@@ -155,6 +153,7 @@ public class MessageEvictionTest {
         
         ExecutorService executor = Executors.newCachedThreadPool();
         final CountDownLatch doAck = new CountDownLatch(1);
+        final CountDownLatch ackDone = new CountDownLatch(1);
         final CountDownLatch consumerRegistered = new CountDownLatch(1);
         executor.execute(new Runnable() {
             public void run() {
@@ -167,15 +166,18 @@ public class MessageEvictionTest {
                                 doAck.await(60, TimeUnit.SECONDS);
                                 LOG.info("acking: " + message.getJMSMessageID());
                                 message.acknowledge();
+                                ackDone.countDown();
                             } catch (Exception e) {
-                                e.printStackTrace();
-                                consumerRegistered.countDown();
+                                e.printStackTrace();   
                                 fail(e.toString());
+                            } finally {
+                                consumerRegistered.countDown();
+                                ackDone.countDown();
                             }
                         }           
                     });
                     consumerRegistered.countDown();
-                    doAck.await(60, TimeUnit.SECONDS);
+                    ackDone.await(60, TimeUnit.SECONDS);
                     consumer.close();
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -256,7 +258,8 @@ public class MessageEvictionTest {
 
         // to keep the limit in check and up to date rather than just the first few, evict some
         OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
-        messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(100);
+        // whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test
+        //messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000);
         entry.setMessageEvictionStrategy(messageEvictionStrategy);
         
         // let evicted messaged disappear

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=946138&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed May 19 11:56:52 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
+
+    private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class);
+    
+    AbortSlowConsumerStrategy underTest;
+    
+    public boolean abortConnection = false;
+    public long checkPeriod = 2*1000;
+    public long maxSlowDuration = 5*1000;
+
+    private List<Throwable> exceptions = new ArrayList<Throwable>();
+    
+    @Override
+    protected void setUp() throws Exception {
+        exceptions.clear();
+        topic = true;
+        underTest = new AbortSlowConsumerStrategy();
+        super.setUp();
+        createDestination();
+    }
+    
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+        underTest.setAbortConnection(abortConnection);
+        underTest.setCheckPeriod(checkPeriod);
+        underTest.setMaxSlowDuration(maxSlowDuration);
+
+        policy.setSlowConsumerStrategy(underTest);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    public void testRegularConsumerIsNotAborted() throws Exception {
+        startConsumers(destination);
+        for (Connection c: connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        allMessagesList.waitForMessagesToArrive(10);
+        allMessagesList.assertAtLeastMessagesReceived(10);
+    }
+
+    public void initCombosForTestLittleSlowConsumerIsNotAborted() {
+        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+    }
+    
+    public void testLittleSlowConsumerIsNotAborted() throws Exception {
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(500);
+        for (Connection c: connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 12);
+        allMessagesList.waitForMessagesToArrive(10);
+        allMessagesList.assertAtLeastMessagesReceived(10);
+    }
+
+    
+    public void initCombosForTestSlowConsumerIsAborted() {
+        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+    }
+    
+    public void testSlowConsumerIsAborted() throws Exception {
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8*1000);
+        for (Connection c: connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        
+        consumertoAbort.getValue().assertMessagesReceived(1);
+     
+        TimeUnit.SECONDS.sleep(5);
+        
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);        
+    }
+
+    
+    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
+        consumerCount = 10;
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8*1000);
+        for (Connection c: connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        
+        allMessagesList.waitForMessagesToArrive(99);
+        allMessagesList.assertAtLeastMessagesReceived(99);
+        
+        consumertoAbort.getValue().assertMessagesReceived(1);
+     
+        TimeUnit.SECONDS.sleep(5);
+        
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);        
+    }
+    
+    public void testAbortAlreadyClosingConsumers() throws Exception {
+        consumerCount = 1;
+        startConsumers(destination);
+        for (MessageIdList list : consumers.values()) {
+            list.setProcessingDelay(6*1000);
+        }
+        for (Connection c: connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+        allMessagesList.waitForMessagesToArrive(consumerCount);
+
+        for (MessageConsumer consumer : consumers.keySet()) {
+            LOG.info("closing consumer: " + consumer);
+            /// will block waiting for on message till 6secs expire
+            consumer.close();
+        }
+    }
+    
+    public void initCombosForTestAbortAlreadyClosedConsumers() {
+        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+    }
+    
+    public void testAbortAlreadyClosedConsumers() throws Exception {
+        Connection conn = createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        conn.start();
+        startProducers(destination, 20);
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("closing consumer: " + consumer);
+        consumer.close();
+        
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+    }
+
+    
+    public void initCombosForTestAbortAlreadyClosedConnection() {
+        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+    }
+    
+    public void testAbortAlreadyClosedConnection() throws Exception {
+        Connection conn = createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        sess.createConsumer(destination);
+        conn.start();
+        startProducers(destination, 20);
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("closing connection: " + conn);
+        conn.close();
+        
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
+    }
+
+    public void testAbortConsumerOnDeadConnection() throws Exception {
+        // socket proxy on pause, close could hang??
+    }
+    
+    public void onException(JMSException exception) {
+        exceptions.add(exception);
+        exception.printStackTrace();        
+    }
+    
+    public static Test suite() {
+        return suite(AbortSlowConsumerTest.class);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=946138&r1=946137&r2=946138&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed May 19 11:56:52 2010
@@ -291,8 +291,12 @@ public class QueueDuplicatesFromStoreTes
 			}
 
 			public void setCursorMemoryHighWaterMark(
-					int cursorMemoryHighWaterMark) {				
+			        int cursorMemoryHighWaterMark) {				
 			}
+
+            public boolean isSlowConsumer() {
+                return false;
+            }
         };
 
         queue.addSubscription(contextNotInTx, subscription);