You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/11 18:08:16 UTC
svn commit: r963095 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
BrokerService.java region/BaseDestination.java region/Queue.java
region/RegionBroker.java region/Topic.java
Author: rajdavies
Date: Sun Jul 11 16:08:15 2010
New Revision: 963095
URL: http://svn.apache.org/viewvc?rev=963095&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2821
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sun Jul 11 16:08:15 2010
@@ -195,6 +195,8 @@ public class BrokerService implements Se
private Scheduler scheduler;
private ThreadPoolExecutor executor;
private boolean slave = true;
+ private int schedulePeriodForDestinationPurge=5000;
+
static {
String localHostName = "localhost";
@@ -2303,5 +2305,13 @@ public class BrokerService implements Se
public void setSchedulerDirectory(String schedulerDirectory) {
setSchedulerDirectoryFile(new File(schedulerDirectory));
+ }
+
+ public int getSchedulePeriodForDestinationPurge() {
+ return this.schedulePeriodForDestinationPurge;
+ }
+
+ public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
+ this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Sun Jul 11 16:08:15 2010
@@ -48,6 +48,7 @@ public abstract class BaseDestination im
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
+ public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@@ -82,6 +83,9 @@ public abstract class BaseDestination im
protected int storeUsageHighWaterMark = 100;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
+ private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
+ private boolean gcIfInactive;
+ private long lastActiveTime=0l;
/**
* @param broker
@@ -196,11 +200,22 @@ public abstract class BaseDestination im
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().increment();
+ this.lastActiveTime=0l;
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().decrement();
}
+
+ public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
+ destinationStatistics.getConsumers().increment();
+ this.lastActiveTime=0l;
+ }
+
+ public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
+ destinationStatistics.getConsumers().decrement();
+ }
+
public final MemoryUsage getMemoryUsage() {
return memoryUsage;
@@ -595,5 +610,50 @@ public abstract class BaseDestination im
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
+
+ /**
+ * @return the inactiveTimoutBeforeGC
+ */
+ public long getInactiveTimoutBeforeGC() {
+ return this.inactiveTimoutBeforeGC;
+ }
+
+ /**
+ * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
+ */
+ public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
+ this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
+ }
+
+ /**
+ * @return the gcIfInactive
+ */
+ public boolean isGcIfInactive() {
+ return this.gcIfInactive;
+ }
+
+ /**
+ * @param gcIfInactive the gcIfInactive to set
+ */
+ public void setGcIfInactive(boolean gcIfInactive) {
+ this.gcIfInactive = gcIfInactive;
+ }
+
+ public void markForGC(long timeStamp) {
+ if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
+ && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
+ this.lastActiveTime = timeStamp;
+ }
+ }
+
+ public boolean canGC() {
+ boolean result = false;
+ if (isGcIfInactive()&& this.lastActiveTime != 0l) {
+ if ((System.currentTimeMillis() - this.lastActiveTime) > getInactiveTimoutBeforeGC()) {
+ result = true;
+ }
+ }
+ return result;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sun Jul 11 16:08:15 2010
@@ -355,6 +355,7 @@ public class Queue extends BaseDestinati
LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+ super.addSubscription(context, sub);
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
@@ -362,8 +363,7 @@ public class Queue extends BaseDestinati
try {
sub.add(context, this);
- destinationStatistics.getConsumers().increment();
-
+
// needs to be synchronized - so no contention with dispatching
// consumersLock.
consumersLock.writeLock().lock();
@@ -423,7 +423,7 @@ public class Queue extends BaseDestinati
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
throws Exception {
- destinationStatistics.getConsumers().decrement();
+ super.removeSubscription(context, sub, lastDeiveredSequenceId);
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sun Jul 11 16:08:15 2010
@@ -102,6 +102,12 @@ public class RegionBroker extends EmptyB
private ConnectionContext adminConnectionContext;
private final Scheduler scheduler;
private final ThreadPoolExecutor executor;
+
+ private final Runnable purgeInactiveDestinationsTask = new Runnable() {
+ public void run() {
+ purgeInactiveDestinations();
+ }
+ };
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
@@ -191,11 +197,16 @@ public class RegionBroker extends EmptyB
topicRegion.start();
tempQueueRegion.start();
tempTopicRegion.start();
+ int period = this.brokerService.getSchedulePeriodForDestinationPurge();
+ if (period > 0) {
+ this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
+ }
}
@Override
public void stop() throws Exception {
started = false;
+ this.scheduler.cancel(purgeInactiveDestinationsTask);
ServiceStopper ss = new ServiceStopper();
doStop(ss);
ss.throwFirstException();
@@ -351,26 +362,28 @@ public class RegionBroker extends EmptyB
}
@Override
- public void addProducer(ConnectionContext context, ProducerInfo info)
- throws Exception {
+ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- if (destination != null) {
+ synchronized (purgeInactiveDestinationsTask) {
+ if (destination != null) {
- // This seems to cause the destination to be added but without advisories firing...
- context.getBroker().addDestination(context, destination,false);
- switch (destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- queueRegion.addProducer(context, info);
- break;
- case ActiveMQDestination.TOPIC_TYPE:
- topicRegion.addProducer(context, info);
- break;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- tempQueueRegion.addProducer(context, info);
- break;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- tempTopicRegion.addProducer(context, info);
- break;
+ // This seems to cause the destination to be added but without
+ // advisories firing...
+ context.getBroker().addDestination(context, destination, false);
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ queueRegion.addProducer(context, info);
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ topicRegion.addProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ tempQueueRegion.addProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ tempTopicRegion.addProducer(context, info);
+ break;
+ }
}
}
}
@@ -378,20 +391,22 @@ public class RegionBroker extends EmptyB
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- if (destination != null) {
- switch (destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- queueRegion.removeProducer(context, info);
- break;
- case ActiveMQDestination.TOPIC_TYPE:
- topicRegion.removeProducer(context, info);
- break;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- tempQueueRegion.removeProducer(context, info);
- break;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- tempTopicRegion.removeProducer(context, info);
- break;
+ synchronized (purgeInactiveDestinationsTask) {
+ if (destination != null) {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ queueRegion.removeProducer(context, info);
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ topicRegion.removeProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ tempQueueRegion.removeProducer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ tempTopicRegion.removeProducer(context, info);
+ break;
+ }
}
}
}
@@ -399,48 +414,55 @@ public class RegionBroker extends EmptyB
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- switch (destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- return queueRegion.addConsumer(context, info);
+ synchronized (purgeInactiveDestinationsTask) {
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ return queueRegion.addConsumer(context, info);
- case ActiveMQDestination.TOPIC_TYPE:
- return topicRegion.addConsumer(context, info);
+ case ActiveMQDestination.TOPIC_TYPE:
+ return topicRegion.addConsumer(context, info);
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- return tempQueueRegion.addConsumer(context, info);
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return tempQueueRegion.addConsumer(context, info);
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- return tempTopicRegion.addConsumer(context, info);
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return tempTopicRegion.addConsumer(context, info);
- default:
- throw createUnknownDestinationTypeException(destination);
+ default:
+ throw createUnknownDestinationTypeException(destination);
+ }
}
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
- switch (destination.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- queueRegion.removeConsumer(context, info);
- break;
- case ActiveMQDestination.TOPIC_TYPE:
- topicRegion.removeConsumer(context, info);
- break;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- tempQueueRegion.removeConsumer(context, info);
- break;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- tempTopicRegion.removeConsumer(context, info);
- break;
- default:
- throw createUnknownDestinationTypeException(destination);
+ synchronized (purgeInactiveDestinationsTask) {
+ switch (destination.getDestinationType()) {
+
+ case ActiveMQDestination.QUEUE_TYPE:
+ queueRegion.removeConsumer(context, info);
+ break;
+ case ActiveMQDestination.TOPIC_TYPE:
+ topicRegion.removeConsumer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ tempQueueRegion.removeConsumer(context, info);
+ break;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ tempTopicRegion.removeConsumer(context, info);
+ break;
+ default:
+ throw createUnknownDestinationTypeException(destination);
+ }
}
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
- topicRegion.removeSubscription(context, info);
+ synchronized (purgeInactiveDestinationsTask) {
+ topicRegion.removeSubscription(context, info);
+ }
}
@Override
@@ -868,4 +890,38 @@ public class RegionBroker extends EmptyB
}
}
}
+
+ protected void purgeInactiveDestinations() {
+ synchronized (purgeInactiveDestinationsTask) {
+ List<BaseDestination> list = new ArrayList<BaseDestination>();
+ Map<ActiveMQDestination, Destination> map = getDestinationMap();
+ long timeStamp = System.currentTimeMillis();
+ for (Destination d : map.values()) {
+ if (d instanceof BaseDestination) {
+ BaseDestination bd = (BaseDestination) d;
+ bd.markForGC(timeStamp);
+ if (bd.canGC()) {
+ list.add(bd);
+ }
+ }
+ }
+
+ if (list.isEmpty() == false) {
+
+ ConnectionContext context = new ConnectionContext();
+ context.setBroker(this);
+
+ for (BaseDestination dest : list) {
+ dest.getLog().info(
+ dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
+ + " ms - removing ...");
+ try {
+ getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
+ } catch (Exception e) {
+ LOG.error("Failed to remove inactive destination " + dest, e);
+ }
+ }
+ }
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Sun Jul 11 16:08:15 2010
@@ -112,7 +112,7 @@ public class Topic extends BaseDestinati
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
- destinationStatistics.getConsumers().increment();
+ super.addSubscription(context, sub);
if (!sub.getConsumerInfo().isDurable()) {
@@ -152,7 +152,7 @@ public class Topic extends BaseDestinati
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
- destinationStatistics.getConsumers().decrement();
+ super.removeSubscription(context, sub, lastDeliveredSequenceId);
synchronized (consumers) {
consumers.remove(sub);
}