You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/10/31 14:29:25 UTC

activemq git commit: AMQ-7086 - make kahadb gc/cleanup on shutdown optional to trade availability over disk usage for fast failover

Repository: activemq
Updated Branches:
  refs/heads/master e093a8c1d -> bf8eb08ac


AMQ-7086 - make kahadb gc/cleanup on shutdown optional to trade availability over disk usage for fast failover


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bf8eb08a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bf8eb08a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bf8eb08a

Branch: refs/heads/master
Commit: bf8eb08acaeec653d04daa0b8b6dd889ef990bed
Parents: e093a8c
Author: gtully <ga...@gmail.com>
Authored: Wed Oct 31 14:29:05 2018 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Oct 31 14:29:05 2018 +0000

----------------------------------------------------------------------
 .../activemq/store/PersistenceAdapter.java      |   4 +-
 .../store/kahadb/AbstractKahaDBStore.java       |  11 +-
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  17 +-
 .../activemq/store/kahadb/MessageDatabase.java  |  16 +-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   |   4 +-
 .../kahadb/scheduler/JobSchedulerStoreImpl.java |   6 +-
 .../store/kahadb/scheduler/AMQ7086Test.java     | 184 +++++++++++++++++++
 7 files changed, 228 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 07063b4..7bad926 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -183,11 +183,11 @@ public interface PersistenceAdapter extends Service {
     /**
      * checkpoint any
      *
-     * @param sync
+     * @param cleanup
      * @throws IOException
      *
      */
-    void checkpoint(boolean sync) throws IOException;
+    void checkpoint(boolean cleanup) throws IOException;
 
     /**
      * A hint to return the size of the store on disk

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
index 6003c87..70be2fb 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
@@ -55,6 +55,7 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport {
     protected boolean failIfDatabaseIsLocked;
     protected long checkpointInterval = 5*1000;
     protected long cleanupInterval = 30*1000;
+    private boolean cleanupOnStop = true;
     protected boolean checkForCorruptJournalFiles = false;
     protected boolean checksumJournalFiles = true;
     protected boolean forceRecoverIndex = false;
@@ -202,6 +203,14 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport {
         this.cleanupInterval = cleanupInterval;
     }
 
+    public void setCleanupOnStop(boolean cleanupOnStop) {
+        this.cleanupOnStop = cleanupOnStop;
+    }
+
+    public boolean getCleanupOnStop() {
+        return this.cleanupOnStop;
+    }
+
     public boolean isChecksumJournalFiles() {
         return checksumJournalFiles;
     }
@@ -666,7 +675,7 @@ public abstract class AbstractKahaDBStore extends LockableServiceSupport {
      */
     protected void startCheckpoint() {
         if (checkpointInterval == 0 && cleanupInterval == 0) {
-            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+            LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
             return;
         }
         synchronized (checkpointThreadLock) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index fbeda4c..5d6e896 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -79,13 +79,13 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
     }
 
     /**
-     * @param sync
+     * @param cleanup
      * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
      */
     @Override
-    public void checkpoint(boolean sync) throws IOException {
-        this.letter.checkpoint(sync);
+    public void checkpoint(boolean cleanup) throws IOException {
+        this.letter.checkpoint(cleanup);
     }
 
     /**
@@ -817,4 +817,15 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
     public boolean isPersistNoLocal() {
         return this.letter.isPersistNoLocal();
     }
+
+    /*
+     * When set, ensure that the cleanup/gc operation is executed during the stop procedure
+     */
+    public void setCleanupOnStop(boolean cleanupOnStop) {
+        this.letter.setCleanupOnStop(cleanupOnStop);
+    }
+
+    public boolean getCleanupOnStop() {
+        return this.letter.getCleanupOnStop();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 22a2c1e..db6239a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -266,6 +266,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     long journalDiskSyncInterval = 1000;
     long checkpointInterval = 5*1000;
     long cleanupInterval = 30*1000;
+    boolean cleanupOnStop = true;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
@@ -375,7 +376,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
     private void startCheckpoint() {
         if (checkpointInterval == 0 && cleanupInterval == 0) {
-            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+            LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
             return;
         }
         synchronized (schedulerLock) {
@@ -508,7 +509,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             checkpointLock.writeLock().lock();
             try {
                 if (metadata.page != null) {
-                    checkpointUpdate(true);
+                    checkpointUpdate(getCleanupOnStop());
                 }
                 pageFile.unload();
                 metadata = createMetadata();
@@ -1147,9 +1148,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 after.run();
             }
 
-            if (scheduler == null && opened.get()) {
-                startCheckpoint();
-            }
             return location;
         } catch (IOException ioe) {
             LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
@@ -3311,6 +3309,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         this.cleanupInterval = cleanupInterval;
     }
 
+    public boolean getCleanupOnStop() {
+        return cleanupOnStop;
+    }
+
+    public void setCleanupOnStop(boolean cleanupOnStop) {
+        this.cleanupOnStop = cleanupOnStop;
+    }
+
     public void setJournalMaxFileLength(int journalMaxFileLength) {
         this.journalMaxFileLength = journalMaxFileLength;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4bdb8de..f143a07 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -173,9 +173,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
     }
 
     @Override
-    public void checkpoint(final boolean sync) throws IOException {
+    public void checkpoint(final boolean cleanup) throws IOException {
         for (PersistenceAdapter persistenceAdapter : adapters) {
-            persistenceAdapter.checkpoint(sync);
+            persistenceAdapter.checkpoint(cleanup);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 05ca383..79059f1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -59,6 +59,10 @@ import org.apache.activemq.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/*
+ * @org.apache.xbean.XBean element="kahaDBJobScheduler"
+ */
+
 public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
@@ -230,7 +234,7 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
             checkpointLock.writeLock().lock();
             try {
                 if (metaData.getPage() != null) {
-                    checkpointUpdate(true);
+                    checkpointUpdate(getCleanupOnStop());
                 }
             } finally {
                 checkpointLock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb08a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
new file mode 100644
index 0000000..3028dd0
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.scheduler;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.IOHelper;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ7086Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ7086Test.class);
+
+    BrokerService brokerService;
+    JobSchedulerStoreImpl jobSchedulerStore;
+    KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
+
+    @Test
+    public void testGcDoneAtStop() throws Exception {
+
+        brokerService = createBroker(true);
+        brokerService.start();
+
+        produceWithScheduledDelayAndConsume();
+
+        LOG.info("job store: " + jobSchedulerStore);
+        int numSchedulerFiles = jobSchedulerStore.getJournal().getFileMap().size();
+        LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
+        int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
+
+        LOG.info("Num files, job store: {}, messge store: {}", numKahadbFiles, numKahadbFiles);
+
+        // pull the dirs before we stop
+        File jobDir = jobSchedulerStore.getJournal().getDirectory();
+        File kahaDir = kahaDBPersistenceAdapter.getStore().getJournal().getDirectory();
+
+        brokerService.stop();
+
+
+        assertEquals("Expected job store data files", 1, verifyFilesOnDisk(jobDir));
+        assertEquals("Expected kahadb data files", 1, verifyFilesOnDisk(kahaDir));
+    }
+
+    @Test
+    public void testNoGcAtStop() throws Exception {
+
+        brokerService = createBroker(false);
+        brokerService.start();
+
+        produceWithScheduledDelayAndConsume();
+
+        LOG.info("job store: " + jobSchedulerStore);
+        int numSchedulerFiles = jobSchedulerStore.getJournal().getFileMap().size();
+        LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
+        int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
+
+        LOG.info("Num files, job store: {}, messge store: {}", numKahadbFiles, numKahadbFiles);
+
+        // pull the dirs before we stop
+        File jobDir = jobSchedulerStore.getJournal().getDirectory();
+        File kahaDir = kahaDBPersistenceAdapter.getStore().getJournal().getDirectory();
+
+        brokerService.stop();
+
+        assertEquals("Expected job store data files", numSchedulerFiles, verifyFilesOnDisk(jobDir));
+        assertEquals("Expected kahadb data files", numKahadbFiles, verifyFilesOnDisk(kahaDir));
+    }
+
+    private int verifyFilesOnDisk(File directory) {
+
+        LOG.info("Broker: " + brokerService);
+        LOG.info("dir: " + directory);
+        int result = 0;
+
+        File[] files = directory.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String n) {
+                return dir.equals(directory) && n.startsWith(Journal.DEFAULT_FILE_PREFIX) && n.endsWith(Journal.DEFAULT_FILE_SUFFIX);
+            }
+        });
+
+        LOG.info("File count: " + (files != null ? files.length : " empty!"));
+
+        if (files != null) {
+            result = files.length;
+        }
+        for (File file : files) {
+            LOG.info("name :" + file.getAbsolutePath());
+        }
+        return result;
+    }
+
+    protected BrokerService createBroker(boolean doCleanupOnStop) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        File kahadbDir = new File("target/kahadb");
+
+        for (File directory: new File[]{schedulerDirectory, kahadbDir}) {
+            IOHelper.mkdirs(directory);
+            IOHelper.deleteChildren(directory);
+        }
+
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setSchedulerSupport(true);
+
+
+        jobSchedulerStore = new JobSchedulerStoreImpl();
+        jobSchedulerStore.setDirectory(schedulerDirectory);
+        jobSchedulerStore.setJournalMaxFileLength(16*1024);
+
+        jobSchedulerStore.setCheckpointInterval(0);
+        jobSchedulerStore.setCleanupOnStop(doCleanupOnStop);
+
+        broker.setJobSchedulerStore(jobSchedulerStore);
+
+
+        kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+        kahaDBPersistenceAdapter.setDirectory(kahadbDir);
+        kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+
+        kahaDBPersistenceAdapter.setCleanupInterval(0);
+        kahaDBPersistenceAdapter.setCleanupOnStop(doCleanupOnStop);
+
+        broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
+
+        return broker;
+    }
+
+    public void produceWithScheduledDelayAndConsume() throws Exception {
+        Connection connection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        connection.start();
+        final ActiveMQQueue destination = new ActiveMQQueue("QQ");
+        final int numMessages = 50;
+        final long time = 1000l;
+        final byte[] payload = new byte[1024];
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < numMessages; i++) {
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(payload);
+            bytesMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+            producer.send(bytesMessage);
+        }
+
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+        for (int i = 0; i < numMessages; i++) {
+            assertNotNull(messageConsumer.receive(5000));
+        }
+        connection.close();
+
+        // let last ack settle
+        TimeUnit.SECONDS.sleep(1);
+
+    }
+}