You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/04/26 20:45:42 UTC

[activemq-artemis] branch main updated: ARTEMIS-3261 Expanding verification to journal compacting counters

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 42405fe  ARTEMIS-3261 Expanding verification to journal compacting counters
42405fe is described below

commit 42405fedcfebb436a87c80a76237e20250c5450c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Apr 26 14:30:07 2021 -0400

    ARTEMIS-3261 Expanding verification to journal compacting counters
    
    After testing a production outage situation I still encountered issues on deciding the journal should be compacted.
    This is addressing these issues.
---
 .../artemis/core/journal/impl/JournalFile.java     |   4 +
 .../artemis/core/journal/impl/JournalFileImpl.java |  31 +++--
 .../artemis/core/journal/impl/JournalImpl.java     |  57 ++++++++-
 .../artemis/core/journal/impl/JournalRecord.java   |   2 +
 .../impl/VerifyUpdateFactorSystemProperty.java     |  42 +++++++
 tests/config/logging.properties                    |   3 +-
 tests/smoke-tests/pom.xml                          |  18 +++
 .../servers/infinite-redelivery/broker.xml         |  78 ++++++++++++
 .../servers/infinite-redelivery/logging.properties |  89 +++++++++++++
 .../infinite/InfiniteRedeliverySmokeTest.java      | 138 +++++++++++++++++++++
 .../unit/core/journal/impl/ReclaimerTest.java      |  10 ++
 11 files changed, 460 insertions(+), 12 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
index c9973d2..48aa232 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
@@ -32,6 +32,10 @@ public interface JournalFile {
 
    void decPosCount();
 
+   void incAddRecord();
+
+   int getAddRecord();
+
    void addSize(int bytes);
 
    void decSize(int bytes);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
index 857d54a..8c5e439 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
@@ -20,6 +20,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.jboss.logging.Logger;
@@ -34,9 +35,13 @@ public class JournalFileImpl implements JournalFile {
 
    private long offset;
 
-   private final AtomicInteger posCount = new AtomicInteger(0);
+   private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField");
+   private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField");
+   private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField");
 
-   private final AtomicInteger liveBytes = new AtomicInteger(0);
+   private volatile int posCountField = 0;
+   private volatile int addRecordField = 0;
+   private volatile int liveBytesField = 0;
 
    // Flags to be used by determine if the journal file can be reclaimed
    private boolean posReclaimCriteria = false;
@@ -62,7 +67,7 @@ public class JournalFileImpl implements JournalFile {
 
    @Override
    public int getPosCount() {
-      return posCount.intValue();
+      return posCountUpdater.get(this);
    }
 
    @Override
@@ -135,12 +140,22 @@ public class JournalFileImpl implements JournalFile {
 
    @Override
    public void incPosCount() {
-      posCount.incrementAndGet();
+      posCountUpdater.incrementAndGet(this);
+   }
+
+   @Override
+   public void incAddRecord() {
+      addRecordUpdate.incrementAndGet(this);
+   }
+
+   @Override
+   public int getAddRecord() {
+      return addRecordUpdate.get(this);
    }
 
    @Override
    public void decPosCount() {
-      posCount.decrementAndGet();
+      posCountUpdater.decrementAndGet(this);
    }
 
    public long getOffset() {
@@ -191,17 +206,17 @@ public class JournalFileImpl implements JournalFile {
 
    @Override
    public void addSize(final int bytes) {
-      liveBytes.addAndGet(bytes);
+      liveBytesUpdater.addAndGet(this, bytes);
    }
 
    @Override
    public void decSize(final int bytes) {
-      liveBytes.addAndGet(-bytes);
+      liveBytesUpdater.addAndGet(this, -bytes);
    }
 
    @Override
    public int getLiveSize() {
-      return liveBytes.get();
+      return liveBytesUpdater.get(this);
    }
 
    @Override
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 624094d..d6ee403 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -91,15 +91,45 @@ import static org.apache.activemq.artemis.core.journal.impl.Reclaimer.scan;
  * <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
  */
 public class JournalImpl extends JournalBase implements TestableJournal, JournalRecordProvider {
+   private static final Logger logger = Logger.getLogger(JournalImpl.class);
+
+
+   /**
+    * this is a factor where when you have more than UPDATE_FACTOR updates for every ADD.
+    *
+    * When this happens we should issue a compacting event.
+    *
+    * I don't foresee users needing to configure this value. However if this ever happens we would have a system property aligned for this.
+    *
+    * With that being said, if you needed this, please raise an issue on why you needed to use this, so we may eventually add it to broker.xml when a real
+    * use case would determine the configuration exposed in there.
+    *
+    * To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE
+    *
+    * */
+   public static final double UPDATE_FACTOR;
+
+   static {
+      String UPDATE_FACTOR_STR = System.getProperty(JournalImpl.class.getName() + ".UPDATE_FACTOR");
+      double value;
+      try {
+         if (UPDATE_FACTOR_STR == null) {
+            value = 100;
+         } else {
+            value = Double.parseDouble(UPDATE_FACTOR_STR);
+         }
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         value = 100;
+      }
 
-   // Constants -----------------------------------------------------
+      UPDATE_FACTOR = value;
+   }
 
    public static final int FORMAT_VERSION = 2;
 
    private static final int[] COMPATIBLE_VERSIONS = new int[]{1};
 
-   // Static --------------------------------------------------------
-   private static final Logger logger = Logger.getLogger(JournalImpl.class);
 
    // The sizes of primitive types
 
@@ -2312,10 +2342,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       long totalLiveSize = 0;
 
+      long updateCount = 0, addRecord = 0;
+
       for (JournalFile file : dataFiles) {
          totalLiveSize += file.getLiveSize();
+         updateCount += file.getPosCount();
+         addRecord += file.getAddRecord();
       }
 
+
+      if (dataFiles.length > compactMinFiles && addRecord > 0 && updateCount > 0) {
+         double updateFactor = updateCount / addRecord;
+
+         if (updateFactor > UPDATE_FACTOR) { // this means every add records with at least 10 records
+            if (logger.isDebugEnabled()) {
+               logger.debug("There are " + addRecord + " records, with " + updateCount + " towards them. UpdateCound / AddCount = " + updateFactor + ", being greater than " + UPDATE_FACTOR + " meaning we have to schedule compacting");
+            }
+            return true;
+         } else {
+            if (logger.isDebugEnabled()) {
+               logger.debug("There are " + addRecord + " records, with " + updateCount + " towards them. UpdateCound / AddCount = " + updateFactor + ", which is lower than " + UPDATE_FACTOR + " meaning we are ok to leave these records");
+            }
+         }
+      }
+
+
       long totalBytes = dataFiles.length * (long) fileSize;
 
       long compactMargin = (long) (totalBytes * compactPercentage);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
index af00489..c1d4796 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java
@@ -42,6 +42,8 @@ public class JournalRecord {
       addFile.incPosCount();
 
       addFile.addSize(size);
+
+      addFile.incAddRecord();
    }
 
    void addUpdateFile(final JournalFile updateFile, final int bytes) {
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/journal/impl/VerifyUpdateFactorSystemProperty.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/journal/impl/VerifyUpdateFactorSystemProperty.java
new file mode 100644
index 0000000..1972be3
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/journal/impl/VerifyUpdateFactorSystemProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.journal.impl;
+
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class VerifyUpdateFactorSystemProperty {
+
+   public static void main(String[] arg) {
+
+      try {
+         Assert.assertEquals(33.0, JournalImpl.UPDATE_FACTOR, 0);
+         System.exit(0);
+      } catch (Throwable e) {
+         e.printStackTrace();
+         System.exit(100);
+      }
+   }
+
+   @Test
+   public void testValidateUpdateRecordProperty() throws Exception {
+      Process process = SpawnedVMSupport.spawnVM(VerifyUpdateFactorSystemProperty.class.getName(), new String[]{"-D" + JournalImpl.class.getName() + ".UPDATE_FACTOR=33.0"}, new String[]{});
+      Assert.assertEquals(0, process.waitFor());
+   }
+
+}
diff --git a/tests/config/logging.properties b/tests/config/logging.properties
index 2469a68..14e40a9 100644
--- a/tests/config/logging.properties
+++ b/tests/config/logging.properties
@@ -17,7 +17,7 @@
 
 # Additional logger names to configure (root logger is always configured)
 # Root logger option
-loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message
+loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.smoke,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message
 
 # Root logger level
 logger.level=INFO
@@ -33,6 +33,7 @@ logger.org.apache.activemq.artemis.tests.integration.level=DEBUG
 logger.org.apache.activemq.artemis.tests.level=DEBUG
 logger.org.apache.activemq.artemis.tests.unit.level=DEBUG
 logger.org.apache.activemq.artemis.jms.tests.level=DEBUG
+logger.org.apache.activemq.artemis.tests.smoke.level=DEBUG
 
 
 # Root logger handlers
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index d9e93b4..fbe28d8 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -765,6 +765,24 @@
                      <configuration>${basedir}/target/classes/servers/MaxQueueResourceTest</configuration>
                   </configuration>
                </execution>
+
+               <!-- used on InfiniteRedeliverySmokeTest  -->
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>createBrokerInfiniteRedelivery</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>A</user>
+                     <password>A</password>
+                     <noWeb>true</noWeb>
+                     <instance>${basedir}/target/infinite-redelivery</instance>
+                     <configuration>${basedir}/target/classes/servers/infinite-redelivery</configuration>
+                  </configuration>
+               </execution>
+
             </executions>
          <dependencies>
                <dependency>
diff --git a/tests/smoke-tests/src/main/resources/servers/infinite-redelivery/broker.xml b/tests/smoke-tests/src/main/resources/servers/infinite-redelivery/broker.xml
new file mode 100644
index 0000000..e15494e
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/infinite-redelivery/broker.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+   <core xmlns="urn:activemq:core">
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <journal-pool-files>10</journal-pool-files>
+      <journal-min-files>2</journal-min-files>
+      <journal-compact-min-files>3</journal-compact-min-files>
+      <persist-delivery-count-before-delivery>false</persist-delivery-count-before-delivery>
+
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
+      </acceptors>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="#">
+            <permission roles="guest" type="createDurableQueue"/>
+            <permission roles="guest" type="deleteDurableQueue"/>
+            <permission roles="guest" type="createNonDurableQueue"/>
+            <permission roles="guest" type="deleteNonDurableQueue"/>
+            <permission roles="guest" type="consume"/>
+            <permission roles="guest" type="send"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!--override the max-delivery-attempts and dead letter address for the example queue-->
+         <address-setting match="testQueue">
+            <dead-letter-address>deadLetterQueue</dead-letter-address>
+            <max-delivery-attempts>-1</max-delivery-attempts>
+            <redelivery-delay>1</redelivery-delay>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="deadLetterQueue">
+            <anycast>
+               <queue name="deadLetterQueue"/>
+            </anycast>
+         </address>
+         <address name="testQueue">
+            <anycast>
+               <queue name="testQueue"/>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git a/tests/smoke-tests/src/main/resources/servers/infinite-redelivery/logging.properties b/tests/smoke-tests/src/main/resources/servers/infinite-redelivery/logging.properties
new file mode 100644
index 0000000..4448152
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/infinite-redelivery/logging.properties
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional logger names to configure (root logger is always configured)
+# Root logger option
+loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.core.journal.impl.JournalImpl
+
+# Root logger level
+logger.level=INFO
+# ActiveMQ Artemis logger levels
+logger.org.apache.activemq.artemis.core.server.level=INFO
+logger.org.apache.activemq.artemis.journal.level=INFO
+logger.org.apache.activemq.artemis.utils.level=INFO
+
+# DEBUG on this level will give you a lot of information on when compacting is happening
+logger.org.apache.activemq.artemis.core.journal.impl.JournalImpl.level=DEBUG
+
+# if you have issues with CriticalAnalyzer, setting this as TRACE would give you extra troubleshooting information.
+# but do not use it regularly as it would incur in some extra CPU usage for this diagnostic.
+logger.org.apache.activemq.artemis.utils.critical.level=INFO
+
+logger.org.apache.activemq.artemis.jms.level=INFO
+logger.org.apache.activemq.artemis.integration.bootstrap.level=INFO
+logger.org.eclipse.jetty.level=WARN
+# Root logger handlers
+logger.handlers=FILE,CONSOLE
+
+# to enable audit change the level to INFO
+logger.org.apache.activemq.audit.base.level=ERROR
+logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.base.useParentHandlers=false
+
+logger.org.apache.activemq.audit.resource.level=ERROR
+logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.resource.useParentHandlers=false
+
+logger.org.apache.activemq.audit.message.level=ERROR
+logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.message.useParentHandlers=false
+
+# Console handler configuration
+handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
+handler.CONSOLE.properties=autoFlush
+handler.CONSOLE.level=DEBUG
+handler.CONSOLE.autoFlush=true
+handler.CONSOLE.formatter=PATTERN
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.FILE.level=DEBUG
+handler.FILE.properties=suffix,append,autoFlush,fileName
+handler.FILE.suffix=.yyyy-MM-dd
+handler.FILE.append=true
+handler.FILE.autoFlush=true
+handler.FILE.fileName=${artemis.instance}/log/artemis.log
+handler.FILE.formatter=PATTERN
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=%d %-5p [%c] %s%E%n
+
+#Audit logger
+handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.AUDIT_FILE.level=INFO
+handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
+handler.AUDIT_FILE.suffix=.yyyy-MM-dd
+handler.AUDIT_FILE.append=true
+handler.AUDIT_FILE.autoFlush=true
+handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
+handler.AUDIT_FILE.formatter=AUDIT_PATTERN
+
+formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.AUDIT_PATTERN.properties=pattern
+formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java
new file mode 100644
index 0000000..86b641e
--- /dev/null
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.infinite;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InfiniteRedeliverySmokeTest extends SmokeTestBase {
+
+   private static final Logger logger = Logger.getLogger(InfiniteRedeliverySmokeTest.class);
+
+   public static final String SERVER_NAME_0 = "infinite-redelivery";
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      startServer(SERVER_NAME_0, 0, 30000);
+   }
+
+   @Test
+   public void testValidateRedeliveries() throws Exception {
+      ConnectionFactory factory = new ActiveMQConnectionFactory();
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue("testQueue");
+      MessageProducer producer = session.createProducer(queue);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+
+      TextMessage message = session.createTextMessage("this is a test");
+      for (int i = 0; i < 5000; i++) {
+         producer.send(message);
+      }
+      session.commit();
+
+      connection.start();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      File journalLocation = new File(getServerLocation(SERVER_NAME_0) + "/data/journal");
+      SequentialFileFactory fileFactory = new NIOSequentialFileFactory(journalLocation, 1);
+
+      for (int i = 0; i < 500; i++) {
+         if (i % 10 == 0) logger.debug("Redelivery " + i);
+         for (int j = 0; j < 5000; j++) {
+            Assert.assertNotNull(consumer.receive(5000));
+         }
+         session.rollback();
+
+         int numberOfFiles = fileFactory.listFiles("amq").size();
+
+         // it should be actually 10, However if a future rule changes it to allow removing files I'm ok with that
+         Assert.assertTrue("there are not enough files on journal", numberOfFiles >= 2);
+         // it should be max 10 actually, I'm just leaving some space for future changes,
+         // as the real test I'm after here is the broker should clean itself up
+         Wait.assertTrue("there are too many files created", () -> fileFactory.listFiles("amq").size() <= 20);
+
+      }
+   }
+
+   @Test
+   public void testValidateJournalOnRollbackSend() throws Exception {
+      ConnectionFactory factory = new ActiveMQConnectionFactory();
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue("testQueue");
+      MessageProducer producer = session.createProducer(queue);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+
+
+      File journalLocation = new File(getServerLocation(SERVER_NAME_0) + "/data/journal");
+      SequentialFileFactory fileFactory = new NIOSequentialFileFactory(journalLocation, 1);
+      TextMessage message = session.createTextMessage("This is a test");
+      producer.send(message); // we will always have one message behind
+      connection.start();
+      MessageConsumer consumer = session.createConsumer(queue);
+      for (int i = 0; i < 500; i++) {
+         if (i % 10 == 0) logger.debug("Rollback send " + i);
+         for (int j = 0; j < 5000; j++) {
+            producer.send(message);
+         }
+         if (i % 100 == 0) {
+            session.commit();
+            for (int c = 0; c < 5000; c++) {
+               Assert.assertNotNull(consumer.receive(5000));
+            }
+            session.commit();
+            Assert.assertNotNull(consumer.receive(5000)); // there's one message behind
+            session.rollback(); // we will keep the one message behind
+         } else {
+            session.rollback();
+         }
+         int numberOfFiles = fileFactory.listFiles("amq").size();
+         // it should be actually 10, However if a future rule changes it to allow removing files I'm ok with that
+         Assert.assertTrue("there are not enough files on journal", numberOfFiles >= 2);
+         // it should be max 10 actually, I'm just leaving some space for future changes,
+         // as the real test I'm after here is the broker should clean itself up
+         Wait.assertTrue(() -> fileFactory.listFiles("amq").size() <= 20);
+         Assert.assertTrue("there are too many files created", numberOfFiles <= 20);
+      }
+   }
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
index 4f04ba5..54f83da 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java
@@ -745,6 +745,16 @@ public class ReclaimerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void incAddRecord() {
+
+      }
+
+      @Override
+      public int getAddRecord() {
+         return 0;
+      }
+
+      @Override
       public void incNegCount(final JournalFile file) {
          incNegCount(file, 1);
       }