You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/21 19:09:33 UTC
svn commit: r966319 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/a...
Author: gtully
Date: Wed Jul 21 17:09:33 2010
New Revision: 966319
URL: http://svn.apache.org/viewvc?rev=966319&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2741 - visibility of abort slow consumer policy in via jmx
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jul 21 17:09:33 2010
@@ -657,7 +657,9 @@ public class ActiveMQMessageConsumer imp
void doClose() throws JMSException {
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
- LOG.info("remove: " + this.getConsumerId() + ", lasteDeliveredSequenceId:" + lastDeliveredSequenceId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
+ }
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
this.session.asyncSendPacket(removeCommand);
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java?rev=966319&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java Wed Jul 21 17:09:33 2010
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerEntry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.Map;
+
+public class AbortSlowConsumerStrategyView implements AbortSlowConsumerStrategyViewMBean {
+ private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategyView.class);
+ private ManagedRegionBroker broker;
+ private AbortSlowConsumerStrategy strategy;
+
+
+ public AbortSlowConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowConsumerStrategy slowConsumerStrategy) {
+ this.broker = managedRegionBroker;
+ this.strategy = slowConsumerStrategy;
+ }
+
+ public long getMaxSlowCount() {
+ return strategy.getMaxSlowCount();
+ }
+
+ public void setMaxSlowCount(long maxSlowCount) {
+ strategy.setMaxSlowCount(maxSlowCount);
+ }
+
+ public long getMaxSlowDuration() {
+ return strategy.getMaxSlowDuration();
+ }
+
+ public void setMaxSlowDuration(long maxSlowDuration) {
+ strategy.setMaxSlowDuration(maxSlowDuration);
+ }
+
+ public long getCheckPeriod() {
+ return strategy.getCheckPeriod();
+ }
+
+ public TabularData getSlowConsumers() throws OpenDataException {
+
+ OpenTypeSupport.OpenTypeFactory factory = OpenTypeSupport.getFactory(SlowConsumerEntry.class);
+ CompositeType ct = factory.getCompositeType();
+ TabularType tt = new TabularType("SlowConsumers", "Table of current slow Consumers", ct, new String[] {"subscription" });
+ TabularDataSupport rc = new TabularDataSupport(tt);
+
+ int index = 0;
+ Map<Subscription, SlowConsumerEntry> slowConsumers = strategy.getSlowConsumers();
+ for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+ entry.getValue().setSubscription(broker.getSubscriberObjectName(entry.getKey()));
+ rc.put(OpenTypeSupport.convert(entry.getValue()));
+ }
+ return rc;
+ }
+
+ public void abortConsumer(ObjectName consumerToAbort) {
+ Subscription sub = broker.getSubscriber(consumerToAbort);
+ if (sub != null) {
+ LOG.info("aborting consumer via jmx: " + sub.getConsumerInfo().getConsumerId());
+ strategy.abortConsumer(sub, false);
+ } else {
+ LOG.warn("cannot resolve subscription matching name: " + consumerToAbort);
+ }
+
+ }
+
+ public void abortConnection(ObjectName consumerToAbort) {
+ Subscription sub = broker.getSubscriber(consumerToAbort);
+ if (sub != null) {
+ LOG.info("aborting consumer connection via jmx: " + sub.getConsumerInfo().getConsumerId().getConnectionId());
+ strategy.abortConsumer(sub, true);
+ } else {
+ LOG.warn("cannot resolve subscription matching name: " + consumerToAbort);
+ }
+ }
+
+ public void abortConsumer(String objectNameOfConsumerToAbort) {
+ abortConsumer(toObjectName(objectNameOfConsumerToAbort));
+ }
+
+ public void abortConnection(String objectNameOfConsumerToAbort) {
+ abortConnection(toObjectName(objectNameOfConsumerToAbort));
+ }
+
+ private ObjectName toObjectName(String objectName) {
+ ObjectName result = null;
+ try {
+ result = new ObjectName(objectName);
+ } catch (Exception e) {
+ LOG.warn("cannot create subscription ObjectName to abort, from string: " + objectName);
+ }
+ return result;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java?rev=966319&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java Wed Jul 21 17:09:33 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.ObjectName;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
+public interface AbortSlowConsumerStrategyViewMBean {
+
+ @MBeanInfo("returns the current max slow count, -1 disables")
+ long getMaxSlowCount();
+
+ @MBeanInfo("sets the count after which a slow consumer will be aborted, -1 disables")
+ void setMaxSlowCount(long maxSlowCount);
+
+ @MBeanInfo("returns the current max slow (milliseconds) duration")
+ long getMaxSlowDuration();
+
+ @MBeanInfo("sets the duration (milliseconds) after which a continually slow consumer will be aborted")
+ void setMaxSlowDuration(long maxSlowDuration);
+
+ @MBeanInfo("returns the check period at which a sweep of consumers is done to determine continued slowness")
+ public long getCheckPeriod();
+
+ @MBeanInfo("returns the current list of slow consumers, Not HTML friendly")
+ TabularData getSlowConsumers() throws OpenDataException;
+
+ @MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer")
+ void abortConsumer(ObjectName consumerToAbort);
+
+ @MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected")
+ void abortConnection(ObjectName consumerToAbort);
+
+ @MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer")
+ void abortConsumer(String objectNameOfConsumerToAbort);
+
+ @MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected")
+ void abortConnection(String objectNameOfConsumerToAbort);
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Wed Jul 21 17:09:33 2010
@@ -39,6 +39,8 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
@@ -388,4 +390,13 @@ public class DestinationView implements
return answer;
}
+ public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
+ ObjectName result = null;
+ SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
+ if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
+ result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
+ }
+ return result;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Wed Jul 21 17:09:33 2010
@@ -332,4 +332,13 @@ public interface DestinationViewMBean {
@MBeanInfo("returns all the current subscription MBeans matching this destination")
ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException;
+
+ /**
+ * Returns the slow consumer strategy MBean for this destination
+ *
+ * @return the name of the slow consumer handler MBean for this destination
+ */
+ @MBeanInfo("returns the optional slowConsumer handler MBeans for this destination")
+ ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException;
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jul 21 17:09:33 2010
@@ -53,6 +53,8 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -260,10 +262,12 @@ public class ManagedRegionBroker extends
}
protected void unregisterDestination(ObjectName key) throws Exception {
- topics.remove(key);
- queues.remove(key);
- temporaryQueues.remove(key);
- temporaryTopics.remove(key);
+
+ DestinationView view = null;
+ removeAndRemember(topics, key, view);
+ removeAndRemember(queues, key, view);
+ removeAndRemember(temporaryQueues, key, view);
+ removeAndRemember(temporaryTopics, key, view);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
@@ -272,6 +276,24 @@ public class ManagedRegionBroker extends
LOG.debug("Failure reason: " + e, e);
}
}
+ if (view != null) {
+ key = view.getSlowConsumerStrategy();
+ if (key!= null && registeredMBeans.remove(key)) {
+ try {
+ managementContext.unregisterMBean(key);
+ } catch (Throwable e) {
+ LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
+ LOG.debug("Failure reason: " + e, e);
+ }
+ }
+ }
+ }
+
+ private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
+ DestinationView candidate = map.remove(key);
+ if (candidate != null && view == null) {
+ view = candidate;
+ }
}
protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
@@ -527,4 +549,42 @@ public class ManagedRegionBroker extends
+ JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
return objectName;
}
+
+ public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
+ ObjectName objectName = null;
+ try {
+ objectName = createObjectName(strategy);
+ if (!registeredMBeans.contains(objectName)) {
+ AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
+ AnnotatedMBean.registerMBean(managementContext, view, objectName);
+ registeredMBeans.add(objectName);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register MBean: " + strategy);
+ LOG.debug("Failure reason: " + e, e);
+ }
+ return objectName;
+ }
+
+ private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
+ Hashtable map = brokerObjectName.getKeyPropertyList();
+ ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
+ + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
+ return objectName;
+ }
+
+ public ObjectName getSubscriberObjectName(Subscription key) {
+ return subscriptionMap.get(key);
+ }
+
+ public Subscription getSubscriber(ObjectName key) {
+ Subscription sub = null;
+ for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
+ if (entry.getValue().equals(key)) {
+ sub = entry.getKey();
+ break;
+ }
+ }
+ return sub;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Wed Jul 21 17:09:33 2010
@@ -41,6 +41,9 @@ import javax.management.openmbean.OpenTy
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.SlowConsumerEntry;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
@@ -429,7 +432,30 @@ public final class OpenTypeSupport {
}
}
+ static class SlowConsumerEntryOpenTypeFactory extends AbstractOpenTypeFactory {
+ @Override
+ protected String getTypeName() {
+ return SlowConsumerEntry.class.getName();
+ }
+
+ @Override
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem("subscription", "the subscription view", SimpleType.OBJECTNAME);
+ addItem("slowCount", "number of times deemed slow", SimpleType.INTEGER);
+ addItem("markCount", "number of periods remaining slow", SimpleType.INTEGER);
+ }
+ @Override
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
+ SlowConsumerEntry entry = (SlowConsumerEntry) o;
+ Map<String, Object> rc = super.getFields(o);
+ rc.put("subscription", entry.getSubscription());
+ rc.put("slowCount", Integer.valueOf(entry.getSlowCount()));
+ rc.put("markCount", Integer.valueOf(entry.getMarkCount()));
+ return rc;
+ }
+ }
static {
OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
@@ -439,6 +465,7 @@ public final class OpenTypeSupport {
OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(Job.class, new JobOpenTypeFactory());
+ OPEN_TYPE_FACTORIES.put(SlowConsumerEntry.class, new SlowConsumerEntryOpenTypeFactory());
}
private OpenTypeSupport() {
@@ -448,7 +475,7 @@ public final class OpenTypeSupport {
return OPEN_TYPE_FACTORIES.get(clazz);
}
- public static CompositeData convert(Message message) throws OpenDataException {
+ public static CompositeData convert(Object message) throws OpenDataException {
OpenTypeFactory f = getFactory(message.getClass());
if (f == null) {
throw new OpenDataException("Cannot create a CompositeData for type: " + message.getClass().getName());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Jul 21 17:09:33 2010
@@ -602,6 +602,10 @@ public abstract class BaseDestination im
this.slowConsumerStrategy = slowConsumerStrategy;
}
+ public SlowConsumerStrategy getSlowConsumerStrategy() {
+ return this.slowConsumerStrategy;
+ }
+
public boolean isPrioritizedMessages() {
return this.prioritizedMessages;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Wed Jul 21 17:09:33 2010
@@ -23,6 +23,7 @@ import org.apache.activemq.broker.Connec
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -215,4 +216,6 @@ public interface Destination extends Ser
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
boolean isPrioritizedMessages();
+
+ SlowConsumerStrategy getSlowConsumerStrategy();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Wed Jul 21 17:09:33 2010
@@ -23,6 +23,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -273,4 +274,8 @@ public class DestinationFilter implement
public boolean isPrioritizedMessages() {
return next.isPrioritizedMessages();
}
+
+ public SlowConsumerStrategy getSlowConsumerStrategy() {
+ return next.getSlowConsumerStrategy();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jul 21 17:09:33 2010
@@ -407,8 +407,6 @@ public abstract class PrefetchSubscripti
* Checks an ack versus the contents of the dispatched list.
*
* @param ack
- * @param firstAckedMsg
- * @param lastAckedMsg
* @throws JMSException if it does not match
*/
protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed Jul 21 17:09:33 2010
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.broker.region.policy;
import java.util.HashMap;
@@ -5,6 +21,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
@@ -23,7 +41,9 @@ public class AbortSlowConsumerStrategy i
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
+ private String name = "AbortSlowConsumerStrategy@" + hashCode();
private Scheduler scheduler;
+ private Broker broker;
private final AtomicBoolean taskStarted = new AtomicBoolean(false);
private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
@@ -32,10 +52,11 @@ public class AbortSlowConsumerStrategy i
private long checkPeriod = 30*1000;
private boolean abortConnection = false;
- public void setScheduler(Scheduler s) {
- this.scheduler=s;
- }
-
+ public void setBrokerService(Broker broker) {
+ this.scheduler = broker.getScheduler();
+ this.broker = broker;
+ }
+
public void slowConsumer(ConnectionContext context, Subscription subs) {
if (maxSlowCount < 0 && maxSlowDuration < 0) {
// nothing to do
@@ -75,21 +96,25 @@ public class AbortSlowConsumerStrategy i
slowConsumers.remove(entry.getKey());
}
}
-
+
+ abortSubscription(toAbort, abortConnection);
+ }
+
+ private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
ConnectionContext connectionContext = entry.getValue().context;
if (connectionContext!= null) {
try {
- LOG.info("aborting "
- + (abortConnection ? "connection" : "consumer")
- + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
+ LOG.info("aborting "
+ + (abortSubscriberConnection ? "connection" : "consumer")
+ + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
final Connection connection = connectionContext.getConnection();
- if (connection != null) {
- if (abortConnection) {
+ if (connection != null) {
+ if (abortSubscriberConnection) {
scheduler.executeAfterDelay(new Runnable() {
public void run() {
- connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
+ connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
+ maxSlowCount + ") or too long (>"
+ maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
}}, 0l);
@@ -97,21 +122,36 @@ public class AbortSlowConsumerStrategy i
// just abort the consumer by telling it to stop
ConsumerControl stopConsumer = new ConsumerControl();
stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
- stopConsumer.setClose(true);
+ stopConsumer.setClose(true);
connection.dispatchAsync(stopConsumer);
}
} else {
LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
}
} catch (Exception e) {
- LOG.info("exception on stopping "
- + (abortConnection ? "connection" : "consumer")
- + " to abort slow consumer: " + entry.getKey(), e);
+ LOG.info("exception on stopping "
+ + (abortSubscriberConnection ? "connection" : "consumer")
+ + " to abort slow consumer: " + entry.getKey(), e);
}
}
}
}
-
+
+
+ public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
+ if (sub != null) {
+ SlowConsumerEntry entry = slowConsumers.remove(sub);
+ if (entry != null) {
+ Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
+ toAbort.put(sub, entry);
+ abortSubscription(toAbort, abortSubscriberConnection);
+ } else {
+ LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
+ }
+ }
+ }
+
+
public long getMaxSlowCount() {
return maxSlowCount;
}
@@ -120,7 +160,7 @@ public class AbortSlowConsumerStrategy i
* number of times a subscription can be deemed slow before triggering abort
* effect depends on dispatch rate as slow determination is done on dispatch
*/
- public void setMaxSlowCount(int maxSlowCount) {
+ public void setMaxSlowCount(long maxSlowCount) {
this.maxSlowCount = maxSlowCount;
}
@@ -161,22 +201,15 @@ public class AbortSlowConsumerStrategy i
this.abortConnection = abortConnection;
}
- static class SlowConsumerEntry {
-
- final ConnectionContext context;
- int slowCount = 1;
- int markCount = 0;
-
- SlowConsumerEntry(ConnectionContext context) {
- this.context = context;
- }
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
- public void slow() {
- slowCount++;
- }
-
- public void mark() {
- markCount++;
- }
+ public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
+ return slowConsumers;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jul 21 17:09:33 2010
@@ -157,7 +157,7 @@ public class PolicyEntry extends Destina
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
SlowConsumerStrategy scs = getSlowConsumerStrategy();
if (scs != null) {
- scs.setScheduler(broker.getScheduler());
+ scs.setBrokerService(broker);
}
destination.setSlowConsumerStrategy(scs);
destination.setPrioritizedMessages(isPrioritizedMessages());
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java?rev=966319&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java Wed Jul 21 17:09:33 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ConnectionContext;
+
+public class SlowConsumerEntry {
+
+ final ConnectionContext context;
+ Object subscription;
+ int slowCount = 1;
+ int markCount = 0;
+
+ SlowConsumerEntry(ConnectionContext context) {
+ this.context = context;
+ }
+
+ public void slow() {
+ slowCount++;
+ }
+
+ public void mark() {
+ markCount++;
+ }
+
+ public void setSubscription(Object subscriptionObjectName) {
+ this.subscription = subscriptionObjectName;
+ }
+
+ public Object getSubscription() {
+ return subscription;
+ }
+
+ public int getSlowCount() {
+ return slowCount;
+ }
+
+ public int getMarkCount() {
+ return markCount;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed Jul 21 17:09:33 2010
@@ -1,8 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.thread.Scheduler;
/*
* a strategy for dealing with slow consumers
@@ -10,6 +26,5 @@ import org.apache.activemq.thread.Schedu
public interface SlowConsumerStrategy {
void slowConsumer(ConnectionContext context, Subscription subs);
- void setScheduler(Scheduler scheduler);
-
+ void setBrokerService(Broker broker);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=966319&r1=966318&r2=966319&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed Jul 21 17:09:33 2010
@@ -16,41 +16,45 @@
*/
package org.apache.activemq.broker.policy;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
import junit.framework.Test;
-
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class);
-
+
AbortSlowConsumerStrategy underTest;
-
+
public boolean abortConnection = false;
- public long checkPeriod = 2*1000;
- public long maxSlowDuration = 5*1000;
+ public long checkPeriod = 2 * 1000;
+ public long maxSlowDuration = 5 * 1000;
private List<Throwable> exceptions = new ArrayList<Throwable>();
-
+
@Override
protected void setUp() throws Exception {
exceptions.clear();
@@ -59,7 +63,7 @@ public class AbortSlowConsumerTest exten
super.setUp();
createDestination();
}
-
+
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
@@ -79,7 +83,7 @@ public class AbortSlowConsumerTest exten
public void testRegularConsumerIsNotAborted() throws Exception {
startConsumers(destination);
- for (Connection c: connections) {
+ for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
@@ -90,12 +94,12 @@ public class AbortSlowConsumerTest exten
public void initCombosForTestLittleSlowConsumerIsNotAborted() {
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
-
+
public void testLittleSlowConsumerIsNotAborted() throws Exception {
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(500);
- for (Connection c: connections) {
+ for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 12);
@@ -103,56 +107,102 @@ public class AbortSlowConsumerTest exten
allMessagesList.assertAtLeastMessagesReceived(10);
}
-
+
public void initCombosForTestSlowConsumerIsAborted() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
-
+
public void testSlowConsumerIsAborted() throws Exception {
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
- consumertoAbort.getValue().setProcessingDelay(8*1000);
- for (Connection c: connections) {
+ consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+ for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
-
+
consumertoAbort.getValue().assertMessagesReceived(1);
-
+
TimeUnit.SECONDS.sleep(5);
-
- consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+ }
+
+
+ public void testSlowConsumerIsAbortedViaJmx() throws Exception {
+ underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
+ startConsumers(destination);
+ Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
+ consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+ for (Connection c : connections) {
+ c.setExceptionListener(this);
+ }
+ startProducers(destination, 100);
+
+ consumertoAbort.getValue().assertMessagesReceived(1);
+
+ ActiveMQDestination amqDest = (ActiveMQDestination)destination;
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" +
+ (amqDest.isTopic() ? "Topic" : "Queue") +",Destination="
+ + amqDest.getPhysicalName() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+
+ assertNotNull(slowConsumerPolicyMBeanName);
+
+ AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+ broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+ TimeUnit.SECONDS.sleep(3);
+
+ TabularData slowOnes = abortPolicy.getSlowConsumers();
+ assertEquals("one slow consumers", 1, slowOnes.size());
+
+ LOG.info("slow ones:" + slowOnes);
+
+ CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
+ LOG.info("Slow one: " + slowOne);
+
+ assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
+ abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
+
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+ slowOnes = abortPolicy.getSlowConsumers();
+ assertEquals("no slow consumers left", 0, slowOnes.size());
+
}
-
+
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
consumerCount = 10;
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
- consumertoAbort.getValue().setProcessingDelay(8*1000);
- for (Connection c: connections) {
+ consumertoAbort.getValue().setProcessingDelay(8 * 1000);
+ for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
-
+
allMessagesList.waitForMessagesToArrive(99);
allMessagesList.assertAtLeastMessagesReceived(99);
-
+
consumertoAbort.getValue().assertMessagesReceived(1);
-
+
TimeUnit.SECONDS.sleep(5);
-
- consumertoAbort.getValue().assertAtMostMessagesReceived(1);
+
+ consumertoAbort.getValue().assertAtMostMessagesReceived(1);
}
-
+
public void testAbortAlreadyClosingConsumers() throws Exception {
consumerCount = 1;
startConsumers(destination);
for (MessageIdList list : consumers.values()) {
- list.setProcessingDelay(6*1000);
+ list.setProcessingDelay(6 * 1000);
}
- for (Connection c: connections) {
+ for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
@@ -164,12 +214,12 @@ public class AbortSlowConsumerTest exten
consumer.close();
}
}
-
+
public void initCombosForTestAbortAlreadyClosedConsumers() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
-
+
public void testAbortAlreadyClosedConsumers() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
@@ -182,17 +232,17 @@ public class AbortSlowConsumerTest exten
TimeUnit.SECONDS.sleep(1);
LOG.info("closing consumer: " + consumer);
consumer.close();
-
+
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
}
-
+
public void initCombosForTestAbortAlreadyClosedConnection() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
-
+
public void testAbortAlreadyClosedConnection() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
@@ -204,7 +254,7 @@ public class AbortSlowConsumerTest exten
TimeUnit.SECONDS.sleep(1);
LOG.info("closing connection: " + conn);
conn.close();
-
+
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
}
@@ -212,12 +262,12 @@ public class AbortSlowConsumerTest exten
public void testAbortConsumerOnDeadConnection() throws Exception {
// socket proxy on pause, close could hang??
}
-
+
public void onException(JMSException exception) {
exceptions.add(exception);
- exception.printStackTrace();
+ exception.printStackTrace();
}
-
+
public static Test suite() {
return suite(AbortSlowConsumerTest.class);
}