You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/04/29 18:00:55 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5603 -
add preallocationScope=full_journal_async that will preallocate a journal in
advance or use to avoid latency jitter on journal rotation. Added none option
to disable preallocation
Repository: activemq
Updated Branches:
refs/heads/master 3c342ffce -> 62bdbb0db
https://issues.apache.org/jira/browse/AMQ-5603 - add preallocationScope=full_journal_async that will preallocate a journal in advance or use to avoid latency jitter on journal rotation. Added none option to disable preallocation
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/62bdbb0d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/62bdbb0d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/62bdbb0d
Branch: refs/heads/master
Commit: 62bdbb0db5dc4354f0e00fd5259b3db53eb1432d
Parents: 3c342ff
Author: gtully <ga...@gmail.com>
Authored: Fri Apr 29 16:57:03 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Apr 29 16:57:28 2016 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 45 ++-
.../store/kahadb/disk/journal/DataFile.java | 11 +
.../kahadb/disk/journal/DataFileAppender.java | 38 +-
.../store/kahadb/disk/journal/Journal.java | 391 +++++++++++++------
.../disk/journal/TargetedDataFileAppender.java | 8 +-
.../JournalCorruptionEofIndexRecoveryTest.java | 5 +-
.../JournalCorruptionIndexRecoveryTest.java | 2 +
.../store/kahadb/disk/journal/JournalTest.java | 1 +
.../PreallocationJournalLatencyTest.java | 15 +-
.../journal/TargetedDataFileAppenderTest.java | 1 +
.../activemq/bugs/AMQ2584ConcurrentDlqTest.java | 2 +
.../org/apache/activemq/bugs/AMQ3120Test.java | 4 +-
.../org/apache/activemq/bugs/AMQ4323Test.java | 4 +-
.../store/kahadb/KahaDBIndexLocationTest.java | 2 +-
14 files changed, 339 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8bb9491..3e754f7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -257,7 +257,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
- private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
+ private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name();
private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
protected AtomicBoolean opened = new AtomicBoolean();
@@ -1860,32 +1860,37 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@Override
public void run() {
+
+ int journalToAdvance = -1;
+ Set<Integer> journalLogsReferenced = new HashSet<Integer>();
+
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
- // Map keys might not be sorted, find the earliest log file to forward acks
- // from and move only those, future cycles can chip away at more as needed.
- // We won't move files that are themselves rewritten on a previous compaction.
- List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
- Collections.sort(journalFileIds);
- int journalToAdvance = -1;
- for (Integer journalFileId : journalFileIds) {
- DataFile current = journal.getDataFileById(journalFileId);
- if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
- journalToAdvance = journalFileId;
- break;
+ try {
+ // Map keys might not be sorted, find the earliest log file to forward acks
+ // from and move only those, future cycles can chip away at more as needed.
+ // We won't move files that are themselves rewritten on a previous compaction.
+ List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+ Collections.sort(journalFileIds);
+ for (Integer journalFileId : journalFileIds) {
+ DataFile current = journal.getDataFileById(journalFileId);
+ if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+ journalToAdvance = journalFileId;
+ break;
+ }
}
- }
- // Check if we found one, or if we only found the current file being written to.
- if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
- return;
- }
+ // Check if we found one, or if we only found the current file being written to.
+ if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+ return;
+ }
- Set<Integer> journalLogsReferenced =
- new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
+ journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
- indexLock.writeLock().unlock();
+ } finally {
+ indexLock.writeLock().unlock();
+ }
try {
// Background rewrite of the old acks
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index 126d82b..5b96adf 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -36,6 +36,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
protected volatile int length;
protected int typeCode = STANDARD_LOG_FILE;
protected final SequenceSet corruptedBlocks = new SequenceSet();
+ protected RecoverableRandomAccessFile appendRandomAccessFile;
DataFile(File file, int number) {
this.file = file;
@@ -76,12 +77,22 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
return file.getName() + " number = " + dataFileId + " , length = " + length;
}
+ public synchronized RecoverableRandomAccessFile appendRandomAccessFile() throws IOException {
+ if (appendRandomAccessFile == null) {
+ appendRandomAccessFile = new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
+ }
+ return appendRandomAccessFile;
+ }
+
public synchronized RecoverableRandomAccessFile openRandomAccessFile() throws IOException {
return new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
}
public synchronized void closeRandomAccessFile(RecoverableRandomAccessFile file) throws IOException {
file.close();
+ if (file == appendRandomAccessFile) {
+ appendRandomAccessFile = null;
+ }
}
public synchronized boolean delete() throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index e2f173a..792431c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -31,6 +31,9 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.EMPTY_BATCH_CONTROL_RECORD;
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.RECORD_HEAD_SPACE;
+
/**
* An optimized writer to do batch appends to a data file. This object is thread
* safe and gains throughput as you increase the number of concurrent writes it
@@ -110,7 +113,7 @@ class DataFileAppender implements FileAppender {
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
- int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+ int size = data.getLength() + RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@@ -138,7 +141,7 @@ class DataFileAppender implements FileAppender {
@Override
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
- int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+ int size = data.getLength() + RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@@ -179,12 +182,7 @@ class DataFileAppender implements FileAppender {
while ( true ) {
if (nextWriteBatch == null) {
- DataFile file = journal.getOrCreateCurrentWriteFile();
- if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
- file = journal.rotateWriteFile();
- }
-
-
+ DataFile file = journal.getCurrentDataFile(write.location.getSize());
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();
break;
@@ -285,23 +283,14 @@ class DataFileAppender implements FileAppender {
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
- file = dataFile.openRandomAccessFile();
- // pre allocate on first open of new file (length==0)
- // note dataFile.length cannot be used because it is updated in enqueue
- if (file.length() == 0l) {
- journal.preallocateEntireJournalDataFile(file);
- }
+ file = dataFile.appendRandomAccessFile();
}
Journal.WriteCommand write = wb.writes.getHead();
// Write an empty batch control record.
buff.reset();
- buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
- buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
- buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
- buff.writeInt(0);
- buff.writeLong(0);
+ buff.write(EMPTY_BATCH_CONTROL_RECORD);
boolean forceToDisk = false;
while (write != null) {
@@ -312,19 +301,18 @@ class DataFileAppender implements FileAppender {
write = write.getNext();
}
- // append 'unset' next batch (5 bytes) so read can always find eof
- buff.writeInt(0);
- buff.writeByte(0);
+ // append 'unset', zero length next batch so read can always find eof
+ buff.write(Journal.EOF_RECORD);
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
- buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
- buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+ buff.skip(RECORD_HEAD_SPACE + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+ buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
- checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+ checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE-Journal.EOF_RECORD.length);
buff.writeLong(checksum.getValue());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index da0d5b4..182a3d7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -23,19 +23,16 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeMap;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
@@ -43,13 +40,13 @@ import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
-import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +70,12 @@ public class Journal {
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
+ public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
+ public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
+ public static final byte EOF_EOT = '4';
+ public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
+
+ private ScheduledExecutorService scheduler;
// tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
@@ -103,7 +106,9 @@ public class Journal {
}
public enum PreallocationScope {
- ENTIRE_JOURNAL;
+ ENTIRE_JOURNAL,
+ ENTIRE_JOURNAL_ASYNC,
+ NONE;
}
private static byte[] createBatchControlRecordHeader() {
@@ -119,13 +124,39 @@ public class Journal {
}
}
+ private static byte[] createEmptyBatchControlRecordHeader() {
+ try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
+ os.writeInt(BATCH_CONTROL_RECORD_SIZE);
+ os.writeByte(BATCH_CONTROL_RECORD_TYPE);
+ os.write(BATCH_CONTROL_RECORD_MAGIC);
+ os.writeInt(0);
+ os.writeLong(0l);
+ ByteSequence sequence = os.toByteSequence();
+ sequence.compact();
+ return sequence.getData();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create empty batch control record header.", e);
+ }
+ }
+
+ private static byte[] createEofBatchAndLocationRecord() {
+ try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
+ os.writeInt(EOF_INT);
+ os.writeByte(EOF_EOT);
+ ByteSequence sequence = os.toByteSequence();
+ sequence.compact();
+ return sequence.getData();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create eof header.", e);
+ }
+ }
+
public static final String DEFAULT_DIRECTORY = ".";
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "db-";
public static final String DEFAULT_FILE_SUFFIX = ".log";
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
- public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
@@ -151,18 +182,21 @@ public class Journal {
protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
- protected Runnable cleanupTask;
+ protected ScheduledFuture cleanupTask;
protected AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget;
protected boolean checksum;
protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
- private Timer timer;
private int nextDataFileId = 1;
+ private Object dataFileIdLock = new Object();
+ private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
+ private volatile DataFile nextDataFile;
- protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
+ protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL_ASYNC;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
+ private File osKernelCopyTemplateFile = null;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
@@ -204,13 +238,15 @@ public class Journal {
// Sort the list so that we can link the DataFiles together in the
// right order.
- List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+ LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
Collections.sort(l);
for (DataFile df : l) {
if (df.getLength() == 0) {
// possibly the result of a previous failed write
LOG.info("ignoring zero length, partially initialised journal data file: " + df);
continue;
+ } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
+ continue;
}
dataFiles.addLast(df);
fileByFileMap.put(df.getFile(), df);
@@ -221,9 +257,31 @@ public class Journal {
}
}
- nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+ if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
+ // create a template file that will be used to pre-allocate the journal files
+ if (osKernelCopyTemplateFile == null) {
+ osKernelCopyTemplateFile = createJournalTemplateFile();
+ }
+ }
- getOrCreateCurrentWriteFile();
+ scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread schedulerThread = new Thread(r);
+ schedulerThread.setName("ActiveMQ Journal Scheduled executor");
+ schedulerThread.setDaemon(true);
+ return schedulerThread;
+ }
+ });
+
+ // init current write file
+ if (dataFiles.isEmpty()) {
+ nextDataFileId = 1;
+ rotateWriteFile();
+ } else {
+ currentDataFile.set(dataFiles.getTail());
+ nextDataFileId = currentDataFile.get().dataFileId + 1;
+ }
if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
@@ -239,23 +297,20 @@ public class Journal {
totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
}
- cleanupTask = new Runnable() {
+ cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanup();
}
- };
+ }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
- this.timer = new Timer("KahaDB Scheduler", true);
- TimerTask task = new SchedulerTimerTask(cleanupTask);
- this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
long end = System.currentTimeMillis();
LOG.trace("Startup took: "+(end-start)+" ms");
}
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
- if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
+ if (PreallocationScope.NONE != preallocationScope) {
if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
doPreallocationKernelCopy(file);
@@ -266,58 +321,68 @@ public class Journal {
} else {
doPreallocationSparseFile(file);
}
- } else {
- LOG.info("Using journal preallocation scope of batch allocation");
}
}
private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
+ final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
try {
- file.seek(maxFileLength - 1);
- file.write((byte)0x00);
+ FileChannel channel = file.getChannel();
+ channel.position(0);
+ channel.write(journalEof);
+ channel.position(maxFileLength - 5);
+ journalEof.rewind();
+ channel.write(journalEof);
+ channel.force(false);
+ channel.position(0);
+ } catch (ClosedByInterruptException ignored) {
+ LOG.trace("Could not preallocate journal file with sparse file", ignored);
} catch (IOException e) {
- LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e);
+ LOG.error("Could not preallocate journal file with sparse file", e);
}
}
private void doPreallocationZeros(RecoverableRandomAccessFile file) {
ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
-
+ buffer.put(EOF_RECORD);
+ buffer.rewind();
try {
FileChannel channel = file.getChannel();
channel.write(buffer);
channel.force(false);
channel.position(0);
+ } catch (ClosedByInterruptException ignored) {
+ LOG.trace("Could not preallocate journal file with zeros", ignored);
} catch (IOException e) {
- LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
+ LOG.error("Could not preallocate journal file with zeros", e);
}
}
private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
- // create a template file that will be used to pre-allocate the journal files
- File templateFile = createJournalTemplateFile();
-
- RandomAccessFile templateRaf = null;
try {
- templateRaf = new RandomAccessFile(templateFile, "rw");
- templateRaf.setLength(maxFileLength);
- templateRaf.getChannel().force(true);
+ RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
templateRaf.close();
- templateFile.delete();
+ } catch (ClosedByInterruptException ignored) {
+ LOG.trace("Could not preallocate journal file with kernel copy", ignored);
} catch (FileNotFoundException e) {
- LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e);
+ LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
} catch (IOException e) {
- LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e);
+ LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
}
}
private File createJournalTemplateFile() {
String fileName = "db-log.template";
File rc = new File(directory, fileName);
- if (rc.exists()) {
- LOG.trace("deleting journal template file because it already exists...");
- rc.delete();
+ try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
+ templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
+ templateRaf.setLength(maxFileLength);
+ templateRaf.getChannel().force(true);
+ } catch (FileNotFoundException e) {
+ LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
+ } catch (IOException e) {
+ LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
}
return rc;
}
@@ -325,6 +390,8 @@ public class Journal {
private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
+ buffer.put(EOF_RECORD);
+ buffer.rewind();
try {
FileChannel channel = file.getChannel();
@@ -354,6 +421,24 @@ public class Journal {
}
}
+ public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
+ int firstBatchRecordSize = -1;
+ if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
+ Location location = new Location();
+ location.setDataFileId(dataFile.getDataFileId());
+ location.setOffset(0);
+
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
+ } catch (Exception ignored) {
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ }
+ return firstBatchRecordSize == 0;
+ }
+
protected Location recoveryCheck(DataFile dataFile) throws IOException {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
@@ -364,6 +449,10 @@ public class Journal {
while (true) {
int size = checkBatchRecord(reader, location.getOffset());
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
+ if (size == 0) {
+ // eof batch record
+ break;
+ }
location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
} else {
@@ -433,6 +522,12 @@ public class Journal {
reader.readFully(offset, controlRecord);
+ // check for journal eof
+ if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
+ // eof batch
+ return 0;
+ }
+
// Assert that it's a batch record.
for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
@@ -476,42 +571,67 @@ public class Journal {
return totalLength.get();
}
- synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
- if (dataFiles.isEmpty()) {
- rotateWriteFile();
+ private void rotateWriteFile() throws IOException {
+ synchronized (dataFileIdLock) {
+ DataFile dataFile = nextDataFile;
+ if (dataFile == null) {
+ dataFile = newDataFile();
+ }
+ synchronized (currentDataFile) {
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ fileByFileMap.put(dataFile.getFile(), dataFile);
+ dataFiles.addLast(dataFile);
+ currentDataFile.set(dataFile);
+ }
+ nextDataFile = null;
}
-
- DataFile current = dataFiles.getTail();
-
- if (current != null) {
- return current;
- } else {
- return rotateWriteFile();
+ if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
+ preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
}
}
- synchronized DataFile rotateWriteFile() {
+ private Runnable preAllocateNextDataFileTask = new Runnable() {
+ @Override
+ public void run() {
+ if (nextDataFile == null) {
+ synchronized (dataFileIdLock){
+ try {
+ nextDataFile = newDataFile();
+ } catch (IOException e) {
+ LOG.warn("Failed to proactively allocate data file", e);
+ }
+ }
+ }
+ }
+ };
+
+ private volatile Future preAllocateNextDataFileFuture;
+
+ private DataFile newDataFile() throws IOException {
int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum);
- fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
- fileByFileMap.put(file, nextWriteFile);
- dataFiles.addLast(nextWriteFile);
+ preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
return nextWriteFile;
}
- public synchronized DataFile reserveDataFile() {
- int nextNum = nextDataFileId++;
- File file = getFile(nextNum);
- DataFile reservedDataFile = new DataFile(file, nextNum);
- fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
- fileByFileMap.put(file, reservedDataFile);
- if (dataFiles.isEmpty()) {
- dataFiles.addLast(reservedDataFile);
- } else {
- dataFiles.getTail().linkBefore(reservedDataFile);
+
+ public DataFile reserveDataFile() {
+ synchronized (dataFileIdLock) {
+ int nextNum = nextDataFileId++;
+ File file = getFile(nextNum);
+ DataFile reservedDataFile = new DataFile(file, nextNum);
+ synchronized (currentDataFile) {
+ fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
+ fileByFileMap.put(file, reservedDataFile);
+ if (dataFiles.isEmpty()) {
+ dataFiles.addLast(reservedDataFile);
+ } else {
+ dataFiles.getTail().linkBefore(reservedDataFile);
+ }
+ }
+ return reservedDataFile;
}
- return reservedDataFile;
}
public File getFile(int nextNum) {
@@ -520,24 +640,17 @@ public class Journal {
return file;
}
- synchronized DataFile getDataFile(Location item) throws IOException {
+ DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
- DataFile dataFile = fileMap.get(key);
- if (dataFile == null) {
- LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
- throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
+ DataFile dataFile = null;
+ synchronized (currentDataFile) {
+ dataFile = fileMap.get(key);
}
- return dataFile;
- }
-
- synchronized File getFile(Location item) throws IOException {
- Integer key = Integer.valueOf(item.getDataFileId());
- DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
}
- return dataFile.getFile();
+ return dataFile;
}
public void close() throws IOException {
@@ -545,14 +658,16 @@ public class Journal {
if (!started) {
return;
}
- if (this.timer != null) {
- this.timer.cancel();
+ cleanupTask.cancel(true);
+ if (preAllocateNextDataFileFuture != null) {
+ preAllocateNextDataFileFuture.cancel(true);
}
+ ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
accessorPool.close();
}
// the appender can be calling back to to the journal blocking a close AMQ-5620
appender.close();
- synchronized (this) {
+ synchronized (currentDataFile) {
fileMap.clear();
fileByFileMap.clear();
dataFiles.clear();
@@ -579,37 +694,52 @@ public class Journal {
result &= dataFile.delete();
}
- totalLength.set(0);
- fileMap.clear();
- fileByFileMap.clear();
- lastAppendLocation.set(null);
- dataFiles = new LinkedNodeList<DataFile>();
+ if (preAllocateNextDataFileFuture != null) {
+ preAllocateNextDataFileFuture.cancel(true);
+ }
+ synchronized (dataFileIdLock) {
+ if (nextDataFile != null) {
+ nextDataFile.delete();
+ nextDataFile = null;
+ }
+ }
+ totalLength.set(0);
+ synchronized (currentDataFile) {
+ fileMap.clear();
+ fileByFileMap.clear();
+ lastAppendLocation.set(null);
+ dataFiles = new LinkedNodeList<DataFile>();
+ }
// reopen open file handles...
accessorPool = new DataFileAccessorPool(this);
appender = new DataFileAppender(this);
return result;
}
- public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
+ public void removeDataFiles(Set<Integer> files) throws IOException {
for (Integer key : files) {
// Can't remove the data file (or subsequent files) that is currently being written to.
if (key >= lastAppendLocation.get().getDataFileId()) {
continue;
}
- DataFile dataFile = fileMap.get(key);
+ DataFile dataFile = null;
+ synchronized (currentDataFile) {
+ dataFile = fileMap.remove(key);
+ if (dataFile != null) {
+ fileByFileMap.remove(dataFile.getFile());
+ dataFile.unlink();
+ }
+ }
if (dataFile != null) {
forceRemoveDataFile(dataFile);
}
}
}
- private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
+ private void forceRemoveDataFile(DataFile dataFile) throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
- fileByFileMap.remove(dataFile.getFile());
- fileMap.remove(dataFile.getDataFileId());
totalLength.addAndGet(-dataFile.getLength());
- dataFile.unlink();
if (archiveDataLogs) {
File directoryArchive = getDirectoryArchive();
if (directoryArchive.exists()) {
@@ -657,13 +787,15 @@ public class Journal {
return directory.toString();
}
- public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
-
+ public Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null;
while (true) {
if (cur == null) {
if (location == null) {
- DataFile head = dataFiles.getHead();
+ DataFile head = null;
+ synchronized (currentDataFile) {
+ head = dataFiles.getHead();
+ }
if (head == null) {
return null;
}
@@ -687,7 +819,9 @@ public class Journal {
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
- dataFile = dataFile.getNext();
+ synchronized (currentDataFile) {
+ dataFile = dataFile.getNext();
+ }
if (dataFile == null) {
return null;
} else {
@@ -708,9 +842,14 @@ public class Journal {
if (corruptedRange != null) {
// skip corruption
cur.setSize((int) corruptedRange.range());
- } else if (cur.getType() == 0) {
+ } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
+ (cur.getType() == 0 && cur.getSize() == 0)) {
// eof - jump to next datafile
- cur.setOffset(maxFileLength);
+ // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
+ // replay of existing journals
+ // possibly journal is larger than maxFileLength after config change
+ cur.setSize(EOF_RECORD.length);
+ cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
} else if (cur.getType() == USER_RECORD_TYPE) {
// Only return user records.
return cur;
@@ -718,7 +857,7 @@ public class Journal {
}
}
- public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
+ public ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
ByteSequence rc = null;
@@ -816,34 +955,24 @@ public class Journal {
this.archiveDataLogs = archiveDataLogs;
}
- public synchronized DataFile getDataFileById(int dataFileId) {
- if (dataFiles.isEmpty()) {
- return null;
+ public DataFile getDataFileById(int dataFileId) {
+ synchronized (currentDataFile) {
+ return fileMap.get(Integer.valueOf(dataFileId));
}
-
- return fileMap.get(Integer.valueOf(dataFileId));
}
- public synchronized DataFile getCurrentDataFile() {
- if (dataFiles.isEmpty()) {
- return null;
- }
-
- DataFile current = dataFiles.getTail();
-
- if (current != null) {
- return current;
- } else {
- return null;
+ public DataFile getCurrentDataFile(int capacity) throws IOException {
+ synchronized (currentDataFile) {
+ if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
+ rotateWriteFile();
+ }
+ return currentDataFile.get();
}
}
- public synchronized Integer getCurrentDataFileId() {
- DataFile current = getCurrentDataFile();
- if (current != null) {
- return current.getDataFileId();
- } else {
- return null;
+ public Integer getCurrentDataFileId() {
+ synchronized (currentDataFile) {
+ return currentDataFile.get().getDataFileId();
}
}
@@ -853,11 +982,15 @@ public class Journal {
* @return files currently being used
*/
public Set<File> getFiles() {
- return fileByFileMap.keySet();
+ synchronized (currentDataFile) {
+ return fileByFileMap.keySet();
+ }
}
- public synchronized Map<Integer, DataFile> getFileMap() {
- return new TreeMap<Integer, DataFile>(fileMap);
+ public Map<Integer, DataFile> getFileMap() {
+ synchronized (currentDataFile) {
+ return new TreeMap<Integer, DataFile>(fileMap);
+ }
}
public long getDiskSize() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
index 3e3e090..a80328f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
@@ -227,20 +227,18 @@ public class TargetedDataFileAppender implements FileAppender {
}
// append 'unset' next batch (5 bytes) so read can always find eof
- buff.writeInt(0);
- buff.writeByte(0);
-
+ buff.write(Journal.EOF_RECORD);
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
- buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+ buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
if (journal.isChecksum()) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(),
sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
- sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+ sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
buff.writeLong(checksum.getValue());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index cf60a08..faf0022 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -128,8 +128,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles);
- adapter.setPreallocationStrategy("zeros");
- adapter.setPreallocationScope("entire_journal");
+ adapter.setPreallocationStrategy(Journal.PreallocationStrategy.ZEROS.name());
+ adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
}
@After
@@ -259,6 +259,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
corruptOrderIndex(id, size);
randomAccessFile.getChannel().force(true);
+ dataFile.closeRandomAccessFile(randomAccessFile);
}
private void corruptBatchEndEof(int id) throws Exception{
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
index 84c2ab5..2e34686 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
@@ -114,6 +114,8 @@ public class JournalCorruptionIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(true);
+
+ adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
}
@After
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
index 3c65814..987c2d3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
index d6c64f4..32e2f0c 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalLatencyTest.java
@@ -39,10 +39,13 @@ public class PreallocationJournalLatencyTest {
TimeStatisticImpl sparse = executeTest(Journal.PreallocationStrategy.SPARSE_FILE.name());
TimeStatisticImpl chunked_zeros = executeTest(Journal.PreallocationStrategy.CHUNKED_ZEROS.name());
- TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name());
+ //TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name());
+ TimeStatisticImpl kernel = executeTest(Journal.PreallocationStrategy.OS_KERNEL_COPY.name());
+
LOG.info(" sparse: " + sparse);
LOG.info(" chunked: " + chunked_zeros);
- LOG.info(" zeros: " + zeros);
+ //LOG.info(" zeros: " + zeros);
+ LOG.info(" kernel: " + kernel);
}
@@ -50,11 +53,13 @@ public class PreallocationJournalLatencyTest {
int randInt = rand.nextInt(100);
File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
- KahaDBStore store = new KahaDBStore();
- store.setJournalMaxFileLength(16*1204*1024);
+ final KahaDBStore store = new KahaDBStore();
+ store.setCheckpointInterval(5000);
+ store.setJournalMaxFileLength(32*1204*1024);
store.deleteAllMessages();
store.setDirectory(dataDirectory);
store.setPreallocationStrategy(preallocationStrategy);
+ store.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
store.start();
final File journalLog = new File(dataDirectory, "db-1.log");
@@ -66,7 +71,7 @@ public class PreallocationJournalLatencyTest {
}));
final Journal journal = store.getJournal();
- ByteSequence byteSequence = new ByteSequence(new byte[8*1024]);
+ ByteSequence byteSequence = new ByteSequence(new byte[16*1024]);
TimeStatisticImpl timeStatistic = new TimeStatisticImpl("append", "duration");
for (int i=0;i<5000; i++) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
index a6cdb9e..bbdcde7 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.ByteSequence;
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
index 3e41dc9..087b9be 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -224,6 +225,7 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport {
properties.put("maxFileLength", maxFileLengthVal);
properties.put("cleanupInterval", "2000");
properties.put("checkpointInterval", "2000");
+ properties.put("preallocationScope", Journal.PreallocationScope.ENTIRE_JOURNAL.name());
// there are problems with duplicate dispatch in the cursor, which maintain
// a map of messages. A dup dispatch can be dropped.
// see: org.apache.activemq.broker.region.cursors.OrderedPendingList
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
index 6494efe..58e27f8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
@@ -31,6 +31,7 @@ import org.junit.Test;
import javax.jms.*;
import java.io.File;
+import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@@ -101,6 +102,7 @@ public class AMQ3120Test {
private int getFileCount(File dir){
if (dir.isDirectory()) {
String[] children = dir.list();
+ LOG.info("Children: " + Arrays.asList(children));
return children.length;
}
@@ -112,7 +114,7 @@ public class AMQ3120Test {
final int messageCount = 500;
startBroker(true);
int fileCount = getFileCount(kahaDbDir);
- assertEquals(4, fileCount);
+ assertEquals(5, fileCount);
Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
index e965731..5db7579 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
@@ -116,7 +116,7 @@ public class AMQ4323Test {
final int messageCount = 500;
startBroker(true);
int fileCount = getFileCount(kahaDbDir);
- assertEquals(4, fileCount);
+ assertEquals(5, fileCount);
Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
@@ -149,7 +149,7 @@ public class AMQ4323Test {
public boolean isSatisified() throws Exception {
int fileCount = getFileCount(kahaDbDir);
LOG.info("current filecount:" + fileCount);
- return 4 == fileCount;
+ return 5 == fileCount;
}
}));
http://git-wip-us.apache.org/repos/asf/activemq/blob/62bdbb0d/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
index 4a23331..cf9522f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
@@ -135,7 +135,7 @@ public class KahaDBIndexLocationTest {
// Should contain the initial log for the journal and the lock.
assertNotNull(journal);
- assertEquals(2, journal.length);
+ assertEquals(3, journal.length);
}
@Test