You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/23 23:51:10 UTC

svn commit: r1160894 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: AbstractRegion.java TopicRegion.java

Author: tabish
Date: Tue Aug 23 21:51:10 2011
New Revision: 1160894

URL: http://svn.apache.org/viewvc?rev=1160894&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3454

Use a read / write lock on the destination mutex as many of the access points are simply reads of the underlying maps or a quite copy to another Collection.  The only time a write is done is during addDestination and removeDestination.  

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1160894&r1=1160893&r2=1160894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Tue Aug 23 21:51:10 2011
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import javax.jms.JMSException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ConsumerBrokerExchange;
@@ -65,7 +67,7 @@ public abstract class AbstractRegion imp
     protected final RegionBroker broker;
     protected boolean autoCreateDestinations = true;
     protected final TaskRunnerFactory taskRunnerFactory;
-    protected final Object destinationsMutex = new Object();
+    protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
     protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
     protected boolean started;
 
@@ -96,21 +98,27 @@ public abstract class AbstractRegion imp
             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
             context.getBroker().addDestination(context, dest, false);
         }
-        synchronized (destinationsMutex) {
+        destinationsLock.readLock().lock();
+        try{
             for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
                 Destination dest = i.next();
                 dest.start();
             }
+        } finally {
+            destinationsLock.readLock().unlock();
         }
     }
 
     public void stop() throws Exception {
         started = false;
-        synchronized (destinationsMutex) {
+        destinationsLock.readLock().lock();
+        try{
             for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
                 Destination dest = i.next();
                 dest.stop();
             }
+        } finally {
+            destinationsLock.readLock().unlock();
         }
         destinations.clear();
     }
@@ -118,7 +126,9 @@ public abstract class AbstractRegion imp
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
             boolean createIfTemporary) throws Exception {
         LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
-        synchronized (destinationsMutex) {
+
+        destinationsLock.writeLock().lock();
+        try {
             Destination dest = destinations.get(destination);
             if (dest == null) {
                 if (destination.isTemporary() == false || createIfTemporary) {
@@ -150,6 +160,8 @@ public abstract class AbstractRegion imp
                 }
             }
             return dest;
+        } finally {
+            destinationsLock.writeLock().unlock();
         }
     }
 
@@ -197,7 +209,8 @@ public abstract class AbstractRegion imp
 
         LOG.debug("Removing destination: " + destination);
 
-        synchronized (destinationsMutex) {
+        destinationsLock.writeLock().lock();
+        try {
             Destination dest = destinations.remove(destination);
             if (dest != null) {
                 // timeout<0 or we timed out, we now force any remaining
@@ -218,6 +231,8 @@ public abstract class AbstractRegion imp
             } else {
                 LOG.debug("Destination doesn't exist: " + dest);
             }
+        } finally {
+            destinationsLock.writeLock().unlock();
         }
     }
 
@@ -226,18 +241,26 @@ public abstract class AbstractRegion imp
      *
      * @return a set of matching destination objects.
      */
+    @SuppressWarnings("unchecked")
     public Set<Destination> getDestinations(ActiveMQDestination destination) {
-        synchronized (destinationsMutex) {
+        destinationsLock.readLock().lock();
+        try{
             return destinationMap.get(destination);
+        } finally {
+            destinationsLock.readLock().unlock();
         }
     }
 
     public Map<ActiveMQDestination, Destination> getDestinationMap() {
-        synchronized (destinationsMutex) {
+        destinationsLock.readLock().lock();
+        try{
             return new HashMap<ActiveMQDestination, Destination>(destinations);
+        } finally {
+            destinationsLock.readLock().unlock();
         }
     }
 
+    @SuppressWarnings("unchecked")
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
                 + info.getDestination());
@@ -258,8 +281,7 @@ public abstract class AbstractRegion imp
         synchronized (addGuard) {
             Subscription o = subscriptions.get(info.getConsumerId());
             if (o != null) {
-                LOG
-                        .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
+                LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
                 return o;
             }
 
@@ -293,11 +315,13 @@ public abstract class AbstractRegion imp
             // Add the subscription to all the matching queues.
             // But copy the matches first - to prevent deadlocks
             List<Destination> addList = new ArrayList<Destination>();
-            synchronized (destinationsMutex) {
-                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
-                    Destination dest = (Destination) iter.next();
+            destinationsLock.readLock().lock();
+            try {
+                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
                     addList.add(dest);
                 }
+            } finally {
+                destinationsLock.readLock().unlock();
             }
 
             for (Destination dest : addList) {
@@ -317,6 +341,7 @@ public abstract class AbstractRegion imp
      *
      * @return Set of all stored destinations
      */
+    @SuppressWarnings("rawtypes")
     public Set getDurableDestinations() {
         return destinationFactory.getDestinations();
     }
@@ -326,12 +351,16 @@ public abstract class AbstractRegion imp
      */
     protected Set<ActiveMQDestination> getInactiveDestinations() {
         Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
-        synchronized (destinationsMutex) {
+        destinationsLock.readLock().lock();
+        try {
             inactiveDests.removeAll(destinations.keySet());
+        } finally {
+            destinationsLock.readLock().unlock();
         }
         return inactiveDests;
     }
 
+    @SuppressWarnings("unchecked")
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
                 + info.getDestination());
@@ -342,12 +371,13 @@ public abstract class AbstractRegion imp
 
             // remove the subscription from all the matching queues.
             List<Destination> removeList = new ArrayList<Destination>();
-            synchronized (destinationsMutex) {
-                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
-                    Destination dest = (Destination) iter.next();
+            destinationsLock.readLock().lock();
+            try {
+                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
                     removeList.add(dest);
-
                 }
+            } finally {
+                destinationsLock.readLock().unlock();
             }
             for (Destination dest : removeList) {
                 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
@@ -407,9 +437,14 @@ public abstract class AbstractRegion imp
 
     protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
         Destination dest = null;
-        synchronized (destinationsMutex) {
+
+        destinationsLock.readLock().lock();
+        try {
             dest = destinations.get(destination);
+        } finally {
+            destinationsLock.readLock().unlock();
         }
+
         if (dest == null) {
             if (isAutoCreateDestinations()) {
                 // Try to auto create the destination... re-invoke broker
@@ -423,10 +458,14 @@ public abstract class AbstractRegion imp
                     // this error
                 }
                 // We should now have the dest created.
-                synchronized (destinationsMutex) {
+                destinationsLock.readLock().lock();
+                try {
                     dest = destinations.get(destination);
+                } finally {
+                    destinationsLock.readLock().unlock();
                 }
             }
+
             if (dest == null) {
                 throw new JMSException("The destination " + destination + " does not exist.");
             }
@@ -454,9 +493,13 @@ public abstract class AbstractRegion imp
     protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
             throws Exception {
         Destination dest = null;
-        synchronized (destinationsMutex) {
+        destinationsLock.readLock().lock();
+        try {
             dest = destinations.get(messageDispatchNotification.getDestination());
+        } finally {
+            destinationsLock.readLock().unlock();
         }
+
         if (dest != null) {
             dest.processDispatchNotification(messageDispatchNotification);
         } else {
@@ -468,15 +511,17 @@ public abstract class AbstractRegion imp
     }
 
     public void gc() {
-        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
-            Subscription sub = iter.next();
+        for (Subscription sub : subscriptions.values()) {
             sub.gc();
         }
-        synchronized (destinationsMutex) {
-            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
-                Destination dest = iter.next();
+
+        destinationsLock.readLock().lock();
+        try {
+            for (Destination dest : destinations.values()) {
                 dest.gc();
             }
+        } finally {
+            destinationsLock.readLock().unlock();
         }
     }
 
@@ -495,12 +540,15 @@ public abstract class AbstractRegion imp
         this.autoCreateDestinations = autoCreateDestinations;
     }
 
+    @SuppressWarnings("unchecked")
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
-        synchronized (destinationsMutex) {
-            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
-                Destination dest = (Destination) iter.next();
+        destinationsLock.readLock().lock();
+        try {
+            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
                 dest.addProducer(context, info);
             }
+        } finally {
+            destinationsLock.readLock().unlock();
         }
     }
 
@@ -512,12 +560,15 @@ public abstract class AbstractRegion imp
      * @throws Exception
      *             TODO
      */
+    @SuppressWarnings("unchecked")
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
-        synchronized (destinationsMutex) {
-            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
-                Destination dest = (Destination) iter.next();
+        destinationsLock.readLock().lock();
+        try {
+            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
                 dest.removeProducer(context, info);
             }
+        } finally {
+            destinationsLock.readLock().unlock();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1160894&r1=1160893&r2=1160894&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Aug 23 21:51:10 2011
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class TopicRegion extends AbstractRegion {
     private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
@@ -60,7 +60,6 @@ public class TopicRegion extends Abstrac
                 public void run() {
                     doCleanup();
                 }
-
             };
             this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
         }
@@ -118,15 +117,17 @@ public class TopicRegion extends Abstrac
                 if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
                     // Remove the consumer first then add it.
                     durableSubscriptions.remove(key);
-                    synchronized (destinationsMutex) {
-                        for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
-                            Destination dest = iter.next();
+                    destinationsLock.readLock().lock();
+                    try {
+                        for (Destination dest : destinations.values()) {
                             //Account for virtual destinations
                             if (dest instanceof Topic){
                                 Topic topic = (Topic)dest;
                                 topic.deleteSubscription(context, key);
                             }
                         }
+                    } finally {
+                        destinationsLock.readLock().unlock();
                     }
                     super.removeConsumer(context, sub.getConsumerInfo());
                     super.addConsumer(context, info);
@@ -179,16 +180,19 @@ public class TopicRegion extends Abstrac
             throw new JMSException("Durable consumer is in use");
         }
 
-        synchronized (destinationsMutex) {
-            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
-            	Destination dest = iter.next();
-            	//Account for virtual destinations
-            	if (dest instanceof Topic){
-            	    Topic topic = (Topic)dest;
-            	    topic.deleteSubscription(context, key);
-            	}
+        destinationsLock.readLock().lock();
+        try {
+            for (Destination dest : destinations.values()) {
+                //Account for virtual destinations
+                if (dest instanceof Topic){
+                    Topic topic = (Topic)dest;
+                    topic.deleteSubscription(context, key);
+                }
             }
+        } finally {
+            destinationsLock.readLock().unlock();
         }
+
         if (subscriptions.get(sub.getConsumerInfo()) != null) {
             super.removeConsumer(context, sub.getConsumerInfo());
         } else {
@@ -243,8 +247,7 @@ public class TopicRegion extends Abstrac
             // Now perhaps there other durable subscriptions (via wild card)
             // that would match this destination..
             durableSubscriptions.values();
-            for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
-                DurableTopicSubscription sub = iterator.next();
+            for (DurableTopicSubscription sub : durableSubscriptions.values()) {
                 // Skip over subscriptions that we allready added..
                 if (dupChecker.contains(sub)) {
                     continue;
@@ -284,16 +287,16 @@ public class TopicRegion extends Abstrac
     @Override
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
         ActiveMQDestination destination = info.getDestination();
-        
+
         if (info.isDurable()) {
             if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
                 throw new JMSException("Cannot create a durable subscription for an advisory Topic");
             }
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
             DurableTopicSubscription sub = durableSubscriptions.get(key);
-            
+
             if (sub == null) {
-                
+
                 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
 
                 if (destination != null && broker.getDestinationPolicy() != null) {