You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/10/31 14:29:25 UTC
activemq git commit: AMQ-7086 - make kahadb gc/cleanup on shutdown
optional to trade availability over disk usage for fast failover
Repository: activemq
Updated Branches:
refs/heads/master e093a8c1d -> bf8eb08ac
AMQ-7086 - make kahadb gc/cleanup on shutdown optional to trade availability over disk usage for fast failover
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bf8eb08a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bf8eb08a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bf8eb08a
Branch: refs/heads/master
Commit: bf8eb08acaeec653d04daa0b8b6dd889ef990bed
Parents: e093a8c
Author: gtully <ga...@gmail.com>
Authored: Wed Oct 31 14:29:05 2018 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Oct 31 14:29:05 2018 +0000
----------------------------------------------------------------------
.../activemq/store/PersistenceAdapter.java | 4 +-
.../store/kahadb/AbstractKahaDBStore.java | 11 +-
.../store/kahadb/KahaDBPersistenceAdapter.java | 17 +-
.../activemq/store/kahadb/MessageDatabase.java | 16 +-
.../kahadb/MultiKahaDBPersistenceAdapter.java | 4 +-
.../kahadb/scheduler/JobSchedulerStoreImpl.java | 6 +-
.../store/kahadb/scheduler/AMQ7086Test.java | 184 +++++++++++++++++++
7 files changed, 228 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 07063b4..7bad926 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -183,11 +183,11 @@ public interface PersistenceAdapter extends Service {
/**
* checkpoint any
*
- * @param sync
+ * @param cleanup
* @throws IOException
*
*/
- void checkpoint(boolean sync) throws IOException;
+ void checkpoint(boolean cleanup) throws IOException;
/**
* A hint to return the size of the store on disk
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
index 6003c87..70be2fb 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
@@ -55,6 +55,7 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport {
protected boolean failIfDatabaseIsLocked;
protected long checkpointInterval = 5*1000;
protected long cleanupInterval = 30*1000;
+ private boolean cleanupOnStop = true;
protected boolean checkForCorruptJournalFiles = false;
protected boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
@@ -202,6 +203,14 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport {
this.cleanupInterval = cleanupInterval;
}
+ public void setCleanupOnStop(boolean cleanupOnStop) {
+ this.cleanupOnStop = cleanupOnStop;
+ }
+
+ public boolean getCleanupOnStop() {
+ return this.cleanupOnStop;
+ }
+
public boolean isChecksumJournalFiles() {
return checksumJournalFiles;
}
@@ -666,7 +675,7 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport {
*/
protected void startCheckpoint() {
if (checkpointInterval == 0 && cleanupInterval == 0) {
- LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+ LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
return;
}
synchronized (checkpointThreadLock) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index fbeda4c..5d6e896 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -79,13 +79,13 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
}
/**
- * @param sync
+ * @param cleanup
* @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
*/
@Override
- public void checkpoint(boolean sync) throws IOException {
- this.letter.checkpoint(sync);
+ public void checkpoint(boolean cleanup) throws IOException {
+ this.letter.checkpoint(cleanup);
}
/**
@@ -817,4 +817,15 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
public boolean isPersistNoLocal() {
return this.letter.isPersistNoLocal();
}
+
+ /*
+ * When set, ensure that the cleanup/gc operation is executed during the stop procedure
+ */
+ public void setCleanupOnStop(boolean cleanupOnStop) {
+ this.letter.setCleanupOnStop(cleanupOnStop);
+ }
+
+ public boolean getCleanupOnStop() {
+ return this.letter.getCleanupOnStop();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 22a2c1e..db6239a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -266,6 +266,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long journalDiskSyncInterval = 1000;
long checkpointInterval = 5*1000;
long cleanupInterval = 30*1000;
+ boolean cleanupOnStop = true;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
@@ -375,7 +376,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private void startCheckpoint() {
if (checkpointInterval == 0 && cleanupInterval == 0) {
- LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+ LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
return;
}
synchronized (schedulerLock) {
@@ -508,7 +509,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
checkpointLock.writeLock().lock();
try {
if (metadata.page != null) {
- checkpointUpdate(true);
+ checkpointUpdate(getCleanupOnStop());
}
pageFile.unload();
metadata = createMetadata();
@@ -1147,9 +1148,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
after.run();
}
- if (scheduler == null && opened.get()) {
- startCheckpoint();
- }
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
@@ -3311,6 +3309,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.cleanupInterval = cleanupInterval;
}
+ public boolean getCleanupOnStop() {
+ return cleanupOnStop;
+ }
+
+ public void setCleanupOnStop(boolean cleanupOnStop) {
+ this.cleanupOnStop = cleanupOnStop;
+ }
+
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4bdb8de..f143a07 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -173,9 +173,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
@Override
- public void checkpoint(final boolean sync) throws IOException {
+ public void checkpoint(final boolean cleanup) throws IOException {
for (PersistenceAdapter persistenceAdapter : adapters) {
- persistenceAdapter.checkpoint(sync);
+ persistenceAdapter.checkpoint(cleanup);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 05ca383..79059f1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -59,6 +59,10 @@ import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/*
+ * @org.apache.xbean.XBean element="kahaDBJobScheduler"
+ */
+
public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
@@ -230,7 +234,7 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
checkpointLock.writeLock().lock();
try {
if (metaData.getPage() != null) {
- checkpointUpdate(true);
+ checkpointUpdate(getCleanupOnStop());
}
} finally {
checkpointLock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
new file mode 100644
index 0000000..3028dd0
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.scheduler;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ7086Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ7086Test.class);
+
+ BrokerService brokerService;
+ JobSchedulerStoreImpl jobSchedulerStore;
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
+
+ @Test
+ public void testGcDoneAtStop() throws Exception {
+
+ brokerService = createBroker(true);
+ brokerService.start();
+
+ produceWithScheduledDelayAndConsume();
+
+ LOG.info("job store: " + jobSchedulerStore);
+ int numSchedulerFiles = jobSchedulerStore.getJournal().getFileMap().size();
+ LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
+ int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
+
+ LOG.info("Num files, job store: {}, messge store: {}", numKahadbFiles, numKahadbFiles);
+
+ // pull the dirs before we stop
+ File jobDir = jobSchedulerStore.getJournal().getDirectory();
+ File kahaDir = kahaDBPersistenceAdapter.getStore().getJournal().getDirectory();
+
+ brokerService.stop();
+
+
+ assertEquals("Expected job store data files", 1, verifyFilesOnDisk(jobDir));
+ assertEquals("Expected kahadb data files", 1, verifyFilesOnDisk(kahaDir));
+ }
+
+ @Test
+ public void testNoGcAtStop() throws Exception {
+
+ brokerService = createBroker(false);
+ brokerService.start();
+
+ produceWithScheduledDelayAndConsume();
+
+ LOG.info("job store: " + jobSchedulerStore);
+ int numSchedulerFiles = jobSchedulerStore.getJournal().getFileMap().size();
+ LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
+ int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
+
+ LOG.info("Num files, job store: {}, messge store: {}", numKahadbFiles, numKahadbFiles);
+
+ // pull the dirs before we stop
+ File jobDir = jobSchedulerStore.getJournal().getDirectory();
+ File kahaDir = kahaDBPersistenceAdapter.getStore().getJournal().getDirectory();
+
+ brokerService.stop();
+
+ assertEquals("Expected job store data files", numSchedulerFiles, verifyFilesOnDisk(jobDir));
+ assertEquals("Expected kahadb data files", numKahadbFiles, verifyFilesOnDisk(kahaDir));
+ }
+
+ private int verifyFilesOnDisk(File directory) {
+
+ LOG.info("Broker: " + brokerService);
+ LOG.info("dir: " + directory);
+ int result = 0;
+
+ File[] files = directory.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String n) {
+ return dir.equals(directory) && n.startsWith(Journal.DEFAULT_FILE_PREFIX) && n.endsWith(Journal.DEFAULT_FILE_SUFFIX);
+ }
+ });
+
+ LOG.info("File count: " + (files != null ? files.length : " empty!"));
+
+ if (files != null) {
+ result = files.length;
+ }
+ for (File file : files) {
+ LOG.info("name :" + file.getAbsolutePath());
+ }
+ return result;
+ }
+
+ protected BrokerService createBroker(boolean doCleanupOnStop) throws Exception {
+ File schedulerDirectory = new File("target/scheduler");
+ File kahadbDir = new File("target/kahadb");
+
+ for (File directory: new File[]{schedulerDirectory, kahadbDir}) {
+ IOHelper.mkdirs(directory);
+ IOHelper.deleteChildren(directory);
+ }
+
+ BrokerService broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.setSchedulerSupport(true);
+
+
+ jobSchedulerStore = new JobSchedulerStoreImpl();
+ jobSchedulerStore.setDirectory(schedulerDirectory);
+ jobSchedulerStore.setJournalMaxFileLength(16*1024);
+
+ jobSchedulerStore.setCheckpointInterval(0);
+ jobSchedulerStore.setCleanupOnStop(doCleanupOnStop);
+
+ broker.setJobSchedulerStore(jobSchedulerStore);
+
+
+ kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+ kahaDBPersistenceAdapter.setDirectory(kahadbDir);
+ kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+
+ kahaDBPersistenceAdapter.setCleanupInterval(0);
+ kahaDBPersistenceAdapter.setCleanupOnStop(doCleanupOnStop);
+
+ broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
+
+ return broker;
+ }
+
+ public void produceWithScheduledDelayAndConsume() throws Exception {
+ Connection connection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+ final ActiveMQQueue destination = new ActiveMQQueue("QQ");
+ final int numMessages = 50;
+ final long time = 1000l;
+ final byte[] payload = new byte[1024];
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < numMessages; i++) {
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(payload);
+ bytesMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+ producer.send(bytesMessage);
+ }
+
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+ for (int i = 0; i < numMessages; i++) {
+ assertNotNull(messageConsumer.receive(5000));
+ }
+ connection.close();
+
+ // let last ack settle
+ TimeUnit.SECONDS.sleep(1);
+
+ }
+}