You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/01 00:22:04 UTC
svn commit: r1141741 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Author: tabish
Date: Thu Jun 30 22:22:04 2011
New Revision: 1141741
URL: http://svn.apache.org/viewvc?rev=1141741&view=rev
Log:
Apply patch for: https://issues.apache.org/jira/browse/AMQ-3326
Use of sync block was causing unneeded serialization of calls to add and remove producers and consumers when the inactivity purge wasn't running.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1141741&r1=1141740&r2=1141741&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Jun 30 22:22:04 2011
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
@@ -105,6 +106,8 @@ public class RegionBroker extends EmptyB
private final Scheduler scheduler;
private final ThreadPoolExecutor executor;
private boolean allowTempAutoCreationOnSend;
+
+ private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
private final Runnable purgeInactiveDestinationsTask = new Runnable() {
public void run() {
purgeInactiveDestinations();
@@ -388,9 +391,9 @@ public class RegionBroker extends EmptyB
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- synchronized (purgeInactiveDestinationsTask) {
- if (destination != null) {
-
+ if (destination != null) {
+ inactiveDestinationsPurgeLock.readLock().lock();
+ try {
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, true);
@@ -408,6 +411,8 @@ public class RegionBroker extends EmptyB
tempTopicRegion.addProducer(context, info);
break;
}
+ } finally {
+ inactiveDestinationsPurgeLock.readLock().unlock();
}
}
}
@@ -415,8 +420,9 @@ public class RegionBroker extends EmptyB
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- synchronized (purgeInactiveDestinationsTask) {
- if (destination != null) {
+ if (destination != null) {
+ inactiveDestinationsPurgeLock.readLock().lock();
+ try {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeProducer(context, info);
@@ -431,6 +437,8 @@ public class RegionBroker extends EmptyB
tempTopicRegion.removeProducer(context, info);
break;
}
+ } finally {
+ inactiveDestinationsPurgeLock.readLock().unlock();
}
}
}
@@ -441,7 +449,8 @@ public class RegionBroker extends EmptyB
if (destinationInterceptor != null) {
destinationInterceptor.create(this, context, destination);
}
- synchronized (purgeInactiveDestinationsTask) {
+ inactiveDestinationsPurgeLock.readLock().lock();
+ try {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.addConsumer(context, info);
@@ -458,13 +467,16 @@ public class RegionBroker extends EmptyB
default:
throw createUnknownDestinationTypeException(destination);
}
+ } finally {
+ inactiveDestinationsPurgeLock.readLock().unlock();
}
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- synchronized (purgeInactiveDestinationsTask) {
+ inactiveDestinationsPurgeLock.readLock().lock();
+ try {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
@@ -482,13 +494,18 @@ public class RegionBroker extends EmptyB
default:
throw createUnknownDestinationTypeException(destination);
}
+ } finally {
+ inactiveDestinationsPurgeLock.readLock().unlock();
}
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
- synchronized (purgeInactiveDestinationsTask) {
+ inactiveDestinationsPurgeLock.readLock().lock();
+ try {
topicRegion.removeSubscription(context, info);
+ } finally {
+ inactiveDestinationsPurgeLock.readLock().unlock();
}
}
@@ -934,7 +951,8 @@ public class RegionBroker extends EmptyB
}
protected void purgeInactiveDestinations() {
- synchronized (purgeInactiveDestinationsTask) {
+ inactiveDestinationsPurgeLock.writeLock().lock();
+ try {
List<BaseDestination> list = new ArrayList<BaseDestination>();
Map<ActiveMQDestination, Destination> map = getDestinationMap();
if (isAllowTempAutoCreationOnSend()) {
@@ -968,6 +986,8 @@ public class RegionBroker extends EmptyB
}
}
}
+ } finally {
+ inactiveDestinationsPurgeLock.writeLock().unlock();
}
}