You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/27 18:31:26 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6377

Repository: activemq
Updated Branches:
  refs/heads/master 822e2be90 -> dd0ed17e5


https://issues.apache.org/jira/browse/AMQ-6377

Introducing JournalDiskSyncStrategy to allow a peridic disk sync mode
instead of always syncing after every write or never syncing.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dd0ed17e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dd0ed17e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dd0ed17e

Branch: refs/heads/master
Commit: dd0ed17e59da3ac19f9e528c8b071322d3035404
Parents: 822e2be
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 27 14:20:48 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Jul 27 14:31:07 2016 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  52 ++++++-
 .../kahadb/disk/journal/DataFileAppender.java   |   6 +
 .../store/kahadb/disk/journal/Journal.java      |  52 ++++++-
 .../disk/journal/JournalSyncStrategyTest.java   | 152 +++++++++++++++++++
 4 files changed, 256 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index eb3a5ee..a58d614 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -92,6 +92,7 @@ import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
 import org.apache.activemq.store.kahadb.disk.page.Page;
@@ -252,10 +253,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     protected ScheduledExecutorService scheduler;
     private final Object schedulerLock = new Object();
 
-    protected boolean enableJournalDiskSyncs = true;
+    protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
     protected boolean archiveDataLogs;
     protected File directoryArchive;
     protected AtomicLong journalSize = new AtomicLong(0);
+    long journalDiskSyncInterval = 1000;
     long checkpointInterval = 5*1000;
     long cleanupInterval = 30*1000;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -373,7 +375,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 });
 
                 // Short intervals for check-point and cleanups
-                long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+                long delay;
+                if (journal.isJournalDiskSyncPeriodic()) {
+                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
+                } else {
+                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+                }
 
                 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
             }
@@ -384,6 +391,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         private long lastCheckpoint = System.currentTimeMillis();
         private long lastCleanup = System.currentTimeMillis();
+        private long lastSync = System.currentTimeMillis();
 
         @Override
         public void run() {
@@ -391,6 +399,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 // Decide on cleanup vs full checkpoint here.
                 if (opened.get()) {
                     long now = System.currentTimeMillis();
+                    if (journal.isJournalDiskSyncPeriodic() &&
+                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
+                        journal.syncCurrentDataFile();
+                        lastSync = now;
+                    }
                     if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
                         checkpointCleanup(true);
                         lastCleanup = now;
@@ -3110,6 +3123,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
         manager.setPreallocationStrategy(
                 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
+        manager.setJournalDiskSyncStrategy(
+                Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()));
         if (getDirectoryArchive() != null) {
             IOHelper.mkdirs(getDirectoryArchive());
             manager.setDirectoryArchive(getDirectoryArchive());
@@ -3166,12 +3181,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return enableIndexWriteAsync;
     }
 
+    /**
+     * @deprecated use {@link #getJournalDiskSyncStrategy} instead
+     * @return
+     */
     public boolean isEnableJournalDiskSyncs() {
-        return enableJournalDiskSyncs;
+        return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
+                journalDiskSyncStrategy.trim().toUpperCase());
     }
 
+    /**
+     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
+     * @param syncWrites
+     */
     public void setEnableJournalDiskSyncs(boolean syncWrites) {
-        this.enableJournalDiskSyncs = syncWrites;
+        if (syncWrites) {
+            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
+        } else {
+            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name();
+        }
+    }
+
+    public String getJournalDiskSyncStrategy() {
+        return journalDiskSyncStrategy;
+    }
+
+    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
+        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
+    }
+
+    public long getJournalDiskSyncInterval() {
+        return journalDiskSyncInterval;
+    }
+
+    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
+        this.journalDiskSyncInterval = journalDiskSyncInterval;
     }
 
     public long getCheckpointInterval() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 792431c..8df834c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
 import org.apache.activemq.util.ByteSequence;
@@ -53,6 +54,7 @@ class DataFileAppender implements FileAppender {
     protected final CountDownLatch shutdownDone = new CountDownLatch(1);
     protected int maxWriteBatchSize;
     protected final boolean syncOnComplete;
+    protected final boolean periodicSync;
 
     protected boolean running;
     private Thread thread;
@@ -107,6 +109,8 @@ class DataFileAppender implements FileAppender {
         this.inflightWrites = this.journal.getInflightWrites();
         this.maxWriteBatchSize = this.journal.getWriteBatchSize();
         this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
+        this.periodicSync = JournalDiskSyncStrategy.PERIODIC.equals(
+                this.journal.getJournalDiskSyncStrategy());
     }
 
     @Override
@@ -338,6 +342,8 @@ class DataFileAppender implements FileAppender {
 
                 if (forceToDisk) {
                     file.sync();
+                } else if (periodicSync) {
+                    journal.currentFileNeedSync.set(true);
                 }
 
                 Journal.WriteCommand lastWrite = wb.writes.getTail();

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 0b05c56..93d5f7f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -25,7 +25,15 @@ import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -33,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
@@ -75,6 +84,7 @@ public class Journal {
     public static final byte EOF_EOT = '4';
     public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
 
+    protected final AtomicBoolean currentFileNeedSync = new AtomicBoolean();
     private ScheduledExecutorService scheduler;
 
     // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
@@ -115,6 +125,12 @@ public class Journal {
         NONE;
     }
 
+    public enum JournalDiskSyncStrategy {
+        ALWAYS,
+        PERIODIC,
+        NEVER;
+    }
+
     private static byte[] createBatchControlRecordHeader() {
         try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
             os.writeInt(BATCH_CONTROL_RECORD_SIZE);
@@ -195,12 +211,13 @@ public class Journal {
     protected boolean enableAsyncDiskSync = true;
     private int nextDataFileId = 1;
     private Object dataFileIdLock = new Object();
-    private  final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
+    private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
     private volatile DataFile nextDataFile;
 
     protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
     private File osKernelCopyTemplateFile = null;
+    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
 
     public interface DataFileRemovedListener {
         void fileRemoved(DataFile datafile);
@@ -580,6 +597,7 @@ public class Journal {
                 dataFile = newDataFile();
             }
             synchronized (currentDataFile) {
+                syncCurrentDataFile();
                 fileMap.put(dataFile.getDataFileId(), dataFile);
                 fileByFileMap.put(dataFile.getFile(), dataFile);
                 dataFiles.addLast(dataFile);
@@ -592,6 +610,23 @@ public class Journal {
         }
     }
 
+    public void syncCurrentDataFile() throws IOException {
+        synchronized (currentDataFile) {
+            DataFile dataFile = currentDataFile.get();
+            if (dataFile != null && isJournalDiskSyncPeriodic()) {
+                if (currentFileNeedSync.compareAndSet(true, false)) {
+                    LOG.trace("Syncing Journal file: {}", dataFile.getFile().getName());
+                    RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
+                    try {
+                        file.sync();
+                    } finally {
+                        file.close();
+                    }
+                }
+            }
+        }
+    }
+
     private Runnable preAllocateNextDataFileTask = new Runnable() {
         @Override
         public void run() {
@@ -670,6 +705,7 @@ public class Journal {
         // the appender can be calling back to to the journal blocking a close AMQ-5620
         appender.close();
         synchronized (currentDataFile) {
+            syncCurrentDataFile();
             fileMap.clear();
             fileByFileMap.clear();
             dataFiles.clear();
@@ -1051,6 +1087,18 @@ public class Journal {
         return enableAsyncDiskSync;
     }
 
+    public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
+        return journalDiskSyncStrategy;
+    }
+
+    public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
+        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
+    }
+
+    public boolean isJournalDiskSyncPeriodic() {
+        return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
+    }
+
     public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
         this.dataFileRemovedListener = dataFileRemovedListener;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
new file mode 100644
index 0000000..af618d5
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+public class JournalSyncStrategyTest  {
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
+
+    private KahaDBStore store;
+    private int defaultJournalLength = 10 * 1024;
+
+    @After
+    public void after() throws Exception {
+        if (store != null) {
+            store.stop();
+        }
+    }
+
+    @Test
+    public void testPeriodicSync()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.PERIODIC);
+        store.start();
+        final Journal journal = store.getJournal();
+        assertTrue(journal.isJournalDiskSyncPeriodic());
+        assertFalse(store.isEnableJournalDiskSyncs());
+
+        MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
+
+        //write a message to the store
+        writeMessage(messageStore, 1);
+
+        //Make sure the flag was set to true
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return journal.currentFileNeedSync.get();
+            }
+        }));
+
+        //Make sure a disk sync was done by the executor because a message was added
+        //which will cause the flag to be set to false
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !journal.currentFileNeedSync.get();
+            }
+        }));
+
+    }
+
+    @Test
+    public void testSyncRotate()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.PERIODIC);
+        //Set a long interval to make sure it isn't called in this test
+        store.setJournalDiskSyncInterval(10 * 1000);
+        store.start();
+
+        final Journal journal = store.getJournal();
+        assertTrue(journal.isJournalDiskSyncPeriodic());
+        assertFalse(store.isEnableJournalDiskSyncs());
+        assertEquals(10 * 1000, store.getJournalDiskSyncInterval());
+        journal.currentFileNeedSync.set(true);        //Make sure a disk sync was done by the executor because a message was added
+
+        //get the current file but pass in a size greater than the
+        //journal length to trigger a rotation so we can verify that it was synced
+        journal.getCurrentDataFile(2 * defaultJournalLength);
+
+        //verify a sync was called (which will set this flag to false)
+        assertFalse(journal.currentFileNeedSync.get());
+    }
+
+    @Test
+    public void testAlwaysSync()throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.ALWAYS);
+        store.start();
+        assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
+        assertTrue(store.isEnableJournalDiskSyncs());
+    }
+
+    @Test
+    public void testNeverSync() throws Exception {
+        store = configureStore(JournalDiskSyncStrategy.NEVER);
+        store.start();
+        assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
+        assertFalse(store.isEnableJournalDiskSyncs());
+    }
+
+    private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception {
+        KahaDBStore store = new KahaDBStore();
+        store.setJournalMaxFileLength(defaultJournalLength);
+        store.deleteAllMessages();
+        store.setDirectory(dataFileDir.getRoot());
+        if (strategy != null) {
+            store.setJournalDiskSyncStrategy(strategy.name());
+        }
+
+        return store;
+    }
+
+    private void writeMessage(final MessageStore messageStore, int num) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("testtesttest");
+        MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + num);
+        messageId.setBrokerSequenceId(num);
+        message.setMessageId(messageId);
+        messageStore.addMessage(new ConnectionContext(), message);
+    }
+
+
+}