You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/04/20 17:17:39 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5578 -
preallocation could ocurr after a restart over an existing journal file! -
fix and test
Repository: activemq
Updated Branches:
refs/heads/master 01f56d0ca -> 4a821186a
https://issues.apache.org/jira/browse/AMQ-5578 - preallocation could ocurr after a restart over an existing journal file! - fix and test
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4a821186
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4a821186
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4a821186
Branch: refs/heads/master
Commit: 4a821186a4c8e7296637438fee932365d73b936e
Parents: 01f56d0
Author: gtully <ga...@gmail.com>
Authored: Mon Apr 20 16:01:50 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon Apr 20 16:04:01 2015 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 1 +
.../kahadb/disk/journal/DataFileAppender.java | 7 +++++--
.../JournalCorruptionEofIndexRecoveryTest.java | 17 +++++++++++++++++
.../disk/journal/PreallocationJournalTest.java | 8 ++++++--
4 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 32b7170..41c9aba 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -610,6 +610,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
redoCounter++;
} catch (IOException failedRecovery) {
if (isIgnoreMissingJournalfiles()) {
+ LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
// track this dud location
journal.corruptRecoveryLocation(recoveryPosition);
} else {
http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index fbb276a..0ce647a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -314,8 +314,11 @@ class DataFileAppender implements FileAppender {
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
- // pre allocate on first open
- journal.preallocateEntireJournalDataFile(file);
+ // pre allocate on first open of new file (length==0)
+ // note dataFile.length cannot be used because it is updated in enqueue
+ if (file.length() == 0l) {
+ journal.preallocateEntireJournalDataFile(file);
+ }
}
Journal.WriteCommand write = wb.writes.getHead();
http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index 948b543..bb56e7d 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -116,6 +116,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(true);
+ adapter.setPreallocationStrategy("zeros");
+ adapter.setPreallocationScope("entire_journal");
+
}
@After
@@ -186,6 +189,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
+ @Test
+ public void testRecoverIndex() throws Exception {
+ startBroker();
+
+ final int numToSend = 4;
+ produceMessagesToConsumeMultipleDataFiles(numToSend);
+
+ // force journal replay by whacking the index
+ restartBroker(false, true);
+
+ assertEquals("Drain", numToSend, drainQueue(numToSend));
+
+ }
+
private void corruptBatchCheckSumSplash(int id) throws Exception{
Collection<DataFile> files =
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
index 9c21a56..d4095f3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb.disk.journal;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
@@ -36,6 +38,8 @@ import static org.junit.Assert.assertTrue;
*/
public class PreallocationJournalTest {
+ private static final Logger LOG = LoggerFactory.getLogger(PreallocationJournalTest.class);
+
@Test
public void testSparseFilePreallocation() throws Exception {
executeTest("sparse_file");
@@ -76,6 +80,7 @@ public class PreallocationJournalTest {
assertTrue("file size as expected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
+ LOG.info ("file size:" + journalLog + ", chan.size " + channel.size() + ", jfileSize.length: " + journalLog.length());
return Journal.DEFAULT_MAX_FILE_LENGTH == channel.size();
}
}));
@@ -87,8 +92,7 @@ public class PreallocationJournalTest {
buff.position(0);
assertEquals(0x00, buff.get());
- System.out.println("File size: " + channel.size());
-
+ LOG.info("File size: " + channel.size());
store.stop();
}