You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/08 23:18:27 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5875
Repository: activemq
Updated Branches:
refs/heads/master c853bcf43 -> b0952d874
https://issues.apache.org/jira/browse/AMQ-5875
Resolves an issue when using mKahaDB that caused a MessageStore
being used by more than one destination to be deleted even though
there was still at least 1 other destination using the store.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/73d1bcd7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/73d1bcd7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/73d1bcd7
Branch: refs/heads/master
Commit: 73d1bcd7ac4fe7bd94da9fe178f6f17d30bc41f2
Parents: c853bcf
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 8 13:02:43 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Jul 8 18:32:15 2015 +0000
----------------------------------------------------------------------
.../activemq/store/kahadb/KahaDBStore.java | 18 +-
.../kahadb/MultiKahaDBPersistenceAdapter.java | 26 +--
.../kahadb/MultiKahaDBTopicDeletionTest.java | 234 +++++++++++++++++++
3 files changed, 248 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/73d1bcd7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 44f93a6..89ee40c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1049,23 +1049,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
- if (!isEmptyTopic(entry, tx)) {
- rc.add(convert(entry.getKey()));
- }
- }
- }
-
- private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
- throws IOException {
- boolean isEmptyTopic = false;
- ActiveMQDestination dest = convert(entry.getKey());
- if (dest.isTopic()) {
- StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
- if (loadedStore.subscriptionAcks.isEmpty(tx)) {
- isEmptyTopic = true;
- }
+ //Removing isEmpty topic check - see AMQ-5875
+ rc.add(convert(entry.getKey()));
}
- return isEmptyTopic;
}
});
}finally {
http://git-wip-us.apache.org/repos/asf/activemq/blob/73d1bcd7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 286931e..d46108a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -277,7 +277,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
} catch (IOException e) {
throw new RuntimeException(e);
}
- if (adapter instanceof PersistenceAdapter) {
+ if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
adapter.removeQueueMessageStore(destination);
removeMessageStore(adapter, destination);
destinationMap.removeAll(destination);
@@ -292,7 +292,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
} catch (IOException e) {
throw new RuntimeException(e);
}
- if (adapter instanceof PersistenceAdapter) {
+ if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
adapter.removeTopicMessageStore(destination);
removeMessageStore(adapter, destination);
destinationMap.removeAll(destination);
@@ -300,18 +300,16 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
- if (adapter.getDestinations().isEmpty()) {
- stopAdapter(adapter, destination.toString());
- File adapterDir = adapter.getDirectory();
- if (adapterDir != null) {
- if (IOHelper.deleteFile(adapterDir)) {
- if (LOG.isTraceEnabled()) {
- LOG.info("deleted per destination adapter directory for: " + destination);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.info("failed to deleted per destination adapter directory for: " + destination);
- }
+ stopAdapter(adapter, destination.toString());
+ File adapterDir = adapter.getDirectory();
+ if (adapterDir != null) {
+ if (IOHelper.deleteFile(adapterDir)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.info("deleted per destination adapter directory for: " + destination);
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.info("failed to deleted per destination adapter directory for: " + destination);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/73d1bcd7/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
new file mode 100644
index 0000000..4380f5a
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AMQ-5875
+ *
+ * This test shows that when multiple destinations share a single KahaDB
+ * instance when using mKahaDB, that the deletion of one Topic will no longer
+ * cause an IllegalStateException and the store will be properly kept around
+ * until all destinations associated with the store are gone.
+ *
+ * */
+@RunWith(Parameterized.class)
+public class MultiKahaDBTopicDeletionTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(MultiKahaDBTopicDeletionTest.class);
+
+ protected BrokerService brokerService;
+ protected Broker broker;
+ protected URI brokerConnectURI;
+ protected File storeDir;
+ protected ActiveMQTopic topic1;
+ protected ActiveMQTopic topic2;
+
+ protected static ActiveMQTopic TOPIC1 = new ActiveMQTopic("test.>");
+ protected static ActiveMQTopic TOPIC2 = new ActiveMQTopic("test.t.topic");
+
+
+ @Parameters
+ public static Collection<Object[]> data() {
+
+ //Test with topics created in different orders
+ return Arrays.asList(new Object[][] {
+ {TOPIC1, TOPIC2},
+ {TOPIC2, TOPIC1}
+ });
+ }
+
+ public MultiKahaDBTopicDeletionTest(ActiveMQTopic topic1, ActiveMQTopic topic2) {
+ this.topic1 = topic1;
+ this.topic2 = topic2;
+ }
+
+ @Rule
+ public TemporaryFolder tempTestDir = new TemporaryFolder();
+
+ @Before
+ public void startBroker() throws Exception {
+ setUpBroker(true);
+ }
+
+ protected void setUpBroker(boolean clearDataDir) throws Exception {
+ brokerService = new BrokerService();
+ this.initPersistence(brokerService);
+ // set up a transport
+ TransportConnector connector = brokerService
+ .addConnector(new TransportConnector());
+ connector.setUri(new URI("tcp://0.0.0.0:0"));
+ connector.setName("tcp");
+
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ brokerConnectURI = brokerService.getConnectorByName("tcp")
+ .getConnectUri();
+ broker = brokerService.getBroker();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+ protected void initPersistence(BrokerService brokerService)
+ throws IOException {
+ storeDir = tempTestDir.getRoot();
+ brokerService.setPersistent(true);
+
+ // setup multi-kaha adapter
+ MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(storeDir);
+
+ KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+ kahaStore.setJournalMaxFileLength(1024 * 512);
+
+ // set up a store per destination
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(kahaStore);
+ filtered.setPerDestination(true);
+ List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+ stores.add(filtered);
+
+ persistenceAdapter.setFilteredPersistenceAdapters(stores);
+ brokerService.setPersistenceAdapter(persistenceAdapter);
+ }
+
+ /**
+ * Test that a topic can be deleted and the other topic can still be subscribed to
+ * @throws Exception
+ */
+ @Test
+ public void testTopic1Deletion() throws Exception {
+ LOG.info("Creating {} first, {} second", topic1, topic2);
+ LOG.info("Removing {}, subscribing to {}", topic1, topic2);
+
+ // Create two topics
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+ // remove topic2
+ broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
+
+ // try and create a subscription on topic2, before AMQ-5875 this
+ //would cause an IllegalStateException
+ createSubscriber(topic2);
+ }
+
+
+ @Test
+ public void testTopic2Deletion() throws Exception {
+ LOG.info("Creating {} first, {} second", topic1, topic2);
+ LOG.info("Removing {}, subscribing to {}", topic2, topic1);
+
+ // Create two topics
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+ // remove topic2
+ broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
+
+ // try and create a subscription on topic1, before AMQ-5875 this
+ //would cause an IllegalStateException
+ createSubscriber(topic1);
+ }
+
+
+ @Test
+ public void testStoreCleanupDeleteTopic1First() throws Exception {
+ LOG.info("Creating {} first, {} second", topic1, topic2);
+ LOG.info("Deleting {} first, {} second", topic1, topic2);
+
+ // Create two topics
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+ // remove both topics
+ broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
+ broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
+
+ //Assert that with no more destinations attached to a store that it has been cleaned up
+ Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), new WildcardFileFilter("topic*"));
+ assertEquals("Store files should be deleted", 0, storeFiles.size());
+
+ }
+
+ @Test
+ public void testStoreCleanupDeleteTopic2First() throws Exception {
+ LOG.info("Creating {} first, {} second", topic1, topic2);
+ LOG.info("Deleting {} first, {} second", topic2, topic1);
+
+ // Create two topics
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+ broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+ // remove both topics
+ broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
+ broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
+
+ //Assert that with no more destinations attached to a store that it has been cleaned up
+ Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), new WildcardFileFilter("topic*"));
+ assertEquals("Store files should be deleted", 0, storeFiles.size());
+
+ }
+
+
+ protected void createSubscriber(ActiveMQTopic topic) throws JMSException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ brokerConnectURI);
+ Connection connection = factory.createConnection();
+ connection.setClientID("client1");
+ connection.start();
+ Session session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "sub1");
+ }
+
+}
[2/2] activemq git commit: This closes #130
Posted by ta...@apache.org.
This closes #130
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0952d87
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0952d87
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0952d87
Branch: refs/heads/master
Commit: b0952d8747fea3490beedeab30969c8c4296c9ba
Parents: c853bcf 73d1bcd
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 8 17:18:17 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jul 8 17:18:17 2015 -0400
----------------------------------------------------------------------
.../activemq/store/kahadb/KahaDBStore.java | 18 +-
.../kahadb/MultiKahaDBPersistenceAdapter.java | 26 +--
.../kahadb/MultiKahaDBTopicDeletionTest.java | 234 +++++++++++++++++++
3 files changed, 248 insertions(+), 30 deletions(-)
----------------------------------------------------------------------