You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/04/20 17:17:39 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5578 - preallocation could ocurr after a restart over an existing journal file! - fix and test

Repository: activemq
Updated Branches:
  refs/heads/master 01f56d0ca -> 4a821186a


https://issues.apache.org/jira/browse/AMQ-5578 - preallocation could ocurr after a restart over an existing journal file! - fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4a821186
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4a821186
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4a821186

Branch: refs/heads/master
Commit: 4a821186a4c8e7296637438fee932365d73b936e
Parents: 01f56d0
Author: gtully <ga...@gmail.com>
Authored: Mon Apr 20 16:01:50 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon Apr 20 16:04:01 2015 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java     |  1 +
 .../kahadb/disk/journal/DataFileAppender.java      |  7 +++++--
 .../JournalCorruptionEofIndexRecoveryTest.java     | 17 +++++++++++++++++
 .../disk/journal/PreallocationJournalTest.java     |  8 ++++++--
 4 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 32b7170..41c9aba 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -610,6 +610,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                         redoCounter++;
                     } catch (IOException failedRecovery) {
                         if (isIgnoreMissingJournalfiles()) {
+                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
                             // track this dud location
                             journal.corruptRecoveryLocation(recoveryPosition);
                         } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index fbb276a..0ce647a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -314,8 +314,11 @@ class DataFileAppender implements FileAppender {
                     }
                     dataFile = wb.dataFile;
                     file = dataFile.openRandomAccessFile();
-                    // pre allocate on first open
-                    journal.preallocateEntireJournalDataFile(file);
+                    // pre allocate on first open of new file (length==0)
+                    // note dataFile.length cannot be used because it is updated in enqueue
+                    if (file.length() == 0l) {
+                        journal.preallocateEntireJournalDataFile(file);
+                    }
                 }
 
                 Journal.WriteCommand write = wb.writes.getHead();

http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index 948b543..bb56e7d 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -116,6 +116,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
         adapter.setCheckForCorruptJournalFiles(true);
         adapter.setIgnoreMissingJournalfiles(true);
 
+        adapter.setPreallocationStrategy("zeros");
+        adapter.setPreallocationScope("entire_journal");
+
     }
 
     @After
@@ -186,6 +189,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
 
     }
 
+    @Test
+    public void testRecoverIndex() throws Exception {
+        startBroker();
+
+        final int numToSend = 4;
+        produceMessagesToConsumeMultipleDataFiles(numToSend);
+
+        // force journal replay by whacking the index
+        restartBroker(false, true);
+
+        assertEquals("Drain", numToSend, drainQueue(numToSend));
+
+    }
+
     private void corruptBatchCheckSumSplash(int id) throws Exception{
         Collection<DataFile> files =
                 ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();

http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
index 9c21a56..d4095f3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb.disk.journal;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.util.Wait;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -36,6 +38,8 @@ import static org.junit.Assert.assertTrue;
  */
 public class PreallocationJournalTest  {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PreallocationJournalTest.class);
+
     @Test
     public void testSparseFilePreallocation() throws Exception {
         executeTest("sparse_file");
@@ -76,6 +80,7 @@ public class PreallocationJournalTest  {
         assertTrue("file size as expected", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
+                LOG.info ("file size:" + journalLog + ", chan.size " + channel.size() + ", jfileSize.length: " + journalLog.length());
                 return Journal.DEFAULT_MAX_FILE_LENGTH == channel.size();
             }
         }));
@@ -87,8 +92,7 @@ public class PreallocationJournalTest  {
         buff.position(0);
         assertEquals(0x00, buff.get());
 
-        System.out.println("File size: " + channel.size());
-
+        LOG.info("File size: " + channel.size());
 
         store.stop();
     }