You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/10/13 03:58:50 UTC

[activemq] branch activemq-5.17.x updated: remove consumers more efficiently

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 51c390737 remove consumers more efficiently
51c390737 is described below

commit 51c390737aacd2d3349c7e043ebb32fb815c0d1d
Author: Lucas Tétreault <te...@amazon.com>
AuthorDate: Mon Oct 10 13:30:35 2022 -0700

    remove consumers more efficiently
    
    (cherry pick from commit b7f73d4875d767f704c6720e66da1bf8609d7ed7)
---
 .../activemq/broker/jmx/ManagedRegionBroker.java   | 50 +++++++++++-----------
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index e856c5da5..5decd6c3c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
 
 import javax.jms.IllegalStateException;
 import javax.management.InstanceNotFoundException;
@@ -79,23 +80,24 @@ public class ManagedRegionBroker extends RegionBroker {
     private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
     private final ManagementContext managementContext;
     private final ObjectName brokerObjectName;
-    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
-    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
-    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
-    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
-    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
-    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
-    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
-    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
-    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
-    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
-    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
-    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
-    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
-    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
-    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
-    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
-    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
+    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<>();
+    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<>();
+    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<>();
+    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<>();
+    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<>();
+    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<>();
+    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<>();
+    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<>();
+    private final Map<ConsumerInfo, Subscription> consumerSubscriptionMap = new ConcurrentHashMap<>();
     private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
@@ -215,6 +217,7 @@ public class ManagedRegionBroker extends RegionBroker {
                 registerSubscription(objectName, sub.getConsumerInfo(), key, view);
             }
             subscriptionMap.put(sub, objectName);
+            consumerSubscriptionMap.put(sub.getConsumerInfo(), sub);
             return objectName;
         } catch (Exception e) {
             LOG.error("Failed to register subscription {}", sub, e);
@@ -249,11 +252,9 @@ public class ManagedRegionBroker extends RegionBroker {
 
     @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        for (Subscription sub : subscriptionMap.keySet()) {
-            if (sub.getConsumerInfo().equals(info)) {
-               // unregister all consumer subs
-               unregisterSubscription(subscriptionMap.get(sub), true);
-            }
+        if (consumerSubscriptionMap.containsKey(info)){
+            Subscription sub = consumerSubscriptionMap.get(info);
+            unregisterSubscription(subscriptionMap.get(sub), true);
         }
         super.removeConsumer(context, info);
     }
@@ -295,6 +296,7 @@ public class ManagedRegionBroker extends RegionBroker {
 
     public void unregisterSubscription(Subscription sub) {
         ObjectName name = subscriptionMap.remove(sub);
+        consumerSubscriptionMap.remove(sub.getConsumerInfo());
         if (name != null) {
             try {
                 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
@@ -484,7 +486,7 @@ public class ManagedRegionBroker extends RegionBroker {
     }
 
     protected void buildExistingSubscriptions() throws Exception {
-        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
+        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<>();
         Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
         if (destinations != null) {
             for (ActiveMQDestination dest : destinations) {
@@ -609,7 +611,7 @@ public class ManagedRegionBroker extends RegionBroker {
     }
 
     private ObjectName[] onlyNonSuppressed (Set<ObjectName> set){
-        List<ObjectName> nonSuppressed = new ArrayList<ObjectName>();
+        List<ObjectName> nonSuppressed = new ArrayList<>();
         for(ObjectName key : set){
             if (managementContext.isAllowedToRegister(key)){
                 nonSuppressed.add(key);