You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/21 19:09:33 UTC

svn commit: r966319 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/a...

Author: gtully
Date: Wed Jul 21 17:09:33 2010
New Revision: 966319

URL: http://svn.apache.org/viewvc?rev=966319&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2741 - visibility of abort slow consumer policy in via jmx

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jul 21 17:09:33 2010
@@ -657,7 +657,9 @@ public class ActiveMQMessageConsumer imp
     void doClose() throws JMSException {
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
-        LOG.info("remove: " + this.getConsumerId() + ", lasteDeliveredSequenceId:" + lastDeliveredSequenceId);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
+        }
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         this.session.asyncSendPacket(removeCommand);
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java?rev=966319&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java Wed Jul 21 17:09:33 2010
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerEntry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.Map;
+
+public class AbortSlowConsumerStrategyView implements AbortSlowConsumerStrategyViewMBean {
+    private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategyView.class);
+    private ManagedRegionBroker broker;
+    private AbortSlowConsumerStrategy strategy;
+
+
+    public AbortSlowConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowConsumerStrategy slowConsumerStrategy) {
+        this.broker = managedRegionBroker;
+        this.strategy = slowConsumerStrategy;
+    }
+
+    public long getMaxSlowCount() {
+        return strategy.getMaxSlowCount();
+    }
+
+    public void setMaxSlowCount(long maxSlowCount) {
+        strategy.setMaxSlowCount(maxSlowCount);
+    }
+
+    public long getMaxSlowDuration() {
+        return strategy.getMaxSlowDuration();
+    }
+
+    public void setMaxSlowDuration(long maxSlowDuration) {
+       strategy.setMaxSlowDuration(maxSlowDuration);
+    }
+
+    public long getCheckPeriod() {
+        return strategy.getCheckPeriod();
+    }
+
+    public TabularData getSlowConsumers() throws OpenDataException {
+
+        OpenTypeSupport.OpenTypeFactory factory = OpenTypeSupport.getFactory(SlowConsumerEntry.class);
+        CompositeType ct = factory.getCompositeType();
+        TabularType tt = new TabularType("SlowConsumers", "Table of current slow Consumers", ct, new String[] {"subscription" });
+        TabularDataSupport rc = new TabularDataSupport(tt);
+        
+        int index = 0;
+        Map<Subscription, SlowConsumerEntry> slowConsumers = strategy.getSlowConsumers();
+        for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+            entry.getValue().setSubscription(broker.getSubscriberObjectName(entry.getKey()));
+            rc.put(OpenTypeSupport.convert(entry.getValue()));
+        }
+        return rc;
+    }
+
+    public void abortConsumer(ObjectName consumerToAbort) {
+        Subscription sub = broker.getSubscriber(consumerToAbort);
+        if (sub != null) {
+            LOG.info("aborting consumer via jmx: " + sub.getConsumerInfo().getConsumerId());           
+            strategy.abortConsumer(sub, false);
+        } else {
+            LOG.warn("cannot resolve subscription matching name: " + consumerToAbort);
+        }
+
+    }
+
+    public void abortConnection(ObjectName consumerToAbort) {
+        Subscription sub = broker.getSubscriber(consumerToAbort);
+        if (sub != null) {
+            LOG.info("aborting consumer connection via jmx: " + sub.getConsumerInfo().getConsumerId().getConnectionId());
+            strategy.abortConsumer(sub, true);
+        } else {
+            LOG.warn("cannot resolve subscription matching name: " + consumerToAbort);
+        }
+    }
+
+    public void abortConsumer(String objectNameOfConsumerToAbort) {
+        abortConsumer(toObjectName(objectNameOfConsumerToAbort));
+    }
+
+    public void abortConnection(String objectNameOfConsumerToAbort) {
+        abortConnection(toObjectName(objectNameOfConsumerToAbort));
+    }
+
+    private ObjectName toObjectName(String objectName) {
+        ObjectName result = null;
+        try {
+            result = new ObjectName(objectName);
+        } catch (Exception e) {
+            LOG.warn("cannot create subscription ObjectName to abort, from string: " + objectName);
+        }
+        return result;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java?rev=966319&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java Wed Jul 21 17:09:33 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.ObjectName;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
+public interface AbortSlowConsumerStrategyViewMBean {
+    
+    @MBeanInfo("returns the current max slow count, -1 disables")
+    long getMaxSlowCount();
+
+    @MBeanInfo("sets the count after which a slow consumer will be aborted, -1 disables")
+    void setMaxSlowCount(long maxSlowCount);
+    
+    @MBeanInfo("returns the current max slow (milliseconds) duration")
+    long getMaxSlowDuration();
+
+    @MBeanInfo("sets the duration (milliseconds) after which a continually slow consumer will be aborted")
+    void setMaxSlowDuration(long maxSlowDuration);
+
+    @MBeanInfo("returns the check period at which a sweep of consumers is done to determine continued slowness")
+    public long getCheckPeriod();
+    
+    @MBeanInfo("returns the current list of slow consumers, Not HTML friendly")
+    TabularData getSlowConsumers() throws OpenDataException;
+    
+    @MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer")
+    void abortConsumer(ObjectName consumerToAbort);
+    
+    @MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected")
+    void abortConnection(ObjectName consumerToAbort);
+
+    @MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer")
+    void abortConsumer(String objectNameOfConsumerToAbort);
+    
+    @MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected")
+    void abortConnection(String objectNameOfConsumerToAbort);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Wed Jul 21 17:09:33 2010
@@ -39,6 +39,8 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
@@ -388,4 +390,13 @@ public class DestinationView implements 
         return answer;
     }
 
+    public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
+        ObjectName result = null;
+        SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
+        if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
+            result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
+        }
+        return result;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Wed Jul 21 17:09:33 2010
@@ -332,4 +332,13 @@ public interface DestinationViewMBean {
     @MBeanInfo("returns all the current subscription MBeans matching this destination")
     ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException;
 
+
+    /**
+     * Returns the slow consumer strategy MBean for this destination
+     *
+     * @return the name of the slow consumer handler MBean for this destination
+     */
+    @MBeanInfo("returns the optional slowConsumer handler MBeans for this destination")
+    ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException;
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jul 21 17:09:33 2010
@@ -53,6 +53,8 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -260,10 +262,12 @@ public class ManagedRegionBroker extends
     }
 
     protected void unregisterDestination(ObjectName key) throws Exception {
-        topics.remove(key);
-        queues.remove(key);
-        temporaryQueues.remove(key);
-        temporaryTopics.remove(key);
+
+        DestinationView view = null;
+        removeAndRemember(topics, key, view);
+        removeAndRemember(queues, key, view);
+        removeAndRemember(temporaryQueues, key, view);
+        removeAndRemember(temporaryTopics, key, view);
         if (registeredMBeans.remove(key)) {
             try {
                 managementContext.unregisterMBean(key);
@@ -272,6 +276,24 @@ public class ManagedRegionBroker extends
                 LOG.debug("Failure reason: " + e, e);
             }
         }
+        if (view != null) {
+            key = view.getSlowConsumerStrategy();
+            if (key!= null && registeredMBeans.remove(key)) {
+                try {
+                    managementContext.unregisterMBean(key);
+                } catch (Throwable e) {
+                    LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
+                    LOG.debug("Failure reason: " + e, e);
+                }
+            }
+        }
+    }
+
+    private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
+        DestinationView candidate = map.remove(key);
+        if (candidate != null && view == null) {
+            view = candidate;
+        }
     }
 
     protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
@@ -527,4 +549,42 @@ public class ManagedRegionBroker extends
                                                + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
         return objectName;
     }
+
+    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
+        ObjectName objectName = null;
+        try {
+            objectName = createObjectName(strategy);
+            if (!registeredMBeans.contains(objectName))  {
+                AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
+                AnnotatedMBean.registerMBean(managementContext, view, objectName);
+                registeredMBeans.add(objectName);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to register MBean: " + strategy);
+            LOG.debug("Failure reason: " + e, e);
+        }
+        return objectName;
+    }
+
+    private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
+        Hashtable map = brokerObjectName.getKeyPropertyList();
+        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
+                            + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
+        return objectName;            
+    }
+
+    public ObjectName getSubscriberObjectName(Subscription key) {
+        return subscriptionMap.get(key);
+    }
+
+    public Subscription getSubscriber(ObjectName key) {
+        Subscription sub = null;
+        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
+            if (entry.getValue().equals(key)) {
+                sub = entry.getKey();
+                break;
+            }
+        }
+        return sub;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Wed Jul 21 17:09:33 2010
@@ -41,6 +41,9 @@ import javax.management.openmbean.OpenTy
 import javax.management.openmbean.SimpleType;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.SlowConsumerEntry;
 import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQMapMessage;
@@ -429,7 +432,30 @@ public final class OpenTypeSupport {
         }
     }
 
+    static class SlowConsumerEntryOpenTypeFactory extends AbstractOpenTypeFactory {
+       @Override
+        protected String getTypeName() {
+            return SlowConsumerEntry.class.getName();
+        }
+
+        @Override
+        protected void init() throws OpenDataException {
+            super.init();
+            addItem("subscription", "the subscription view", SimpleType.OBJECTNAME);
+            addItem("slowCount", "number of times deemed slow", SimpleType.INTEGER);
+            addItem("markCount", "number of periods remaining slow", SimpleType.INTEGER);
+        }
 
+        @Override
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
+            SlowConsumerEntry entry = (SlowConsumerEntry) o;
+            Map<String, Object> rc = super.getFields(o);
+            rc.put("subscription", entry.getSubscription());
+            rc.put("slowCount", Integer.valueOf(entry.getSlowCount()));
+            rc.put("markCount", Integer.valueOf(entry.getMarkCount()));
+            return rc;
+        }
+    }
 
     static {
         OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
@@ -439,6 +465,7 @@ public final class OpenTypeSupport {
         OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
         OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
         OPEN_TYPE_FACTORIES.put(Job.class, new JobOpenTypeFactory());
+        OPEN_TYPE_FACTORIES.put(SlowConsumerEntry.class, new SlowConsumerEntryOpenTypeFactory());
     }
 
     private OpenTypeSupport() {
@@ -448,7 +475,7 @@ public final class OpenTypeSupport {
         return OPEN_TYPE_FACTORIES.get(clazz);
     }
 
-    public static CompositeData convert(Message message) throws OpenDataException {
+    public static CompositeData convert(Object message) throws OpenDataException {
         OpenTypeFactory f = getFactory(message.getClass());
         if (f == null) {
             throw new OpenDataException("Cannot create a CompositeData for type: " + message.getClass().getName());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Jul 21 17:09:33 2010
@@ -602,6 +602,10 @@ public abstract class BaseDestination im
         this.slowConsumerStrategy = slowConsumerStrategy;
     }
 
+    public SlowConsumerStrategy getSlowConsumerStrategy() {
+        return this.slowConsumerStrategy;
+    }
+
    
     public boolean isPrioritizedMessages() {
         return this.prioritizedMessages;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Wed Jul 21 17:09:33 2010
@@ -23,6 +23,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -215,4 +216,6 @@ public interface Destination extends Ser
     void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
     
     boolean isPrioritizedMessages();
+
+    SlowConsumerStrategy getSlowConsumerStrategy();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Wed Jul 21 17:09:33 2010
@@ -23,6 +23,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -273,4 +274,8 @@ public class DestinationFilter implement
     public boolean isPrioritizedMessages() {
         return next.isPrioritizedMessages();
     }
+
+    public SlowConsumerStrategy getSlowConsumerStrategy() {
+        return next.getSlowConsumerStrategy();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jul 21 17:09:33 2010
@@ -407,8 +407,6 @@ public abstract class PrefetchSubscripti
      * Checks an ack versus the contents of the dispatched list.
      * 
      * @param ack
-     * @param firstAckedMsg
-     * @param lastAckedMsg
      * @throws JMSException if it does not match
      */
 	protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed Jul 21 17:09:33 2010
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.broker.region.policy;
 
 import java.util.HashMap;
@@ -5,6 +21,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Subscription;
@@ -23,7 +41,9 @@ public class AbortSlowConsumerStrategy i
     
     private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
 
+    private String name = "AbortSlowConsumerStrategy@" + hashCode();
     private Scheduler scheduler;
+    private Broker broker;
     private final AtomicBoolean taskStarted = new AtomicBoolean(false);
     private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
 
@@ -32,10 +52,11 @@ public class AbortSlowConsumerStrategy i
     private long checkPeriod = 30*1000;
     private boolean abortConnection = false;
 
-   public void setScheduler(Scheduler s) {
-       this.scheduler=s;
-   }
-   
+    public void setBrokerService(Broker broker) {
+       this.scheduler = broker.getScheduler();
+       this.broker = broker;
+    }
+
     public void slowConsumer(ConnectionContext context, Subscription subs) {
         if (maxSlowCount < 0 && maxSlowDuration < 0) {
             // nothing to do
@@ -75,21 +96,25 @@ public class AbortSlowConsumerStrategy i
                 slowConsumers.remove(entry.getKey());
             }
         }
-        
+
+        abortSubscription(toAbort, abortConnection);
+    }
+
+    private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
         for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
             ConnectionContext connectionContext = entry.getValue().context;
             if (connectionContext!= null) {
                 try {
-                    LOG.info("aborting " 
-                            + (abortConnection ? "connection" : "consumer") 
-                            + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId()); 
+                    LOG.info("aborting "
+                            + (abortSubscriberConnection ? "connection" : "consumer") 
+                            + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
 
                     final Connection connection = connectionContext.getConnection();
-                    if (connection != null) {    
-                        if (abortConnection) {
+                    if (connection != null) {
+                        if (abortSubscriberConnection) {
                             scheduler.executeAfterDelay(new Runnable() {
                                 public void run() {
-                                    connection.serviceException(new InactivityIOException("Consumer was slow too often (>" 
+                                    connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
                                             + maxSlowCount +  ") or too long (>"
                                             + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
                                 }}, 0l);
@@ -97,21 +122,36 @@ public class AbortSlowConsumerStrategy i
                             // just abort the consumer by telling it to stop
                             ConsumerControl stopConsumer = new ConsumerControl();
                             stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
-                            stopConsumer.setClose(true);    
+                            stopConsumer.setClose(true);
                             connection.dispatchAsync(stopConsumer);
                         }
                     } else {
                         LOG.debug("slowConsumer abort ignored, no connection in context:"  + connectionContext);
                     }
                 } catch (Exception e) {
-                    LOG.info("exception on stopping " 
-                            + (abortConnection ? "connection" : "consumer") 
-                            + " to abort slow consumer: " + entry.getKey(), e);   
+                    LOG.info("exception on stopping "
+                            + (abortSubscriberConnection ? "connection" : "consumer")
+                            + " to abort slow consumer: " + entry.getKey(), e);
                 }
             }
         }
     }
-    
+
+
+    public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
+        if (sub != null) {
+            SlowConsumerEntry entry = slowConsumers.remove(sub);
+            if (entry != null) {
+                Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
+                toAbort.put(sub, entry);
+                abortSubscription(toAbort, abortSubscriberConnection);
+            } else {
+                LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
+            }
+        }
+    }
+
+
     public long getMaxSlowCount() {
         return maxSlowCount;
     }
@@ -120,7 +160,7 @@ public class AbortSlowConsumerStrategy i
      * number of times a subscription can be deemed slow before triggering abort
      * effect depends on dispatch rate as slow determination is done on dispatch
      */
-    public void setMaxSlowCount(int maxSlowCount) {
+    public void setMaxSlowCount(long maxSlowCount) {
         this.maxSlowCount = maxSlowCount;
     }
 
@@ -161,22 +201,15 @@ public class AbortSlowConsumerStrategy i
         this.abortConnection = abortConnection;
     }
 
-    static class SlowConsumerEntry {
-        
-        final ConnectionContext context;
-        int slowCount = 1;
-        int markCount = 0;
-        
-        SlowConsumerEntry(ConnectionContext context) {
-            this.context = context;
-        }
+    public void setName(String name) {
+        this.name = name;
+    }
+    
+    public String getName() {
+        return name;
+    }
 
-        public void slow() {
-            slowCount++;
-        }
-        
-        public void mark() {
-            markCount++;
-        }
+    public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
+        return slowConsumers;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jul 21 17:09:33 2010
@@ -157,7 +157,7 @@ public class PolicyEntry extends Destina
         destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
         SlowConsumerStrategy scs = getSlowConsumerStrategy();
         if (scs != null) {
-            scs.setScheduler(broker.getScheduler());
+            scs.setBrokerService(broker);
         }
         destination.setSlowConsumerStrategy(scs);
         destination.setPrioritizedMessages(isPrioritizedMessages());

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java?rev=966319&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java Wed Jul 21 17:09:33 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ConnectionContext;
+
+public class SlowConsumerEntry {
+
+    final ConnectionContext context;
+    Object subscription;
+    int slowCount = 1;
+    int markCount = 0;
+
+    SlowConsumerEntry(ConnectionContext context) {
+        this.context = context;
+    }
+
+    public void slow() {
+        slowCount++;
+    }
+
+    public void mark() {
+        markCount++;
+    }
+
+    public void setSubscription(Object subscriptionObjectName) {
+        this.subscription = subscriptionObjectName;
+    }
+    
+    public Object getSubscription() {
+        return subscription;
+    }
+
+    public int getSlowCount() {
+        return slowCount;
+    }
+
+    public int getMarkCount() {
+        return markCount;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed Jul 21 17:09:33 2010
@@ -1,8 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.thread.Scheduler;
 
 /*
  * a strategy for dealing with slow consumers
@@ -10,6 +26,5 @@ import org.apache.activemq.thread.Schedu
 public interface SlowConsumerStrategy {
 
     void slowConsumer(ConnectionContext context, Subscription subs);
-    void setScheduler(Scheduler scheduler);
-
+    void setBrokerService(Broker broker);
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed Jul 21 17:09:33 2010
@@ -16,41 +16,45 @@
  */
 package org.apache.activemq.broker.policy;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
 import junit.framework.Test;
-
 import org.apache.activemq.JmsMultipleClientsTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
 
 public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
 
     private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class);
-    
+
     AbortSlowConsumerStrategy underTest;
-    
+
     public boolean abortConnection = false;
-    public long checkPeriod = 2*1000;
-    public long maxSlowDuration = 5*1000;
+    public long checkPeriod = 2 * 1000;
+    public long maxSlowDuration = 5 * 1000;
 
     private List<Throwable> exceptions = new ArrayList<Throwable>();
-    
+
     @Override
     protected void setUp() throws Exception {
         exceptions.clear();
@@ -59,7 +63,7 @@ public class AbortSlowConsumerTest exten
         super.setUp();
         createDestination();
     }
-    
+
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
@@ -79,7 +83,7 @@ public class AbortSlowConsumerTest exten
 
     public void testRegularConsumerIsNotAborted() throws Exception {
         startConsumers(destination);
-        for (Connection c: connections) {
+        for (Connection c : connections) {
             c.setExceptionListener(this);
         }
         startProducers(destination, 100);
@@ -90,12 +94,12 @@ public class AbortSlowConsumerTest exten
     public void initCombosForTestLittleSlowConsumerIsNotAborted() {
         addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
-    
+
     public void testLittleSlowConsumerIsNotAborted() throws Exception {
         startConsumers(destination);
         Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
         consumertoAbort.getValue().setProcessingDelay(500);
-        for (Connection c: connections) {
+        for (Connection c : connections) {
             c.setExceptionListener(this);
         }
         startProducers(destination, 12);
@@ -103,56 +107,102 @@ public class AbortSlowConsumerTest exten
         allMessagesList.assertAtLeastMessagesReceived(10);
     }
 
-    
+
     public void initCombosForTestSlowConsumerIsAborted() {
         addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
         addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
-    
+
     public void testSlowConsumerIsAborted() throws Exception {
         startConsumers(destination);
         Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
-        consumertoAbort.getValue().setProcessingDelay(8*1000);
-        for (Connection c: connections) {
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
             c.setExceptionListener(this);
         }
         startProducers(destination, 100);
-        
+
         consumertoAbort.getValue().assertMessagesReceived(1);
-     
+
         TimeUnit.SECONDS.sleep(5);
-        
-        consumertoAbort.getValue().assertAtMostMessagesReceived(1);        
+
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+    }
+
+
+    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+        underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
+        startConsumers(destination);
+        Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
+            c.setExceptionListener(this);
+        }
+        startProducers(destination, 100);
+
+        consumertoAbort.getValue().assertMessagesReceived(1);
+
+        ActiveMQDestination amqDest = (ActiveMQDestination)destination;
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" +
+                (amqDest.isTopic() ? "Topic" : "Queue") +",Destination="
+                + amqDest.getPhysicalName() + ",BrokerName=localhost");
+
+        QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+
+        assertNotNull(slowConsumerPolicyMBeanName);
+
+        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+                broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TabularData slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("one slow consumers", 1, slowOnes.size());
+
+        LOG.info("slow ones:"  + slowOnes);
+
+        CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
+        LOG.info("Slow one: " + slowOne);
+
+        assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
+        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
+
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+        slowOnes = abortPolicy.getSlowConsumers();
+        assertEquals("no slow consumers left", 0, slowOnes.size());
+
     }
 
-    
+
     public void testOnlyOneSlowConsumerIsAborted() throws Exception {
         consumerCount = 10;
         startConsumers(destination);
         Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
-        consumertoAbort.getValue().setProcessingDelay(8*1000);
-        for (Connection c: connections) {
+        consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+        for (Connection c : connections) {
             c.setExceptionListener(this);
         }
         startProducers(destination, 100);
-        
+
         allMessagesList.waitForMessagesToArrive(99);
         allMessagesList.assertAtLeastMessagesReceived(99);
-        
+
         consumertoAbort.getValue().assertMessagesReceived(1);
-     
+
         TimeUnit.SECONDS.sleep(5);
-        
-        consumertoAbort.getValue().assertAtMostMessagesReceived(1);        
+
+        consumertoAbort.getValue().assertAtMostMessagesReceived(1);
     }
-    
+
     public void testAbortAlreadyClosingConsumers() throws Exception {
         consumerCount = 1;
         startConsumers(destination);
         for (MessageIdList list : consumers.values()) {
-            list.setProcessingDelay(6*1000);
+            list.setProcessingDelay(6 * 1000);
         }
-        for (Connection c: connections) {
+        for (Connection c : connections) {
             c.setExceptionListener(this);
         }
         startProducers(destination, 100);
@@ -164,12 +214,12 @@ public class AbortSlowConsumerTest exten
             consumer.close();
         }
     }
-    
+
     public void initCombosForTestAbortAlreadyClosedConsumers() {
         addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
         addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
-    
+
     public void testAbortAlreadyClosedConsumers() throws Exception {
         Connection conn = createConnectionFactory().createConnection();
         conn.setExceptionListener(this);
@@ -182,17 +232,17 @@ public class AbortSlowConsumerTest exten
         TimeUnit.SECONDS.sleep(1);
         LOG.info("closing consumer: " + consumer);
         consumer.close();
-        
+
         TimeUnit.SECONDS.sleep(5);
         assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
     }
 
-    
+
     public void initCombosForTestAbortAlreadyClosedConnection() {
         addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
         addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
-    
+
     public void testAbortAlreadyClosedConnection() throws Exception {
         Connection conn = createConnectionFactory().createConnection();
         conn.setExceptionListener(this);
@@ -204,7 +254,7 @@ public class AbortSlowConsumerTest exten
         TimeUnit.SECONDS.sleep(1);
         LOG.info("closing connection: " + conn);
         conn.close();
-        
+
         TimeUnit.SECONDS.sleep(5);
         assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
     }
@@ -212,12 +262,12 @@ public class AbortSlowConsumerTest exten
     public void testAbortConsumerOnDeadConnection() throws Exception {
         // socket proxy on pause, close could hang??
     }
-    
+
     public void onException(JMSException exception) {
         exceptions.add(exception);
-        exception.printStackTrace();        
+        exception.printStackTrace();
     }
-    
+
     public static Test suite() {
         return suite(AbortSlowConsumerTest.class);
     }