You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/11 18:08:16 UTC

svn commit: r963095 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: BrokerService.java region/BaseDestination.java region/Queue.java region/RegionBroker.java region/Topic.java

Author: rajdavies
Date: Sun Jul 11 16:08:15 2010
New Revision: 963095

URL: http://svn.apache.org/viewvc?rev=963095&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2821

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sun Jul 11 16:08:15 2010
@@ -195,6 +195,8 @@ public class BrokerService implements Se
     private Scheduler scheduler;
     private ThreadPoolExecutor executor;
     private boolean slave = true;
+    private int schedulePeriodForDestinationPurge=5000;
+    
     
 	static {
         String localHostName = "localhost";
@@ -2303,5 +2305,13 @@ public class BrokerService implements Se
     
     public void setSchedulerDirectory(String schedulerDirectory) {
         setSchedulerDirectoryFile(new File(schedulerDirectory));
+    }
+
+    public int getSchedulePeriodForDestinationPurge() {
+        return this.schedulePeriodForDestinationPurge;
+    }
+
+    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
+        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
     }   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Sun Jul 11 16:08:15 2010
@@ -48,6 +48,7 @@ public abstract class BaseDestination im
     public static final int MAX_PAGE_SIZE = 200;
     public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
     public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
+    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
@@ -82,6 +83,9 @@ public abstract class BaseDestination im
     protected int storeUsageHighWaterMark = 100;
     private SlowConsumerStrategy slowConsumerStrategy;
     private boolean prioritizedMessages;
+    private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
+    private boolean gcIfInactive;
+    private long lastActiveTime=0l;
 
     /**
      * @param broker
@@ -196,11 +200,22 @@ public abstract class BaseDestination im
 
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationStatistics.getProducers().increment();
+        this.lastActiveTime=0l;
     }
 
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationStatistics.getProducers().decrement();
     }
+    
+    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
+        destinationStatistics.getConsumers().increment();
+        this.lastActiveTime=0l;
+    }
+
+    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
+        destinationStatistics.getConsumers().decrement();
+    }
+
 
     public final MemoryUsage getMemoryUsage() {
         return memoryUsage;
@@ -595,5 +610,50 @@ public abstract class BaseDestination im
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         this.prioritizedMessages = prioritizedMessages;
     }
+
+    /**
+     * @return the inactiveTimoutBeforeGC
+     */
+    public long getInactiveTimoutBeforeGC() {
+        return this.inactiveTimoutBeforeGC;
+    }
+
+    /**
+     * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
+     */
+    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
+        this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
+    }
+
+    /**
+     * @return the gcIfInactive
+     */
+    public boolean isGcIfInactive() {
+        return this.gcIfInactive;
+    }
+
+    /**
+     * @param gcIfInactive the gcIfInactive to set
+     */
+    public void setGcIfInactive(boolean gcIfInactive) {
+        this.gcIfInactive = gcIfInactive;
+    }
+    
+    public void markForGC(long timeStamp) {
+        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
+                && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
+            this.lastActiveTime = timeStamp;
+        }
+    }
+
+    public boolean canGC() {
+        boolean result = false;
+        if (isGcIfInactive()&& this.lastActiveTime != 0l) {
+            if ((System.currentTimeMillis() - this.lastActiveTime) > getInactiveTimoutBeforeGC()) {
+                result = true;
+            }
+        }
+        return result;
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sun Jul 11 16:08:15 2010
@@ -355,6 +355,7 @@ public class Queue extends BaseDestinati
     LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+        super.addSubscription(context, sub);
         // synchronize with dispatch method so that no new messages are sent
         // while setting up a subscription. avoid out of order messages,
         // duplicates, etc.
@@ -362,8 +363,7 @@ public class Queue extends BaseDestinati
         try {
 
             sub.add(context, this);
-            destinationStatistics.getConsumers().increment();
-
+           
             // needs to be synchronized - so no contention with dispatching
            // consumersLock.
             consumersLock.writeLock().lock();
@@ -423,7 +423,7 @@ public class Queue extends BaseDestinati
 
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
             throws Exception {
-        destinationStatistics.getConsumers().decrement();
+        super.removeSubscription(context, sub, lastDeiveredSequenceId);
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
         pagedInPendingDispatchLock.writeLock().lock();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sun Jul 11 16:08:15 2010
@@ -102,6 +102,12 @@ public class RegionBroker extends EmptyB
     private ConnectionContext adminConnectionContext;
     private final Scheduler scheduler;
     private final ThreadPoolExecutor executor;
+    
+    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
+        public void run() {
+            purgeInactiveDestinations();
+        }
+    };
 
     public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
                         DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
@@ -191,11 +197,16 @@ public class RegionBroker extends EmptyB
         topicRegion.start();
         tempQueueRegion.start();
         tempTopicRegion.start();
+        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
+        if (period > 0) {
+            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
+        }
     }
 
     @Override
     public void stop() throws Exception {
         started = false;
+        this.scheduler.cancel(purgeInactiveDestinationsTask);
         ServiceStopper ss = new ServiceStopper();
         doStop(ss);
         ss.throwFirstException();
@@ -351,26 +362,28 @@ public class RegionBroker extends EmptyB
     }
 
     @Override
-    public void addProducer(ConnectionContext context, ProducerInfo info)
-            throws Exception {
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        if (destination != null) {
+        synchronized (purgeInactiveDestinationsTask) {
+            if (destination != null) {
 
-            // This seems to cause the destination to be added but without advisories firing...
-            context.getBroker().addDestination(context, destination,false);
-            switch (destination.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                queueRegion.addProducer(context, info);
-                break;
-            case ActiveMQDestination.TOPIC_TYPE:
-                topicRegion.addProducer(context, info);
-                break;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                tempQueueRegion.addProducer(context, info);
-                break;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                tempTopicRegion.addProducer(context, info);
-                break;
+                // This seems to cause the destination to be added but without
+                // advisories firing...
+                context.getBroker().addDestination(context, destination, false);
+                switch (destination.getDestinationType()) {
+                case ActiveMQDestination.QUEUE_TYPE:
+                    queueRegion.addProducer(context, info);
+                    break;
+                case ActiveMQDestination.TOPIC_TYPE:
+                    topicRegion.addProducer(context, info);
+                    break;
+                case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                    tempQueueRegion.addProducer(context, info);
+                    break;
+                case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                    tempTopicRegion.addProducer(context, info);
+                    break;
+                }
             }
         }
     }
@@ -378,20 +391,22 @@ public class RegionBroker extends EmptyB
     @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        if (destination != null) {
-            switch (destination.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                queueRegion.removeProducer(context, info);
-                break;
-            case ActiveMQDestination.TOPIC_TYPE:
-                topicRegion.removeProducer(context, info);
-                break;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                tempQueueRegion.removeProducer(context, info);
-                break;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                tempTopicRegion.removeProducer(context, info);
-                break;
+        synchronized (purgeInactiveDestinationsTask) {
+            if (destination != null) {
+                switch (destination.getDestinationType()) {
+                case ActiveMQDestination.QUEUE_TYPE:
+                    queueRegion.removeProducer(context, info);
+                    break;
+                case ActiveMQDestination.TOPIC_TYPE:
+                    topicRegion.removeProducer(context, info);
+                    break;
+                case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                    tempQueueRegion.removeProducer(context, info);
+                    break;
+                case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                    tempTopicRegion.removeProducer(context, info);
+                    break;
+                }
             }
         }
     }
@@ -399,48 +414,55 @@ public class RegionBroker extends EmptyB
     @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            return queueRegion.addConsumer(context, info);
+        synchronized (purgeInactiveDestinationsTask) {
+            switch (destination.getDestinationType()) {
+            case ActiveMQDestination.QUEUE_TYPE:
+                return queueRegion.addConsumer(context, info);
 
-        case ActiveMQDestination.TOPIC_TYPE:
-            return topicRegion.addConsumer(context, info);
+            case ActiveMQDestination.TOPIC_TYPE:
+                return topicRegion.addConsumer(context, info);
 
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            return tempQueueRegion.addConsumer(context, info);
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                return tempQueueRegion.addConsumer(context, info);
 
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            return tempTopicRegion.addConsumer(context, info);
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                return tempTopicRegion.addConsumer(context, info);
 
-        default:
-            throw createUnknownDestinationTypeException(destination);
+            default:
+                throw createUnknownDestinationTypeException(destination);
+            }
         }
     }
 
     @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
-        switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            queueRegion.removeConsumer(context, info);
-            break;
-        case ActiveMQDestination.TOPIC_TYPE:
-            topicRegion.removeConsumer(context, info);
-            break;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            tempQueueRegion.removeConsumer(context, info);
-            break;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            tempTopicRegion.removeConsumer(context, info);
-            break;
-        default:
-            throw createUnknownDestinationTypeException(destination);
+        synchronized (purgeInactiveDestinationsTask) {
+            switch (destination.getDestinationType()) {
+
+            case ActiveMQDestination.QUEUE_TYPE:
+                queueRegion.removeConsumer(context, info);
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                topicRegion.removeConsumer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                tempQueueRegion.removeConsumer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                tempTopicRegion.removeConsumer(context, info);
+                break;
+            default:
+                throw createUnknownDestinationTypeException(destination);
+            }
         }
     }
 
     @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
-        topicRegion.removeSubscription(context, info);
+        synchronized (purgeInactiveDestinationsTask) {
+            topicRegion.removeSubscription(context, info);
+        }
     }
 
     @Override
@@ -868,4 +890,38 @@ public class RegionBroker extends EmptyB
             }
         }
     }
+    
+    protected void purgeInactiveDestinations() {
+        synchronized (purgeInactiveDestinationsTask) {
+            List<BaseDestination> list = new ArrayList<BaseDestination>();
+            Map<ActiveMQDestination, Destination> map = getDestinationMap();
+            long timeStamp = System.currentTimeMillis();
+            for (Destination d : map.values()) {
+                if (d instanceof BaseDestination) {
+                    BaseDestination bd = (BaseDestination) d;
+                    bd.markForGC(timeStamp);
+                    if (bd.canGC()) {
+                        list.add(bd);
+                    }
+                }
+            }
+
+            if (list.isEmpty() == false) {
+
+                ConnectionContext context = new ConnectionContext();
+                context.setBroker(this);
+
+                for (BaseDestination dest : list) {
+                    dest.getLog().info(
+                            dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
+                                    + " ms - removing ...");
+                    try {
+                        getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
+                    } catch (Exception e) {
+                        LOG.error("Failed to remove inactive destination " + dest, e);
+                    }
+                }
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=963095&r1=963094&r2=963095&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Sun Jul 11 16:08:15 2010
@@ -112,7 +112,7 @@ public class Topic extends BaseDestinati
 
     public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
 
-        destinationStatistics.getConsumers().increment();
+       super.addSubscription(context, sub);
 
         if (!sub.getConsumerInfo().isDurable()) {
 
@@ -152,7 +152,7 @@ public class Topic extends BaseDestinati
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
             throws Exception {
         if (!sub.getConsumerInfo().isDurable()) {
-            destinationStatistics.getConsumers().decrement();
+            super.removeSubscription(context, sub, lastDeliveredSequenceId);
             synchronized (consumers) {
                 consumers.remove(sub);
             }