You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:04 UTC
[05/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 068e697..865cf72 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -76,8 +76,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
* <p></p>
* <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
*/
-public class JournalImpl extends JournalBase implements TestableJournal, JournalRecordProvider
-{
+public class JournalImpl extends JournalBase implements TestableJournal, JournalRecordProvider {
// Constants -----------------------------------------------------
@@ -95,13 +94,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
- private static void trace(final String message)
- {
+ private static void trace(final String message) {
ActiveMQJournalLogger.LOGGER.trace(message);
}
- private static void traceRecord(final String message)
- {
+ private static void traceRecord(final String message) {
ActiveMQJournalLogger.LOGGER.trace(message);
}
@@ -223,12 +220,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxAIO)
- {
+ final int maxAIO) {
this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
}
-
public JournalImpl(final int fileSize,
final int minFiles,
final int compactMinFiles,
@@ -236,29 +231,24 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxAIO, final int userVersion)
- {
+ final int maxAIO,
+ final int userVersion) {
super(fileFactory.isSupportsCallbacks(), fileSize);
- if (fileSize % fileFactory.getAlignment() != 0)
- {
+ if (fileSize % fileFactory.getAlignment() != 0) {
throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
fileFactory.getAlignment());
}
- if (minFiles < 2)
- {
+ if (minFiles < 2) {
throw new IllegalArgumentException("minFiles cannot be less than 2");
}
- if (compactPercentage < 0 || compactPercentage > 100)
- {
+ if (compactPercentage < 0 || compactPercentage > 100) {
throw new IllegalArgumentException("Compact Percentage out of range");
}
- if (compactPercentage == 0)
- {
+ if (compactPercentage == 0) {
this.compactPercentage = 0;
}
- else
- {
+ else {
this.compactPercentage = compactPercentage / 100f;
}
@@ -267,46 +257,34 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
this.fileFactory = fileFactory;
- filesRepository = new JournalFilesRepository(fileFactory,
- this,
- filePrefix,
- fileExtension,
- userVersion,
- maxAIO,
- fileSize,
- minFiles);
+ filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
this.userVersion = userVersion;
}
@Override
- public String toString()
- {
+ public String toString() {
return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
}
- public void runDirectJournalBlast() throws Exception
- {
+ public void runDirectJournalBlast() throws Exception {
final int numIts = 100000000;
ActiveMQJournalLogger.LOGGER.runningJournalBlast(numIts);
final CountDownLatch latch = new CountDownLatch(numIts * 2);
- class MyAIOCallback implements IOCompletion
- {
- public void done()
- {
+ class MyAIOCallback implements IOCompletion {
+
+ public void done() {
latch.countDown();
}
- public void onError(final int errorCode, final String errorMessage)
- {
+ public void onError(final int errorCode, final String errorMessage) {
}
- public void storeLineUp()
- {
+ public void storeLineUp() {
}
}
@@ -316,20 +294,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final byte[] bytes = new byte[recordSize];
- class MyRecord implements EncodingSupport
- {
+ class MyRecord implements EncodingSupport {
- public void decode(final ActiveMQBuffer buffer)
- {
+ public void decode(final ActiveMQBuffer buffer) {
}
- public void encode(final ActiveMQBuffer buffer)
- {
+ public void encode(final ActiveMQBuffer buffer) {
buffer.writeBytes(bytes);
}
- public int getEncodeSize()
- {
+ public int getEncodeSize() {
return recordSize;
}
@@ -337,8 +311,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
MyRecord record = new MyRecord();
- for (int i = 0; i < numIts; i++)
- {
+ for (int i = 0; i < numIts; i++) {
appendAddRecord(i, (byte) 1, record, true, task);
appendDeleteRecord(i, true, task);
}
@@ -346,18 +319,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
latch.await();
}
- public Map<Long, JournalRecord> getRecords()
- {
+ public Map<Long, JournalRecord> getRecords() {
return records;
}
- public JournalFile getCurrentFile()
- {
+ public JournalFile getCurrentFile() {
return currentFile;
}
- public JournalCompactor getCompactor()
- {
+ public JournalCompactor getCompactor() {
return compactor;
}
@@ -365,33 +335,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* this method is used internally only however tools may use it to maintenance.
* It won't be part of the interface as the tools should be specific to the implementation
*/
- public List<JournalFile> orderFiles() throws Exception
- {
+ public List<JournalFile> orderFiles() throws Exception {
List<String> fileNames = fileFactory.listFiles(filesRepository.getFileExtension());
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
- for (String fileName : fileNames)
- {
+ for (String fileName : fileNames) {
SequentialFile file = fileFactory.createSequentialFile(fileName);
- if (file.size() >= SIZE_HEADER)
- {
+ if (file.size() >= SIZE_HEADER) {
file.open();
- try
- {
+ try {
JournalFileImpl jrnFile = readFileHeader(file);
orderedFiles.add(jrnFile);
}
- finally
- {
+ finally {
file.close();
}
}
- else
- {
+ else {
ActiveMQJournalLogger.LOGGER.ignoringShortFile(fileName);
file.delete();
}
@@ -410,20 +374,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
*/
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
- final JournalReaderCallback reader) throws Exception
- {
+ final JournalReaderCallback reader) throws Exception {
file.getFile().open(1, false);
ByteBuffer wholeFileBuffer = null;
- try
- {
+ try {
final int filesize = (int) file.getFile().size();
wholeFileBuffer = fileFactory.newBuffer(filesize);
final int journalFileSize = file.getFile().read(wholeFileBuffer);
- if (journalFileSize != filesize)
- {
+ if (journalFileSize != filesize) {
throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
}
@@ -432,22 +393,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
int lastDataPos = JournalImpl.SIZE_HEADER;
- while (wholeFileBuffer.hasRemaining())
- {
+ while (wholeFileBuffer.hasRemaining()) {
final int pos = wholeFileBuffer.position();
byte recordType = wholeFileBuffer.get();
- if (recordType < JournalImpl.ADD_RECORD || recordType > JournalImpl.ROLLBACK_RECORD)
- {
+ if (recordType < JournalImpl.ADD_RECORD || recordType > JournalImpl.ROLLBACK_RECORD) {
// I - We scan for any valid record on the file. If a hole
// happened on the middle of the file we keep looking until all
// the possibilities are gone
continue;
}
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
reader.markAsDataFile(file);
wholeFileBuffer.position(pos + 1);
@@ -461,18 +419,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
- if (readFileId != file.getRecordID())
- {
+ if (readFileId != file.getRecordID()) {
wholeFileBuffer.position(pos + 1);
continue;
}
short compactCount = 0;
- if (file.getJournalVersion() >= 2)
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
- {
+ if (file.getJournalVersion() >= 2) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE)) {
reader.markAsDataFile(file);
wholeFileBuffer.position(pos + 1);
@@ -484,10 +439,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
long transactionID = 0;
- if (JournalImpl.isTransaction(recordType))
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
- {
+ if (JournalImpl.isTransaction(recordType)) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG)) {
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
continue;
@@ -499,10 +452,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
long recordID = 0;
// If prepare or commit
- if (!JournalImpl.isCompleteTransaction(recordType))
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
- {
+ if (!JournalImpl.isCompleteTransaction(recordType)) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG)) {
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
continue;
@@ -525,10 +476,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
byte[] record = null;
- if (JournalImpl.isContainsBody(recordType))
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
+ if (JournalImpl.isContainsBody(recordType)) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
continue;
@@ -536,10 +485,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
variableSize = wholeFileBuffer.getInt();
- if (recordType != JournalImpl.DELETE_RECORD_TX)
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1))
- {
+ if (recordType != JournalImpl.DELETE_RECORD_TX) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1)) {
wholeFileBuffer.position(pos + 1);
continue;
}
@@ -547,8 +494,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
userRecordType = wholeFileBuffer.get();
}
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
- {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize)) {
wholeFileBuffer.position(pos + 1);
continue;
}
@@ -562,20 +508,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// currentFile
int transactionCheckNumberOfRecords = 0;
- if (recordType == JournalImpl.PREPARE_RECORD || recordType == JournalImpl.COMMIT_RECORD)
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
+ if (recordType == JournalImpl.PREPARE_RECORD || recordType == JournalImpl.COMMIT_RECORD) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
wholeFileBuffer.position(pos + 1);
continue;
}
transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
- if (recordType == JournalImpl.PREPARE_RECORD)
- {
- if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
+ if (recordType == JournalImpl.PREPARE_RECORD) {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT)) {
wholeFileBuffer.position(pos + 1);
continue;
}
@@ -591,8 +533,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// of the record,
// But we avoid buffer overflows by damaged data
if (JournalImpl.isInvalidSize(journalFileSize, pos, recordSize + variableSize +
- preparedTransactionExtraDataSize))
- {
+ preparedTransactionExtraDataSize)) {
// Avoid a buffer overflow caused by damaged data... continue
// scanning for more pendingTransactions...
JournalImpl.trace("Record at position " + pos +
@@ -619,8 +560,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
wholeFileBuffer.position(pos + variableSize +
recordSize +
- preparedTransactionExtraDataSize -
- DataConstants.SIZE_INT);
+ preparedTransactionExtraDataSize - DataConstants.SIZE_INT);
int checkSize = wholeFileBuffer.getInt();
@@ -628,8 +568,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// informed at the beginning.
// This is like testing a hash for the record. (We could replace the
// checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize) {
JournalImpl.trace("Record at position " + pos +
" recordType = " +
recordType +
@@ -655,58 +594,38 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// At this point everything is checked. So we relax and just load
// the data now.
- switch (recordType)
- {
- case ADD_RECORD:
- {
+ switch (recordType) {
+ case ADD_RECORD: {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
- case UPDATE_RECORD:
- {
+ case UPDATE_RECORD: {
reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
- case DELETE_RECORD:
- {
+ case DELETE_RECORD: {
reader.onReadDeleteRecord(recordID);
break;
}
- case ADD_RECORD_TX:
- {
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
- userRecordType,
- record,
- false,
- compactCount));
+ case ADD_RECORD_TX: {
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
- case UPDATE_RECORD_TX:
- {
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
- userRecordType,
- record,
- true,
- compactCount));
+ case UPDATE_RECORD_TX: {
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
- case DELETE_RECORD_TX:
- {
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
- (byte) 0,
- record,
- true,
- compactCount));
+ case DELETE_RECORD_TX: {
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, compactCount));
break;
}
- case PREPARE_RECORD:
- {
+ case PREPARE_RECORD: {
byte[] extraData = new byte[preparedTransactionExtraDataSize];
@@ -716,19 +635,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
break;
}
- case COMMIT_RECORD:
- {
+ case COMMIT_RECORD: {
reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
break;
}
- case ROLLBACK_RECORD:
- {
+ case ROLLBACK_RECORD: {
reader.onReadRollbackRecord(transactionID);
break;
}
- default:
- {
+ default: {
throw new IllegalStateException("Journal " + file.getFile().getFileName() +
" is corrupt, invalid record type " +
recordType);
@@ -740,8 +656,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This is a sanity check about the loading code itself.
// If this checkSize doesn't match, it means the reading method is
// not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize) {
throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
", pos = " +
pos);
@@ -753,24 +668,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return lastDataPos;
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
throw new Exception(e.getMessage(), e);
}
- finally
- {
- if (wholeFileBuffer != null)
- {
+ finally {
+ if (wholeFileBuffer != null) {
fileFactory.releaseBuffer(wholeFileBuffer);
}
- try
- {
+ try {
file.getFile().close();
}
- catch (Throwable ignored)
- {
+ catch (Throwable ignored) {
}
}
}
@@ -783,28 +693,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final byte recordType,
final EncodingSupport record,
final boolean sync,
- final IOCompletion callback) throws Exception
- {
+ final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
- if (callback != null)
- {
+ if (callback != null) {
callback.storeLineUp();
}
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendAddRecord::id=" + id +
", userRecordType=" +
recordType +
@@ -814,13 +719,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
@@ -830,38 +733,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final byte recordType,
final EncodingSupport record,
final boolean sync,
- final IOCompletion callback) throws Exception
- {
+ final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalRecord jrnRecord = records.get(id);
- if (jrnRecord == null)
- {
- if (!(compactor != null && compactor.lookupRecord(id)))
- {
+ if (jrnRecord == null) {
+ if (!(compactor != null && compactor.lookupRecord(id))) {
throw new IllegalStateException("Cannot find add info " + id);
}
}
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
- if (callback != null)
- {
+ if (callback != null) {
callback.storeLineUp();
}
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
", userRecordType=" +
recordType +
@@ -871,91 +767,73 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
- if (jrnRecord == null)
- {
+ if (jrnRecord == null) {
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
}
- else
- {
+ else {
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
-
@Override
- public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
- {
+ public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalRecord record = null;
- if (compactor == null)
- {
+ if (compactor == null) {
record = records.remove(id);
- if (record == null)
- {
+ if (record == null) {
throw new IllegalStateException("Cannot find add info " + id);
}
}
- else
- {
- if (!records.containsKey(id) && !compactor.lookupRecord(id))
- {
+ else {
+ if (!records.containsKey(id) && !compactor.lookupRecord(id)) {
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
}
}
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
- if (callback != null)
- {
+ if (callback != null) {
callback.storeLineUp();
}
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
}
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
- if (record == null)
- {
+ if (record == null) {
compactor.addCommandDelete(id, usedFile);
}
- else
- {
+ else {
record.delete(usedFile);
}
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
@@ -964,25 +842,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception
- {
+ final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
@@ -994,27 +868,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
- private void checkJournalIsLoaded()
- {
- if (state != JournalState.LOADED && state != JournalState.SYNCING)
- {
+ private void checkJournalIsLoaded() {
+ if (state != JournalState.LOADED && state != JournalState.SYNCING) {
throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
}
}
- private void setJournalState(JournalState newState)
- {
+ private void setJournalState(JournalState newState) {
state = newState;
}
@@ -1022,25 +891,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception
- {
+ final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
@@ -1052,38 +917,33 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
-
@Override
- public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
- {
+ public void appendDeleteRecordTransactional(final long txID,
+ final long id,
+ final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendDeleteRecordTransactional::txID=" + txID +
", id=" +
id +
@@ -1093,13 +953,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addNegative(usedFile, id);
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
@@ -1120,197 +978,167 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendPrepareRecord(final long txID,
final EncodingSupport transactionData,
final boolean sync,
- final IOCompletion callback) throws Exception
- {
+ final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalTransaction tx = getTransactionInfo(txID);
- JournalInternalRecord prepareRecord =
- new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+ JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
- if (callback != null)
- {
+ if (callback != null) {
callback.storeLineUp();
}
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
}
tx.prepare(usedFile);
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
@Override
- public void lineUpContext(IOCompletion callback)
- {
+ public void lineUpContext(IOCompletion callback) {
callback.storeLineUp();
}
-
/**
* Regarding the number of operations in a given file see {@link JournalCompleteRecordTX}.
*/
@Override
- public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
- {
+ public void appendCommitRecord(final long txID,
+ final boolean sync,
+ final IOCompletion callback,
+ boolean lineUpContext) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
- try
- {
+ try {
JournalTransaction tx = transactions.remove(txID);
- if (tx == null)
- {
+ if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
- if (callback != null && lineUpContext)
- {
+ if (callback != null && lineUpContext) {
callback.storeLineUp();
}
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
- if (JournalImpl.TRACE_RECORDS)
- {
+ if (JournalImpl.TRACE_RECORDS) {
JournalImpl.traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
}
tx.commit(usedFile);
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
@Override
- public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
- {
+ public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
JournalTransaction tx = null;
- try
- {
+ try {
tx = transactions.remove(txID);
- if (tx == null)
- {
+ if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
- if (callback != null)
- {
+ if (callback != null) {
callback.storeLineUp();
}
lockAppend.lock();
- try
- {
+ try {
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
// XXX make it protected?
- public int getAlignment() throws Exception
- {
+ public int getAlignment() throws Exception {
return fileFactory.getAlignment();
}
- private static final class DummyLoader implements LoaderCallback
- {
+ private static final class DummyLoader implements LoaderCallback {
+
static final LoaderCallback INSTANCE = new DummyLoader();
- public void failedTransaction(final long transactionID, final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete)
- {
+ public void failedTransaction(final long transactionID,
+ final List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete) {
}
- public void updateRecord(final RecordInfo info)
- {
+ public void updateRecord(final RecordInfo info) {
}
- public void deleteRecord(final long id)
- {
+ public void deleteRecord(final long id) {
}
- public void addRecord(final RecordInfo info)
- {
+ public void addRecord(final RecordInfo info) {
}
- public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
- {
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) {
}
}
- public synchronized JournalLoadInformation loadInternalOnly() throws Exception
- {
+ public synchronized JournalLoadInformation loadInternalOnly() throws Exception {
return load(DummyLoader.INSTANCE, true, null);
}
- public synchronized JournalLoadInformation loadSyncOnly(JournalState syncState) throws Exception
- {
+ public synchronized JournalLoadInformation loadSyncOnly(JournalState syncState) throws Exception {
assert syncState == JournalState.SYNCING || syncState == JournalState.SYNCING_UP_TO_DATE;
return load(DummyLoader.INSTANCE, true, syncState);
}
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
- final TransactionFailureCallback failureCallback) throws Exception
- {
+ final TransactionFailureCallback failureCallback) throws Exception {
return load(committedRecords, preparedTransactions, failureCallback, true);
}
@@ -1320,33 +1148,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
- final boolean fixBadTX) throws Exception
- {
+ final boolean fixBadTX) throws Exception {
final Set<Long> recordsToDelete = new HashSet<Long>();
// ArrayList was taking too long to delete elements on checkDeleteSize
final List<RecordInfo> records = new LinkedList<RecordInfo>();
final int DELETE_FLUSH = 20000;
- JournalLoadInformation info = load(new LoaderCallback()
- {
+ JournalLoadInformation info = load(new LoaderCallback() {
Runtime runtime = Runtime.getRuntime();
- private void checkDeleteSize()
- {
+ private void checkDeleteSize() {
// HORNETQ-482 - Flush deletes only if memory is critical
- if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2)
- {
+ if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2) {
ActiveMQJournalLogger.LOGGER.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
- while (iter.hasNext())
- {
+ while (iter.hasNext()) {
RecordInfo record = iter.next();
- if (recordsToDelete.contains(record.id))
- {
+ if (recordsToDelete.contains(record.id)) {
iter.remove();
}
}
@@ -1357,45 +1179,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
- {
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) {
preparedTransactions.add(preparedTransaction);
checkDeleteSize();
}
- public void addRecord(final RecordInfo info)
- {
+ public void addRecord(final RecordInfo info) {
records.add(info);
checkDeleteSize();
}
- public void updateRecord(final RecordInfo info)
- {
+ public void updateRecord(final RecordInfo info) {
records.add(info);
checkDeleteSize();
}
- public void deleteRecord(final long id)
- {
+ public void deleteRecord(final long id) {
recordsToDelete.add(id);
checkDeleteSize();
}
public void failedTransaction(final long transactionID,
final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete)
- {
- if (failureCallback != null)
- {
+ final List<RecordInfo> recordsToDelete) {
+ if (failureCallback != null) {
failureCallback.failedTransaction(transactionID, records, recordsToDelete);
}
}
}, fixBadTX, null);
- for (RecordInfo record : records)
- {
- if (!recordsToDelete.contains(record.id))
- {
+ for (RecordInfo record : records) {
+ if (!recordsToDelete.contains(record.id)) {
committedRecords.add(record);
}
}
@@ -1403,9 +1217,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return info;
}
-
- public void scheduleCompactAndBlock(int timeout) throws Exception
- {
+ public void scheduleCompactAndBlock(int timeout) throws Exception {
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch latch = newLatch(1);
@@ -1414,40 +1226,32 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// We can't use the executor for the compacting... or we would dead lock because of file open and creation
// operations (that will use the executor)
- compactorExecutor.execute(new Runnable()
- {
- public void run()
- {
+ compactorExecutor.execute(new Runnable() {
+ public void run() {
- try
- {
+ try {
JournalImpl.this.compact();
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
errors.incrementAndGet();
ActiveMQJournalLogger.LOGGER.errorCompacting(e);
e.printStackTrace();
}
- finally
- {
+ finally {
latch.countDown();
}
}
});
- try
- {
+ try {
awaitLatch(latch, timeout);
- if (errors.get() > 0)
- {
+ if (errors.get() > 0) {
throw new RuntimeException("Error during compact, look at the logs");
}
}
- finally
- {
+ finally {
compactorRunning.set(false);
}
}
@@ -1459,22 +1263,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* Note: only synchronized methods on journal are methods responsible for the life-cycle such as
* stop, start records will still come as this is being executed
*/
- public synchronized void compact() throws Exception
- {
- if (compactor != null)
- {
+ public synchronized void compact() throws Exception {
+ if (compactor != null) {
throw new IllegalStateException("There is pending compacting operation");
}
compactorLock.writeLock().lock();
- try
- {
+ try {
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
boolean previousReclaimValue = isAutoReclaim();
- try
- {
+ try {
ActiveMQJournalLogger.LOGGER.debug("Starting compacting operation on journal");
onCompactStart();
@@ -1482,10 +1282,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
journalLock.writeLock().lock();
- try
- {
- if (state != JournalState.LOADED)
- {
+ try {
+ if (state != JournalState.LOADED) {
return;
}
@@ -1502,20 +1300,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
filesRepository.clearDataFiles();
- if (dataFilesToProcess.size() == 0)
- {
+ if (dataFilesToProcess.size() == 0) {
trace("Finishing compacting, nothing to process");
return;
}
- compactor = new JournalCompactor(fileFactory,
- this,
- filesRepository,
- records.keySet(),
- dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID());
- for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
- {
+ for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet()) {
compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
entry.getValue().setCompacting();
}
@@ -1524,8 +1316,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// after compacting
records.clear();
}
- finally
- {
+ finally {
journalLock.writeLock().unlock();
}
@@ -1536,14 +1327,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
// well
- for (final JournalFile file : dataFilesToProcess)
- {
- try
- {
+ for (final JournalFile file : dataFilesToProcess) {
+ try {
JournalImpl.readJournalFile(fileFactory, file, compactor);
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.compactReadError(file);
throw new Exception("Error on reading compacting for " + file, e);
}
@@ -1563,8 +1351,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
journalLock.writeLock().lock();
- try
- {
+ try {
// Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
compactor = null;
@@ -1573,31 +1360,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
newDatafiles = localCompactor.getNewDataFiles();
// Restore newRecords created during compacting
- for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet())
- {
+ for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet()) {
records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
}
// Restore compacted dataFiles
- for (int i = newDatafiles.size() - 1; i >= 0; i--)
- {
+ for (int i = newDatafiles.size() - 1; i >= 0; i--) {
JournalFile fileToAdd = newDatafiles.get(i);
- if (JournalImpl.trace)
- {
+ if (JournalImpl.trace) {
JournalImpl.trace("Adding file " + fileToAdd + " back as datafile");
}
filesRepository.addDataFileOnTop(fileToAdd);
}
- if (JournalImpl.trace)
- {
+ if (JournalImpl.trace) {
JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
}
// Replay pending commands (including updates, deletes and commits)
- for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
- {
+ for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) {
newTransaction.replaceRecordProvider(this);
}
@@ -1607,25 +1389,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This has to be done after the replay pending commands, as we need to delete commits
// that happened during the compacting
- for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
- {
- if (JournalImpl.trace)
- {
+ for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) {
+ if (JournalImpl.trace) {
JournalImpl.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
}
JournalTransaction liveTransaction = transactions.get(newTransaction.getId());
- if (liveTransaction != null)
- {
+ if (liveTransaction != null) {
liveTransaction.merge(newTransaction);
}
- else
- {
+ else {
ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
}
}
}
- finally
- {
+ finally {
journalLock.writeLock().unlock();
}
@@ -1636,17 +1413,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
ActiveMQJournalLogger.LOGGER.debug("Finished compacting on journal");
}
- finally
- {
+ finally {
// An Exception was probably thrown, and the compactor was not cleared
- if (compactor != null)
- {
- try
- {
+ if (compactor != null) {
+ try {
compactor.flush();
}
- catch (Throwable ignored)
- {
+ catch (Throwable ignored) {
}
compactor = null;
@@ -1654,8 +1427,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setAutoReclaim(previousReclaimValue);
}
}
- finally
- {
+ finally {
compactorLock.writeLock().unlock();
}
@@ -1697,8 +1469,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* <p></p>
* <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
*/
- public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
- {
+ public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception {
return load(loadManager, true, null);
}
@@ -1709,16 +1480,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* @return
* @throws Exception
*/
- private synchronized JournalLoadInformation load(final LoaderCallback loadManager, final boolean changeData,
- final JournalState replicationSync) throws Exception
- {
- if (state == JournalState.STOPPED || state == JournalState.LOADED)
- {
+ private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
+ final boolean changeData,
+ final JournalState replicationSync) throws Exception {
+ if (state == JournalState.STOPPED || state == JournalState.LOADED) {
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
state);
}
- if (state == replicationSync)
- {
+ if (state == replicationSync) {
throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
}
@@ -1742,25 +1511,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// AtomicLong is used only as a reference, not as an Atomic value
final AtomicLong maxID = new AtomicLong(-1);
- for (final JournalFile file : orderedFiles)
- {
+ for (final JournalFile file : orderedFiles) {
JournalImpl.trace("Loading file " + file.getFile().getFileName());
final AtomicBoolean hasData = new AtomicBoolean(false);
- int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
- {
+ int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {
- private void checkID(final long id)
- {
- if (id > maxID.longValue())
- {
+ private void checkID(final long id) {
+ if (id > maxID.longValue()) {
maxID.set(id);
}
}
- public void onReadAddRecord(final RecordInfo info) throws Exception
- {
+ public void onReadAddRecord(final RecordInfo info) throws Exception {
checkID(info.id);
hasData.set(true);
@@ -1770,8 +1534,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
}
- public void onReadUpdateRecord(final RecordInfo info) throws Exception
- {
+ public void onReadUpdateRecord(final RecordInfo info) throws Exception {
checkID(info.id);
hasData.set(true);
@@ -1780,8 +1543,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalRecord posFiles = records.get(info.id);
- if (posFiles != null)
- {
+ if (posFiles != null) {
// It's legal for this to be null. The file(s) with the may
// have been deleted
// just leaving some updates in this file
@@ -1791,27 +1553,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- public void onReadDeleteRecord(final long recordID) throws Exception
- {
+ public void onReadDeleteRecord(final long recordID) throws Exception {
hasData.set(true);
loadManager.deleteRecord(recordID);
JournalRecord posFiles = records.remove(recordID);
- if (posFiles != null)
- {
+ if (posFiles != null) {
posFiles.delete(file);
}
}
- public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
- {
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
onReadAddRecordTX(transactionID, info);
}
- public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
- {
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
checkID(info.id);
@@ -1819,8 +1577,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
TransactionHolder tx = loadTransactions.get(transactionID);
- if (tx == null)
- {
+ if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
@@ -1830,8 +1587,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tnp = transactions.get(transactionID);
- if (tnp == null)
- {
+ if (tnp == null) {
tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
@@ -1841,14 +1597,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// count
}
- public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
- {
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
hasData.set(true);
TransactionHolder tx = loadTransactions.get(transactionID);
- if (tx == null)
- {
+ if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
@@ -1858,8 +1612,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tnp = transactions.get(transactionID);
- if (tnp == null)
- {
+ if (tnp == null) {
tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
@@ -1869,14 +1622,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
- public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
- {
+ public void onReadPrepareRecord(final long transactionID,
+ final byte[] extraData,
+ final int numberOfRecords) throws Exception {
hasData.set(true);
TransactionHolder tx = loadTransactions.get(transactionID);
- if (tx == null)
- {
+ if (tx == null) {
// The user could choose to prepare empty transactions
tx = new TransactionHolder(transactionID);
@@ -1889,8 +1642,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction journalTransaction = transactions.get(transactionID);
- if (journalTransaction == null)
- {
+ if (journalTransaction == null) {
journalTransaction = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, journalTransaction);
@@ -1898,19 +1650,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
- if (healthy)
- {
+ if (healthy) {
journalTransaction.prepare(file);
}
- else
- {
+ else {
ActiveMQJournalLogger.LOGGER.preparedTXIncomplete(transactionID);
tx.invalid = true;
}
}
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
// The commit could be alone on its own journal-file and the
@@ -1920,40 +1669,32 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// point
// If we can't find it, we assume the TX was reclaimed and we
// ignore this
- if (tx != null)
- {
+ if (tx != null) {
JournalTransaction journalTransaction = transactions.remove(transactionID);
- if (journalTransaction == null)
- {
+ if (journalTransaction == null) {
throw new IllegalStateException("Cannot find tx " + transactionID);
}
boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
- if (healthy)
- {
- for (RecordInfo txRecord : tx.recordInfos)
- {
- if (txRecord.isUpdate)
- {
+ if (healthy) {
+ for (RecordInfo txRecord : tx.recordInfos) {
+ if (txRecord.isUpdate) {
loadManager.updateRecord(txRecord);
}
- else
- {
+ else {
loadManager.addRecord(txRecord);
}
}
- for (RecordInfo deleteValue : tx.recordsToDelete)
- {
+ for (RecordInfo deleteValue : tx.recordsToDelete) {
loadManager.deleteRecord(deleteValue.id);
}
journalTransaction.commit(file);
}
- else
- {
+ else {
ActiveMQJournalLogger.LOGGER.txMissingElements(transactionID);
journalTransaction.forget();
@@ -1964,20 +1705,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
+ public void onReadRollbackRecord(final long transactionID) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
// The rollback could be alone on its own journal-file and the
// whole transaction body was reclaimed but the commit-record
// So it is completely legal to not find a transaction at this
// point
- if (tx != null)
- {
+ if (tx != null) {
JournalTransaction tnp = transactions.remove(transactionID);
- if (tnp == null)
- {
+ if (tnp == null) {
throw new IllegalStateException("Cannot find tx " + transactionID);
}
@@ -1989,30 +1727,25 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- public void markAsDataFile(final JournalFile file)
- {
+ public void markAsDataFile(final JournalFile file) {
hasData.set(true);
}
});
- if (hasData.get())
- {
+ if (hasData.get()) {
lastDataPos = resultLastPost;
filesRepository.addDataFileOnBottom(file);
}
- else
- {
- if (changeData)
- {
+ else {
+ if (changeData) {
// Empty dataFiles with no data
filesRepository.addFreeFile(file, false, false);
}
}
}
- if (replicationSync == JournalState.SYNCING)
- {
+ if (replicationSync == JournalState.SYNCING) {
assert filesRepository.getDataFiles().isEmpty();
setJournalState(JournalState.SYNCING);
return new JournalLoadInformation(0, -1);
@@ -2022,28 +1755,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setJournalState(JournalState.LOADED);
- for (TransactionHolder transaction : loadTransactions.values())
- {
- if ((!transaction.prepared || transaction.invalid) && replicationSync != JournalState.SYNCING_UP_TO_DATE)
- {
+ for (TransactionHolder transaction : loadTransactions.values()) {
+ if ((!transaction.prepared || transaction.invalid) && replicationSync != JournalState.SYNCING_UP_TO_DATE) {
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
- if (changeData)
- {
+ if (changeData) {
// I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
this.appendRollbackRecord(transaction.transactionID, false);
}
- loadManager.failedTransaction(transaction.transactionID,
- transaction.recordInfos,
- transaction.recordsToDelete);
+ loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
}
- else
- {
- for (RecordInfo info : transaction.recordInfos)
- {
- if (info.id > maxID.get())
- {
+ else {
+ for (RecordInfo info : transaction.recordInfos) {
+ if (info.id > maxID.get()) {
maxID.set(info.id);
}
}
@@ -2066,17 +1791,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
/**
* @return true if cleanup was called
*/
- public final boolean checkReclaimStatus() throws Exception
- {
+ public final boolean checkReclaimStatus() throws Exception {
- if (compactorRunning.get())
- {
+ if (compactorRunning.get()) {
return false;
}
// We can't start reclaim while compacting is working
- while (true)
- {
+ while (true) {
if (state != JournalImpl.JournalState.LOADED)
return false;
if (!isAutoReclaim())
@@ -2084,17 +1806,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (journalLock.readLock().tryLock(250, TimeUnit.MILLISECONDS))
break;
}
- try
- {
+ try {
reclaimer.scan(getDataFiles());
- for (JournalFile file : filesRepository.getDataFiles())
- {
- if (file.isCanReclaim())
- {
+ for (JournalFile file : filesRepository.getDataFiles()) {
+ if (file.isCanReclaim()) {
// File can be reclaimed or deleted
- if (JournalImpl.trace)
- {
+ if (JournalImpl.trace) {
JournalImpl.trace("Reclaiming file " + file);
}
@@ -2104,22 +1822,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
return false;
}
- private boolean needsCompact() throws Exception
- {
+ private boolean needsCompact() throws Exception {
JournalFile[] dataFiles = getDataFiles();
long totalLiveSize = 0;
- for (JournalFile file : dataFiles)
- {
+ for (JournalFile file : dataFiles) {
totalLiveSize += file.getLiveSize();
}
@@ -2133,49 +1848,38 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
- private void checkCompact() throws Exception
- {
- if (compactMinFiles == 0)
- {
+ private void checkCompact() throws Exception {
+ if (compactMinFiles == 0) {
// compacting is disabled
return;
}
- if (state != JournalState.LOADED)
- {
+ if (state != JournalState.LOADED) {
return;
}
- if (!compactorRunning.get() && needsCompact())
- {
+ if (!compactorRunning.get() && needsCompact()) {
scheduleCompact();
}
}
- private void scheduleCompact()
- {
- if (!compactorRunning.compareAndSet(false, true))
- {
+ private void scheduleCompact() {
+ if (!compactorRunning.compareAndSet(false, true)) {
return;
}
// We can't use the executor for the compacting... or we would dead lock because of file open and creation
// operations (that will use the executor)
- compactorExecutor.execute(new Runnable()
- {
- public void run()
- {
+ compactorExecutor.execute(new Runnable() {
+ public void run() {
- try
- {
+ try {
JournalImpl.this.compact();
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompacting(e);
}
- finally
- {
+ finally {
compactorRunning.set(false);
}
}
@@ -2185,26 +1889,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// TestableJournal implementation
// --------------------------------------------------------------
- public final void setAutoReclaim(final boolean autoReclaim)
- {
+ public final void setAutoReclaim(final boolean autoReclaim) {
this.autoReclaim = autoReclaim;
}
- public final boolean isAutoReclaim()
- {
+ public final boolean isAutoReclaim() {
return autoReclaim;
}
/* Only meant to be used in tests. */
@Override
- public String debug() throws Exception
- {
+ public String debug() throws Exception {
reclaimer.scan(getDataFiles());
StringBuilder builder = new StringBuilder();
- for (JournalFile file : filesRepository.getDataFiles())
- {
+ for (JournalFile file : filesRepository.getDataFiles()) {
builder.append("DataFile:" + file +
" posCounter = " +
file.getPosCount() +
@@ -2213,29 +1913,24 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
" live size = " +
file.getLiveSize() +
"\n");
- if (file instanceof JournalFileImpl)
- {
+ if (file instanceof JournalFileImpl) {
builder.append(((JournalFileImpl) file).debug());
}
}
- for (JournalFile file : filesRepository.getFreeFiles())
- {
+ for (JournalFile file : filesRepository.getFreeFiles()) {
builder.append("FreeFile:" + file + "\n");
}
- if (currentFile != null)
- {
+ if (currentFile != null) {
builder.append("CurrentFile:" + currentFile + " posCounter = " + currentFile.getPosCount() + "\n");
- if (currentFile instanceof JournalFileImpl)
- {
+ if (currentFile instanceof JournalFileImpl) {
builder.append(((JournalFileImpl) currentFile).debug());
}
}
- else
- {
+ else {
builder.append("CurrentFile: No current file at this point!");
}
@@ -2246,25 +1941,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* Method for use on testcases.
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this
*/
- public void debugWait() throws InterruptedException
- {
+ public void debugWait() throws InterruptedException {
fileFactory.flush();
- for (JournalTransaction tx : transactions.values())
- {
+ for (JournalTransaction tx : transactions.values()) {
tx.waitCallbacks();
}
- if (filesExecutor != null && !filesExecutor.isShutdown())
- {
+ if (filesExecutor != null && !filesExecutor.isShutdown()) {
// Send something to the closingExecutor, just to make sure we went
// until its end
final CountDownLatch latch = newLatch(1);
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
+ filesExecutor.execute(new Runnable() {
+ public void run() {
latch.countDown();
}
});
@@ -2274,119 +1964,95 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
- public int getDataFilesCount()
- {
+ public int getDataFilesCount() {
return filesRepository.getDataFilesCount();
}
- public JournalFile[] getDataFiles()
- {
+ public JournalFile[] getDataFiles() {
return filesRepository.getDataFilesArray();
}
- public int getFreeFilesCount()
- {
+ public int getFreeFilesCount() {
return filesRepository.getFreeFilesCount();
}
- public int getOpenedFilesCount()
- {
+ public int getOpenedFilesCount() {
return filesRepository.getOpenedFilesCount();
}
- public int getIDMapSize()
- {
+ public int getIDMapSize() {
return records.size();
}
@Override
- public int getFileSize()
- {
+ public int getFileSize() {
return fileSize;
}
- public int getMinFiles()
- {
+ public int getMinFiles() {
return minFiles;
}
- public String getFilePrefix()
- {
+ public String getFilePrefix() {
return filesRepository.getFilePrefix();
}
- public String getFileExtension()
- {
+ public String getFileExtension() {
return filesRepository.getFileExtension();
}
- public int getMaxAIO()
- {
+ public int getMaxAIO() {
return filesRepository.getMaxAIO();
}
- public int getUserVersion()
- {
+ public int getUserVersion() {
return userVersion;
}
// In some tests we need to force the journal to move to a next file
- public void forceMoveNextFile() throws Exception
- {
+ public void forceMoveNextFile() throws Exception {
journalLock.readLock().lock();
- try
- {
+ try {
lockAppend.lock();
- try
- {
+ try {
moveNextFile(false);
debugWait();
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.readLock().unlock();
}
}
- public void perfBlast(final int pages)
- {
+ public void perfBlast(final int pages) {
new PerfBlast(pages).start();
}
// ActiveMQComponent implementation
// ---------------------------------------------------
- public synchronized boolean isStarted()
- {
+ public synchronized boolean isStarted() {
return state != JournalState.STOPPED;
}
- public synchronized void start()
- {
- if (state != JournalState.STOPPED)
- {
+ public synchronized void start() {
+ if (state != JournalState.STOPPED) {
throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state);
}
- filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
- {
+ filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
- public Thread newThread(final Runnable r)
- {
+ public Thread newThread(final Runnable r) {
return new Thread(r, "JournalImpl::FilesExecutor");
}
});
- compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
- {
+ compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
- public Thread newThread(final Runnable r)
- {
+ public Thread newThread(final Runnable r) {
return new Thread(r, "JournalImpl::CompactorExecutor");
}
});
@@ -2398,28 +2064,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setJournalState(JournalState.STARTED);
}
- public synchronized void stop() throws Exception
- {
- if (state == JournalState.STOPPED)
- {
+ public synchronized void stop() throws Exception {
+ if (state == JournalState.STOPPED) {
throw new IllegalStateException("Journal is already stopped");
}
-
journalLock.writeLock().lock();
- try
- {
+ try {
lockAppend.lock();
- try
- {
+ try {
setJournalState(JournalState.STOPPED);
compactorExecutor.shutdown();
- if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
- {
+ if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
}
@@ -2427,27 +2087,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
filesRepository.setExecutor(null);
- if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
- {
+ if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
}
- try
- {
- for (CountDownLatch latch : latches)
- {
+ try {
+ for (CountDownLatch latch : latches) {
latch.countDown();
}
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
}
fileFactory.deactivateBuffer();
- if (currentFile != null && currentFile.getFile().isOpen())
- {
+ if (currentFile != null && currentFile.getFile().isOpen()) {
currentFile.getFile().close();
}
@@ -2457,50 +2112,41 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
currentFile = null;
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
- finally
- {
+ finally {
journalLock.writeLock().unlock();
}
}
- public int getNumberOfRecords()
- {
+ public int getNumberOfRecords() {
return records.size();
}
-
protected SequentialFile createControlFile(final List<JournalFile> files,
final List<JournalFile> newFiles,
- final Pair<String, String> cleanupRename) throws Exception
- {
+ final Pair<String, String> cleanupRename) throws Exception {
ArrayList<Pair<String, String>> cleanupList;
- if (cleanupRename == null)
- {
+ if (cleanupRename == null) {
cleanupList = null;
}
- else
- {
+ else {
cleanupList = new ArrayList<Pair<String, String>>();
cleanupList.add(cleanupRename);
}
return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList);
}
- protected void deleteControlFile(final SequentialFile controlFile) throws Exception
- {
+ protected void deleteControlFile(final SequentialFile controlFile) throws Exception {
controlFile.delete();
}
/**
* being protected as testcases can override this method
*/
- protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
- {
+ protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception {
// addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong
// order
@@ -2509,26 +2155,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final CountDownLatch done = newLatch(1);
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- for (JournalFile file : oldFiles)
- {
- try
- {
+ filesExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ for (JournalFile file : oldFiles) {
+ try {
filesRepository.addFreeFile(file, false);
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorReinitializingFile(e, file);
}
}
}
- finally
- {
+ finally {
done.countDown();
}
}
@@ -2539,8 +2178,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// what could cause a duplicate in case of a crash after the CTR is deleted and before the file is initialized
awaitLatch(done, -1);
- for (JournalFile file : newFiles)
- {
+ for (JournalFile file : newFiles) {
String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(), ".cmp");
file.getFile().renameTo(newName);
}
@@ -2551,8 +2189,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* @param name
* @return
*/
- protected static String renameExtensionFile(String name, final String extension)
- {
+ protected static String renameExtensionFile(String name, final String extension) {
name = name.substring(0, name.lastIndexOf(extension));
return name;
}
@@ -2560,23 +2197,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
/**
* This is an interception point for testcases, when the compacted files are written, before replacing the data structures
*/
- protected void onCompactStart() throws Exception
- {
+ protected void onCompactStart() throws Exception {
}
/**
* This is an interception point for testcases, when the compacted files are written, to be called
* as soon as the compactor gets a writeLock
*/
- protected void onCompactLockingTheJournal() throws Exception
- {
+ protected void onCompactLockingTheJournal() throws Exception {
}
/**
* This is an interception point for testcases, when the compacted files are written, before replacing the data structures
*/
- protected void onCompactDone()
- {
+ protected void onCompactDone() {
}
// Private
@@ -2599,35 +2
<TRUNCATED>