You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 03:41:45 UTC

[15/17] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5337

https://issues.apache.org/jira/browse/AMQ-5337

Switch to LinkedHashMap with R/W locking for concurrent add / remove
protection


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4349e77e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4349e77e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4349e77e

Branch: refs/heads/activemq-5.10.x
Commit: 4349e77eefa560f0dff2eee31cb6a9881fee3559
Parents: 41311df
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 29 15:52:23 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 19:50:55 2014 -0500

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4349e77e/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 2583a23..58947ad 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -16,12 +16,14 @@
  */
 package org.apache.activemq.advisory;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
@@ -70,7 +72,8 @@ public class AdvisoryBroker extends BrokerFilter {
 
     protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
 
-    protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
+    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
+    protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
 
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
@@ -103,7 +106,12 @@ public class AdvisoryBroker extends BrokerFilter {
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            consumers.offer(info);
+            consumersLock.writeLock().lock();
+            try {
+                consumers.put(info.getConsumerId(), info);
+            } finally {
+                consumersLock.writeLock().unlock();
+            }
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
         } else {
             // We need to replay all the previously collected state objects
@@ -148,10 +156,15 @@ public class AdvisoryBroker extends BrokerFilter {
 
             // Replay the consumers.
             if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
-                for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) {
-                    ConsumerInfo value = iter.next();
-                    ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
-                    fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
+                consumersLock.readLock().lock();
+                try {
+                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
+                        ConsumerInfo value = iter.next();
+                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
+                        fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
+                    }
+                } finally {
+                    consumersLock.readLock().unlock();
                 }
             }
 
@@ -266,7 +279,12 @@ public class AdvisoryBroker extends BrokerFilter {
         ActiveMQDestination dest = info.getDestination();
         if (!AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            consumers.remove(info);
+            consumersLock.writeLock().lock();
+            try {
+                consumers.remove(info.getConsumerId());
+            } finally {
+                consumersLock.writeLock().unlock();
+            }
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
                 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
             }
@@ -623,8 +641,13 @@ public class AdvisoryBroker extends BrokerFilter {
         return connections;
     }
 
-    public Queue<ConsumerInfo> getAdvisoryConsumers() {
-        return consumers;
+    public Collection<ConsumerInfo> getAdvisoryConsumers() {
+        consumersLock.readLock().lock();
+        try {
+            return new ArrayList<ConsumerInfo>(consumers.values());
+        } finally {
+            consumersLock.readLock().unlock();
+        }
     }
 
     public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {