You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/01 00:22:04 UTC

svn commit: r1141741 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Author: tabish
Date: Thu Jun 30 22:22:04 2011
New Revision: 1141741

URL: http://svn.apache.org/viewvc?rev=1141741&view=rev
Log:
Apply patch for: https://issues.apache.org/jira/browse/AMQ-3326

Use of sync block was causing unneeded serialization of calls to add and remove producers and consumers when the inactivity purge wasn't running.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1141741&r1=1141740&r2=1141741&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Jun 30 22:22:04 2011
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -105,6 +106,8 @@ public class RegionBroker extends EmptyB
     private final Scheduler scheduler;
     private final ThreadPoolExecutor executor;
     private boolean allowTempAutoCreationOnSend;
+
+    private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
     private final Runnable purgeInactiveDestinationsTask = new Runnable() {
         public void run() {
             purgeInactiveDestinations();
@@ -388,9 +391,9 @@ public class RegionBroker extends EmptyB
     @Override
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        synchronized (purgeInactiveDestinationsTask) {
-            if (destination != null) {
-
+        if (destination != null) {
+            inactiveDestinationsPurgeLock.readLock().lock();
+            try {
                 // This seems to cause the destination to be added but without
                 // advisories firing...
                 context.getBroker().addDestination(context, destination, true);
@@ -408,6 +411,8 @@ public class RegionBroker extends EmptyB
                     tempTopicRegion.addProducer(context, info);
                     break;
                 }
+            } finally {
+                inactiveDestinationsPurgeLock.readLock().unlock();
             }
         }
     }
@@ -415,8 +420,9 @@ public class RegionBroker extends EmptyB
     @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        synchronized (purgeInactiveDestinationsTask) {
-            if (destination != null) {
+        if (destination != null) {
+            inactiveDestinationsPurgeLock.readLock().lock();
+            try {
                 switch (destination.getDestinationType()) {
                 case ActiveMQDestination.QUEUE_TYPE:
                     queueRegion.removeProducer(context, info);
@@ -431,6 +437,8 @@ public class RegionBroker extends EmptyB
                     tempTopicRegion.removeProducer(context, info);
                     break;
                 }
+            } finally {
+                inactiveDestinationsPurgeLock.readLock().unlock();
             }
         }
     }
@@ -441,7 +449,8 @@ public class RegionBroker extends EmptyB
         if (destinationInterceptor != null) {
             destinationInterceptor.create(this, context, destination);
         }
-        synchronized (purgeInactiveDestinationsTask) {
+        inactiveDestinationsPurgeLock.readLock().lock();
+        try {
             switch (destination.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:
                 return queueRegion.addConsumer(context, info);
@@ -458,13 +467,16 @@ public class RegionBroker extends EmptyB
             default:
                 throw createUnknownDestinationTypeException(destination);
             }
+        } finally {
+            inactiveDestinationsPurgeLock.readLock().unlock();
         }
     }
 
     @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        synchronized (purgeInactiveDestinationsTask) {
+        inactiveDestinationsPurgeLock.readLock().lock();
+        try {
             switch (destination.getDestinationType()) {
 
             case ActiveMQDestination.QUEUE_TYPE:
@@ -482,13 +494,18 @@ public class RegionBroker extends EmptyB
             default:
                 throw createUnknownDestinationTypeException(destination);
             }
+        } finally {
+            inactiveDestinationsPurgeLock.readLock().unlock();
         }
     }
 
     @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
-        synchronized (purgeInactiveDestinationsTask) {
+        inactiveDestinationsPurgeLock.readLock().lock();
+        try {
             topicRegion.removeSubscription(context, info);
+        } finally {
+            inactiveDestinationsPurgeLock.readLock().unlock();
         }
     }
 
@@ -934,7 +951,8 @@ public class RegionBroker extends EmptyB
     }
 
     protected void purgeInactiveDestinations() {
-        synchronized (purgeInactiveDestinationsTask) {
+        inactiveDestinationsPurgeLock.writeLock().lock();
+        try {
             List<BaseDestination> list = new ArrayList<BaseDestination>();
             Map<ActiveMQDestination, Destination> map = getDestinationMap();
             if (isAllowTempAutoCreationOnSend()) {
@@ -968,6 +986,8 @@ public class RegionBroker extends EmptyB
                     }
                 }
             }
+        } finally {
+            inactiveDestinationsPurgeLock.writeLock().unlock();
         }
     }