You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/17 20:44:28 UTC

svn commit: r1504231 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/

Author: tabish
Date: Wed Jul 17 18:44:27 2013
New Revision: 1504231

URL: http://svn.apache.org/r1504231
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-4621

New SlowConsumerStrategy implementation for aborting consumers that haven't ack'd in the configured interval.  Can also be used to kick idle consumers if you disable the ignore idle consumers option.  

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java   (with props)
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed Jul 17 18:44:27 2013
@@ -20,14 +20,17 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.management.ObjectName;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.LogicExpression;
@@ -49,7 +52,7 @@ public abstract class AbstractSubscripti
     private ObjectName objectName;
     private int cursorMemoryHighWaterMark = 70;
     private boolean slowConsumer;
-
+    private long lastAckTime;
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         this.broker = broker;
@@ -57,6 +60,7 @@ public abstract class AbstractSubscripti
         this.info = info;
         this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
         this.selectorExpression = parseSelector(info);
+        this.lastAckTime = System.currentTimeMillis();
     }
 
     private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
@@ -81,6 +85,12 @@ public abstract class AbstractSubscripti
         return rc;
     }
 
+    @Override
+    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
+        this.lastAckTime = System.currentTimeMillis();
+    }
+
+    @Override
     public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
         ConsumerId targetConsumerId = node.getTargetConsumerId();
         if (targetConsumerId != null) {
@@ -96,26 +106,32 @@ public abstract class AbstractSubscripti
         }
     }
 
+    @Override
     public boolean matches(ActiveMQDestination destination) {
         return destinationFilter.matches(destination);
     }
 
+    @Override
     public void add(ConnectionContext context, Destination destination) throws Exception {
         destinations.add(destination);
     }
 
+    @Override
     public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
         destinations.remove(destination);
         return Collections.EMPTY_LIST;
     }
 
+    @Override
     public ConsumerInfo getConsumerInfo() {
         return info;
     }
 
+    @Override
     public void gc() {
     }
 
+    @Override
     public ConnectionContext getContext() {
         return context;
     }
@@ -128,10 +144,12 @@ public abstract class AbstractSubscripti
         return selectorExpression;
     }
 
+    @Override
     public String getSelector() {
         return info.getSelector();
     }
 
+    @Override
     public void setSelector(String selector) throws InvalidSelectorException {
         ConsumerInfo copy = info.copy();
         copy.setSelector(selector);
@@ -141,14 +159,17 @@ public abstract class AbstractSubscripti
         this.selectorExpression = newSelector;
     }
 
+    @Override
     public ObjectName getObjectName() {
         return objectName;
     }
 
+    @Override
     public void setObjectName(ObjectName objectName) {
         this.objectName = objectName;
     }
 
+    @Override
     public int getPrefetchSize() {
         return info.getPrefetchSize();
     }
@@ -156,18 +177,21 @@ public abstract class AbstractSubscripti
         info.setPrefetchSize(newSize);
     }
 
+    @Override
     public boolean isRecoveryRequired() {
         return true;
     }
-    
+
+    @Override
     public boolean isSlowConsumer() {
         return slowConsumer;
     }
-    
+
     public void setSlowConsumer(boolean val) {
         slowConsumer = val;
     }
 
+    @Override
     public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
         boolean result = false;
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
@@ -186,50 +210,56 @@ public abstract class AbstractSubscripti
         return result;
     }
 
+    @Override
     public ActiveMQDestination getActiveMQDestination() {
         return info != null ? info.getDestination() : null;
     }
-    
+
+    @Override
     public boolean isBrowser() {
         return info != null && info.isBrowser();
     }
-    
+
+    @Override
     public int getInFlightUsage() {
         if (info.getPrefetchSize() > 0) {
         return (getInFlightSize() * 100)/info.getPrefetchSize();
         }
         return Integer.MAX_VALUE;
     }
-    
+
     /**
      * Add a destination
      * @param destination
      */
     public void addDestination(Destination destination) {
-        
+
     }
-       
-    
+
     /**
      * Remove a destination
      * @param destination
      */
     public void removeDestination(Destination destination) {
-        
+
     }
-    
+
+    @Override
     public int getCursorMemoryHighWaterMark(){
-    	return this.cursorMemoryHighWaterMark;
+        return this.cursorMemoryHighWaterMark;
+    }
+
+    @Override
+    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
+        this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
     }
 
-	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
-		this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
-	}
-    
+    @Override
     public int countBeforeFull() {
         return getDispatchedQueueSize() - info.getPrefetchSize();
     }
 
+    @Override
     public void unmatched(MessageReference node) throws IOException {
         // only durable topic subs have something to do here
     }
@@ -237,4 +267,13 @@ public abstract class AbstractSubscripti
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         add(message);
     }
+
+    @Override
+    public long getTimeOfLastMessageAck() {
+        return lastAckTime;
+    }
+
+    public void setTimeOfLastMessageAck(long value) {
+        this.lastAckTime = value;
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Jul 17 18:44:27 2013
@@ -310,6 +310,7 @@ public class DurableTopicSubscription ex
 
     @Override
     protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
+        this.setTimeOfLastMessageAck(System.currentTimeMillis());
         Destination regionDestination = (Destination) node.getRegionDestination();
         regionDestination.acknowledge(context, this, ack, node);
         redeliveredMessages.remove(node.getMessageId());

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Jul 17 18:44:27 2013
@@ -47,6 +47,8 @@ public class QueueSubscription extends P
      */
     @Override
     protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
+        this.setTimeOfLastMessageAck(System.currentTimeMillis());
+
         final Destination q = (Destination) n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java Wed Jul 17 18:44:27 2013
@@ -32,27 +32,26 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 /**
- * 
+ *
  */
 public interface Subscription extends SubscriptionRecovery {
 
     /**
      * Used to add messages that match the subscription.
      * @param node
-     * @throws Exception 
-     * @throws InterruptedException 
-     * @throws IOException 
+     * @throws Exception
+     * @throws InterruptedException
+     * @throws IOException
      */
     void add(MessageReference node) throws Exception;
-    
+
     /**
-     * Used when client acknowledge receipt of dispatched message. 
+     * Used when client acknowledge receipt of dispatched message.
      * @param node
-     * @throws IOException 
-     * @throws Exception 
+     * @throws IOException
+     * @throws Exception
      */
     void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception;
-    
 
     /**
      * Allows a consumer to pull a message on demand
@@ -61,36 +60,36 @@ public interface Subscription extends Su
 
     /**
      * Is the subscription interested in the message?
-     * @param node 
+     * @param node
      * @param context
      * @return
-     * @throws IOException 
+     * @throws IOException
      */
     boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
-    
+
     /**
      * Is the subscription interested in messages in the destination?
      * @param context
      * @return
      */
     boolean matches(ActiveMQDestination destination);
-    
+
     /**
      * The subscription will be receiving messages from the destination.
-     * @param context 
+     * @param context
      * @param destination
-     * @throws Exception 
+     * @throws Exception
      */
     void add(ConnectionContext context, Destination destination) throws Exception;
-    
+
     /**
      * The subscription will be no longer be receiving messages from the destination.
-     * @param context 
+     * @param context
      * @param destination
      * @return a list of un-acked messages that were added to the subscription.
      */
     List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
-    
+
     /**
      * The ConsumerInfo object that created the subscription.
      * @param destination
@@ -102,11 +101,11 @@ public interface Subscription extends Su
      * reclaim memory.
      */
     void gc();
-    
+
     /**
      * Used by a Slave Broker to update dispatch infomation
      * @param mdn
-     * @throws Exception 
+     * @throws Exception
      */
     void processMessageDispatchNotification(MessageDispatchNotification  mdn) throws Exception;
 
@@ -114,17 +113,17 @@ public interface Subscription extends Su
      * @return number of messages pending delivery
      */
     int getPendingQueueSize();
-    
+
     /**
      * @return number of messages dispatched to the client
      */
     int getDispatchedQueueSize();
-        
+
     /**
      * @return number of messages dispatched to the client
      */
     long getDispatchedCounter();
-    
+
     /**
      * @return number of messages that matched the subscription
      */
@@ -139,7 +138,7 @@ public interface Subscription extends Su
      * @return the JMS selector on the current subscription
      */
     String getSelector();
-    
+
     /**
      * Attempts to change the current active selector on the subscription.
      * This operation is not supported for persistent topics.
@@ -155,29 +154,28 @@ public interface Subscription extends Su
      * Set when the subscription is registered in JMX
      */
     void setObjectName(ObjectName objectName);
-    
+
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     boolean isLowWaterMark();
-    
+
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     boolean isHighWaterMark();
-    
+
     /**
      * @return true if there is no space to dispatch messages
      */
     boolean isFull();
-    
+
     /**
      * inform the MessageConsumer on the client to change it's prefetch
      * @param newPrefetch
      */
     void updateConsumerPrefetch(int newPrefetch);
-    
-        
+
     /**
      * Called when the subscription is destroyed.
      */
@@ -187,17 +185,17 @@ public interface Subscription extends Su
      * @return the prefetch size that is configured for the subscription
      */
     int getPrefetchSize();
-    
+
     /**
      * @return the number of messages awaiting acknowledgement
      */
     int getInFlightSize();
-    
+
     /**
      * @return the in flight messages as a percentage of the prefetch size
      */
     int getInFlightUsage();
-    
+
     /**
      * Informs the Broker if the subscription needs to intervention to recover it's state
      * e.g. DurableTopicSubscriber may do
@@ -205,25 +203,35 @@ public interface Subscription extends Su
      * @return true if recovery required
      */
     boolean isRecoveryRequired();
-    
-    
+
     /**
      * @return true if a browser
      */
     boolean isBrowser();
-    
+
     /**
      * @return the number of messages this subscription can accept before its full
      */
     int countBeforeFull();
 
     ConnectionContext getContext();
-    
+
     public int getCursorMemoryHighWaterMark();
 
-	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
 
     boolean isSlowConsumer();
 
     void unmatched(MessageReference node) throws IOException;
+
+    /**
+     * Returns the time since the last Ack message was received by this subscription.
+     *
+     * If there has never been an ack this value should be set to the creation time of the
+     * subscription.
+     *
+     * @return time of last received Ack message or Subscription create time if no Acks.
+     */
+    long getTimeOfLastMessageAck();
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Jul 17 18:44:27 2013
@@ -261,6 +261,8 @@ public class TopicSubscription extends A
 
     @Override
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
+        super.acknowledge(context, ack);
+
         // Handle the standard acknowledgment case.
         if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
             if (context.isInTransaction()) {

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java?rev=1504231&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java Wed Jul 17 18:44:27 2013
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abort slow consumers when they reach the configured threshold of slowness,
+ *
+ * default is that a consumer that has not Ack'd a message for 30 seconds is slow.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerStrategy.class);
+
+    private final List<Destination> destinations = new LinkedList<Destination>();
+    private long maxTimeSinceLastAck = 30*1000;
+    private boolean ignoreIdleConsumers = true;
+
+    public AbortSlowAckConsumerStrategy() {
+        this.name = "AbortSlowAckConsumerStrategy@" + hashCode();
+    }
+
+    @Override
+    public void setBrokerService(Broker broker) {
+        super.setBrokerService(broker);
+
+        // Task starts right away since we may not receive any slow consumer events.
+        if (taskStarted.compareAndSet(false, true)) {
+            scheduler.executePeriodically(this, getCheckPeriod());
+        }
+    }
+
+    @Override
+    public void slowConsumer(ConnectionContext context, Subscription subs) {
+        // Ignore these events, we just look at time since last Ack.
+    }
+
+    @Override
+    public void run() {
+
+        if (maxTimeSinceLastAck < 0) {
+            // nothing to do
+            LOG.info("no limit set, slowConsumer strategy has nothing to do");
+            return;
+        }
+
+        if (getMaxSlowDuration() > 0) {
+            // For subscriptions that are already slow we mark them again and check below if
+            // they've exceeded their configured lifetime.
+            for (SlowConsumerEntry entry : slowConsumers.values()) {
+                entry.mark();
+            }
+        }
+
+        List<Destination> disposed = new ArrayList<Destination>();
+
+        for (Destination destination : destinations) {
+            if (destination.isDisposed()) {
+                disposed.add(destination);
+                continue;
+            }
+
+            // Not explicitly documented but this returns a stable copy.
+            List<Subscription> subscribers = destination.getConsumers();
+
+            updateSlowConsumersList(subscribers);
+        }
+
+        // Clean up an disposed destinations to save space.
+        destinations.removeAll(disposed);
+
+        abortAllQualifiedSlowConsumers();
+    }
+
+    private void updateSlowConsumersList(List<Subscription> subscribers) {
+        for (Subscription subscriber : subscribers) {
+
+            if (isIgnoreIdleConsumers() && subscriber.getDispatchedQueueSize() == 0) {
+                // Not considered Idle so ensure its cleared from the list
+                if (slowConsumers.remove(subscriber) != null) {
+                    LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
+                }
+            }
+
+            long lastAckTime = subscriber.getTimeOfLastMessageAck();
+            long timeDelta = System.currentTimeMillis() - lastAckTime;
+
+            if (timeDelta > maxTimeSinceLastAck) {
+                if (!slowConsumers.containsKey(subscriber)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId());
+                    }
+                    slowConsumers.put(subscriber, new SlowConsumerEntry(subscriber.getContext()));
+                } else if (getMaxSlowCount() > 0) {
+                    slowConsumers.get(subscriber).slow();
+                }
+            } else {
+                if (slowConsumers.remove(subscriber) != null) {
+                    LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
+                }
+            }
+        }
+    }
+
+    private void abortAllQualifiedSlowConsumers() {
+        HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
+        for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+            if (entry.getKey().isSlowConsumer()) {
+                if (getMaxSlowDuration() > 0 &&
+                    (entry.getValue().markCount * getCheckPeriod() > getMaxSlowDuration()) ||
+                    getMaxSlowCount() > 0 && entry.getValue().slowCount > getMaxSlowCount()) {
+
+                    toAbort.put(entry.getKey(), entry.getValue());
+                    slowConsumers.remove(entry.getKey());
+                }
+            }
+        }
+
+        // Now if any subscriptions made it into the aborts list we can kick them.
+        abortSubscription(toAbort, isAbortConnection());
+    }
+
+    @Override
+    public void addDestination(Destination destination) {
+        this.destinations.add(destination);
+    }
+
+    /**
+     * Gets the maximum time since last Ack before a subscription is considered to be slow.
+     *
+     * @return the maximum time since last Ack before the consumer is considered to be slow.
+     */
+    public long getMaxTimeSinceLastAck() {
+        return maxTimeSinceLastAck;
+    }
+
+    /**
+     * Sets the maximum time since last Ack before a subscription is considered to be slow.
+     *
+     * @param maxTimeSinceLastAck
+     *      the maximum time since last Ack (mills) before the consumer is considered to be slow.
+     */
+    public void setMaxTimeSinceLastAck(long maxTimeSinceLastAck) {
+        this.maxTimeSinceLastAck = maxTimeSinceLastAck;
+    }
+
+    /**
+     * Returns whether the strategy is configured to ignore consumers that are simply idle, i.e
+     * consumers that have no pending acks (dispatch queue is empty).
+     *
+     * @return true if the strategy will ignore idle consumer when looking for slow consumers.
+     */
+    public boolean isIgnoreIdleConsumers() {
+        return ignoreIdleConsumers;
+    }
+
+    /**
+     * Sets whether the strategy is configured to ignore consumers that are simply idle, i.e
+     * consumers that have no pending acks (dispatch queue is empty).
+     *
+     * When configured to not ignore idle consumers this strategy acks not only on consumers
+     * that are actually slow but also on any consumer that has not received any messages for
+     * the maxTimeSinceLastAck.  This allows for a way to evict idle consumers while also
+     * aborting slow consumers.
+     *
+     * @param ignoreIdleConsumers
+     *      Should this strategy ignore idle consumers or consider all consumers when checking
+     *      the last ack time verses the maxTimeSinceLastAck value.
+     */
+    public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
+        this.ignoreIdleConsumers = ignoreIdleConsumers;
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed Jul 17 18:44:27 2013
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.thread.Scheduler;
@@ -34,40 +35,43 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
- * 
+ *
  * @org.apache.xbean.XBean
  */
 public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
 
-    private String name = "AbortSlowConsumerStrategy@" + hashCode();
-    private Scheduler scheduler;
-    private Broker broker;
-    private final AtomicBoolean taskStarted = new AtomicBoolean(false);
-    private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
+    protected String name = "AbortSlowConsumerStrategy@" + hashCode();
+    protected Scheduler scheduler;
+    protected Broker broker;
+    protected final AtomicBoolean taskStarted = new AtomicBoolean(false);
+    protected final Map<Subscription, SlowConsumerEntry> slowConsumers =
+        new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
 
     private long maxSlowCount = -1;
     private long maxSlowDuration = 30*1000;
     private long checkPeriod = 30*1000;
     private boolean abortConnection = false;
 
+    @Override
     public void setBrokerService(Broker broker) {
        this.scheduler = broker.getScheduler();
        this.broker = broker;
     }
 
+    @Override
     public void slowConsumer(ConnectionContext context, Subscription subs) {
         if (maxSlowCount < 0 && maxSlowDuration < 0) {
             // nothing to do
             LOG.info("no limits set, slowConsumer strategy has nothing to do");
             return;
         }
-        
+
         if (taskStarted.compareAndSet(false, true)) {
             scheduler.executePeriodically(this, checkPeriod);
         }
-            
+
         if (!slowConsumers.containsKey(subs)) {
             slowConsumers.put(subs, new SlowConsumerEntry(context));
         } else if (maxSlowCount > 0) {
@@ -75,6 +79,7 @@ public class AbortSlowConsumerStrategy i
         }
     }
 
+    @Override
     public void run() {
         if (maxSlowDuration > 0) {
             // mark
@@ -82,12 +87,12 @@ public class AbortSlowConsumerStrategy i
                 entry.mark();
             }
         }
-        
+
         HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
         for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
             if (entry.getKey().isSlowConsumer()) {
                 if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
-                        || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { 
+                        || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) {
                     toAbort.put(entry.getKey(), entry.getValue());
                     slowConsumers.remove(entry.getKey());
                 }
@@ -100,19 +105,20 @@ public class AbortSlowConsumerStrategy i
         abortSubscription(toAbort, abortConnection);
     }
 
-    private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
+    protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
         for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
             ConnectionContext connectionContext = entry.getValue().context;
             if (connectionContext!= null) {
                 try {
                     LOG.info("aborting "
-                            + (abortSubscriberConnection ? "connection" : "consumer") 
+                            + (abortSubscriberConnection ? "connection" : "consumer")
                             + ", slow consumer: " + entry.getKey());
 
                     final Connection connection = connectionContext.getConnection();
                     if (connection != null) {
                         if (abortSubscriberConnection) {
                             scheduler.executeAfterDelay(new Runnable() {
+                                @Override
                                 public void run() {
                                     connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
                                             + maxSlowCount +  ") or too long (>"
@@ -137,12 +143,11 @@ public class AbortSlowConsumerStrategy i
         }
     }
 
-
     public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
         if (sub != null) {
             SlowConsumerEntry entry = slowConsumers.remove(sub);
             if (entry != null) {
-                Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
+                Map<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
                 toAbort.put(sub, entry);
                 abortSubscription(toAbort, abortSubscriberConnection);
             } else {
@@ -151,7 +156,6 @@ public class AbortSlowConsumerStrategy i
         }
     }
 
-
     public long getMaxSlowCount() {
         return maxSlowCount;
     }
@@ -204,7 +208,7 @@ public class AbortSlowConsumerStrategy i
     public void setName(String name) {
         this.name = name;
     }
-    
+
     public String getName() {
         return name;
     }
@@ -212,4 +216,9 @@ public class AbortSlowConsumerStrategy i
     public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
         return slowConsumers;
     }
+
+    @Override
+    public void addDestination(Destination destination) {
+        // Not needed for this strategy.
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jul 17 18:44:27 2013
@@ -145,7 +145,7 @@ public class PolicyEntry extends Destina
         topic.setLazyDispatch(isLazyDispatch());
     }
 
-    public void baseConfiguration(Broker broker,BaseDestination destination) {
+    public void baseConfiguration(Broker broker, BaseDestination destination) {
         destination.setProducerFlowControl(isProducerFlowControl());
         destination.setAlwaysRetroactive(isAlwaysRetroactive());
         destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
@@ -170,6 +170,7 @@ public class PolicyEntry extends Destina
         SlowConsumerStrategy scs = getSlowConsumerStrategy();
         if (scs != null) {
             scs.setBrokerService(broker);
+            scs.addDestination(destination);
         }
         destination.setSlowConsumerStrategy(scs);
         destination.setPrioritizedMessages(isPrioritizedMessages());
@@ -179,7 +180,6 @@ public class PolicyEntry extends Destina
         destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
         destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
         destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
-
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed Jul 17 18:44:27 2013
@@ -18,13 +18,41 @@ package org.apache.activemq.broker.regio
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 
-/*
- * a strategy for dealing with slow consumers
+/**
+ * Interface for a strategy for dealing with slow consumers
  */
 public interface SlowConsumerStrategy {
 
+    /**
+     * Slow consumer event.
+     *
+     * @param context
+     *      Connection context of the subscription.
+     * @param subs
+     *      The subscription object for the slow consumer.
+     */
     void slowConsumer(ConnectionContext context, Subscription subs);
+
+    /**
+     * Sets the Broker instance which can provide a Scheduler among other things.
+     *
+     * @param broker
+     *      The running Broker.
+     */
     void setBrokerService(Broker broker);
+
+    /**
+     * For Strategies that need to examine assigned destination for slow consumers
+     * periodically the destination is assigned here.
+     *
+     * If the strategy doesn't is event driven it can just ignore assigned destination.
+     *
+     * @param destination
+     *      A destination to add to a watch list.
+     */
+    void addDestination(Destination destination);
+
 }

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java?rev=1504231&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java Wed Jul 17 18:44:27 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class);
+
+    protected long maxTimeSinceLastAck = 5 * 1000;
+
+    @Override
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyEntry policy = new PolicyEntry();
+
+        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
+        strategy.setAbortConnection(abortConnection);
+        strategy.setCheckPeriod(checkPeriod);
+        strategy.setMaxSlowDuration(maxSlowDuration);
+        strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
+
+        policy.setSlowConsumerStrategy(strategy);
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+        return broker;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.getPrefetchPolicy().setAll(1);
+        return factory;
+    }
+
+    @Override
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
+        strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
+        super.testSlowConsumerIsAbortedViaJmx();
+    }
+
+    @Override
+    public void initCombosForTestSlowConsumerIsAborted() {
+        addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testZeroPrefetchConsumerIsAborted() throws Exception {
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+
+        try {
+            consumer.receive(20000);
+            fail("Slow consumer not aborted.");
+        } catch(Exception ex) {
+        }
+    }
+
+    public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
+        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
+        strategy.setIgnoreIdleConsumers(false);
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        try {
+            consumer.receive(20000);
+            fail("Idle consumer not aborted.");
+        } catch(Exception ex) {
+        }
+    }
+
+    public void testIdleConsumerCanBeAborted() throws Exception {
+        AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
+        strategy.setIgnoreIdleConsumers(false);
+
+        ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
+        conn.setExceptionListener(this);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(destination);
+        assertNotNull(consumer);
+        conn.start();
+        startProducers(destination, 20);
+
+        Message message = consumer.receive(5000);
+        assertNotNull(message);
+        message.acknowledge();
+
+        try {
+            consumer.receive(20000);
+            fail("Slow consumer not aborted.");
+        } catch(Exception ex) {
+        }
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=1504231&r1=1504230&r2=1504231&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed Jul 17 18:44:27 2013
@@ -51,23 +51,25 @@ public class AbortSlowConsumerTest exten
 
     private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
 
-    AbortSlowConsumerStrategy underTest;
-
-    public boolean abortConnection = false;
-    public long checkPeriod = 2 * 1000;
-    public long maxSlowDuration = 5 * 1000;
-
-    private final List<Throwable> exceptions = new ArrayList<Throwable>();
+    protected AbortSlowConsumerStrategy underTest;
+    protected boolean abortConnection = false;
+    protected long checkPeriod = 2 * 1000;
+    protected long maxSlowDuration = 5 * 1000;
+    protected final List<Throwable> exceptions = new ArrayList<Throwable>();
 
     @Override
     protected void setUp() throws Exception {
         exceptions.clear();
         topic = true;
-        underTest = new AbortSlowConsumerStrategy();
+        underTest = createSlowConsumerStrategy();
         super.setUp();
         createDestination();
     }
 
+    protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
+        return new AbortSlowConsumerStrategy();
+    }
+
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
@@ -244,7 +246,6 @@ public class AbortSlowConsumerTest exten
         assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
     }
 
-
     public void initCombosForTestAbortAlreadyClosedConnection() {
         addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
         addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});