You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/10/18 15:50:02 UTC

activemq git commit: AMQ-7079 AMQ-7077 AMQ-6421 - check for consumers that have been destroyed

Repository: activemq
Updated Branches:
  refs/heads/master b9c8f6228 -> 9dd751149


AMQ-7079 AMQ-7077 AMQ-6421 - check for consumers that have been destroyed


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9dd75114
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9dd75114
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9dd75114

Branch: refs/heads/master
Commit: 9dd751149f7489f99d430d3f1240f2bfa2e70c69
Parents: b9c8f62
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 18 16:49:29 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 18 16:49:29 2018 +0100

----------------------------------------------------------------------
 .../policy/AbortSlowAckConsumerStrategy.java    | 27 ++++++++------
 .../org/apache/activemq/bugs/AMQ7077Test.java   | 38 ++++++++++++++++++--
 2 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9dd75114/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
index 063fdec..edb0cef 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -74,14 +71,24 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
             return;
         }
 
-        if (getMaxSlowDuration() > 0) {
-            // For subscriptions that are already slow we mark them again and check below if
-            // they've exceeded their configured lifetime.
-            for (SlowConsumerEntry entry : slowConsumers.values()) {
-                entry.mark();
+
+        List<Subscription> subscribersDestroyed  = new LinkedList<Subscription>();
+        // check for removed consumers also
+        for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
+            if (getMaxSlowDuration() > 0) {
+                // For subscriptions that are already slow we mark them again and check below if
+                // they've exceeded their configured lifetime.
+                entry.getValue().mark();
+            }
+            if (!entry.getKey().isSlowConsumer()) {
+                subscribersDestroyed.add(entry.getKey());
             }
         }
 
+        for (Subscription subscription: subscribersDestroyed) {
+            slowConsumers.remove(subscription);
+        }
+
         List<Destination> disposed = new ArrayList<Destination>();
 
         for (Destination destination : destinations.values()) {
@@ -134,7 +141,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
                         if (!abstractSubscription.isSlowConsumer()) {
                             abstractSubscription.setSlowConsumer(true);
                             for (Destination destination: abstractSubscription.getDestinations()) {
-                               // destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription);
+                               destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9dd75114/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
index 8b41a14..67d478f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java
@@ -20,10 +20,13 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,9 +34,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
 import java.net.URI;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class AMQ7077Test {
 
@@ -52,7 +59,8 @@ public class AMQ7077Test {
         AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
         strategy.setCheckPeriod(500);
         strategy.setMaxTimeSinceLastAck(1000);
-        strategy.setMaxSlowCount(2);
+        strategy.setMaxSlowDuration(0);
+        strategy.setMaxSlowCount(4);
         strategy.setIgnoreIdleConsumers(false);
         return strategy;
     }
@@ -68,7 +76,6 @@ public class AMQ7077Test {
         policy.setAdvisoryForSlowConsumers(true);
         PolicyMap pMap = new PolicyMap();
         pMap.put(new ActiveMQQueue(">"), policy);
-        brokerService.setUseJmx(false);
         brokerService.setDestinationPolicy(pMap);
         brokerService.addConnector("tcp://0.0.0.0:0");
         brokerService.start();
@@ -84,16 +91,41 @@ public class AMQ7077Test {
 
         Destination destination = session.createQueue("DD");
 
+        MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination));
+
         MessageConsumer consumer = session.createConsumer(destination);
         // will be idle and can get removed but will be marked slow and now produce an advisory
 
-        MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination));
         Message message = advisoryConsumer.receive(10000);
         if (message == null) {
             message = advisoryConsumer.receive(2000);
         }
         assertNotNull("Got advisory", message);
         connection.close();
+
+        QueueViewMBean queue = getProxyToQueue(((Queue) destination).getQueueName());
+        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
+        assertNotNull(slowConsumerPolicyMBeanName);
+
+        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
+                brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
+
+        assertTrue("slow list is gone on remove", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                TabularData slowOnes = abortPolicy.getSlowConsumers();
+                LOG.info("slow ones:"  + slowOnes);
+                return slowOnes.size() == 0;
+            }
+        }));
+
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
     }
 
     @After