You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/10/13 03:58:50 UTC
[activemq] branch activemq-5.17.x updated: remove consumers more efficiently
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
new 51c390737 remove consumers more efficiently
51c390737 is described below
commit 51c390737aacd2d3349c7e043ebb32fb815c0d1d
Author: Lucas Tétreault <te...@amazon.com>
AuthorDate: Mon Oct 10 13:30:35 2022 -0700
remove consumers more efficiently
(cherry pick from commit b7f73d4875d767f704c6720e66da1bf8609d7ed7)
---
.../activemq/broker/jmx/ManagedRegionBroker.java | 50 +++++++++++-----------
1 file changed, 26 insertions(+), 24 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index e856c5da5..5decd6c3c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
import javax.jms.IllegalStateException;
import javax.management.InstanceNotFoundException;
@@ -79,23 +80,24 @@ public class ManagedRegionBroker extends RegionBroker {
private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
private final ManagementContext managementContext;
private final ObjectName brokerObjectName;
- private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
- private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
- private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
- private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
- private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
- private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
- private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
- private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
- private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
- private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
- private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
- private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
- private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
- private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
- private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
- private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
- private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
+ private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<>();
+ private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<>();
+ private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<>();
+ private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<>();
+ private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<>();
+ private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<>();
+ private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<>();
+ private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<>();
+ private final Map<ConsumerInfo, Subscription> consumerSubscriptionMap = new ConcurrentHashMap<>();
private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
@@ -215,6 +217,7 @@ public class ManagedRegionBroker extends RegionBroker {
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
subscriptionMap.put(sub, objectName);
+ consumerSubscriptionMap.put(sub.getConsumerInfo(), sub);
return objectName;
} catch (Exception e) {
LOG.error("Failed to register subscription {}", sub, e);
@@ -249,11 +252,9 @@ public class ManagedRegionBroker extends RegionBroker {
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
- for (Subscription sub : subscriptionMap.keySet()) {
- if (sub.getConsumerInfo().equals(info)) {
- // unregister all consumer subs
- unregisterSubscription(subscriptionMap.get(sub), true);
- }
+ if (consumerSubscriptionMap.containsKey(info)){
+ Subscription sub = consumerSubscriptionMap.get(info);
+ unregisterSubscription(subscriptionMap.get(sub), true);
}
super.removeConsumer(context, info);
}
@@ -295,6 +296,7 @@ public class ManagedRegionBroker extends RegionBroker {
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
+ consumerSubscriptionMap.remove(sub.getConsumerInfo());
if (name != null) {
try {
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
@@ -484,7 +486,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected void buildExistingSubscriptions() throws Exception {
- Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
+ Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<>();
Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
if (destinations != null) {
for (ActiveMQDestination dest : destinations) {
@@ -609,7 +611,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
private ObjectName[] onlyNonSuppressed (Set<ObjectName> set){
- List<ObjectName> nonSuppressed = new ArrayList<ObjectName>();
+ List<ObjectName> nonSuppressed = new ArrayList<>();
for(ObjectName key : set){
if (managementContext.isAllowedToRegister(key)){
nonSuppressed.add(key);