You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/27 18:31:26 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6377
Repository: activemq
Updated Branches:
refs/heads/master 822e2be90 -> dd0ed17e5
https://issues.apache.org/jira/browse/AMQ-6377
Introducing JournalDiskSyncStrategy to allow a peridic disk sync mode
instead of always syncing after every write or never syncing.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dd0ed17e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dd0ed17e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dd0ed17e
Branch: refs/heads/master
Commit: dd0ed17e59da3ac19f9e528c8b071322d3035404
Parents: 822e2be
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 27 14:20:48 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Jul 27 14:31:07 2016 -0400
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 52 ++++++-
.../kahadb/disk/journal/DataFileAppender.java | 6 +
.../store/kahadb/disk/journal/Journal.java | 52 ++++++-
.../disk/journal/JournalSyncStrategyTest.java | 152 +++++++++++++++++++
4 files changed, 256 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index eb3a5ee..a58d614 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -92,6 +92,7 @@ import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
import org.apache.activemq.store.kahadb.disk.page.Page;
@@ -252,10 +253,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected ScheduledExecutorService scheduler;
private final Object schedulerLock = new Object();
- protected boolean enableJournalDiskSyncs = true;
+ protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
protected boolean archiveDataLogs;
protected File directoryArchive;
protected AtomicLong journalSize = new AtomicLong(0);
+ long journalDiskSyncInterval = 1000;
long checkpointInterval = 5*1000;
long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -373,7 +375,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
// Short intervals for check-point and cleanups
- long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+ long delay;
+ if (journal.isJournalDiskSyncPeriodic()) {
+ delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
+ } else {
+ delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+ }
scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
}
@@ -384,6 +391,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private long lastCheckpoint = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
+ private long lastSync = System.currentTimeMillis();
@Override
public void run() {
@@ -391,6 +399,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Decide on cleanup vs full checkpoint here.
if (opened.get()) {
long now = System.currentTimeMillis();
+ if (journal.isJournalDiskSyncPeriodic() &&
+ journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
+ journal.syncCurrentDataFile();
+ lastSync = now;
+ }
if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
checkpointCleanup(true);
lastCleanup = now;
@@ -3110,6 +3123,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
manager.setPreallocationStrategy(
Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
+ manager.setJournalDiskSyncStrategy(
+ Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()));
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive());
@@ -3166,12 +3181,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return enableIndexWriteAsync;
}
+ /**
+ * @deprecated use {@link #getJournalDiskSyncStrategy} instead
+ * @return
+ */
public boolean isEnableJournalDiskSyncs() {
- return enableJournalDiskSyncs;
+ return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
+ journalDiskSyncStrategy.trim().toUpperCase());
}
+ /**
+ * @deprecated use {@link #setEnableJournalDiskSyncs} instead
+ * @param syncWrites
+ */
public void setEnableJournalDiskSyncs(boolean syncWrites) {
- this.enableJournalDiskSyncs = syncWrites;
+ if (syncWrites) {
+ journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
+ } else {
+ journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name();
+ }
+ }
+
+ public String getJournalDiskSyncStrategy() {
+ return journalDiskSyncStrategy;
+ }
+
+ public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
+ this.journalDiskSyncStrategy = journalDiskSyncStrategy;
+ }
+
+ public long getJournalDiskSyncInterval() {
+ return journalDiskSyncInterval;
+ }
+
+ public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
+ this.journalDiskSyncInterval = journalDiskSyncInterval;
}
public long getCheckpointInterval() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 792431c..8df834c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.util.ByteSequence;
@@ -53,6 +54,7 @@ class DataFileAppender implements FileAppender {
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize;
protected final boolean syncOnComplete;
+ protected final boolean periodicSync;
protected boolean running;
private Thread thread;
@@ -107,6 +109,8 @@ class DataFileAppender implements FileAppender {
this.inflightWrites = this.journal.getInflightWrites();
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
+ this.periodicSync = JournalDiskSyncStrategy.PERIODIC.equals(
+ this.journal.getJournalDiskSyncStrategy());
}
@Override
@@ -338,6 +342,8 @@ class DataFileAppender implements FileAppender {
if (forceToDisk) {
file.sync();
+ } else if (periodicSync) {
+ journal.currentFileNeedSync.set(true);
}
Journal.WriteCommand lastWrite = wb.writes.getTail();
http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 0b05c56..93d5f7f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -25,7 +25,15 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -33,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
@@ -75,6 +84,7 @@ public class Journal {
public static final byte EOF_EOT = '4';
public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
+ protected final AtomicBoolean currentFileNeedSync = new AtomicBoolean();
private ScheduledExecutorService scheduler;
// tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
@@ -115,6 +125,12 @@ public class Journal {
NONE;
}
+ public enum JournalDiskSyncStrategy {
+ ALWAYS,
+ PERIODIC,
+ NEVER;
+ }
+
private static byte[] createBatchControlRecordHeader() {
try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
os.writeInt(BATCH_CONTROL_RECORD_SIZE);
@@ -195,12 +211,13 @@ public class Journal {
protected boolean enableAsyncDiskSync = true;
private int nextDataFileId = 1;
private Object dataFileIdLock = new Object();
- private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
+ private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
private volatile DataFile nextDataFile;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
private File osKernelCopyTemplateFile = null;
+ protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
@@ -580,6 +597,7 @@ public class Journal {
dataFile = newDataFile();
}
synchronized (currentDataFile) {
+ syncCurrentDataFile();
fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(dataFile.getFile(), dataFile);
dataFiles.addLast(dataFile);
@@ -592,6 +610,23 @@ public class Journal {
}
}
+ public void syncCurrentDataFile() throws IOException {
+ synchronized (currentDataFile) {
+ DataFile dataFile = currentDataFile.get();
+ if (dataFile != null && isJournalDiskSyncPeriodic()) {
+ if (currentFileNeedSync.compareAndSet(true, false)) {
+ LOG.trace("Syncing Journal file: {}", dataFile.getFile().getName());
+ RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
+ try {
+ file.sync();
+ } finally {
+ file.close();
+ }
+ }
+ }
+ }
+ }
+
private Runnable preAllocateNextDataFileTask = new Runnable() {
@Override
public void run() {
@@ -670,6 +705,7 @@ public class Journal {
// the appender can be calling back to to the journal blocking a close AMQ-5620
appender.close();
synchronized (currentDataFile) {
+ syncCurrentDataFile();
fileMap.clear();
fileByFileMap.clear();
dataFiles.clear();
@@ -1051,6 +1087,18 @@ public class Journal {
return enableAsyncDiskSync;
}
+ public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
+ return journalDiskSyncStrategy;
+ }
+
+ public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
+ this.journalDiskSyncStrategy = journalDiskSyncStrategy;
+ }
+
+ public boolean isJournalDiskSyncPeriodic() {
+ return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
+ }
+
public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
this.dataFileRemovedListener = dataFileRemovedListener;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/dd0ed17e/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
new file mode 100644
index 0000000..af618d5
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalSyncStrategyTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+public class JournalSyncStrategyTest {
+
+ @Rule
+ public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+ @Rule
+ public Timeout globalTimeout= new Timeout(10, TimeUnit.SECONDS);
+
+ private KahaDBStore store;
+ private int defaultJournalLength = 10 * 1024;
+
+ @After
+ public void after() throws Exception {
+ if (store != null) {
+ store.stop();
+ }
+ }
+
+ @Test
+ public void testPeriodicSync()throws Exception {
+ store = configureStore(JournalDiskSyncStrategy.PERIODIC);
+ store.start();
+ final Journal journal = store.getJournal();
+ assertTrue(journal.isJournalDiskSyncPeriodic());
+ assertFalse(store.isEnableJournalDiskSyncs());
+
+ MessageStore messageStore = store.createQueueMessageStore(new ActiveMQQueue("test"));
+
+ //write a message to the store
+ writeMessage(messageStore, 1);
+
+ //Make sure the flag was set to true
+ assertTrue(Wait.waitFor(new Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return journal.currentFileNeedSync.get();
+ }
+ }));
+
+ //Make sure a disk sync was done by the executor because a message was added
+ //which will cause the flag to be set to false
+ assertTrue(Wait.waitFor(new Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !journal.currentFileNeedSync.get();
+ }
+ }));
+
+ }
+
+ @Test
+ public void testSyncRotate()throws Exception {
+ store = configureStore(JournalDiskSyncStrategy.PERIODIC);
+ //Set a long interval to make sure it isn't called in this test
+ store.setJournalDiskSyncInterval(10 * 1000);
+ store.start();
+
+ final Journal journal = store.getJournal();
+ assertTrue(journal.isJournalDiskSyncPeriodic());
+ assertFalse(store.isEnableJournalDiskSyncs());
+ assertEquals(10 * 1000, store.getJournalDiskSyncInterval());
+ journal.currentFileNeedSync.set(true); //Make sure a disk sync was done by the executor because a message was added
+
+ //get the current file but pass in a size greater than the
+ //journal length to trigger a rotation so we can verify that it was synced
+ journal.getCurrentDataFile(2 * defaultJournalLength);
+
+ //verify a sync was called (which will set this flag to false)
+ assertFalse(journal.currentFileNeedSync.get());
+ }
+
+ @Test
+ public void testAlwaysSync()throws Exception {
+ store = configureStore(JournalDiskSyncStrategy.ALWAYS);
+ store.start();
+ assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
+ assertTrue(store.isEnableJournalDiskSyncs());
+ }
+
+ @Test
+ public void testNeverSync() throws Exception {
+ store = configureStore(JournalDiskSyncStrategy.NEVER);
+ store.start();
+ assertFalse(store.getJournal().isJournalDiskSyncPeriodic());
+ assertFalse(store.isEnableJournalDiskSyncs());
+ }
+
+ private KahaDBStore configureStore(JournalDiskSyncStrategy strategy) throws Exception {
+ KahaDBStore store = new KahaDBStore();
+ store.setJournalMaxFileLength(defaultJournalLength);
+ store.deleteAllMessages();
+ store.setDirectory(dataFileDir.getRoot());
+ if (strategy != null) {
+ store.setJournalDiskSyncStrategy(strategy.name());
+ }
+
+ return store;
+ }
+
+ private void writeMessage(final MessageStore messageStore, int num) throws Exception {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("testtesttest");
+ MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + num);
+ messageId.setBrokerSequenceId(num);
+ message.setMessageId(messageId);
+ messageStore.addMessage(new ConnectionContext(), message);
+ }
+
+
+}