You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/29 21:52:43 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5337

Repository: activemq
Updated Branches:
  refs/heads/trunk 60bdfc061 -> b2afb8c96


https://issues.apache.org/jira/browse/AMQ-5337

Switch to LinkedHashMap with R/W locking for concurrent add / remove
protection  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b2afb8c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b2afb8c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b2afb8c9

Branch: refs/heads/trunk
Commit: b2afb8c9698875ca4b6f2ef4f1f18abfcf9fc15e
Parents: 60bdfc0
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 29 15:52:23 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 29 15:52:23 2014 -0400

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b2afb8c9/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 2583a23..58947ad 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -16,12 +16,14 @@
  */
 package org.apache.activemq.advisory;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
@@ -70,7 +72,8 @@ public class AdvisoryBroker extends BrokerFilter {
 
     protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
 
-    protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
+    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
+    protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
 
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
@@ -103,7 +106,12 @@ public class AdvisoryBroker extends BrokerFilter {
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            consumers.offer(info);
+            consumersLock.writeLock().lock();
+            try {
+                consumers.put(info.getConsumerId(), info);
+            } finally {
+                consumersLock.writeLock().unlock();
+            }
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
         } else {
             // We need to replay all the previously collected state objects
@@ -148,10 +156,15 @@ public class AdvisoryBroker extends BrokerFilter {
 
             // Replay the consumers.
             if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
-                for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) {
-                    ConsumerInfo value = iter.next();
-                    ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
-                    fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
+                consumersLock.readLock().lock();
+                try {
+                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
+                        ConsumerInfo value = iter.next();
+                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
+                        fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
+                    }
+                } finally {
+                    consumersLock.readLock().unlock();
                 }
             }
 
@@ -266,7 +279,12 @@ public class AdvisoryBroker extends BrokerFilter {
         ActiveMQDestination dest = info.getDestination();
         if (!AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            consumers.remove(info);
+            consumersLock.writeLock().lock();
+            try {
+                consumers.remove(info.getConsumerId());
+            } finally {
+                consumersLock.writeLock().unlock();
+            }
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
                 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
             }
@@ -623,8 +641,13 @@ public class AdvisoryBroker extends BrokerFilter {
         return connections;
     }
 
-    public Queue<ConsumerInfo> getAdvisoryConsumers() {
-        return consumers;
+    public Collection<ConsumerInfo> getAdvisoryConsumers() {
+        consumersLock.readLock().lock();
+        try {
+            return new ArrayList<ConsumerInfo>(consumers.values());
+        } finally {
+            consumersLock.readLock().unlock();
+        }
     }
 
     public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {