You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/10/18 15:50:02 UTC
activemq git commit: AMQ-7079 AMQ-7077 AMQ-6421 - check for consumers
that have been destroyed
Repository: activemq
Updated Branches:
refs/heads/master b9c8f6228 -> 9dd751149
AMQ-7079 AMQ-7077 AMQ-6421 - check for consumers that have been destroyed
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9dd75114
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9dd75114
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9dd75114
Branch: refs/heads/master
Commit: 9dd751149f7489f99d430d3f1240f2bfa2e70c69
Parents: b9c8f62
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 18 16:49:29 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 18 16:49:29 2018 +0100
----------------------------------------------------------------------
.../policy/AbortSlowAckConsumerStrategy.java | 27 ++++++++------
.../org/apache/activemq/bugs/AMQ7077Test.java | 38 ++++++++++++++++++--
2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9dd75114/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
index 063fdec..edb0cef 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
@@ -16,10 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -74,14 +71,24 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
return;
}
- if (getMaxSlowDuration() > 0) {
- // For subscriptions that are already slow we mark them again and check below if
- // they've exceeded their configured lifetime.
- for (SlowConsumerEntry entry : slowConsumers.values()) {
- entry.mark();
+
+ List<Subscription> subscribersDestroyed = new LinkedList<Subscription>();
+ // check for removed consumers also
+ for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+ if (getMaxSlowDuration() > 0) {
+ // For subscriptions that are already slow we mark them again and check below if
+ // they've exceeded their configured lifetime.
+ entry.getValue().mark();
+ }
+ if (!entry.getKey().isSlowConsumer()) {
+ subscribersDestroyed.add(entry.getKey());
}
}
+ for (Subscription subscription: subscribersDestroyed) {
+ slowConsumers.remove(subscription);
+ }
+
List<Destination> disposed = new ArrayList<Destination>();
for (Destination destination : destinations.values()) {
@@ -134,7 +141,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
if (!abstractSubscription.isSlowConsumer()) {
abstractSubscription.setSlowConsumer(true);
for (Destination destination: abstractSubscription.getDestinations()) {
- // destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription);
+ destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9dd75114/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
index 8b41a14..67d478f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
@@ -20,10 +20,13 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -31,9 +34,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
import java.net.URI;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class AMQ7077Test {
@@ -52,7 +59,8 @@ public class AMQ7077Test {
AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
strategy.setCheckPeriod(500);
strategy.setMaxTimeSinceLastAck(1000);
- strategy.setMaxSlowCount(2);
+ strategy.setMaxSlowDuration(0);
+ strategy.setMaxSlowCount(4);
strategy.setIgnoreIdleConsumers(false);
return strategy;
}
@@ -68,7 +76,6 @@ public class AMQ7077Test {
policy.setAdvisoryForSlowConsumers(true);
PolicyMap pMap = new PolicyMap();
pMap.put(new ActiveMQQueue(">"), policy);
- brokerService.setUseJmx(false);
brokerService.setDestinationPolicy(pMap);
brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
@@ -84,16 +91,41 @@ public class AMQ7077Test {
Destination destination = session.createQueue("DD");
+ MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination));
+
MessageConsumer consumer = session.createConsumer(destination);
// will be idle and can get removed but will be marked slow and now produce an advisory
- MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination));
Message message = advisoryConsumer.receive(10000);
if (message == null) {
message = advisoryConsumer.receive(2000);
}
assertNotNull("Got advisory", message);
connection.close();
+
+ QueueViewMBean queue = getProxyToQueue(((Queue) destination).getQueueName());
+ ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+ assertNotNull(slowConsumerPolicyMBeanName);
+
+ AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+ brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+ assertTrue("slow list is gone on remove", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ TabularData slowOnes = abortPolicy.getSlowConsumers();
+ LOG.info("slow ones:" + slowOnes);
+ return slowOnes.size() == 0;
+ }
+ }));
+
+ }
+
+ protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+ QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ return proxy;
}
@After