You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/29 21:52:43 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5337
Repository: activemq
Updated Branches:
refs/heads/trunk 60bdfc061 -> b2afb8c96
https://issues.apache.org/jira/browse/AMQ-5337
Switch to LinkedHashMap with R/W locking for concurrent add / remove
protection
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b2afb8c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b2afb8c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b2afb8c9
Branch: refs/heads/trunk
Commit: b2afb8c9698875ca4b6f2ef4f1f18abfcf9fc15e
Parents: 60bdfc0
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 29 15:52:23 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 29 15:52:23 2014 -0400
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b2afb8c9/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 2583a23..58947ad 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -16,12 +16,14 @@
*/
package org.apache.activemq.advisory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
@@ -70,7 +72,8 @@ public class AdvisoryBroker extends BrokerFilter {
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
- protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
+ private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
+ protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
@@ -103,7 +106,12 @@ public class AdvisoryBroker extends BrokerFilter {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
- consumers.offer(info);
+ consumersLock.writeLock().lock();
+ try {
+ consumers.put(info.getConsumerId(), info);
+ } finally {
+ consumersLock.writeLock().unlock();
+ }
fireConsumerAdvisory(context, info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
@@ -148,10 +156,15 @@ public class AdvisoryBroker extends BrokerFilter {
// Replay the consumers.
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
- for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) {
- ConsumerInfo value = iter.next();
- ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
- fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
+ consumersLock.readLock().lock();
+ try {
+ for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
+ ConsumerInfo value = iter.next();
+ ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
+ fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
+ }
+ } finally {
+ consumersLock.readLock().unlock();
}
}
@@ -266,7 +279,12 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQDestination dest = info.getDestination();
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
- consumers.remove(info);
+ consumersLock.writeLock().lock();
+ try {
+ consumers.remove(info.getConsumerId());
+ } finally {
+ consumersLock.writeLock().unlock();
+ }
if (!dest.isTemporary() || destinations.containsKey(dest)) {
fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
}
@@ -623,8 +641,13 @@ public class AdvisoryBroker extends BrokerFilter {
return connections;
}
- public Queue<ConsumerInfo> getAdvisoryConsumers() {
- return consumers;
+ public Collection<ConsumerInfo> getAdvisoryConsumers() {
+ consumersLock.readLock().lock();
+ try {
+ return new ArrayList<ConsumerInfo>(consumers.values());
+ } finally {
+ consumersLock.readLock().unlock();
+ }
}
public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {