You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/23 23:51:10 UTC
svn commit: r1160894 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
AbstractRegion.java TopicRegion.java
Author: tabish
Date: Tue Aug 23 21:51:10 2011
New Revision: 1160894
URL: http://svn.apache.org/viewvc?rev=1160894&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3454
Use a read / write lock on the destination mutex as many of the access points are simply reads of the underlying maps or a quite copy to another Collection. The only time a write is done is during addDestination and removeDestination.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1160894&r1=1160893&r2=1160894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Tue Aug 23 21:51:10 2011
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
@@ -65,7 +67,7 @@ public abstract class AbstractRegion imp
protected final RegionBroker broker;
protected boolean autoCreateDestinations = true;
protected final TaskRunnerFactory taskRunnerFactory;
- protected final Object destinationsMutex = new Object();
+ protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
protected boolean started;
@@ -96,21 +98,27 @@ public abstract class AbstractRegion imp
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.getBroker().addDestination(context, dest, false);
}
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try{
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
Destination dest = i.next();
dest.start();
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
public void stop() throws Exception {
started = false;
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try{
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
Destination dest = i.next();
dest.stop();
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
destinations.clear();
}
@@ -118,7 +126,9 @@ public abstract class AbstractRegion imp
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {
LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
- synchronized (destinationsMutex) {
+
+ destinationsLock.writeLock().lock();
+ try {
Destination dest = destinations.get(destination);
if (dest == null) {
if (destination.isTemporary() == false || createIfTemporary) {
@@ -150,6 +160,8 @@ public abstract class AbstractRegion imp
}
}
return dest;
+ } finally {
+ destinationsLock.writeLock().unlock();
}
}
@@ -197,7 +209,8 @@ public abstract class AbstractRegion imp
LOG.debug("Removing destination: " + destination);
- synchronized (destinationsMutex) {
+ destinationsLock.writeLock().lock();
+ try {
Destination dest = destinations.remove(destination);
if (dest != null) {
// timeout<0 or we timed out, we now force any remaining
@@ -218,6 +231,8 @@ public abstract class AbstractRegion imp
} else {
LOG.debug("Destination doesn't exist: " + dest);
}
+ } finally {
+ destinationsLock.writeLock().unlock();
}
}
@@ -226,18 +241,26 @@ public abstract class AbstractRegion imp
*
* @return a set of matching destination objects.
*/
+ @SuppressWarnings("unchecked")
public Set<Destination> getDestinations(ActiveMQDestination destination) {
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try{
return destinationMap.get(destination);
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
public Map<ActiveMQDestination, Destination> getDestinationMap() {
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try{
return new HashMap<ActiveMQDestination, Destination>(destinations);
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
+ @SuppressWarnings("unchecked")
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
@@ -258,8 +281,7 @@ public abstract class AbstractRegion imp
synchronized (addGuard) {
Subscription o = subscriptions.get(info.getConsumerId());
if (o != null) {
- LOG
- .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
+ LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
return o;
}
@@ -293,11 +315,13 @@ public abstract class AbstractRegion imp
// Add the subscription to all the matching queues.
// But copy the matches first - to prevent deadlocks
List<Destination> addList = new ArrayList<Destination>();
- synchronized (destinationsMutex) {
- for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
- Destination dest = (Destination) iter.next();
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
addList.add(dest);
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
for (Destination dest : addList) {
@@ -317,6 +341,7 @@ public abstract class AbstractRegion imp
*
* @return Set of all stored destinations
*/
+ @SuppressWarnings("rawtypes")
public Set getDurableDestinations() {
return destinationFactory.getDestinations();
}
@@ -326,12 +351,16 @@ public abstract class AbstractRegion imp
*/
protected Set<ActiveMQDestination> getInactiveDestinations() {
Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try {
inactiveDests.removeAll(destinations.keySet());
+ } finally {
+ destinationsLock.readLock().unlock();
}
return inactiveDests;
}
+ @SuppressWarnings("unchecked")
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
@@ -342,12 +371,13 @@ public abstract class AbstractRegion imp
// remove the subscription from all the matching queues.
List<Destination> removeList = new ArrayList<Destination>();
- synchronized (destinationsMutex) {
- for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
- Destination dest = (Destination) iter.next();
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
removeList.add(dest);
-
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
for (Destination dest : removeList) {
dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
@@ -407,9 +437,14 @@ public abstract class AbstractRegion imp
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
Destination dest = null;
- synchronized (destinationsMutex) {
+
+ destinationsLock.readLock().lock();
+ try {
dest = destinations.get(destination);
+ } finally {
+ destinationsLock.readLock().unlock();
}
+
if (dest == null) {
if (isAutoCreateDestinations()) {
// Try to auto create the destination... re-invoke broker
@@ -423,10 +458,14 @@ public abstract class AbstractRegion imp
// this error
}
// We should now have the dest created.
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try {
dest = destinations.get(destination);
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
+
if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist.");
}
@@ -454,9 +493,13 @@ public abstract class AbstractRegion imp
protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
throws Exception {
Destination dest = null;
- synchronized (destinationsMutex) {
+ destinationsLock.readLock().lock();
+ try {
dest = destinations.get(messageDispatchNotification.getDestination());
+ } finally {
+ destinationsLock.readLock().unlock();
}
+
if (dest != null) {
dest.processDispatchNotification(messageDispatchNotification);
} else {
@@ -468,15 +511,17 @@ public abstract class AbstractRegion imp
}
public void gc() {
- for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = iter.next();
+ for (Subscription sub : subscriptions.values()) {
sub.gc();
}
- synchronized (destinationsMutex) {
- for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
- Destination dest = iter.next();
+
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : destinations.values()) {
dest.gc();
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
@@ -495,12 +540,15 @@ public abstract class AbstractRegion imp
this.autoCreateDestinations = autoCreateDestinations;
}
+ @SuppressWarnings("unchecked")
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
- synchronized (destinationsMutex) {
- for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
- Destination dest = (Destination) iter.next();
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
dest.addProducer(context, info);
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
@@ -512,12 +560,15 @@ public abstract class AbstractRegion imp
* @throws Exception
* TODO
*/
+ @SuppressWarnings("unchecked")
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
- synchronized (destinationsMutex) {
- for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
- Destination dest = (Destination) iter.next();
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
dest.removeProducer(context, info);
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1160894&r1=1160893&r2=1160894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Aug 23 21:51:10 2011
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class TopicRegion extends AbstractRegion {
private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
@@ -60,7 +60,6 @@ public class TopicRegion extends Abstrac
public void run() {
doCleanup();
}
-
};
this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
}
@@ -118,15 +117,17 @@ public class TopicRegion extends Abstrac
if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
// Remove the consumer first then add it.
durableSubscriptions.remove(key);
- synchronized (destinationsMutex) {
- for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
- Destination dest = iter.next();
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : destinations.values()) {
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
@@ -179,16 +180,19 @@ public class TopicRegion extends Abstrac
throw new JMSException("Durable consumer is in use");
}
- synchronized (destinationsMutex) {
- for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
- Destination dest = iter.next();
- //Account for virtual destinations
- if (dest instanceof Topic){
- Topic topic = (Topic)dest;
- topic.deleteSubscription(context, key);
- }
+ destinationsLock.readLock().lock();
+ try {
+ for (Destination dest : destinations.values()) {
+ //Account for virtual destinations
+ if (dest instanceof Topic){
+ Topic topic = (Topic)dest;
+ topic.deleteSubscription(context, key);
+ }
}
+ } finally {
+ destinationsLock.readLock().unlock();
}
+
if (subscriptions.get(sub.getConsumerInfo()) != null) {
super.removeConsumer(context, sub.getConsumerInfo());
} else {
@@ -243,8 +247,7 @@ public class TopicRegion extends Abstrac
// Now perhaps there other durable subscriptions (via wild card)
// that would match this destination..
durableSubscriptions.values();
- for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
- DurableTopicSubscription sub = iterator.next();
+ for (DurableTopicSubscription sub : durableSubscriptions.values()) {
// Skip over subscriptions that we allready added..
if (dupChecker.contains(sub)) {
continue;
@@ -284,16 +287,16 @@ public class TopicRegion extends Abstrac
@Override
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
ActiveMQDestination destination = info.getDestination();
-
+
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
-
+
if (sub == null) {
-
+
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {