You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/09/07 16:17:40 UTC
[04/14] activemq git commit: [AMQ-6771] do linear sequential scan of
journal when validating checksums - remove batch reads via seek/read which
depend on write batch size
[AMQ-6771] do linear sequential scan of journal when validating checksums - remove batch reads via seek/read which depend on write batch size
(cherry picked from commit 8c218ee05d2529e62bb1792b20d22e7086169e6a)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ba5e8146
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ba5e8146
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ba5e8146
Branch: refs/heads/activemq-5.15.x
Commit: ba5e814667e4382717a64a11982aef217464d9e5
Parents: 7a2c4ee
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 17 12:18:25 2017 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Sep 7 12:10:20 2017 -0400
----------------------------------------------------------------------
.../org/apache/activemq/util/ByteSequence.java | 22 +++-
.../activemq/util/DataByteArrayInputStream.java | 4 +-
.../activemq/store/kahadb/MessageDatabase.java | 3 +-
.../kahadb/disk/journal/DataFileAccessor.java | 37 +-----
.../store/kahadb/disk/journal/Journal.java | 125 +++++++++++--------
.../util/RecoverableRandomAccessFile.java | 2 +-
.../JournalCorruptionEofIndexRecoveryTest.java | 23 +++-
.../store/kahadb/JournalFdRecoveryTest.java | 37 +++++-
8 files changed, 158 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
index 2699856..ac1e01a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
@@ -50,6 +50,8 @@ public class ByteSequence {
return offset;
}
+ public int remaining() { return length - offset; }
+
public void setData(byte[] data) {
this.data = data;
}
@@ -71,8 +73,14 @@ public class ByteSequence {
}
}
+ public void reset() {
+ length = remaining();
+ System.arraycopy(data, offset, data, 0, length);
+ offset = 0;
+ }
+
public int indexOf(ByteSequence needle, int pos) {
- int max = length - needle.length;
+ int max = length - needle.length - offset;
for (int i = pos; i < max; i++) {
if (matches(needle, i)) {
return i;
@@ -102,4 +110,16 @@ public class ByteSequence {
}
return -1;
}
+
+ public boolean startsWith(final byte[] bytes) {
+ if (length - offset < bytes.length) {
+ return false;
+ }
+ for (int i = 0; i<bytes.length; i++) {
+ if (data[offset+i] != bytes[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
index 2fe92a1..3b42c9f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
@@ -65,6 +65,8 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return pos - offset;
}
+ public int position() { return pos; }
+
/**
* @return the underlying data array
*/
@@ -224,7 +226,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
}
public long readLong() throws IOException {
- if (pos >= buf.length ) {
+ if (pos + 8 >= buf.length ) {
throw new EOFException();
}
long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3321b35..40e8f95 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -480,6 +480,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
IOHelper.mkdirs(directory);
if (deleteAllMessages) {
+ getJournal().setCheckForCorruptionOnStartup(false);
getJournal().start();
getJournal().delete();
getJournal().close();
@@ -1048,7 +1049,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private Location getNextInitializedLocation(Location location) throws IOException {
Location mayNotBeInitialized = journal.getNextLocation(location);
- if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
+ if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
// need to init size and type to skip
return journal.getNextLocation(mayNotBeInitialized);
} else {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 71c2195..548d3b1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -17,6 +17,7 @@
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.util.ByteSequence;
@@ -115,38 +116,6 @@ final class DataFileAccessor {
}
}
-// public boolean readLocationDetailsAndValidate(Location location) {
-// try {
-// WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
-// if (asyncWrite != null) {
-// location.setSize(asyncWrite.location.getSize());
-// location.setType(asyncWrite.location.getType());
-// } else {
-// file.seek(location.getOffset());
-// location.setSize(file.readInt());
-// location.setType(file.readByte());
-//
-// byte data[] = new byte[3];
-// file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
-// file.readFully(data);
-// if (data[0] != Journal.ITEM_HEAD_SOR[0]
-// || data[1] != Journal.ITEM_HEAD_SOR[1]
-// || data[2] != Journal.ITEM_HEAD_SOR[2]) {
-// return false;
-// }
-// file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
-// file.readFully(data);
-// if (data[0] != Journal.ITEM_HEAD_EOR[0]
-// || data[1] != Journal.ITEM_HEAD_EOR[1]
-// || data[2] != Journal.ITEM_HEAD_EOR[2]) {
-// return false;
-// }
-// }
-// } catch (IOException e) {
-// return false;
-// }
-// return true;
-// }
public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
@@ -157,4 +126,8 @@ final class DataFileAccessor {
file.sync();
}
}
+
+ public RecoverableRandomAccessFile getRaf() {
+ return file;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index a78cc65..5edee92 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -90,12 +89,21 @@ public class Journal {
// with corruption on recovery we have no faith in the content - slip to the next batch record or eof
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
- Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
+ RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
+ randomAccessFile.seek(recoveryPosition.getOffset() + 1);
+ byte[] data = new byte[getWriteBatchSize()];
+ ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
+ int nextOffset = 0;
+ if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
+ nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
+ } else {
+ nextOffset = Math.toIntExact(randomAccessFile.length());
+ }
+ Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
// skip corruption on getNextLocation
- recoveryPosition.setOffset((int) sequence.getLast() + 1);
+ recoveryPosition.setOffset(nextOffset);
recoveryPosition.setSize(-1);
dataFile.corruptedBlocks.add(sequence);
@@ -463,21 +471,19 @@ public class Journal {
}
public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
- int firstBatchRecordSize = -1;
if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
- Location location = new Location();
- location.setDataFileId(dataFile.getDataFileId());
- location.setOffset(0);
-
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
+ byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length];
+ reader.readFully(0, firstFewBytes);
+ ByteSequence bs = new ByteSequence(firstFewBytes);
+ return bs.startsWith(EOF_RECORD);
} catch (Exception ignored) {
} finally {
accessorPool.closeDataFileAccessor(reader);
}
}
- return firstBatchRecordSize == 0;
+ return false;
}
protected Location recoveryCheck(DataFile dataFile) throws IOException {
@@ -487,9 +493,15 @@ public class Journal {
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
+ RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
+ randomAccessFile.seek(0);
+ final long totalFileLength = randomAccessFile.length();
+ byte[] data = new byte[getWriteBatchSize()];
+ ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
+
while (true) {
- int size = checkBatchRecord(reader, location.getOffset());
- if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
+ int size = checkBatchRecord(bs, randomAccessFile);
+ if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
if (size == 0) {
// eof batch record
break;
@@ -500,8 +512,8 @@ public class Journal {
// Perhaps it's just some corruption... scan through the
// file to find the next valid batch record. We
// may have subsequent valid batch records.
- int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
- if (nextOffset >= 0) {
+ if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
+ int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
dataFile.corruptedBlocks.add(sequence);
@@ -533,41 +545,33 @@ public class Journal {
return location;
}
- private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
- ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
- byte data[] = new byte[1024*4];
- ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
-
+ private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
+ final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
int pos = 0;
while (true) {
- pos = bs.indexOf(header, pos);
+ pos = bs.indexOf(header, 0);
if (pos >= 0) {
- return offset + pos;
+ bs.setOffset(bs.offset + pos);
+ return pos;
} else {
// need to load the next data chunck in..
- if (bs.length != data.length) {
+ if (bs.length != bs.data.length) {
// If we had a short read then we were at EOF
return -1;
}
- offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
- bs = new ByteSequence(data, 0, reader.read(offset, data));
- pos = 0;
+ bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length);
+ bs.reset();
+ bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length));
}
}
}
- public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
- byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+ private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
- try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
-
- reader.readFully(offset, controlRecord);
-
- // check for journal eof
- if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
- // eof batch
- return 0;
- }
+ if (bs.startsWith(EOF_RECORD)) {
+ return 0; // eof
+ }
+ try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
// Assert that it's a batch record.
for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
@@ -578,28 +582,43 @@ public class Journal {
int size = controlIs.readInt();
if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
- return -1;
+ return -2;
}
- if (isChecksum()) {
-
- long expectedChecksum = controlIs.readLong();
- if (expectedChecksum == 0) {
- // Checksuming was not enabled when the record was stored.
- // we can't validate the record :(
- return size;
- }
-
- byte data[] = new byte[size];
- reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
+ long expectedChecksum = controlIs.readLong();
+ Checksum checksum = null;
+ if (isChecksum() && expectedChecksum > 0) {
+ checksum = new Adler32();
+ }
- Checksum checksum = new Adler32();
- checksum.update(data, 0, data.length);
+ // revert to bs to consume data
+ bs.setOffset(controlIs.position());
+ int toRead = size;
+ while (toRead > 0) {
+ if (bs.remaining() >= toRead) {
+ if (checksum != null) {
+ checksum.update(bs.getData(), bs.getOffset(), toRead);
+ }
+ bs.setOffset(bs.offset + toRead);
+ toRead = 0;
+ } else {
+ if (bs.length != bs.data.length) {
+ // buffer exhausted
+ return -3;
+ }
- if (expectedChecksum != checksum.getValue()) {
- return -1;
+ toRead -= bs.remaining();
+ if (checksum != null) {
+ checksum.update(bs.getData(), bs.getOffset(), bs.remaining());
+ }
+ bs.setLength(reader.read(bs.data));
+ bs.setOffset(0);
}
}
+ if (checksum != null && expectedChecksum != checksum.getValue()) {
+ return -4;
+ }
+
return size;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
index f9e6a45..309272a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
@@ -48,7 +48,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
this(new File(name), mode);
}
- protected RandomAccessFile getRaf() throws IOException {
+ public RandomAccessFile getRaf() throws IOException {
if (raf == null) {
raf = new RandomAccessFile(file, mode);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index 221b087..16598ea 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -64,10 +64,12 @@ public class JournalCorruptionEofIndexRecoveryTest {
private String connectionUri;
private KahaDBPersistenceAdapter adapter;
private boolean ignoreMissingJournalFiles = false;
+ private int journalMaxBatchSize;
private final Destination destination = new ActiveMQQueue("Test");
private final String KAHADB_DIRECTORY = "target/activemq-data/";
private final String payload = new String(new byte[1024]);
+ File brokerDataDir = null;
protected void startBroker() throws Exception {
doStartBroker(true, false);
@@ -78,14 +80,13 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception {
- File dataDir = broker.getPersistenceAdapter().getDirectory();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
if (whackIndex) {
- File indexToDelete = new File(dataDir, "db.data");
+ File indexToDelete = new File(brokerDataDir, "db.data");
LOG.info("Whacking index: " + indexToDelete);
indexToDelete.delete();
}
@@ -113,6 +114,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
cf = new ActiveMQConnectionFactory(connectionUri);
broker.start();
+ brokerDataDir = broker.getPersistenceAdapter().getDirectory();
LOG.info("Starting broker..");
}
@@ -124,6 +126,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
// ensure there are a bunch of data files but multiple entries in each
adapter.setJournalMaxFileLength(1024 * 20);
+ adapter.setJournalMaxWriteBatchSize(journalMaxBatchSize);
+
// speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(5000);
adapter.setCleanupInterval(5000);
@@ -146,6 +150,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
@Before
public void reset() throws Exception {
ignoreMissingJournalFiles = true;
+ journalMaxBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
}
@Test
@@ -234,6 +239,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
assertEquals("Drain", numToSend, drainQueue(numToSend));
}
+ @Test
+ public void testRecoverIndexWithSmallBatch() throws Exception {
+ journalMaxBatchSize = 2 * 1024;
+ startBroker();
+
+ final int numToSend = 4;
+ produceMessagesToConsumeMultipleDataFiles(numToSend);
+
+ // force journal replay by whacking the index
+ restartBroker(false, true);
+
+ assertEquals("Drain", numToSend, drainQueue(numToSend));
+ }
+
@Test
public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ba5e8146/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
index 633ab5c..ffe8ab6 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ import javax.management.Attribute;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -54,8 +57,7 @@ public class JournalFdRecoveryTest {
private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class);
private final String KAHADB_DIRECTORY = "target/activemq-data/";
- private final String payload = new String(new byte[1024]);
-
+ private String payload;
private ActiveMQConnectionFactory cf = null;
private BrokerService broker = null;
private final Destination destination = new ActiveMQQueue("Test");
@@ -63,6 +65,7 @@ public class JournalFdRecoveryTest {
private KahaDBPersistenceAdapter adapter;
public byte fill = Byte.valueOf("3");
+ private int maxJournalSizeBytes;
protected void startBroker() throws Exception {
doStartBroker(true);
@@ -88,7 +91,6 @@ public class JournalFdRecoveryTest {
}
private void doCreateBroker(boolean delete) throws Exception {
-
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true);
@@ -112,7 +114,7 @@ public class JournalFdRecoveryTest {
adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
// ensure there are a bunch of data files but multiple entries in each
- adapter.setJournalMaxFileLength(1024 * 20);
+ adapter.setJournalMaxFileLength(maxJournalSizeBytes);
// speed up the test case, checkpoint an cleanup early and often
adapter.setCheckpointInterval(5000);
@@ -132,6 +134,12 @@ public class JournalFdRecoveryTest {
}
}
+ @Before
+ public void initPayLoad() {
+ payload = new String(new byte[1024]);
+ maxJournalSizeBytes = 1024 * 20;
+ }
+
@Test
public void testStopOnPageInIOError() throws Exception {
@@ -236,6 +244,27 @@ public class JournalFdRecoveryTest {
}
+ @Test
+ public void testRecoveryCheckSpeedSmallMessages() throws Exception {
+ maxJournalSizeBytes = Journal.DEFAULT_MAX_FILE_LENGTH;
+ doCreateBroker(true);
+ broker.start();
+
+ int toSend = 20000;
+ payload = new String(new byte[100]);
+ produceMessagesToConsumeMultipleDataFiles(toSend);
+
+ broker.stop();
+ broker.waitUntilStopped();
+
+ Instant b = Instant.now();
+ doStartBroker(false);
+ Instant e = Instant.now();
+
+ Duration timeElapsed = Duration.between(b, e);
+ LOG.info("Elapsed: " + timeElapsed);
+ }
+
private long totalOpenFileDescriptorCount(BrokerService broker) {
long result = 0;
try {