You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/07/19 14:11:46 UTC

svn commit: r1148289 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/usecases/

Author: dejanb
Date: Tue Jul 19 12:11:45 2011
New Revision: 1148289

URL: http://svn.apache.org/viewvc?rev=1148289&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3408 - remove durable subscribers after certain time of inactivity

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionRemoveOfflineTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1148289&r1=1148288&r2=1148289&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Jul 19 12:11:45 2011
@@ -201,6 +201,9 @@ public class BrokerService implements Se
     private boolean networkConnectorStartAsync = false;
     private boolean allowTempAutoCreationOnSend;
 
+    private int offlineDurableSubscriberTimeout = -1;
+    private int offlineDurableSubscriberTaskSchedule = 30000;
+
     static {
         String localHostName = "localhost";
         try {
@@ -2448,4 +2451,20 @@ public class BrokerService implements Se
     public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
         this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
     }
+
+    public int getOfflineDurableSubscriberTimeout() {
+        return offlineDurableSubscriberTimeout;
+    }
+
+    public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
+        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
+    }
+
+    public int getOfflineDurableSubscriberTaskSchedule() {
+        return offlineDurableSubscriberTaskSchedule;
+    }
+
+    public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
+        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1148289&r1=1148288&r2=1148289&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Jul 19 12:11:45 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -34,7 +35,6 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
@@ -51,6 +51,7 @@ public class DurableTopicSubscription ex
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
     private AtomicBoolean active = new AtomicBoolean();
+    private AtomicLong offlineTimestamp = new AtomicLong(-1);
 
     public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws JMSException {
@@ -67,6 +68,10 @@ public class DurableTopicSubscription ex
         return active.get();
     }
 
+    public final long getOfflineTimestamp() {
+        return offlineTimestamp.get();
+    }
+
     public boolean isFull() {
         return !active.get() || super.isFull();
     }
@@ -149,6 +154,7 @@ public class DurableTopicSubscription ex
                 }
             }
             this.active.set(true);
+            this.offlineTimestamp.set(-1);
             dispatchPending();
             this.usageManager.getMemoryUsage().addUsageListener(this);
         }
@@ -157,6 +163,7 @@ public class DurableTopicSubscription ex
     public void deactivate(boolean keepDurableSubsActive) throws Exception {
         LOG.debug("Deactivating " + this);
         active.set(false);
+        offlineTimestamp.set(System.currentTimeMillis());
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (pending) {
             pending.stop();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1148289&r1=1148288&r2=1148289&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Jul 19 12:11:45 2011
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
@@ -51,10 +48,54 @@ public class TopicRegion extends Abstrac
     private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
     private boolean keepDurableSubsActive;
 
+    private Timer cleanupTimer;
+    private TimerTask cleanupTask;
+
     public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                        DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+        if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
+            this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
+            this.cleanupTask = new TimerTask() {
+                public void run() {
+                    doCleanup();
+                }
 
+            };
+            this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+        if (cleanupTimer != null) {
+            cleanupTimer.cancel();
+        }
+    }
+
+    public void doCleanup() {
+        long now = System.currentTimeMillis();
+        for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
+            DurableTopicSubscription sub = entry.getValue();
+            if (!sub.isActive()) {
+               long offline = sub.getOfflineTimestamp();
+                if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
+                    LOG.info("Destroying durable subscriber due to inactivity: " + sub);
+                    try {
+                        RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+                        info.setClientId(entry.getKey().getClientId());
+                        info.setSubscriptionName(entry.getKey().getSubscriptionName());
+                        ConnectionContext context = new ConnectionContext();
+                        context.setBroker(broker);
+                        context.setClientId(entry.getKey().getClientId());
+                        removeSubscription(context, info);
+                    } catch (Exception e) {
+                        LOG.error("Failed to remove inactive durable subscriber", e);
+                    }
+                }
+            }
+        }
     }
 
     @Override

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionRemoveOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionRemoveOfflineTest.java?rev=1148289&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionRemoveOfflineTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionRemoveOfflineTest.java Tue Jul 19 12:11:45 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.Test;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+public class DurableSubscriptionRemoveOfflineTest extends EmbeddedBrokerTestSupport {
+
+    protected void setUp() throws Exception {
+        useTopic = true;
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = super.createBroker();
+        answer.setOfflineDurableSubscriberTaskSchedule(3 * 1000);
+        answer.setOfflineDurableSubscriberTimeout(5 * 1000);
+        answer.setDeleteAllMessagesOnStartup(true);
+        return answer;
+    }
+
+    public void testRemove() throws Exception {
+        Connection connection = createConnection();
+        connection.setClientID("cliID");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber subscriber = session.createDurableSubscriber((Topic) createDestination(), "subName");
+        subscriber.close();
+        connection.close();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                 return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
+            }
+        }, 10000);
+
+    }
+
+
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    public static Test suite() {
+        return suite(DurableSubscriptionRemoveOfflineTest.class);
+     }
+}