You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/08 14:06:22 UTC
activemq git commit: AMQ-6451 - catch errors on preallocation.
allocate direct buffer on start and reuse to ensure resource availability at
runtime
Repository: activemq
Updated Branches:
refs/heads/master dca066287 -> 3b7613d93
AMQ-6451 - catch errors on preallocation. allocate direct buffer on start and reuse to ensure resource availability at runtime
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3b7613d9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3b7613d9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3b7613d9
Branch: refs/heads/master
Commit: 3b7613d9300443898b608a52cb4e763821fdc163
Parents: dca0662
Author: gtully <ga...@gmail.com>
Authored: Tue Nov 8 14:05:24 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Nov 8 14:05:46 2016 +0000
----------------------------------------------------------------------
.../store/kahadb/disk/journal/Journal.java | 71 +++++++++++++-------
.../disk/journal/PreallocationJournalTest.java | 35 ++++++++++
2 files changed, 80 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3b7613d9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index d780b46..65a952b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -215,6 +215,8 @@ public class Journal {
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
private File osKernelCopyTemplateFile = null;
+ private ByteBuffer preAllocateDirectBuffer = null;
+
protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
public interface DataFileRemovedListener {
@@ -276,13 +278,24 @@ public class Journal {
}
}
- if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
- // create a template file that will be used to pre-allocate the journal files
- if (osKernelCopyTemplateFile == null) {
- osKernelCopyTemplateFile = createJournalTemplateFile();
+ if (preallocationScope != PreallocationScope.NONE) {
+ switch (preallocationStrategy) {
+ case SPARSE_FILE:
+ break;
+ case OS_KERNEL_COPY: {
+ osKernelCopyTemplateFile = createJournalTemplateFile();
+ }
+ break;
+ case CHUNKED_ZEROS: {
+ preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
+ }
+ break;
+ case ZEROS: {
+ preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
+ }
+ break;
}
}
-
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -323,18 +336,29 @@ public class Journal {
LOG.trace("Startup took: "+(end-start)+" ms");
}
+ private ByteBuffer allocateDirectBuffer(int size) {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+ buffer.put(EOF_RECORD);
+ return buffer;
+ }
+
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
if (PreallocationScope.NONE != preallocationScope) {
- if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
- doPreallocationKernelCopy(file);
- } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
- doPreallocationZeros(file);
- } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
- doPreallocationChunkedZeros(file);
- } else {
- doPreallocationSparseFile(file);
+ try {
+ if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
+ doPreallocationKernelCopy(file);
+ } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
+ doPreallocationZeros(file);
+ } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
+ doPreallocationChunkedZeros(file);
+ } else {
+ doPreallocationSparseFile(file);
+ }
+ } catch (Throwable continueWithNoPrealloc) {
+ // error on preallocation is non fatal, and we don't want to leak the journal handle
+ LOG.error("cound not preallocate journal data file", continueWithNoPrealloc);
}
}
}
@@ -358,12 +382,10 @@ public class Journal {
}
private void doPreallocationZeros(RecoverableRandomAccessFile file) {
- ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
- buffer.put(EOF_RECORD);
- buffer.rewind();
+ preAllocateDirectBuffer.rewind();
try {
FileChannel channel = file.getChannel();
- channel.write(buffer);
+ channel.write(preAllocateDirectBuffer);
channel.force(false);
channel.position(0);
} catch (ClosedByInterruptException ignored) {
@@ -401,22 +423,19 @@ public class Journal {
}
private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
-
- ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
- buffer.put(EOF_RECORD);
- buffer.rewind();
-
+ preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity());
+ preAllocateDirectBuffer.rewind();
try {
FileChannel channel = file.getChannel();
int remLen = maxFileLength;
while (remLen > 0) {
- if (remLen < buffer.remaining()) {
- buffer.limit(remLen);
+ if (remLen < preAllocateDirectBuffer.remaining()) {
+ preAllocateDirectBuffer.limit(remLen);
}
- int writeLen = channel.write(buffer);
+ int writeLen = channel.write(preAllocateDirectBuffer);
remLen -= writeLen;
- buffer.rewind();
+ preAllocateDirectBuffer.rewind();
}
channel.force(false);
http://git-wip-us.apache.org/repos/asf/activemq/blob/3b7613d9/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
index d4095f3..1c98743 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.store.kahadb.disk.journal;
import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
@@ -55,6 +56,40 @@ public class PreallocationJournalTest {
executeTest("zeros");
}
+ @Test
+ public void testZerosLoop() throws Exception {
+ Random rand = new Random();
+ int randInt = rand.nextInt(100);
+ File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
+
+ KahaDBStore store = new KahaDBStore();
+ store.setJournalMaxFileLength(5*1024*1024);
+ store.deleteAllMessages();
+ store.setDirectory(dataDirectory);
+ store.setPreallocationStrategy("zeros");
+ store.start();
+
+ final File journalLog = new File(dataDirectory, "db-1.log");
+ assertTrue("file exists", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return journalLog.exists();
+ }
+ }));
+
+
+ KahaTraceCommand traceCommand = new KahaTraceCommand();
+ traceCommand.setMessage(new String(new byte[2*1024*1024]));
+ Location location = null;
+ for (int i=0; i<20; i++) {
+ location = store.store(traceCommand);
+ }
+ LOG.info("Last location:" + location);
+
+ LOG.info("Store journal files:" + store.getJournal().getFiles().size());
+
+ }
+
private void executeTest(String preallocationStrategy)throws Exception {
Random rand = new Random();
int randInt = rand.nextInt(100);