You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/06/07 17:05:19 UTC

[activemq-artemis] branch main updated: ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0edf599  ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification
0edf599 is described below

commit 0edf599adca9365993579a96d717ed2f3c0d0af9
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jun 4 18:15:58 2021 -0400

    ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification
---
 .../cli/commands/tools/journal/CompactJournal.java |  4 +--
 .../activemq/artemis/core/journal/Journal.java     |  2 +-
 .../core/journal/impl/FileWrapperJournal.java      |  2 +-
 .../core/journal/impl/JournalCompactor.java        | 24 ++++++++++-----
 .../artemis/core/journal/impl/JournalFile.java     |  4 +++
 .../artemis/core/journal/impl/JournalFileImpl.java | 12 ++++++++
 .../artemis/core/journal/impl/JournalImpl.java     | 36 +++++++++++++---------
 .../artemis/core/journal/impl/JournalRecord.java   |  5 ++-
 .../core/journal/impl/JournalRecordProvider.java   |  2 ++
 .../core/journal/impl/JournalTransaction.java      | 15 +++++----
 .../unit/core/journal/impl/ReclaimerTest.java      | 10 ++++++
 11 files changed, 83 insertions(+), 33 deletions(-)

diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
index 32aef5e..2b8dc4d 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java
@@ -60,14 +60,14 @@ public final class CompactJournal extends LockAbstract {
                                final int poolFiles,
                                final int fileSize,
                                final IOCriticalErrorListener listener,
-                               int... replaceableRecords) throws Exception {
+                               byte... replaceableRecords) throws Exception {
       NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
 
       JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
       if (historyFolder != null) {
          journal.setHistoryFolder(historyFolder, -1, -1);
       }
-      for (int i : replaceableRecords) {
+      for (byte i : replaceableRecords) {
          journal.replaceableRecord(i);
       }
       journal.setRemoveExtraFilesOnLoad(true);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 6618e8d..28aa2d2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -105,7 +105,7 @@ public interface Journal extends ActiveMQComponent {
       appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
    }
 
-   default void replaceableRecord(int recordType) {
+   default void replaceableRecord(byte recordType) {
    }
 
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 401797e..325e616 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -60,7 +60,7 @@ public final class FileWrapperJournal extends JournalBase {
    protected volatile JournalFile currentFile;
 
    @Override
-   public void replaceableRecord(int recordType) {
+   public void replaceableRecord(byte recordType) {
       journal.replaceableRecord(recordType);
    }
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 8b70325..0108bf9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -144,15 +144,20 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       pendingCommands.add(new DeleteCompactCommand(id, usedFile));
    }
 
+   @Override
+   public boolean isReplaceableRecord(byte recordType) {
+      return journal.isReplaceableRecord(recordType);
+   }
+
    /**
     * @param id
     * @param usedFile
     */
-   public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) {
+   public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
       if (logger.isTraceEnabled()) {
          logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
       }
-      pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
+      pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, userRecordType));
    }
 
    private void checkSize(final int size) throws Exception {
@@ -273,7 +278,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
       checkSize(record.getEncodeSize(), info.compactCount);
 
-      newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
+      newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.userRecordType);
 
       writeEncoder(record);
    }
@@ -433,7 +438,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       newTransaction.rollback(currentFile);
    }
 
-   public void replaceableRecord(int recordType) {
+   public void replaceableRecord(byte recordType) {
       LongObjectHashMap<RunnableEx> longmap = new LongObjectHashMap();
       pendingUpdates.put(recordType, longmap);
    }
@@ -467,7 +472,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
       if (newRecord == null) {
          ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
       } else {
-         newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
+         newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), journal.isReplaceableRecord(info.userRecordType));
       }
 
       writeEncoder(updateRecord);
@@ -497,7 +502,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
       writeEncoder(updateRecordTX);
 
-      newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
+      newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.userRecordType);
    }
 
    /**
@@ -561,14 +566,17 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
       private final long id;
 
+      private final byte userRecordType;
+
       private final JournalFile usedFile;
 
       private final int size;
 
-      private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size) {
+      private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
          this.id = id;
          this.usedFile = usedFile;
          this.size = size;
+         this.userRecordType = userRecordType;
       }
 
       @Override
@@ -577,7 +585,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
          if (updateRecord == null) {
             ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id);
          } else {
-            updateRecord.addUpdateFile(usedFile, size);
+            updateRecord.addUpdateFile(usedFile, size, journal.isReplaceableRecord(userRecordType));
          }
       }
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
index 9bd2b50..2649626 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
@@ -39,6 +39,10 @@ public interface JournalFile {
 
    void decPosCount();
 
+   int getReplaceableCount();
+
+   void incReplaceableCount();
+
    void incAddRecord();
 
    int getAddRecord();
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
index 8c99f29..2e4a072 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
@@ -42,10 +42,12 @@ public class JournalFileImpl implements JournalFile {
       this.reclaimable = reclaimable;
    }
 
+   private static final AtomicIntegerFieldUpdater<JournalFileImpl> replaceableCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "replaceableCountField");
    private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField");
    private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField");
    private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField");
 
+   private volatile int replaceableCountField = 0;
    private volatile int posCountField = 0;
    private volatile int addRecordField = 0;
    private volatile int liveBytesField = 0;
@@ -78,6 +80,16 @@ public class JournalFileImpl implements JournalFile {
    }
 
    @Override
+   public int getReplaceableCount() {
+      return replaceableCountUpdater.get(this);
+   }
+
+   @Override
+   public void incReplaceableCount() {
+      replaceableCountUpdater.incrementAndGet(this);
+   }
+
+   @Override
    public boolean isPosReclaimCriteria() {
       return posReclaimCriteria;
    }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index aac1554..383f2e9 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -31,7 +31,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.GregorianCalendar;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -51,6 +50,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
 
+import io.netty.util.collection.ByteObjectHashMap;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -114,6 +114,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
     *
     * To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE
     *
+    * We only calculate this against replaceable updates, on this case for redelivery counts and rescheduled redelivery in artemis server
+    *
     * */
    public static final double UPDATE_FACTOR;
    private static final String BKP_EXTENSION = "bkp";
@@ -125,7 +127,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       double value;
       try {
          if (UPDATE_FACTOR_STR == null) {
-            value = 100;
+            value = 10;
          } else {
             value = Double.parseDouble(UPDATE_FACTOR_STR);
          }
@@ -323,20 +325,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
    private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
 
-   HashSet<Integer> replaceableRecords;
+   ByteObjectHashMap<Boolean> replaceableRecords;
 
 
    /** This will declare a record type as being replaceable on updates.
     * Certain update records only need the last value, and they could be replaceable during compacting.
     * */
    @Override
-   public void replaceableRecord(int recordType) {
+   public void replaceableRecord(byte recordType) {
       if (replaceableRecords == null) {
-         replaceableRecords = new HashSet<>();
+         replaceableRecords = new ByteObjectHashMap<>();
       }
-      replaceableRecords.add(recordType);
+      replaceableRecords.put(recordType, Boolean.TRUE);
    }
 
+   @Override
+   public boolean isReplaceableRecord(byte recordType) {
+      return replaceableRecords != null && replaceableRecords.containsKey(recordType);
+   }
+
+
    private volatile JournalFile currentFile;
 
    private volatile JournalState state = JournalState.STOPPED;
@@ -1108,10 +1116,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                // computing the delete should be done after compacting is done
                if (jrnRecord == null) {
                   if (compactor != null) {
-                     compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+                     compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), recordType);
                   }
                } else {
-                  jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
+                  jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), isReplaceableRecord(recordType));
                }
 
                if (updateCallback != null) {
@@ -1290,7 +1298,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                   usedFile);
                }
 
-               tx.addPositive(usedFile, id, encodeSize);
+               tx.addPositive(usedFile, id, encodeSize, recordType);
             } catch (Throwable e) {
                logger.error("appendAddRecordTransactional:" + e, e);
                setErrorCondition(null, tx, e);
@@ -1353,7 +1361,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                           usedFile );
                }
 
-               tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
+               tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), recordType);
             } catch (Throwable e ) {
                logger.error("appendUpdateRecordTransactional:" +  e.getMessage(), e );
                setErrorCondition(null, tx, e );
@@ -1987,7 +1995,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
 
          if (replaceableRecords != null) {
-            replaceableRecords.forEach((i) -> compactor.replaceableRecord(i));
+            replaceableRecords.forEach((k, v) -> compactor.replaceableRecord(k));
          }
 
          transactions.forEach((id, pendingTransaction) -> {
@@ -2124,7 +2132,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   // have been deleted
                   // just leaving some updates in this file
 
-                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, isReplaceableRecord(info.userRecordType)); // +1 = compact
                   // count
                }
             }
@@ -2172,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   transactions.put(transactionID, tnp);
                }
 
-               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.userRecordType); // +1 = compact
                // count
             }
 
@@ -2620,7 +2628,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       for (JournalFile file : dataFiles) {
          totalLiveSize += file.getLiveSize();
-         updateCount += file.getPosCount();
+         updateCount += file.getReplaceableCount();
          addRecord += file.getAddRecord();
       }
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
index c1d4796..0be90c2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
@@ -46,7 +46,7 @@ public class JournalRecord {
       addFile.incAddRecord();
    }
 
-   void addUpdateFile(final JournalFile updateFile, final int bytes) {
+   void addUpdateFile(final JournalFile updateFile, final int bytes, boolean replaceableUpdate) {
       checkNotDeleted();
       if (bytes == 0) {
          return;
@@ -66,6 +66,9 @@ public class JournalRecord {
       fileUpdates.add(updateFile, bytes, 1);
       updateFile.incPosCount();
       updateFile.addSize(bytes);
+      if (replaceableUpdate) {
+         updateFile.incReplaceableCount();
+      }
    }
 
    void delete(final JournalFile file) {
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
index c9c92f4..6f9b40c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java
@@ -30,4 +30,6 @@ public interface JournalRecordProvider {
    JournalCompactor getCompactor();
 
    ConcurrentLongHashMap<JournalRecord> getRecords();
+
+   boolean isReplaceableRecord(byte recordType);
 }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index ffc016a..0cfd369 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -202,7 +202,7 @@ public class JournalTransaction {
       }
    }
 
-   public void addPositive(final JournalFile file, final long id, final int size) {
+   public void addPositive(final JournalFile file, final long id, final int size, final byte userRecordType) {
       incCounter(file);
 
       addFile(file);
@@ -211,7 +211,7 @@ public class JournalTransaction {
          pos = new ArrayList<>();
       }
 
-      pos.add(new JournalUpdate(file, id, size));
+      pos.add(new JournalUpdate(file, id, size, userRecordType));
    }
 
    public void addNegative(final JournalFile file, final long id) {
@@ -223,7 +223,7 @@ public class JournalTransaction {
          neg = new ArrayList<>();
       }
 
-      neg.add(new JournalUpdate(file, id, 0));
+      neg.add(new JournalUpdate(file, id, 0, (byte)0));
    }
 
    /**
@@ -254,13 +254,13 @@ public class JournalTransaction {
                   // This is a case where the transaction was opened after compacting was started,
                   // but the commit arrived while compacting was working
                   // We need to cache the counter update, so compacting will take the correct files when it is done
-                  compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size);
+                  compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.userRecordType);
                } else if (posFiles == null) {
                   posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
 
                   journal.getRecords().put(trUpdate.id, posFiles);
                } else {
-                  posFiles.addUpdateFile(trUpdate.file, trUpdate.size);
+                  posFiles.addUpdateFile(trUpdate.file, trUpdate.size, journal.isReplaceableRecord(trUpdate.userRecordType));
                }
             }
          }
@@ -397,16 +397,19 @@ public class JournalTransaction {
 
       int size;
 
+      final byte userRecordType;
+
       /**
        * @param file
        * @param id
        * @param size
        */
-      private JournalUpdate(final JournalFile file, final long id, final int size) {
+      private JournalUpdate(final JournalFile file, final long id, final int size, final byte userRecordType) {
          super();
          this.file = file;
          this.id = id;
          this.size = size;
+         this.userRecordType = userRecordType;
       }
 
       /**
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
index 54f83da..a2b0997 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
@@ -745,6 +745,16 @@ public class ReclaimerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public int getReplaceableCount() {
+         return 0;
+      }
+
+      @Override
+      public void incReplaceableCount() {
+
+      }
+
+      @Override
       public void incAddRecord() {
 
       }