You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/08 14:06:22 UTC

activemq git commit: AMQ-6451 - catch errors on preallocation. allocate direct buffer on start and reuse to ensure resource availability at runtime

Repository: activemq
Updated Branches:
  refs/heads/master dca066287 -> 3b7613d93


AMQ-6451 - catch errors on preallocation. allocate direct buffer on start and reuse to ensure resource availability at runtime


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3b7613d9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3b7613d9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3b7613d9

Branch: refs/heads/master
Commit: 3b7613d9300443898b608a52cb4e763821fdc163
Parents: dca0662
Author: gtully <ga...@gmail.com>
Authored: Tue Nov 8 14:05:24 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Nov 8 14:05:46 2016 +0000

----------------------------------------------------------------------
 .../store/kahadb/disk/journal/Journal.java      | 71 +++++++++++++-------
 .../disk/journal/PreallocationJournalTest.java  | 35 ++++++++++
 2 files changed, 80 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3b7613d9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index d780b46..65a952b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -215,6 +215,8 @@ public class Journal {
     protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
     private File osKernelCopyTemplateFile = null;
+    private ByteBuffer preAllocateDirectBuffer = null;
+
     protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
 
     public interface DataFileRemovedListener {
@@ -276,13 +278,24 @@ public class Journal {
             }
         }
 
-        if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
-            // create a template file that will be used to pre-allocate the journal files
-            if (osKernelCopyTemplateFile == null) {
-                osKernelCopyTemplateFile = createJournalTemplateFile();
+        if (preallocationScope != PreallocationScope.NONE) {
+            switch (preallocationStrategy) {
+                case SPARSE_FILE:
+                    break;
+                case OS_KERNEL_COPY: {
+                    osKernelCopyTemplateFile = createJournalTemplateFile();
+                }
+                break;
+                case CHUNKED_ZEROS: {
+                    preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
+                }
+                break;
+                case ZEROS: {
+                    preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
+                }
+                break;
             }
         }
-
         scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
@@ -323,18 +336,29 @@ public class Journal {
         LOG.trace("Startup took: "+(end-start)+" ms");
     }
 
+    private ByteBuffer allocateDirectBuffer(int size) {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+        buffer.put(EOF_RECORD);
+        return buffer;
+    }
+
     public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
 
         if (PreallocationScope.NONE != preallocationScope) {
 
-            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
-                doPreallocationKernelCopy(file);
-            } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
-                doPreallocationZeros(file);
-            } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
-                doPreallocationChunkedZeros(file);
-            } else {
-                doPreallocationSparseFile(file);
+            try {
+                if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
+                    doPreallocationKernelCopy(file);
+                } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
+                    doPreallocationZeros(file);
+                } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
+                    doPreallocationChunkedZeros(file);
+                } else {
+                    doPreallocationSparseFile(file);
+                }
+            } catch (Throwable continueWithNoPrealloc) {
+                // error on preallocation is non fatal, and we don't want to leak the journal handle
+                LOG.error("cound not preallocate journal data file", continueWithNoPrealloc);
             }
         }
     }
@@ -358,12 +382,10 @@ public class Journal {
     }
 
     private void doPreallocationZeros(RecoverableRandomAccessFile file) {
-        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
-        buffer.put(EOF_RECORD);
-        buffer.rewind();
+        preAllocateDirectBuffer.rewind();
         try {
             FileChannel channel = file.getChannel();
-            channel.write(buffer);
+            channel.write(preAllocateDirectBuffer);
             channel.force(false);
             channel.position(0);
         } catch (ClosedByInterruptException ignored) {
@@ -401,22 +423,19 @@ public class Journal {
     }
 
     private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
-
-        ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
-        buffer.put(EOF_RECORD);
-        buffer.rewind();
-
+        preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity());
+        preAllocateDirectBuffer.rewind();
         try {
             FileChannel channel = file.getChannel();
 
             int remLen = maxFileLength;
             while (remLen > 0) {
-                if (remLen < buffer.remaining()) {
-                    buffer.limit(remLen);
+                if (remLen < preAllocateDirectBuffer.remaining()) {
+                    preAllocateDirectBuffer.limit(remLen);
                 }
-                int writeLen = channel.write(buffer);
+                int writeLen = channel.write(preAllocateDirectBuffer);
                 remLen -= writeLen;
-                buffer.rewind();
+                preAllocateDirectBuffer.rewind();
             }
 
             channel.force(false);

http://git-wip-us.apache.org/repos/asf/activemq/blob/3b7613d9/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
index d4095f3..1c98743 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.store.kahadb.disk.journal;
 
 import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
 import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -55,6 +56,40 @@ public class PreallocationJournalTest  {
         executeTest("zeros");
     }
 
+    @Test
+    public void testZerosLoop() throws Exception {
+        Random rand = new Random();
+        int randInt = rand.nextInt(100);
+        File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
+
+        KahaDBStore store = new KahaDBStore();
+        store.setJournalMaxFileLength(5*1024*1024);
+        store.deleteAllMessages();
+        store.setDirectory(dataDirectory);
+        store.setPreallocationStrategy("zeros");
+        store.start();
+
+        final File journalLog = new File(dataDirectory, "db-1.log");
+        assertTrue("file exists", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return journalLog.exists();
+            }
+        }));
+
+
+        KahaTraceCommand traceCommand = new KahaTraceCommand();
+        traceCommand.setMessage(new String(new byte[2*1024*1024]));
+        Location location = null;
+        for (int i=0; i<20; i++) {
+            location = store.store(traceCommand);
+        }
+        LOG.info("Last location:" + location);
+
+        LOG.info("Store journal files:" + store.getJournal().getFiles().size());
+
+    }
+
     private void executeTest(String preallocationStrategy)throws Exception {
         Random rand = new Random();
         int randInt = rand.nextInt(100);