You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/03 10:51:02 UTC

[activemq] branch activemq-5.17.x updated: AMQ-9107 - rework performance improvement for consumer closing in managed region broker

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 8cc7a4545 AMQ-9107 - rework performance improvement for consumer closing in managed region broker
8cc7a4545 is described below

commit 8cc7a4545510197fe3074ed598a811bdef6bd5d8
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Tue Nov 1 10:45:24 2022 -0400

    AMQ-9107 - rework performance improvement for consumer closing in
    managed region broker
    
    This new approach just looks matching Subscriptions from the region for the
    destination which prevents having to store another map and falls back to
    the old approach if something went wrong.
    
    (cherry picked from commit d46b74d674c2a67193bb95384290da266adf9a25)
---
 .../activemq/broker/jmx/ManagedRegionBroker.java   |  64 +++++-
 .../broker/jmx/JmxConsumerRemovalTest.java         | 214 +++++++++++++++++++++
 2 files changed, 270 insertions(+), 8 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index 5decd6c3c..fa2837956 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -23,9 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.function.Consumer;
 
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
 import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
@@ -42,6 +42,7 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
+import org.apache.activemq.broker.region.AbstractRegion;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
 import org.apache.activemq.broker.region.DestinationInterceptor;
@@ -61,6 +62,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -97,8 +99,7 @@ public class ManagedRegionBroker extends RegionBroker {
     private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<>();
     private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<>();
     private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<>();
-    private final Map<ConsumerInfo, Subscription> consumerSubscriptionMap = new ConcurrentHashMap<>();
-    private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
+    private final Set<ObjectName> registeredMBeans = ConcurrentHashMap.newKeySet();
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 
@@ -217,7 +218,6 @@ public class ManagedRegionBroker extends RegionBroker {
                 registerSubscription(objectName, sub.getConsumerInfo(), key, view);
             }
             subscriptionMap.put(sub, objectName);
-            consumerSubscriptionMap.put(sub.getConsumerInfo(), sub);
             return objectName;
         } catch (Exception e) {
             LOG.error("Failed to register subscription {}", sub, e);
@@ -252,13 +252,62 @@ public class ManagedRegionBroker extends RegionBroker {
 
     @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        if (consumerSubscriptionMap.containsKey(info)){
-            Subscription sub = consumerSubscriptionMap.get(info);
-            unregisterSubscription(subscriptionMap.get(sub), true);
+        //Find subscriptions quickly by relying on the maps contained in the different Regions
+        //that map consumer ids and subscriptions
+        final Set<Subscription> subscriptions = findSubscriptions(info);
+
+        if (!subscriptions.isEmpty()) {
+            for (Subscription sub : subscriptions) {
+                // unregister all consumer subs
+                unregisterSubscription(subscriptionMap.get(sub), true);
+                break;
+            }
+        } else {
+            //Fall back to old slow approach where we go through the entire subscription map case something went wrong
+            //and no subscriptions were found - should generally not happen
+            for (Subscription sub : subscriptionMap.keySet()) {
+                if (sub.getConsumerInfo().equals(info)) {
+                    unregisterSubscription(subscriptionMap.get(sub), true);
+                }
+            }
         }
+
         super.removeConsumer(context, info);
     }
 
+    private Set<Subscription> findSubscriptions(final ConsumerInfo info) {
+        final Set<Subscription> subscriptions = new HashSet<>();
+
+        try {
+            if (info.getDestination() != null) {
+                final ActiveMQDestination consumerDest = info.getDestination();
+                //If it's composite then go through and find the subscription for every dest in case different
+                if (consumerDest.isComposite()) {
+                    ActiveMQDestination[] destinations = consumerDest.getCompositeDestinations();
+                    for (ActiveMQDestination destination : destinations) {
+                        addSubscriptionToList(subscriptions, info.getConsumerId(), destination);
+                    }
+                } else {
+                    //This is the case for a non-composite destination which would be most of the time
+                    addSubscriptionToList(subscriptions, info.getConsumerId(), info.getDestination());
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Error finding subscription {}: {}", info, e.getMessage());
+        }
+
+        return subscriptions;
+    }
+
+    private void addSubscriptionToList(Set<Subscription> subscriptions,
+        ConsumerId consumerId, ActiveMQDestination dest) throws JMSException {
+        final Subscription matchingSub = ((AbstractRegion) this.getRegion(dest))
+            .getSubscriptions().get(consumerId);
+        if (matchingSub != null) {
+            subscriptions.add(matchingSub);
+        }
+    }
+
     @Override
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         super.addProducer(context, info);
@@ -296,7 +345,6 @@ public class ManagedRegionBroker extends RegionBroker {
 
     public void unregisterSubscription(Subscription sub) {
         ObjectName name = subscriptionMap.remove(sub);
-        consumerSubscriptionMap.remove(sub.getConsumerInfo());
         if (name != null) {
             try {
                 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxConsumerRemovalTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxConsumerRemovalTest.java
new file mode 100644
index 000000000..2e356504a
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxConsumerRemovalTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import junit.textui.TestRunner;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JmxConsumerRemovalTest extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmxConsumerRemovalTest.class);
+
+    protected MBeanServer mbeanServer;
+    protected ManagedRegionBroker regionBroker;
+    protected Session session;
+    protected String clientID = "foo";
+
+    protected Connection connection;
+    protected boolean transacted;
+
+    public static void main(String[] args) {
+        TestRunner.run(JmxConsumerRemovalTest.class);
+    }
+
+    public static Test suite() {
+        return suite(JmxConsumerRemovalTest.class);
+    }
+
+    public void testCompositeDestConsumerRemoval() throws Exception {
+        Map<Subscription, ObjectName> subscriptionMap = getSubscriptionMap();
+        int consumersToAdd = 1000;
+        Set<MessageConsumer> consumers = new HashSet<>();
+
+        final ActiveMQDestination dest = new ActiveMQQueue("test");
+        dest.setCompositeDestinations(new ActiveMQDestination[]{new ActiveMQQueue("test1"),
+            new ActiveMQQueue("test2"), new ActiveMQQueue("test3")});
+
+        for (int i = 0; i < consumersToAdd; i++) {
+            consumers.add(session.createConsumer(dest));
+        }
+
+        //Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
+        assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500));
+
+        for (MessageConsumer consumer : consumers) {
+            consumer.close();
+        }
+
+        //Make sure map removed all consumers after close
+        assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500));
+        assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length +
+            regionBroker.getTopicSubscribers().length, 5000, 500));
+    }
+
+    public void testDurableConsumerRemoval() throws Exception {
+        testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.1"));
+    }
+
+    public void testDurableConsumerWildcardRemoval() throws Exception {
+        testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.>"));
+
+    }
+    public void testDurableConsumerRemoval(ActiveMQDestination dest) throws Exception {
+        int consumersToAdd = 1000;
+        Set<MessageConsumer> durables = new HashSet<>();
+
+        //Create a lot of durables and then
+        for (int i = 0; i < consumersToAdd; i++) {
+            durables.add(session.createDurableSubscriber((Topic) dest, "sub" + i));
+        }
+
+        //Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
+        assertTrue(Wait.waitFor(() -> consumersToAdd == getSubscriptionMap().size(), 5000, 500));
+
+        for (MessageConsumer consumer : durables) {
+            consumer.close();
+        }
+
+        //Make sure map removed all consumers after close
+        assertTrue(Wait.waitFor(() -> 0 == regionBroker.getDurableTopicSubscribers().length, 5000, 500));
+        //Note we can't check the subscription map as the durables still exist, just offline
+    }
+
+    public void testQueueConsumerRemoval() throws Exception {
+        testConsumerRemoval(new ActiveMQQueue("wildcard.queue.1"));
+    }
+
+    public void testQueueConsumerRemovalWildcard() throws Exception {
+        testConsumerRemoval(new ActiveMQQueue("wildcard.queue.>"));
+    }
+
+    public void testTopicConsumerRemoval() throws Exception {
+        testConsumerRemoval(new ActiveMQTopic("wildcard.topic.1"));
+    }
+
+    public void testTopicConsumerRemovalWildcard() throws Exception {
+        testConsumerRemoval(new ActiveMQTopic("wildcard.topic.>"));
+    }
+
+    private void testConsumerRemoval(ActiveMQDestination dest) throws Exception {
+        Map<Subscription, ObjectName> subscriptionMap = getSubscriptionMap();
+        int consumersToAdd = 1000;
+        Set<MessageConsumer> consumers = new HashSet<>();
+
+        for (int i = 0; i < consumersToAdd; i++) {
+            consumers.add(session.createConsumer(dest));
+        }
+
+        //Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
+        assertTrue(Wait.waitFor(() ->  consumersToAdd == subscriptionMap.size(), 5000, 500));
+
+        for (MessageConsumer consumer : consumers) {
+            consumer.close();
+        }
+
+        //Make sure map removed all consumers after close
+        assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500));
+        assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length &&
+            0 == regionBroker.getTopicSubscribers().length, 5000, 500));
+    }
+
+    private Map<Subscription, ObjectName> getSubscriptionMap() throws Exception {
+        ManagedRegionBroker regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class);
+        Field subMapField = ManagedRegionBroker.class.getDeclaredField("subscriptionMap");
+        subMapField.setAccessible(true);
+        return (Map<Subscription, ObjectName>) subMapField.get(regionBroker);
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        useTopic = true;
+        super.setUp();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+        regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class);
+        ((ActiveMQConnectionFactory)connectionFactory).setWatchTopicAdvisories(false);
+        connection = connectionFactory.createConnection();
+        connection.setClientID(clientID);
+        connection.start();
+        session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(true);
+        answer.addConnector(bindAddress);
+        answer.deleteAllMessages();
+        return answer;
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    protected void echo(String text) {
+        LOG.info(text);
+    }
+
+    /**
+     * Returns the name of the destination used in this test case
+     */
+    protected String getDestinationString() {
+        return getClass().getName() + "." + getName(true);
+    }
+}