You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/06/07 17:05:19 UTC
[activemq-artemis] branch main updated: ARTEMIS-3261 Updating logic
to use only replaceable records on compacting verification
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0edf599 ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification
0edf599 is described below
commit 0edf599adca9365993579a96d717ed2f3c0d0af9
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jun 4 18:15:58 2021 -0400
ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification
---
.../cli/commands/tools/journal/CompactJournal.java | 4 +--
.../activemq/artemis/core/journal/Journal.java | 2 +-
.../core/journal/impl/FileWrapperJournal.java | 2 +-
.../core/journal/impl/JournalCompactor.java | 24 ++++++++++-----
.../artemis/core/journal/impl/JournalFile.java | 4 +++
.../artemis/core/journal/impl/JournalFileImpl.java | 12 ++++++++
.../artemis/core/journal/impl/JournalImpl.java | 36 +++++++++++++---------
.../artemis/core/journal/impl/JournalRecord.java | 5 ++-
.../core/journal/impl/JournalRecordProvider.java | 2 ++
.../core/journal/impl/JournalTransaction.java | 15 +++++----
.../unit/core/journal/impl/ReclaimerTest.java | 10 ++++++
11 files changed, 83 insertions(+), 33 deletions(-)
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
index 32aef5e..2b8dc4d 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
@@ -60,14 +60,14 @@ public final class CompactJournal extends LockAbstract {
final int poolFiles,
final int fileSize,
final IOCriticalErrorListener listener,
- int... replaceableRecords) throws Exception {
+ byte... replaceableRecords) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
if (historyFolder != null) {
journal.setHistoryFolder(historyFolder, -1, -1);
}
- for (int i : replaceableRecords) {
+ for (byte i : replaceableRecords) {
journal.replaceableRecord(i);
}
journal.setRemoveExtraFilesOnLoad(true);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 6618e8d..28aa2d2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -105,7 +105,7 @@ public interface Journal extends ActiveMQComponent {
appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
}
- default void replaceableRecord(int recordType) {
+ default void replaceableRecord(byte recordType) {
}
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 401797e..325e616 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -60,7 +60,7 @@ public final class FileWrapperJournal extends JournalBase {
protected volatile JournalFile currentFile;
@Override
- public void replaceableRecord(int recordType) {
+ public void replaceableRecord(byte recordType) {
journal.replaceableRecord(recordType);
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 8b70325..0108bf9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -144,15 +144,20 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
pendingCommands.add(new DeleteCompactCommand(id, usedFile));
}
+ @Override
+ public boolean isReplaceableRecord(byte recordType) {
+ return journal.isReplaceableRecord(recordType);
+ }
+
/**
* @param id
* @param usedFile
*/
- public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) {
+ public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
}
- pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
+ pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, userRecordType));
}
private void checkSize(final int size) throws Exception {
@@ -273,7 +278,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
checkSize(record.getEncodeSize(), info.compactCount);
- newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
+ newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.userRecordType);
writeEncoder(record);
}
@@ -433,7 +438,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
newTransaction.rollback(currentFile);
}
- public void replaceableRecord(int recordType) {
+ public void replaceableRecord(byte recordType) {
LongObjectHashMap<RunnableEx> longmap = new LongObjectHashMap();
pendingUpdates.put(recordType, longmap);
}
@@ -467,7 +472,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (newRecord == null) {
ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
} else {
- newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
+ newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), journal.isReplaceableRecord(info.userRecordType));
}
writeEncoder(updateRecord);
@@ -497,7 +502,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
writeEncoder(updateRecordTX);
- newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
+ newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.userRecordType);
}
/**
@@ -561,14 +566,17 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private final long id;
+ private final byte userRecordType;
+
private final JournalFile usedFile;
private final int size;
- private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size) {
+ private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
this.id = id;
this.usedFile = usedFile;
this.size = size;
+ this.userRecordType = userRecordType;
}
@Override
@@ -577,7 +585,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (updateRecord == null) {
ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id);
} else {
- updateRecord.addUpdateFile(usedFile, size);
+ updateRecord.addUpdateFile(usedFile, size, journal.isReplaceableRecord(userRecordType));
}
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
index 9bd2b50..2649626 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
@@ -39,6 +39,10 @@ public interface JournalFile {
void decPosCount();
+ int getReplaceableCount();
+
+ void incReplaceableCount();
+
void incAddRecord();
int getAddRecord();
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
index 8c99f29..2e4a072 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
@@ -42,10 +42,12 @@ public class JournalFileImpl implements JournalFile {
this.reclaimable = reclaimable;
}
+ private static final AtomicIntegerFieldUpdater<JournalFileImpl> replaceableCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "replaceableCountField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField");
+ private volatile int replaceableCountField = 0;
private volatile int posCountField = 0;
private volatile int addRecordField = 0;
private volatile int liveBytesField = 0;
@@ -78,6 +80,16 @@ public class JournalFileImpl implements JournalFile {
}
@Override
+ public int getReplaceableCount() {
+ return replaceableCountUpdater.get(this);
+ }
+
+ @Override
+ public void incReplaceableCount() {
+ replaceableCountUpdater.incrementAndGet(this);
+ }
+
+ @Override
public boolean isPosReclaimCriteria() {
return posReclaimCriteria;
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index aac1554..383f2e9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -31,7 +31,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.GregorianCalendar;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -51,6 +50,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
+import io.netty.util.collection.ByteObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -114,6 +114,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
*
* To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE
*
+ * We only calculate this against replaceable updates, on this case for redelivery counts and rescheduled redelivery in artemis server
+ *
* */
public static final double UPDATE_FACTOR;
private static final String BKP_EXTENSION = "bkp";
@@ -125,7 +127,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
double value;
try {
if (UPDATE_FACTOR_STR == null) {
- value = 100;
+ value = 10;
} else {
value = Double.parseDouble(UPDATE_FACTOR_STR);
}
@@ -323,20 +325,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
- HashSet<Integer> replaceableRecords;
+ ByteObjectHashMap<Boolean> replaceableRecords;
/** This will declare a record type as being replaceable on updates.
* Certain update records only need the last value, and they could be replaceable during compacting.
* */
@Override
- public void replaceableRecord(int recordType) {
+ public void replaceableRecord(byte recordType) {
if (replaceableRecords == null) {
- replaceableRecords = new HashSet<>();
+ replaceableRecords = new ByteObjectHashMap<>();
}
- replaceableRecords.add(recordType);
+ replaceableRecords.put(recordType, Boolean.TRUE);
}
+ @Override
+ public boolean isReplaceableRecord(byte recordType) {
+ return replaceableRecords != null && replaceableRecords.containsKey(recordType);
+ }
+
+
private volatile JournalFile currentFile;
private volatile JournalState state = JournalState.STOPPED;
@@ -1108,10 +1116,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// computing the delete should be done after compacting is done
if (jrnRecord == null) {
if (compactor != null) {
- compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+ compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), recordType);
}
} else {
- jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
+ jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), isReplaceableRecord(recordType));
}
if (updateCallback != null) {
@@ -1290,7 +1298,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile);
}
- tx.addPositive(usedFile, id, encodeSize);
+ tx.addPositive(usedFile, id, encodeSize, recordType);
} catch (Throwable e) {
logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(null, tx, e);
@@ -1353,7 +1361,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile );
}
- tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
+ tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), recordType);
} catch (Throwable e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition(null, tx, e );
@@ -1987,7 +1995,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
if (replaceableRecords != null) {
- replaceableRecords.forEach((i) -> compactor.replaceableRecord(i));
+ replaceableRecords.forEach((k, v) -> compactor.replaceableRecord(k));
}
transactions.forEach((id, pendingTransaction) -> {
@@ -2124,7 +2132,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, isReplaceableRecord(info.userRecordType)); // +1 = compact
// count
}
}
@@ -2172,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.userRecordType); // +1 = compact
// count
}
@@ -2620,7 +2628,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
for (JournalFile file : dataFiles) {
totalLiveSize += file.getLiveSize();
- updateCount += file.getPosCount();
+ updateCount += file.getReplaceableCount();
addRecord += file.getAddRecord();
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
index c1d4796..0be90c2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
@@ -46,7 +46,7 @@ public class JournalRecord {
addFile.incAddRecord();
}
- void addUpdateFile(final JournalFile updateFile, final int bytes) {
+ void addUpdateFile(final JournalFile updateFile, final int bytes, boolean replaceableUpdate) {
checkNotDeleted();
if (bytes == 0) {
return;
@@ -66,6 +66,9 @@ public class JournalRecord {
fileUpdates.add(updateFile, bytes, 1);
updateFile.incPosCount();
updateFile.addSize(bytes);
+ if (replaceableUpdate) {
+ updateFile.incReplaceableCount();
+ }
}
void delete(final JournalFile file) {
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
index c9c92f4..6f9b40c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
@@ -30,4 +30,6 @@ public interface JournalRecordProvider {
JournalCompactor getCompactor();
ConcurrentLongHashMap<JournalRecord> getRecords();
+
+ boolean isReplaceableRecord(byte recordType);
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index ffc016a..0cfd369 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -202,7 +202,7 @@ public class JournalTransaction {
}
}
- public void addPositive(final JournalFile file, final long id, final int size) {
+ public void addPositive(final JournalFile file, final long id, final int size, final byte userRecordType) {
incCounter(file);
addFile(file);
@@ -211,7 +211,7 @@ public class JournalTransaction {
pos = new ArrayList<>();
}
- pos.add(new JournalUpdate(file, id, size));
+ pos.add(new JournalUpdate(file, id, size, userRecordType));
}
public void addNegative(final JournalFile file, final long id) {
@@ -223,7 +223,7 @@ public class JournalTransaction {
neg = new ArrayList<>();
}
- neg.add(new JournalUpdate(file, id, 0));
+ neg.add(new JournalUpdate(file, id, 0, (byte)0));
}
/**
@@ -254,13 +254,13 @@ public class JournalTransaction {
// This is a case where the transaction was opened after compacting was started,
// but the commit arrived while compacting was working
// We need to cache the counter update, so compacting will take the correct files when it is done
- compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size);
+ compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.userRecordType);
} else if (posFiles == null) {
posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
journal.getRecords().put(trUpdate.id, posFiles);
} else {
- posFiles.addUpdateFile(trUpdate.file, trUpdate.size);
+ posFiles.addUpdateFile(trUpdate.file, trUpdate.size, journal.isReplaceableRecord(trUpdate.userRecordType));
}
}
}
@@ -397,16 +397,19 @@ public class JournalTransaction {
int size;
+ final byte userRecordType;
+
/**
* @param file
* @param id
* @param size
*/
- private JournalUpdate(final JournalFile file, final long id, final int size) {
+ private JournalUpdate(final JournalFile file, final long id, final int size, final byte userRecordType) {
super();
this.file = file;
this.id = id;
this.size = size;
+ this.userRecordType = userRecordType;
}
/**
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
index 54f83da..a2b0997 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
@@ -745,6 +745,16 @@ public class ReclaimerTest extends ActiveMQTestBase {
}
@Override
+ public int getReplaceableCount() {
+ return 0;
+ }
+
+ @Override
+ public void incReplaceableCount() {
+
+ }
+
+ @Override
public void incAddRecord() {
}