You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/25 16:44:50 UTC
activemq git commit: [AMQ-6520] respect kahadb indexDirectory for
perDestination mKahaDB - fix and test
Repository: activemq
Updated Branches:
refs/heads/master cfdff4edc -> 0a29533ed
[AMQ-6520] respect kahadb indexDirectory for perDestination mKahaDB - fix and test
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a29533e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a29533e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a29533e
Branch: refs/heads/master
Commit: 0a29533ed7ef49798a3fb4fa8eed7fb78d0a868a
Parents: cfdff4e
Author: gtully <ga...@gmail.com>
Authored: Fri Nov 25 16:44:28 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Fri Nov 25 16:44:28 2016 +0000
----------------------------------------------------------------------
.../store/kahadb/KahaDBPersistenceAdapter.java | 2 +-
.../kahadb/MultiKahaDBPersistenceAdapter.java | 36 +++-
.../store/kahadb/MKahaDBIndexLocationTest.java | 163 +++++++++++++++++++
3 files changed, 198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0a29533e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 5eef750..fd77e49 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -779,7 +779,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
- return "KahaDBPersistenceAdapter[" + path + "]";
+ return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") + "]";
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/0a29533e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 68f0ed6..223afb9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -64,6 +64,8 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.activemq.store.kahadb.MessageDatabase.DEFAULT_DIRECTORY;
+
/**
* An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
* distribution of destinations across multiple kahaDB persistence adapters
@@ -150,7 +152,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
destinationMap.setEntries(entries);
}
- private String nameFromDestinationFilter(ActiveMQDestination destination) {
+ public static String nameFromDestinationFilter(ActiveMQDestination destination) {
if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
"potential problem with recovery can result from name truncation.");
@@ -242,6 +244,20 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
transactionStore.deleteAllMessages();
IOHelper.deleteChildren(getDirectory());
+ for (Object o : destinationMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}))) {
+ if (o instanceof FilteredKahaDBPersistenceAdapter) {
+ FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = (FilteredKahaDBPersistenceAdapter) o;
+ if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory() != DEFAULT_DIRECTORY) {
+ IOHelper.deleteChildren(filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory());
+ }
+ if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) filteredKahaDBPersistenceAdapter.getPersistenceAdapter();
+ if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
+ IOHelper.deleteChildren(kahaDBPersistenceAdapter.getIndexDirectory());
+ }
+ }
+ }
+ }
}
@Override
@@ -394,12 +410,28 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
PersistenceAdapter adapter = kahaDBFromTemplate(template);
configureAdapter(adapter);
configureDirectory(adapter, destinationName);
+ configureIndexDirectory(adapter, template, destinationName);
return adapter;
}
+ private void configureIndexDirectory(PersistenceAdapter adapter, PersistenceAdapter template, String destinationName) {
+ if (template instanceof KahaDBPersistenceAdapter) {
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) template;
+ if (kahaDBPersistenceAdapter.getIndexDirectory() != null) {
+ if (adapter instanceof KahaDBPersistenceAdapter) {
+ File directory = kahaDBPersistenceAdapter.getIndexDirectory();
+ if (destinationName != null) {
+ directory = new File(directory, destinationName);
+ }
+ ((KahaDBPersistenceAdapter)adapter).setIndexDirectory(directory);
+ }
+ }
+ }
+ }
+
private void configureDirectory(PersistenceAdapter adapter, String fileName) {
File directory = null;
- File defaultDir = MessageDatabase.DEFAULT_DIRECTORY;
+ File defaultDir = DEFAULT_DIRECTORY;
try {
defaultDir = adapter.getClass().newInstance().getDirectory();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/0a29533e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java
new file mode 100644
index 0000000..a7ecc0c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter.nameFromDestinationFilter;
+import static org.junit.Assert.*;
+
+public class MKahaDBIndexLocationTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MKahaDBIndexLocationTest.class);
+
+ private BrokerService broker;
+
+ private final File testDataDir = new File("target/activemq-data/ConfigIndexDir");
+ private final File kahaDataDir = new File(testDataDir, "log");
+ private final File kahaIndexDir = new File(testDataDir, "index");
+ private final ActiveMQQueue queue = new ActiveMQQueue("Qq");
+
+ @Before
+ public void startBroker() throws Exception {
+ createBroker();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ private void createBroker() throws Exception {
+ broker = new BrokerService();
+
+ // setup multi-kaha adapter
+ MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(kahaDataDir);
+
+ KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+ kahaStore.setJournalMaxFileLength(1024 * 512);
+ kahaStore.setIndexDirectory(kahaIndexDir);
+
+ // set up a store per destination
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(kahaStore);
+ filtered.setPerDestination(true);
+ List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+ stores.add(filtered);
+
+ persistenceAdapter.setFilteredPersistenceAdapters(stores);
+ broker.setPersistenceAdapter(persistenceAdapter);
+
+ broker.setUseJmx(false);
+ broker.setAdvisorySupport(false);
+ broker.setSchedulerSupport(false);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ }
+
+ @Test
+ public void testIndexDirExists() throws Exception {
+
+ produceMessages();
+
+ LOG.info("Index dir is configured as: {}", kahaIndexDir);
+ assertTrue(kahaDataDir.exists());
+ assertTrue(kahaIndexDir.exists());
+
+
+ String destName = nameFromDestinationFilter(queue);
+ String[] index = new File(kahaIndexDir, destName).list(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ LOG.info("Testing index filename: {}", name);
+ return name.endsWith("data") || name.endsWith("redo");
+ }
+ });
+
+ String[] journal = new File(kahaDataDir, destName).list(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ LOG.info("Testing log filename: {}", name);
+ return name.endsWith("log") || name.equals("lock");
+ }
+ });
+
+
+ // Should be db.data and db.redo and nothing else.
+ assertNotNull(index);
+ assertEquals(2, index.length);
+
+ // Should contain the initial log for the journal
+ assertNotNull(journal);
+ assertEquals(1, journal.length);
+
+ stopBroker();
+ createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+
+ consume();
+ }
+
+ private void consume() throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int i = 0; i < 5; ++i) {
+ assertNotNull("message[" + i + "]", consumer.receive(4000));
+ }
+ connection.close();
+ }
+
+ private void produceMessages() throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 5; ++i) {
+ producer.send(session.createTextMessage("test:" + i));
+ }
+ connection.close();
+ }
+}