You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/07 00:21:30 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-4814

Repository: activemq
Updated Branches:
  refs/heads/master c38a61d7a -> 7a68ad5d9


https://issues.apache.org/jira/browse/AMQ-4814

Remove of a durable subscription when the keepDurableSubActive flag is
true (default) did not actually stop and cleanup the still active cursor
due to the fact that the cursor checks the flag of its subscription to
see if it should stay active when its stop method is called.  The
subscription needs to disable that flag when the subscription is being
removed so that the cursor will correctly shutdown. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a68ad5d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a68ad5d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a68ad5d

Branch: refs/heads/master
Commit: 7a68ad5d987caa20e6517beb12ee67d538b2ee08
Parents: c38a61d
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 6 18:21:20 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 6 18:21:20 2015 -0400

----------------------------------------------------------------------
 .../broker/region/DurableTopicSubscription.java |   5 +-
 .../java/org/apache/activemq/usage/Usage.java   |   4 +
 .../org/apache/activemq/bugs/AMQ4814Test.java   | 136 +++++++++++++++++++
 3 files changed, 143 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7a68ad5d/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index d87bd54..12c418a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -54,7 +54,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
     private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
     private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
-    private final boolean keepDurableSubsActive;
+    private boolean keepDurableSubsActive;
     private final AtomicBoolean active = new AtomicBoolean();
     private final AtomicLong offlineTimestamp = new AtomicLong(-1);
 
@@ -189,8 +189,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
     public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception {
         LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this);
         active.set(false);
+        this.keepDurableSubsActive = keepDurableSubsActive;
         offlineTimestamp.set(System.currentTimeMillis());
-        this.usageManager.getMemoryUsage().removeUsageListener(this);
+        usageManager.getMemoryUsage().removeUsageListener(this);
 
         ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
         List<MessageReference> savedDispateched = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/7a68ad5d/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
index cfe7de2..f075187 100755
--- a/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java
@@ -138,6 +138,10 @@ public abstract class Usage<T extends Usage> implements Service {
         listeners.remove(listener);
     }
 
+    public int getNumUsageListeners() {
+        return listeners.size();
+    }
+
     public long getLimit() {
         usageLock.readLock().lock();
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7a68ad5d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4814Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4814Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4814Test.java
new file mode 100644
index 0000000..ca43263
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4814Test.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4814Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4814Test.class);
+
+    private static final String CONNECTION_URL = "tcp://127.0.0.1:0";
+    private static final String KAHADB_DIRECTORY = "./target/activemq-data/";
+
+    private BrokerService broker;
+    private String connectionURI;
+
+    @Before
+    public void setup() throws Exception {
+
+        PolicyMap pm = new PolicyMap();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setGcInactiveDestinations(true);
+        pe.setInactiveTimeoutBeforeGC(1000L);
+
+        pe.setProducerFlowControl(false);
+
+        ActiveMQDestination d = new ActiveMQTopic(">");
+        pe.setDestination(d);
+        pm.put(d, pe);
+
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File(KAHADB_DIRECTORY + "-LEAKTEST"));
+
+        broker = new BrokerService();
+        broker.setBrokerName("broker1");
+        broker.setUseJmx(false);
+        broker.setPersistenceAdapter(kaha);
+        broker.setDestinationPolicy(pm);
+        broker.setSchedulePeriodForDestinationPurge(1000);
+        broker.setTimeBeforePurgeTempDestinations(1000);
+        broker.setMaxPurgedDestinationsPerSweep(5000);
+        broker.setOfflineDurableSubscriberTaskSchedule(1000L);
+        broker.setOfflineDurableSubscriberTimeout(1000L);
+        broker.setKeepDurableSubsActive(true);
+
+        TransportConnector connector = broker.addConnector(CONNECTION_URL);
+
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionURI = connector.getPublishableConnectString();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    @Test(timeout=60000)
+    public void testDurableTopicResourcesAreRemoved() throws Exception {
+
+        LOG.info("Test starting.");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
+
+        for (int i = 0; i < 2; ++i) {
+            LOG.info("Test main loop starting iteration: {}", i + 1);
+            Connection connection = factory.createConnection();
+            connection.setClientID("client_id");
+            connection.start();
+
+            for (int j = 0; j < 8; j++) {
+                LOG.info("Test sub loop starting iteration: {}", j + 1);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                String topicName = "durabletopic_" + j;
+                String subscriberName = "subscriber_" + j;
+                Topic topic = session.createTopic(topicName);
+
+                TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriberName);
+                subscriber.close();
+                session.unsubscribe(subscriberName);
+                session.close();
+            }
+
+            connection.stop();
+            connection.close();
+            connection = null;
+
+            Thread.sleep(10);
+        }
+
+        assertEquals(0, broker.getSystemUsage().getMemoryUsage().getNumUsageListeners());
+
+        LOG.info("Test completed.");
+    }
+}