You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/12/15 15:30:25 UTC

[activemq-artemis] branch main updated: ARTEMIS-4065 Optimize page counters to not use the journal as often

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new af9bd7b84a ARTEMIS-4065 Optimize page counters to not use the journal as often
af9bd7b84a is described below

commit af9bd7b84aad32e4fe30f2c8909e51cf7300b475
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 26 13:30:42 2022 -0400

    ARTEMIS-4065 Optimize page counters to not use the journal as often
    
    - From now on we will save snapshots of page-counters on the journal (basically for compatibility with previous verions).
      And we will recount the records on startup.
    
    - While the rebuild is being done the value from the previous snapshot is still available with current updates.
---
 .../artemis/cli/commands/tools/PrintData.java      |   2 +-
 .../api/core/management/ActiveMQServerControl.java |   3 +
 .../jdbc/store/drivers/AbstractJDBCDriver.java     |   3 +
 .../jdbc/store/file/JDBCSequentialFile.java        |  14 +-
 .../jdbc/store/file/JDBCSequentialFileFactory.java |   4 +-
 .../artemis/core/io/AbstractSequentialFile.java    |   3 +
 .../artemis/core/io/IOCriticalErrorListener.java   |   7 +-
 .../management/impl/ActiveMQServerControlImpl.java |  10 +
 .../artemis/core/paging/PagingManager.java         |  14 +
 .../activemq/artemis/core/paging/PagingStore.java  |   8 +
 .../artemis/core/paging/cursor/ConsumedPage.java   |   6 +
 .../core/paging/cursor/PageCursorProvider.java     |  10 +
 .../core/paging/cursor/PageSubscription.java       |   7 +
 .../paging/cursor/PageSubscriptionCounter.java     |  13 +-
 .../BasePagingCounter.java}                        |  23 +-
 .../cursor/impl/PageCounterRebuildManager.java     | 327 ++++++++++++++++++
 .../paging/cursor/impl/PageCursorProviderImpl.java |  48 ++-
 .../cursor/impl/PageSubscriptionCounterImpl.java   | 383 +++++++++------------
 .../paging/cursor/impl/PageSubscriptionImpl.java   |  35 +-
 .../activemq/artemis/core/paging/impl/Page.java    |  39 +--
 .../core/paging/impl/PagingManagerImpl.java        |  69 +++-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  74 ++--
 .../journal/AbstractJournalStorageManager.java     |  22 +-
 .../persistence/impl/journal/DescribeJournal.java  |   2 +-
 .../artemis/core/server/ActiveMQMessageBundle.java |   3 +
 .../artemis/core/server/ActiveMQServer.java        |   9 +
 .../artemis/core/server/ActiveMQServerLogger.java  |   3 -
 .../core/server/impl/ActiveMQServerImpl.java       |  43 +++
 .../artemis/core/server/impl/QueueImpl.java        |  11 +-
 .../artemis/tests/util/ActiveMQTestBase.java       |   7 +-
 .../resources/pageCounter/checkMessages.groovy     |  14 +-
 .../main/resources/pageCounter/sendMessages.groovy |  53 +++
 .../tests/compatibility/PagingCounterTest.java     | 100 ++++++
 .../cluster/failover/BackupSyncJournalTest.java    |   1 -
 .../ActiveMQServerControlUsingCoreTest.java        |   5 +
 .../paging/PageCountSyncOnNonTXTest.java           |   7 +-
 .../integration/paging/PageCounterRebuildTest.java | 289 ++++++++++++++++
 .../integration/paging/PagingCounterTest.java      | 122 +++++--
 .../tests/integration/paging/PagingSendTest.java   |   3 -
 .../integration/replication/ReplicationTest.java   |   1 +
 .../SharedNothingReplicationFlowControlTest.java   |   8 +-
 .../storage/PersistMultiThreadTest.java            |   4 +
 .../core/paging/impl/PagingManagerImplTest.java    |   2 +
 .../artemis/tests/unit/util/FakePagingManager.java |   5 +
 44 files changed, 1434 insertions(+), 382 deletions(-)

diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 448c3b247c..4cae819e6c 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -279,7 +279,7 @@ public class PrintData extends DBOption {
             folder = pgStore.getFolder();
             out.println("####################################################################################################");
             out.println("Exploring store " + store + " folder = " + folder);
-            int pgid = (int) pgStore.getFirstPage();
+            long pgid = pgStore.getFirstPage();
 
             out.println("Number of pages ::" + pgStore.getNumberOfPages() + ", Current writing page ::" + pgStore.getCurrentWritingPage());
             for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 2256e05058..248f44d4a6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -2003,5 +2003,8 @@ public interface ActiveMQServerControl {
 
    @Attribute(desc = "Whether the embedded web server is started")
    boolean isEmbeddedWebServerStarted();
+
+   @Attribute(desc = "Scan all paged destinations to rebuild the page counters")
+   void rebuildPageCounters() throws Exception;
 }
 
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index cd1600e1a2..2e445e3b3e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -67,6 +67,9 @@ public abstract class AbstractJDBCDriver {
    }
 
    public void destroy() throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("dropping {}", sqlProvider.getTableName(), new Exception("trace"));
+      }
       final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
       try (Connection connection = connectionProvider.getConnection()) {
          try {
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 4b03f91779..96375af025 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -95,8 +95,10 @@ public class JDBCSequentialFile implements SequentialFile {
       try {
          return fileFactory.listFiles(extension).contains(filename);
       } catch (Exception e) {
-         logger.warn(e.getMessage(), e);
-         fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
+         logger.debug(e.getMessage(), e);
+         // this shouldn't throw a critical IO Error
+         // as if the destination does not exists (ot table store removed), the table will not exist and
+         // we may get a SQL Exception
          return false;
       }
    }
@@ -114,7 +116,9 @@ public class JDBCSequentialFile implements SequentialFile {
          return true;
       } catch (SQLException e) {
          isLoaded.set(false);
-         fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
+         // should not throw exceptions, as we drop the table on queue.destroy.
+         // storage.exists could be called for non existing pages during async cleanup and they are
+         // just supposed to return false
       }
       return false;
    }
@@ -158,7 +162,9 @@ public class JDBCSequentialFile implements SequentialFile {
             }
          }
       } catch (SQLException e) {
-         fileFactory.onIOError(e, "Error deleting JDBC file.", this);
+         // file is already gone from a drop somewhere
+         logger.debug("Expected error deleting Sequential File", e);
+         return;
       }
    }
 
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 6bceb5a7c1..e55fb4d03b 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -141,7 +141,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
       try {
          return dbDriver.listFiles(extension);
       } catch (SQLException e) {
-         criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
+         // We can't throw critical error here
+         // exists will call listfiles, and if the store does not exists
+         // it should simply return false
          throw e;
       }
    }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index 044d9162b7..3d9d52dc25 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -96,6 +96,9 @@ public abstract class AbstractSequentialFile implements SequentialFile {
    @Override
    public final void delete() throws IOException, InterruptedException, ActiveMQException {
       try {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Deleting {}", this.getFileName(), new Exception("trace"));
+         }
          if (isOpen()) {
             close(false, false);
          }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
index aeb24d3736..2f28196a07 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
@@ -16,10 +16,11 @@
  */
 package org.apache.activemq.artemis.core.io;
 
-/**
- * TODO Merge this with IOExceptionListener
- */
 public interface IOCriticalErrorListener {
 
+   default boolean isPreviouslyFailed() {
+      return false;
+   }
+
    void onIOException(Throwable code, String message, String file);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 2b541583ec..6d1f4341da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -45,6 +45,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -4619,6 +4620,15 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       }
    }
 
+   @Override
+   public void rebuildPageCounters() throws Exception {
+      // managementLock will guarantee there's only one management operation being called
+      try (AutoCloseable lock = server.managementLock()) {
+         Future<Object> task = server.getPagingManager().rebuildCounters();
+         task.get();
+      }
+   }
+
    private ServiceComponent getEmbeddedWebServerComponent() throws ActiveMQIllegalStateException {
       for (ActiveMQComponent component : server.getExternalComponents()) {
          if (component instanceof WebServerComponentMarker) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 89ff513566..80b16cdb2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.core.paging;
 
 import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -141,6 +143,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
     */
    void checkMemory(Runnable runWhenAvailable);
 
+   void counterSnapshot();
 
    /**
     * Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
@@ -157,4 +160,15 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
    default long getMaxMessages() {
       return 0;
    }
+
+   /**
+    * Rebuilds all page counters for destinations that are paging in the background.
+    */
+   default Future<Object> rebuildCounters() {
+      return null;
+   }
+
+   default void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
+   }
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 20b62def9d..da680fce6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.RouteContextList;
@@ -129,6 +130,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
 
    Page getCurrentPage();
 
+   /** it will save snapshots on the counters */
+   void counterSnapshot();
+
    /**
     * @return true if paging was started, or false if paging was already started before this call
     */
@@ -220,4 +224,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
    void block();
 
    void unblock();
+
+   default StorageManager getStorageManager() {
+      return null;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
index 1f1ae7b4cb..7eb5b7bb94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.core.paging.cursor;
 
+import java.util.function.BiConsumer;
+
 // this is to expose PageSubscriptionImpl::PageCursorInfo
 public interface ConsumedPage {
 
@@ -24,4 +26,8 @@ public interface ConsumedPage {
 
    boolean isDone();
 
+   boolean isAck(int messageNumber);
+
+   void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer);
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index a6f1714a47..d3f25e21b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
+import java.util.function.Consumer;
+
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 
@@ -32,12 +34,16 @@ public interface PageCursorProvider {
     */
    PageSubscription getSubscription(long queueId);
 
+   void forEachSubscription(Consumer<PageSubscription> consumer);
+
    PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
 
    void processReload() throws Exception;
 
    void stop();
 
+   void counterSnapshot();
+
    void flushExecutors();
 
    void scheduleCleanup();
@@ -56,4 +62,8 @@ public interface PageCursorProvider {
     */
    void close(PageSubscription pageCursorImpl);
 
+   void counterRebuildStarted();
+
+   void counterRebuildDone();
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 50d7fae5d0..17a743f111 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -35,6 +35,9 @@ public interface PageSubscription {
    // To be called before the server is down
    void stop();
 
+   /** Save a snapshot of the current counter value in the journal */
+   void counterSnapshot();
+
    /**
     * This is a callback to inform the PageSubscription that something was routed, so the empty flag can be cleared
     */
@@ -46,6 +49,8 @@ public interface PageSubscription {
 
    long getMessageCount();
 
+   boolean isCounterPending();
+
    long getPersistentSize();
 
    long getId();
@@ -170,4 +175,6 @@ public interface PageSubscription {
    void incrementDeliveredSize(long size);
 
    void removePendingDelivery(PagedMessage pagedMessage);
+
+   ConsumedPage locatePageInfo(long pageNr);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 75eedfa9ee..56614a4871 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
-import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
 public interface PageSubscriptionCounter {
@@ -36,7 +35,13 @@ public interface PageSubscriptionCounter {
 
    void loadInc(long recordInd, int add, long persistentSize);
 
-   void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize);
+   void applyIncrementOnTX(Transaction tx, int add, long persistentSize);
+
+   void markRebuilding();
+
+   void finishRebuild();
+
+   boolean isRebuilding();
 
    /**
     * This will process the reload
@@ -46,12 +51,12 @@ public interface PageSubscriptionCounter {
    // used when deleting the counter
    void delete() throws Exception;
 
-   void pendingCounter(Page page, int increment, long persistentSize) throws Exception;
+   void snapshot();
 
    // used when leaving page mode, so the counters are deleted in batches
    // for each queue on the address
    void delete(Transaction tx) throws Exception;
 
-   void cleanupNonTXCounters(long pageID) throws Exception;
+   PageSubscriptionCounter setSubscription(PageSubscription subscription);
 
 }
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java
similarity index 61%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java
index 1f1ae7b4cb..3ba540bdf5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java
@@ -14,14 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.activemq.artemis.core.paging.cursor.impl;
 
-package org.apache.activemq.artemis.core.paging.cursor;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 
-// this is to expose PageSubscriptionImpl::PageCursorInfo
-public interface ConsumedPage {
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
 
-   long getPageId();
+   private volatile  boolean rebuilding = false;
 
-   boolean isDone();
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+   @Override
+   public boolean isRebuilding() {
+      return rebuilding;
+   }
 
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
new file mode 100644
index 0000000000..92a5ac5d39
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+   private boolean paging;
+   private long limitPageId;
+   private int limitMessageNr;
+   private LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().counterRebuildDone();
+               return;
+            }
+            store.getCursorProvider().counterRebuildStarted();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               CopiedConsumedPage copiedConsumedPage = new CopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      CopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().counterRebuildDone();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} reached the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
+                     }
+                     // this is the limit where we should count..
+                     // anything beyond this will be new data
+                     break;
+                  }
+               }
+               msg.initMessage(sm);
+               long[] routedQueues = msg.getQueueIDs();
+
+               if (logger.isTraceEnabled()) {
+                  logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
+               }
+               for (long queueID : routedQueues) {
+                  boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
+
+                  boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID());
+
+                  if (!txOK) {
+                     logger.debug("TX is not ok for {}", msg);
+                  }
+
+                  if (ok && txOK) { // not acked and TX is ok
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
+                     }
+                     CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+                     if (copiedSubscription != null) {
+                        copiedSubscription.empty = false;
+                        copiedSubscription.addUp++;
+                        copiedSubscription.sizeUp += msg.getPersistentSize();
+                     }
+                  } else {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
+                     }
+                  }
+               }
+            }
+         }
+      }
+
+      logger.debug("Counter rebuilding done for address {}", pgStore.getAddress());
+
+      done();
+
+   }
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         this.subscriptionCounter = subscription.getCounter();
+         this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<CopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      CopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class CopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         throw new RuntimeException("method not implemented");
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         throw new RuntimeException("method not implemented");
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 242fbcb14e..24d65970f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.collections.LongHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.function.Consumer;
 
 public class PageCursorProviderImpl implements PageCursorProvider {
 
@@ -54,6 +56,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    protected volatile boolean cleanupEnabled = true;
 
+   // We can't call cleanup before counters were rebuilt
+   // as they will determine if a subscription is empty or not
+   protected volatile boolean countersRebuilt = true;
+
    protected final PagingStore pagingStore;
 
    protected final StorageManager storageManager;
@@ -85,16 +91,30 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          throw new IllegalStateException("Cursor " + cursorID + " had already been created");
       }
 
-      PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent);
+
+      PageSubscriptionCounter subscriptionCounter = createPageCounter(cursorID, persistent);
+      PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent, subscriptionCounter);
+
+
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
 
+
+   private PageSubscriptionCounter createPageCounter(long cursorID, boolean persistent) {
+      return new PageSubscriptionCounterImpl(storageManager, cursorID);
+   }
+
    @Override
    public synchronized PageSubscription getSubscription(long cursorID) {
       return activeCursors.get(cursorID);
    }
 
+   @Override
+   public void forEachSubscription(Consumer<PageSubscription> consumer) {
+      activeCursors.forEach((k, v) -> consumer.accept(v));
+   }
+
    @Override
    public PagedReference newReference(final PagedMessage msg,
                                       final PageSubscription subscription) {
@@ -139,6 +159,13 @@ public class PageCursorProviderImpl implements PageCursorProvider {
       }
    }
 
+   @Override
+   public void counterSnapshot() {
+      for (PageSubscription cursor : activeCursors.values()) {
+         cursor.counterSnapshot();
+      }
+   }
+
    @Override
    public void flushExecutors() {
       pagingStore.flushExecutors();
@@ -216,6 +243,11 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    protected void cleanup() {
 
+      if (!countersRebuilt) {
+         logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
+         return;
+      }
+
       ArrayList<Page> depagedPages = new ArrayList<>();
       LongHashSet depagedPagesSet = new LongHashSet();
 
@@ -506,6 +538,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    private long checkMinPage(Collection<PageSubscription> cursorList) {
       long minPage = Long.MAX_VALUE;
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("Min page cursorList size {} on {}", cursorList.size(), pagingStore.getAddress(), new Exception("trace"));
+      }
+
       for (PageSubscription cursor : cursorList) {
          long firstPage = cursor.getFirstPage();
          if (logger.isTraceEnabled()) {
@@ -543,4 +579,14 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          }
       }
    }
+
+   @Override
+   public void counterRebuildStarted() {
+      this.countersRebuilt = false;
+   }
+
+   @Override
+   public void counterRebuildDone() {
+      this.countersRebuilt = true;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 6ca73feb19..f580be47f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -16,19 +16,12 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
-import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -44,173 +37,117 @@ import java.lang.invoke.MethodHandles;
 /**
  * This class will encapsulate the persistent counters for the PagingSubscription
  */
-public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
+public class PageSubscriptionCounterImpl extends BasePagingCounter {
 
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   private static final int FLUSH_COUNTER = 1000;
-
    private final long subscriptionID;
 
    // the journal record id that is holding the current value
    private long recordID = -1;
 
-   private boolean persistent;
+   /** while we rebuild the counters, we will use the recordedValues */
+   private volatile long recordedValue = -1;
+   private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> recordedValueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedValue");
 
-   private final PageSubscription subscription;
+   /** while we rebuild the counters, we will use the recordedValues */
+   private volatile long recordedSize = -1;
+   private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> recordedSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedSize");
 
-   private final StorageManager storage;
+   private PageSubscription subscription;
 
-   private final AtomicLong value = new AtomicLong(0);
-   private final AtomicLong persistentSize = new AtomicLong(0);
+   private final StorageManager storage;
 
-   private final AtomicLong added = new AtomicLong(0);
-   private final AtomicLong addedPersistentSize = new AtomicLong(0);
+   private volatile long value;
+   private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> valueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "value");
 
-   private final AtomicLong pendingValue = new AtomicLong(0);
-   private final AtomicLong pendingPersistentSize = new AtomicLong(0);
+   private volatile long persistentSize;
+   private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> persistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "persistentSize");
 
-   private final LinkedList<Long> incrementRecords = new LinkedList<>();
+   private volatile long added;
+   private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> addedUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "added");
 
-   // We are storing pending counters for non transactional writes on page
-   // we will recount a page case we still see pending records
-   // as soon as we close a page we remove these records replacing by a regular page increment record
-   // A Map per pageID, each page will have a set of IDs, with the increment on each one
-   private final Map<Long, PendingCounter> pendingCounters = new HashMap<>();
+   private volatile long addedPersistentSize;
+   private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> addedPersistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "addedPersistentSize");
 
    private LinkedList<PendingCounter> loadList;
 
-   private final Executor pageExecutor;
-
    public PageSubscriptionCounterImpl(final StorageManager storage,
-                                      final PageSubscription subscription,
-                                      final boolean persistent,
                                       final long subscriptionID) {
       this.subscriptionID = subscriptionID;
       this.storage = storage;
-      this.persistent = persistent;
-      this.subscription = subscription;
-      if (subscription == null) {
-         this.pageExecutor = null;
-      } else {
-         this.pageExecutor = subscription.getPagingStore().getExecutor();
-         assert pageExecutor != null;
-      }
    }
 
    @Override
-   public long getValueAdded() {
-      return added.get() + pendingValue.get();
-   }
-
-   @Override
-   public long getValue() {
-      return value.get() + pendingValue.get();
+   public void markRebuilding() {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Subscription {} marked for rebuilding", subscriptionID);
+      }
+      super.markRebuilding();
+      recordedSizeUpdater.set(this, persistentSizeUpdater.get(this));
+      recordedValueUpdater.set(this, recordedValueUpdater.get(this));
+      try {
+         reset();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
    }
 
    @Override
-   public long getPersistentSizeAdded() {
-      return addedPersistentSize.get() + pendingPersistentSize.get();
+   public void finishRebuild() {
+      super.finishRebuild();
+      if (logger.isDebugEnabled()) {
+         logger.debug("Subscription {} finished rebuilding", subscriptionID);
+      }
+      snapshot();
+      addedUpdater.set(this, valueUpdater.get(this));
+      addedPersistentSizeUpdater.set(this, persistentSizeUpdater.get(this));
    }
 
    @Override
-   public long getPersistentSize() {
-      return persistentSize.get() + pendingPersistentSize.get();
+   public long getValueAdded() {
+      return addedUpdater.get(this);
    }
 
-   /**
-    * This is used only on non transactional paging
-    *
-    * @param page
-    * @param increment
-    * @throws Exception
-    */
    @Override
-   public synchronized void pendingCounter(Page page, int increment, long size) throws Exception {
-      if (!persistent) {
-         return; // nothing to be done
+   public long getValue() {
+      if (isRebuilding()) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("returning getValue from isPending on subscription {}, recordedValue={}, addedUpdater={}", recordedValueUpdater.get(this), addedUpdater.get(this));
+         }
+         return recordedValueUpdater.get(this);
       }
-
-      assert page != null;
-
-      PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId());
-      if (pendingInfo == null) {
-         // We have to make sure this is sync here
-         // not syncing this to disk may cause the page files to be out of sync on pages.
-         // we can't afford the case where a page file is written without a record here
-         long id = storage.storePendingCounter(this.subscriptionID, page.getPageId());
-         pendingInfo = new PendingCounter(id, increment, size);
-         pendingCounters.put((long) page.getPageId(), pendingInfo);
-      } else {
-         pendingInfo.addAndGet(increment, size);
+      if (logger.isTraceEnabled()) {
+         logger.trace("returning regular getValue subscription {}, value={}", subscriptionID, valueUpdater.get(this));
       }
+      return valueUpdater.get(this);
+   }
 
-      pendingValue.addAndGet(increment);
-      pendingPersistentSize.addAndGet(size);
-
-      page.addPendingCounter(this);
+   @Override
+   public long getPersistentSizeAdded() {
+      return addedPersistentSizeUpdater.get(this);
    }
 
-   /**
-    * Cleanup temporary page counters on non transactional paged messages
-    *
-    * @param pageID
-    */
    @Override
-   public void cleanupNonTXCounters(final long pageID) throws Exception {
-      PendingCounter pendingInfo;
-      synchronized (this) {
-         pendingInfo = pendingCounters.remove(pageID);
+   public long getPersistentSize() {
+      if (isRebuilding()) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("returning getPersistentSize from isPending on subscription {}, recordedSize={}. addedSize={}", subscriptionID, recordedSizeUpdater.get(this), addedPersistentSizeUpdater.get(this));
+         }
+         return recordedSizeUpdater.get(this);
       }
-
-      if (pendingInfo != null) {
-         final int valueCleaned = pendingInfo.getCount();
-         final long valueSizeCleaned = pendingInfo.getPersistentSize();
-         Transaction tx = new TransactionImpl(storage);
-         storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId());
-
-         // To apply the increment of the value just being cleaned
-         increment(tx, valueCleaned, valueSizeCleaned);
-
-         tx.addOperation(new TransactionOperationAbstract() {
-            @Override
-            public void afterCommit(Transaction tx) {
-               pendingValue.addAndGet(-valueCleaned);
-               pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0);
-            }
-         });
-
-         tx.commit();
+      if (logger.isTraceEnabled()) {
+         logger.trace("returning regular getPersistentSize subscription {}, value={}", subscriptionID, persistentSizeUpdater.get(this));
       }
+      return persistentSizeUpdater.get(this);
    }
 
    @Override
    public void increment(Transaction tx, int add, long size) throws Exception {
       if (tx == null) {
-         if (persistent) {
-            long id = storage.storePageCounterInc(this.subscriptionID, add, size);
-            storage.getContext().executeOnCompletion(new IOCallback() {
-               @Override
-               public void done() {
-                  process(id, add, size);
-               }
-
-               @Override
-               public void onError(int errorCode, String errorMessage) {
-
-               }
-            });
-         } else {
-            process(-1, add, size);
-         }
+         process(add, size);
       } else {
-         if (persistent) {
-            tx.setContainsPersistent();
-            long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size);
-            applyIncrementOnTX(tx, id, add, size);
-         } else {
-            applyIncrementOnTX(tx, -1, add, size);
-         }
+         applyIncrementOnTX(tx, add, size);
       }
    }
 
@@ -218,11 +155,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
     * This method will install the TXs
     *
     * @param tx
-    * @param recordID1
     * @param add
     */
    @Override
-   public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) {
+   public void applyIncrementOnTX(Transaction tx, int add, long size) {
       CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
 
       if (oper == null) {
@@ -231,27 +167,36 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
          tx.addOperation(oper);
       }
 
-      oper.operations.add(new ItemOper(this, recordID1, add, size));
+      oper.operations.add(new ItemOper(this, add, size));
    }
 
    @Override
-   public synchronized void loadValue(final long recordID1, final long value1, long size) {
-      if (this.subscription != null) {
-         // it could be null on testcases... which is ok
-         this.subscription.notEmpty();
+   public synchronized void loadValue(final long recordID, final long value, long size) {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Counter for subscription {} reloading recordID={}, value={}, size={}", this.subscriptionID, recordID, value, size);
       }
-      this.value.set(value1);
-      this.added.set(value1);
-      this.persistentSize.set(size);
-      this.addedPersistentSize.set(size);
-      this.recordID = recordID1;
+      this.recordID = recordID;
+      recordedValueUpdater.set(this, value);
+      recordedSizeUpdater.set(this, size);
+      valueUpdater.set(this, value);
+      persistentSizeUpdater.set(this, size);
+      addedUpdater.set(this, size);
    }
 
-   private void process(long id, int add, long size) {
-      if (id >= 0 && pageExecutor != null) {
-         pageExecutor.execute(() -> doIncrement(id, add, size));
-      } else {
-         doIncrement(-1, add, size);
+   private void process(int add, long size) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size);
+      }
+      valueUpdater.addAndGet(this, add);
+      persistentSizeUpdater.addAndGet(this, size);
+      if (add > 0) {
+         addedUpdater.addAndGet(this, add);
+         addedPersistentSizeUpdater.addAndGet(this, size);
+      }
+
+      if (isRebuilding()) {
+         recordedValueUpdater.addAndGet(this, value);
+         recordedSizeUpdater.addAndGet(this, size);
       }
    }
 
@@ -264,24 +209,39 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       tx.commit();
    }
 
+   private void reset() throws Exception {
+      Transaction tx = new TransactionImpl(storage);
+
+      delete(tx, true);
+
+      tx.commit();
+   }
+
    @Override
    public void delete(Transaction tx) throws Exception {
+      delete(tx, false);
+   }
+
+   private void delete(Transaction tx, boolean keepZero) throws Exception {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Subscription {} delete, keepZero={}", subscriptionID, keepZero);
+      }
       // always lock the StorageManager first.
       try (ArtemisCloseable lock = storage.closeableReadLock()) {
          synchronized (this) {
-            for (Long record : incrementRecords) {
-               storage.deleteIncrementRecord(tx.getID(), record.longValue());
-               tx.setContainsPersistent();
-            }
-
             if (recordID >= 0) {
                storage.deletePageCounter(tx.getID(), this.recordID);
                tx.setContainsPersistent();
             }
 
-            recordID = -1;
-            value.set(0);
-            incrementRecords.clear();
+            if (keepZero) {
+               recordID = storage.storePageCounter(tx.getID(), subscriptionID, 0L, 0L);
+            } else {
+               recordID = -1;
+            }
+
+            valueUpdater.set(this, 0);
+            persistentSizeUpdater.set(this, 0);
          }
       }
    }
@@ -298,110 +258,101 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    @Override
    public void processReload() {
       if (loadList != null) {
-         if (subscription != null) {
-            // it could be null on testcases
-            subscription.notEmpty();
-         }
-
-         for (PendingCounter incElement : loadList) {
-            value.addAndGet(incElement.getCount());
-            added.addAndGet(incElement.getCount());
-            persistentSize.addAndGet(incElement.getPersistentSize());
-            addedPersistentSize.addAndGet(incElement.getPersistentSize());
-            incrementRecords.add(incElement.getId());
+         try {
+            long tx = -1L;
+            logger.debug("Removing increment records on cursor {}", subscriptionID);
+            for (PendingCounter incElement : loadList) {
+               if (tx < 0) {
+                  tx = storage.generateID();
+               }
+               storage.deletePageCounter(tx, incElement.id);
+            }
+            if (tx >= 0) {
+               storage.commit(tx);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
          }
          loadList.clear();
          loadList = null;
       }
    }
 
-   // you need to call this method from the executors when id > 0
-   private void doIncrement(long id, int variance, long size) {
-      value.addAndGet(variance);
-      this.persistentSize.addAndGet(size);
-      if (variance > 0) {
-         added.addAndGet(variance);
-      }
-      if (size > 0) {
-         addedPersistentSize.addAndGet(size);
-      }
-      if (id >= 0) {
-         synchronized (this) {
-            incrementRecords.add(id);
-            if (incrementRecords.size() > FLUSH_COUNTER) {
-               this.cleanup();
-            }
-         }
-      }
-   }
-
-   /**
-    * used on testing only
-    */
-   public void setPersistent(final boolean persistent) {
-      this.persistent = persistent;
-   }
-
    /**
     * This method should always be called from a single threaded executor
     */
-   protected synchronized void cleanup() {
-      if (incrementRecords.size() <= FLUSH_COUNTER) {
+   @Override
+   public synchronized void snapshot() {
+      if (isRebuilding()) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("snapshot call ignored as cursor is being rebuilt for {}", subscriptionID);
+         }
+         return;
+      }
+
+      if (!storage.isStarted()) {
+         logger.debug("Storage is not active, ignoring snapshot call on {}", subscriptionID);
          return;
       }
 
-      long valueReplace = value.get();
-      long sizeReplace = persistentSize.get();
-      ArrayList<Long> deleteList = new ArrayList<>(incrementRecords);
-      incrementRecords.clear();
+      long valueReplace = valueUpdater.get(this);
+      long sizeReplace = persistentSizeUpdater.get(this);
 
       long newRecordID = -1;
 
-      long txCleanup = storage.generateID();
+      long txCleanup = -1;
 
       try {
-         for (Long value1 : deleteList) {
-            storage.deleteIncrementRecord(txCleanup, value1);
-         }
-
          if (recordID >= 0) {
+            if (txCleanup < 0) {
+               txCleanup = storage.generateID();
+            }
             storage.deletePageCounter(txCleanup, recordID);
+            recordID = -1;
          }
 
-         newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
+         if (valueReplace > 0) {
+            if (txCleanup < 0) {
+               txCleanup = storage.generateID();
+            }
+            newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
+         }
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}",
-                         recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
+         if (logger.isDebugEnabled()) {
+            logger.debug("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}, value = {}, size = {}",
+                         recordID, newRecordID, subscriptionID, subscription.getQueue().getName(), valueReplace, sizeReplace);
          }
 
-         storage.commit(txCleanup);
+         if (txCleanup >= 0) {
+            storage.commit(txCleanup);
+         }
       } catch (Exception e) {
          newRecordID = recordID;
 
          ActiveMQServerLogger.LOGGER.problemCleaningPagesubscriptionCounter(e);
-         try {
-            storage.rollback(txCleanup);
-         } catch (Exception ignored) {
+         if (txCleanup >= 0) {
+            try {
+               storage.rollback(txCleanup);
+            } catch (Exception ignored) {
+            }
          }
       } finally {
          recordID = newRecordID;
+         recordedValueUpdater.set(this, valueReplace);
+         recordedSizeUpdater.set(this, sizeReplace);
       }
    }
 
    private static class ItemOper {
 
-      private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) {
+      private ItemOper(PageSubscriptionCounterImpl counter, int add, long persistentSize) {
          this.counter = counter;
-         this.id = id;
          this.amount = add;
          this.persistentSize = persistentSize;
       }
 
       PageSubscriptionCounterImpl counter;
 
-      long id;
-
       int amount;
 
       long persistentSize;
@@ -414,7 +365,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       @Override
       public void afterCommit(Transaction tx) {
          for (ItemOper oper : operations) {
-            oper.counter.process(oper.id, oper.amount, oper.persistentSize);
+            oper.counter.process(oper.amount, oper.persistentSize);
          }
       }
    }
@@ -465,4 +416,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
          SIZE_UPDATER.addAndGet(this, persistentSize);
       }
    }
+
+   @Override
+   public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
+      this.subscription = subscription;
+      return this;
+   }
 }
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index cdbd1dba72..a0ebc3a9c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.ToIntFunction;
@@ -273,14 +274,17 @@ public final class PageSubscriptionImpl implements PageSubscription {
                         final StorageManager store,
                         final Filter filter,
                         final long cursorId,
-                        final boolean persistent) {
+                        final boolean persistent,
+                        final PageSubscriptionCounter counter) {
+      assert counter != null;
       this.pageStore = pageStore;
       this.store = store;
       this.cursorProvider = cursorProvider;
       this.cursorId = cursorId;
       this.filter = filter;
       this.persistent = persistent;
-      this.counter = new PageSubscriptionCounterImpl(store, this, persistent, cursorId);
+      this.counter = counter;
+      this.counter.setSubscription(this);
    }
 
 
@@ -346,6 +350,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
       }
    }
 
+   @Override
+   public boolean isCounterPending() {
+      return counter.isRebuilding();
+   }
+
    @Override
    public long getPersistentSize() {
       if (empty) {
@@ -418,10 +427,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
    @Override
    public void onPageModeCleared(Transaction tx) throws Exception {
-      if (counter != null) {
-         // this could be null on testcases
-         counter.delete(tx);
-      }
+      // this could be null on testcases
+      counter.delete(tx);
       this.empty = true;
    }
 
@@ -746,9 +753,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
    }
 
    @Override
-   public void forEachConsumedPage(Consumer<ConsumedPage> pageCleaner) {
+   public void forEachConsumedPage(Consumer<ConsumedPage> pageConsumer) {
       synchronized (consumedPages) {
-         consumedPages.values().forEach(pageCleaner);
+         consumedPages.values().forEach(pageConsumer);
       }
    }
 
@@ -860,6 +867,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
    public void stop() {
    }
 
+   @Override
+   public void counterSnapshot() {
+      counter.snapshot();
+   }
+
    @Override
    public void printDebug() {
       printDebug(toString());
@@ -912,6 +924,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
       return getPageInfo(pos.getPageNr());
    }
 
+   @Override
    public PageCursorInfo locatePageInfo(final long pageNr) {
       synchronized (consumedPages) {
          return consumedPages.get(pageNr);
@@ -1064,10 +1077,16 @@ public final class PageSubscriptionImpl implements PageSubscription {
       // expressions
       private final AtomicInteger confirmed = new AtomicInteger(0);
 
+      @Override
       public synchronized boolean isAck(int messageNumber) {
          return completePage != null || acks.get(messageNumber) != null;
       }
 
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         acks.forEach(ackConsumer);
+      }
+
       @Override
       public String toString() {
          try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 6a0551b430..8e919fb670 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.paging.impl;
 
 import java.nio.ByteBuffer;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
@@ -26,13 +25,11 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.collections.EmptyList;
 import org.apache.activemq.artemis.utils.collections.LinkedList;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@@ -86,11 +83,6 @@ public final class Page  {
 
    private final SimpleString storeName;
 
-   /**
-    * A list of subscriptions containing pending counters (with non tx adds) on this page
-    */
-   private Set<PageSubscriptionCounter> pendingCounters;
-
    private ByteBuffer readFileBuffer;
 
    public Page(final SimpleString storeName,
@@ -241,13 +233,6 @@ public final class Page  {
          storageManager.pageClosed(storeName, pageId);
       }
       file.close(waitSync, waitSync);
-
-      Set<PageSubscriptionCounter> counters = getPendingCounters();
-      if (counters != null) {
-         for (PageSubscriptionCounter counter : counters) {
-            counter.cleanupNonTXCounters(this.getPageId());
-         }
-      }
    }
 
    public boolean delete(final LinkedList<PagedMessage> messages) throws Exception {
@@ -255,7 +240,9 @@ public final class Page  {
          storageManager.pageDeleted(storeName, pageId);
       }
 
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Deleting pageNr={} on store {}", pageId, storeName, new Exception("trace"));
+      } else if (logger.isDebugEnabled()) {
          logger.debug("Deleting pageNr={} on store {}", pageId, storeName);
       }
 
@@ -373,24 +360,4 @@ public final class Page  {
       return file;
    }
 
-   /**
-    * This will indicate a page that will need to be called on cleanup when the page has been closed and confirmed
-    *
-    * @param pageSubscriptionCounter
-    */
-   public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) {
-      getOrCreatePendingCounters().add(pageSubscriptionCounter);
-   }
-
-   private synchronized Set<PageSubscriptionCounter> getPendingCounters() {
-      return pendingCounters;
-   }
-
-   private synchronized Set<PageSubscriptionCounter> getOrCreatePendingCounters() {
-      if (pendingCounters == null) {
-         pendingCounters = new ConcurrentHashSet<>();
-      }
-
-      return pendingCounters;
-   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 59db0da45f..6bec335949 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -32,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@@ -40,14 +43,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.SizeAwareMetric;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
 import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
 
 public final class PagingManagerImpl implements PagingManager {
 
-   private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval", "0"));
+   private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60"));
 
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -92,13 +97,13 @@ public final class PagingManagerImpl implements PagingManager {
 
    private volatile long diskTotalSpace = 0;
 
-   private final Executor memoryExecutor;
+   private final Executor managerExecutor;
 
    private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
 
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
 
-   private ActiveMQScheduledComponent scheduledComponent = null;
+   private ActiveMQScheduledComponent snapshotUpdater = null;
 
    private final SimpleString managementAddress;
 
@@ -127,7 +132,7 @@ public final class PagingManagerImpl implements PagingManager {
       globalSizeMetric.setElementsEnabled(maxMessages >= 0);
       globalSizeMetric.setOverCallback(() -> setGlobalFull(true));
       globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
-      this.memoryExecutor = pagingSPI.newExecutor();
+      this.managerExecutor = pagingSPI.newExecutor();
       this.managementAddress = managementAddress;
    }
 
@@ -205,8 +210,8 @@ public final class PagingManagerImpl implements PagingManager {
    protected void checkMemoryRelease() {
       if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) {
          if (!memoryCallback.isEmpty()) {
-            if (memoryExecutor != null) {
-               memoryExecutor.execute(this::memoryReleased);
+            if (managerExecutor != null) {
+               managerExecutor.execute(this::memoryReleased);
             } else {
                memoryReleased();
             }
@@ -368,8 +373,8 @@ public final class PagingManagerImpl implements PagingManager {
             PagingStore oldStore = stores.remove(store.getStoreName());
             if (oldStore != null) {
                oldStore.stop();
-               oldStore = null;
             }
+            store.getCursorProvider().counterRebuildStarted(); // TODO-NOW-DONT-MERGE maybe this should be removed
             store.start();
             stores.put(store.getStoreName(), store);
          }
@@ -466,28 +471,38 @@ public final class PagingManagerImpl implements PagingManager {
 
          reloadStores();
 
-         if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) {
-            this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) {
+         if (ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL > 0) {
+            this.snapshotUpdater = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL, TimeUnit.SECONDS, false) {
                @Override
                public void run() {
-                  debug();
+                  try {
+                     logger.debug("Updating counter snapshots");
+                     counterSnapshot();
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                  }
                }
             };
 
-            this.scheduledComponent.start();
+            this.snapshotUpdater.start();
 
          }
 
          started = true;
+
       } finally {
          unlock();
       }
    }
 
-   public void debug() {
-      logger.info("size = {} bytes, messages = {}", globalSizeMetric.getSize(), globalSizeMetric.getElements());
+   @Override
+   public void counterSnapshot() {
+      for (PagingStore store : stores.values()) {
+         store.counterSnapshot();
+      }
    }
 
+
    @Override
    public synchronized void stop() throws Exception {
       if (!started) {
@@ -495,9 +510,9 @@ public final class PagingManagerImpl implements PagingManager {
       }
       started = false;
 
-      if (scheduledComponent != null) {
-         this.scheduledComponent.stop();
-         this.scheduledComponent = null;
+      if (snapshotUpdater != null) {
+         this.snapshotUpdater.stop();
+         this.snapshotUpdater = null;
       }
 
       lock();
@@ -548,4 +563,26 @@ public final class PagingManagerImpl implements PagingManager {
       syncLock.writeLock().lock();
    }
 
+   @Override
+   public void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
+      transactions.forEach(transactionConsumer);
+   }
+
+   @Override
+   public Future<Object> rebuildCounters() {
+      LongHashSet transactionsSet = new LongHashSet();
+      transactions.forEach((txId, tx) -> {
+         transactionsSet.add(txId);
+      });
+      stores.forEach((address, pgStore) -> {
+         PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet);
+         logger.debug("Setting destination {} to rebuild counters", address);
+         managerExecutor.execute(rebuildManager);
+      });
+
+      FutureTask<Object> task = new FutureTask<>(() -> null);
+      managerExecutor.execute(task);
+
+      return task;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index ca3fd43a71..81d10b9691 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -380,34 +380,40 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
-   public synchronized void stop() throws Exception {
-      if (running) {
-         cursorProvider.flushExecutors();
-         cursorProvider.stop();
-
-         final List<Runnable> pendingTasks = new ArrayList<>();
-
-         // TODO we could have a parameter to use this
-         final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
-         if (pendingTasksWhileShuttingDown > 0) {
-            logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown);
-            for (Runnable pendingTask : pendingTasks) {
-               try {
-                  pendingTask.run();
-               } catch (Throwable t) {
-                  logger.warn("Error while executing a pending task on shutdown", t);
-               }
-            }
+   public void counterSnapshot() {
+      cursorProvider.counterSnapshot();
+   }
+
+   @Override
+   public void stop() throws Exception {
+      synchronized (this) {
+         if (running) {
+            cursorProvider.stop();
+            running = false;
+         } else {
+            return;
          }
+      }
 
-         running = false;
+      final List<Runnable> pendingTasks = new ArrayList<>();
 
-         final Page page = currentPage;
-         if (page != null) {
-            page.close(false);
-            currentPage = null;
+      final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
+      if (pendingTasksWhileShuttingDown > 0) {
+         logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown);
+         for (Runnable pendingTask : pendingTasks) {
+            try {
+               pendingTask.run();
+            } catch (Throwable t) {
+               logger.warn("Error while executing a pending task on shutdown", t);
+            }
          }
       }
+
+      final Page page = currentPage;
+      if (page != null) {
+         page.close(false);
+         currentPage = null;
+      }
    }
 
    @Override
@@ -424,10 +430,13 @@ public class PagingStoreImpl implements PagingStore {
    public void flushExecutors() {
       FutureLatch future = new FutureLatch();
 
-      executor.execute(future);
+      try {
+         executor.execute(future);
 
-      if (!future.await(60000)) {
-         ActiveMQServerLogger.LOGGER.pageStoreTimeout(address);
+         if (!future.await(60000)) {
+            ActiveMQServerLogger.LOGGER.pageStoreTimeout(address);
+         }
+      } catch (Exception ignored) {
       }
    }
 
@@ -1122,14 +1131,7 @@ public class PagingStoreImpl implements PagingStore {
       List<org.apache.activemq.artemis.core.server.Queue> durableQueues = ctx.getDurableQueues();
       List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
       for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) {
-         if (tx == null) {
-            // non transactional writes need an intermediate place
-            // to avoid the counter getting out of sync
-            q.getPageSubscription().getCounter().pendingCounter(page, 1, size);
-         } else {
-            // null tx is treated through pending counters
-            q.getPageSubscription().getCounter().increment(tx, 1, size);
-         }
+         q.getPageSubscription().getCounter().increment(tx, 1, size);
       }
 
       for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
@@ -1408,5 +1410,9 @@ public class PagingStoreImpl implements PagingStore {
       usedPages.forEachUsedPage(consumerPage);
    }
 
+   @Override
+   public StorageManager getStorageManager() {
+      return storageManager;
+   }
 
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index d7ca55888f..5157103ff9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -22,7 +22,6 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
 
 import java.security.InvalidParameterException;
@@ -1692,7 +1691,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
    @Override
    public synchronized boolean isStarted() {
-      return started;
+      if (ioCriticalErrorListener != null) {
+         return started && !ioCriticalErrorListener.isPreviouslyFailed();
+      } else {
+         return started;
+      }
    }
 
    /**
@@ -1895,25 +1898,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                }
                break;
             }
-            case PAGE_CURSOR_COUNTER_VALUE: {
-               ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
-
-               break;
-            }
-
             case PAGE_CURSOR_COUNTER_INC: {
                PageCountRecordInc encoding = new PageCountRecordInc();
 
                encoding.decode(buff);
 
-               PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
+               logger.debug("Page cursor counter inc on a prepared TX.");
 
-               if (sub != null) {
-                  sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize());
-                  sub.notEmpty();
-               } else {
-                  ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
-               }
+               // TODO: do I need to remove the record on commit?
 
                break;
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 5a585fe8bb..19840061b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -535,7 +535,7 @@ public final class DescribeJournal {
       PageSubscriptionCounterImpl subsCounter;
       subsCounter = counters.get(queueIDForCounter);
       if (subsCounter == null) {
-         subsCounter = new PageSubscriptionCounterImpl(null, null, false, -1);
+         subsCounter = new PageSubscriptionCounterImpl(null, -1);
          counters.put(queueIDForCounter, subsCounter);
       }
       return subsCounter;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index aadb9ce633..2c2f632d93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -524,4 +524,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 229244, value = "Meters already registered for {}")
    IllegalStateException metersAlreadyRegistered(String resource);
+
+   @Message(id = 229245, value = "Management controller is busy with another task. Please try again")
+   ActiveMQTimeoutException managementBusy();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index bfc0feea5a..57a02aa305 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -111,6 +111,7 @@ public interface ActiveMQServer extends ServiceComponent {
       STOPPED
    }
 
+   AutoCloseable managementLock() throws Exception;
 
    void setState(SERVER_STATE state);
 
@@ -357,6 +358,14 @@ public interface ActiveMQServer extends ServiceComponent {
                                Map<SimpleString, RoutingType> prefixes,
                                String securityDomain) throws Exception;
 
+   /** should the server rebuild page counters upon startup.
+    *  this will be useful on testing or an embedded broker scenario */
+   boolean isRebuildCounters();
+
+   /** should the server rebuild page counters upon startup.
+    *  this will be useful on testing or an embedded broker scenario */
+   void setRebuildCounters(boolean rebuildCounters);
+
    SecurityStore getSecurityStore();
 
    void removeSession(String name) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 2f007976e3..1feee23ec0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -472,9 +472,6 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222047, value = "Can not find queue {} while reloading ACKNOWLEDGE_CURSOR", level = LogMessage.Level.WARN)
    void journalCannotFindQueueReloadingACK(Long queueID);
 
-   @LogMessage(id = 222048, value = "PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, invalid state", level = LogMessage.Level.WARN)
-   void journalPAGEOnPrepared();
-
    @LogMessage(id = 222049, value = "InternalError: Record type {} not recognized. Maybe you are using journal files created on a different version", level = LogMessage.Level.WARN)
    void journalInvalidRecordType(Byte recordType);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index af278ebfd9..d9a1f7bc21 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -239,6 +239,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private HAPolicy haPolicy;
 
+   // This will be useful on tests or embedded
+   private boolean rebuildCounters = true;
+
    private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
 
    private final Version version;
@@ -271,6 +274,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private ReplayManager replayManager;
 
+   /** Certain management operations shouldn't use more than one thread.
+    *  this semaphore is used to guarantee a single thread used. */
+   private final Semaphore managementSemaphore = new Semaphore(1);
+
    /**
     * This is a thread pool for io tasks only.
     * We can't use the same global executor to avoid starvations.
@@ -496,6 +503,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return networkHealthCheck;
    }
 
+   @Override
+   public void setRebuildCounters(boolean rebuildCounters) {
+      this.rebuildCounters = rebuildCounters;
+   }
+
+   @Override
+   public boolean isRebuildCounters() {
+      return this.rebuildCounters;
+   }
+
 
    @Override
    public void replay(Date start, Date end, String address, String target, String filter) throws Exception {
@@ -1323,6 +1340,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
+      if (!criticalIOError && pagingManager != null) {
+         pagingManager.counterSnapshot();
+      }
+
       stopComponent(pagingManager);
 
       if (storageManager != null)
@@ -3323,6 +3344,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       JournalLoadInformation[] journalInfo = loadJournals();
 
+      if (rebuildCounters) {
+         pagingManager.rebuildCounters();
+      }
+
       removeExtraAddressStores();
 
       if (securityManager instanceof ActiveMQBasicSecurityManager) {
@@ -4245,8 +4270,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       private final AtomicBoolean failedAlready = new AtomicBoolean();
 
+      @Override
+      public boolean isPreviouslyFailed() {
+         return failedAlready.get();
+      }
+
       @Override
       public synchronized void onIOException(Throwable cause, String message, String file) {
+         if (logger.isTraceEnabled()) {
+            // the purpose of this is to find where the critical error is being called at
+            // useful for when debugging where the critical error is being called at
+            logger.trace("Throwing critical error {}", cause.getMessage(), new Exception("trace"));
+         }
          if (!failedAlready.compareAndSet(false, true)) {
             return;
          }
@@ -4542,4 +4577,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
    }
 
+   @Override
+   public AutoCloseable managementLock() throws Exception {
+      if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
+         throw ActiveMQMessageBundle.BUNDLE.managementBusy();
+      } else {
+         return managementSemaphore::release;
+      }
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 488109c83e..e990162503 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1725,7 +1725,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (pageSubscription != null) {
          // messageReferences will have depaged messages which we need to discount from the counter as they are
          // counted on the pageSubscription as well
-         return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+         long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+         if (logger.isTraceEnabled()) {
+            logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}",
+                         name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(),
+                         pageSubscription.getDeliveredCount());
+         }
+         return returnValue;
       } else {
          return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount();
       }
@@ -2279,6 +2285,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void destroyPaging() throws Exception {
       // it could be null on embedded or certain unit tests
       if (pageSubscription != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Destroying paging for {}", this.name, new Exception("trace"));
+         }
          pageSubscription.destroy();
          pageSubscription.cleanupEntries(true);
       }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 9b18e29ed7..fb04c2b2a4 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -295,6 +295,7 @@ public abstract class ActiveMQTestBase extends Assert {
       try {
          DriverManager.getConnection("jdbc:derby:;shutdown=true");
       } catch (Exception ignored) {
+         // it always throws an exception on shutdown
       }
    }
 
@@ -878,7 +879,7 @@ public abstract class ActiveMQTestBase extends Assert {
       return testDir;
    }
 
-   private String getEmbeddedDataBaseName() {
+   protected String getEmbeddedDataBaseName() {
       return "memory:" + getTestDir();
    }
 
@@ -2314,6 +2315,10 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected int getMessageCount(final Queue queue) {
+      try {
+         Wait.waitFor(() -> queue.getPageSubscription().isCounterPending() == false);
+      } catch (Exception ignored) {
+      }
       queue.flushExecutor();
       return (int) queue.getMessageCount();
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy
similarity index 61%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
copy to tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy
index 1f1ae7b4cb..c3508890e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
+++ b/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy
@@ -15,13 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.artemis.core.paging.cursor;
+package pageCounter
 
-// this is to expose PageSubscriptionImpl::PageCursorInfo
-public interface ConsumedPage {
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
 
-   long getPageId();
+int messages = Integer.parseInt(arg[0]);
 
-   boolean isDone();
+Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
+for (int i = 0; i < 20 && queue.getMessageCount() != messages; i++) {
+    Thread.sleep(100);
 
 }
+GroovyRun.assertEquals((int)messages, (int)queue.getMessageCount());
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy
new file mode 100644
index 0000000000..20f9dccc2d
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pageCounter
+
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+// starts an artemis server
+String serverType = arg[0];
+String protocol = arg[1];
+int messages = Integer.parseInt(arg[2]);
+
+// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
+if (protocol != null && protocol.equals("AMQP")) {
+    GroovyRun.evaluate("clients/artemisClientAMQP.groovy", "serverArg", serverType, protocol);
+} else {
+    GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
+}
+
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+Queue destination = session.createQueue("queue")
+
+MessageProducer producer = session.createProducer(destination);
+producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+for (int i = 0; i < messages; i++) {
+    TextMessage message = session.createTextMessage("Message " + i);
+    producer.send(message);
+    if (i % 100 == 0) {
+        session.commit();
+    }
+}
+
+session.commit();
+
+connection.close();
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java
new file mode 100644
index 0000000000..ae96f5e85c
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYTWO_ZERO;
+
+@RunWith(Parameterized.class)
+public class PagingCounterTest extends VersionedBase {
+
+   // this will ensure that all tests in this class are run twice,
+   // once with "true" passed to the class' constructor and once with "false"
+   @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
+   public static Collection getParameters() {
+      // we don't need every single version ever released..
+      // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
+      List<Object[]> combinations = new ArrayList<>();
+
+      /*
+      // during development sometimes is useful to comment out the combinations
+      // and add the ones you are interested.. example:
+       */
+      //      combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
+      //      combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
+
+      combinations.add(new Object[]{null, TWO_TWENTYTWO_ZERO, SNAPSHOT});
+      combinations.add(new Object[]{null, SNAPSHOT, TWO_TWENTYTWO_ZERO});
+      // the purpose on this one is just to validate the test itself.
+      /// if it can't run against itself it won't work at all
+      combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
+      return combinations;
+   }
+
+   public PagingCounterTest(String server, String sender, String receiver) throws Exception {
+      super(server, sender, receiver);
+   }
+
+   @Before
+   public void removeFolder() throws Throwable {
+      FileUtil.deleteDirectory(serverFolder.getRoot());
+      serverFolder.getRoot().mkdirs();
+   }
+
+   @After
+   public void tearDown() {
+      try {
+         stopServer(serverClassloader);
+      } catch (Throwable ignored) {
+      }
+      try {
+         stopServer(receiverClassloader);
+      } catch (Throwable ignored) {
+      }
+   }
+
+   @Test
+   public void testSendReceivePaging() throws Throwable {
+      setVariable(senderClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), senderClassloader, "pageCounter", null, true);
+      evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
+      evaluate(senderClassloader, "pageCounter/sendMessages.groovy", server, "core", "1000");
+      evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
+      evaluate(senderClassloader, "pageCounter/checkMessages.groovy", "1000");
+      stopServer(senderClassloader);
+
+      setVariable(receiverClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), receiverClassloader, "pageCounter", null, false);
+      evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");
+      evaluate(receiverClassloader, "pageCounter/checkMessages.groovy", "1000");
+
+   }
+}
+
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index 30986fa890..d4e45b2471 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -159,7 +159,6 @@ public class BackupSyncJournalTest extends FailoverTestBase {
       for (Pair<Long, Integer> pair : backupIds) {
          totalBackup += pair.getB();
       }
-      assertEquals("number of records must match ", total, totalBackup);
 
       // "+ 2": there two other calls that send N_MSGS.
       for (int i = 0; i < totalRounds + 3; i++) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 2ffef62bb2..f141a8ba66 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -1715,6 +1715,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          public boolean isEmbeddedWebServerStarted() {
             return (boolean) proxy.retrieveAttributeValue("embeddedWebServerStarted");
          }
+
+         @Override
+         public void rebuildPageCounters() throws Exception {
+            proxy.invokeOperation("rebuildPageCounters");
+         }
       };
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java
index 0231610c69..f2c67c9d13 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.paging;
 
+import java.lang.invoke.MethodHandles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -39,9 +40,13 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    @Rule
    public RetryRule retryRule = new RetryRule(1);
 
@@ -151,7 +156,7 @@ public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
                }
             }
          } catch (Exception expected) {
-            expected.printStackTrace();
+            logger.info("expected exception {}", expected.toString(), expected);
          }
 
       } finally {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java
new file mode 100644
index 0000000000..9669de41a6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PageCounterRebuildTest extends ActiveMQTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test
+   public void testUnitSize() throws Exception {
+      AtomicInteger errors = new AtomicInteger(0);
+
+      StorageManager mockStorage = Mockito.mock(StorageManager.class);
+
+      PageSubscriptionCounterImpl nonPersistentPagingCounter = new PageSubscriptionCounterImpl(mockStorage, -1);
+
+      final int THREADS = 33;
+      final int ADD_VALUE = 7;
+      final int SIZE_VALUE = 17;
+      final int REPEAT = 777;
+
+      ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+      runAfter(executorService::shutdownNow);
+
+      CyclicBarrier startFlag = new CyclicBarrier(THREADS);
+
+      ReusableLatch latch = new ReusableLatch(THREADS);
+
+      for (int j = 0; j < THREADS; j++) {
+         executorService.execute(() -> {
+            try {
+               startFlag.await(10, TimeUnit.SECONDS);
+               for (int i = 0; i < REPEAT; i++) {
+                  nonPersistentPagingCounter.increment(null, ADD_VALUE, SIZE_VALUE);
+               }
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               latch.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(ADD_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getValue());
+      Assert.assertEquals(SIZE_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getPersistentSize());
+
+
+      latch.setCount(THREADS);
+
+      for (int j = 0; j < THREADS; j++) {
+         executorService.execute(() -> {
+            try {
+               startFlag.await(10, TimeUnit.SECONDS);
+               for (int i = 0; i < REPEAT; i++) {
+                  nonPersistentPagingCounter.increment(null, -ADD_VALUE, -SIZE_VALUE);
+               }
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               latch.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0L, nonPersistentPagingCounter.getValue());
+      Assert.assertEquals(0L, nonPersistentPagingCounter.getPersistentSize());
+      Assert.assertEquals(0, errors.get());
+   }
+
+   @Test
+   public void testRebuildCounter() throws Exception {
+      ActiveMQServer server = createServer(true, true);
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(100 * 1024).setMaxReadPageMessages(1);
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+      server.start();
+
+      String queueName = getName();
+      String nonConsumedQueueName = getName() + "_nonConsumed";
+      server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.MULTICAST));
+      server.createQueue(new QueueConfiguration(nonConsumedQueueName).setAddress(queueName).setRoutingType(RoutingType.MULTICAST));
+      server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST));
+
+      Queue serverQueue = server.locateQueue(queueName);
+      Queue serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+      Assert.assertNotNull(serverQueue);
+      Assert.assertNotNull(serverNonConsumedQueue);
+
+      serverQueue.getPagingStore().startPaging();
+
+      final int THREADS = 4;
+      final int TX_SEND = 2000;
+      final int NON_TXT_SEND = 200;
+      final int CONSUME_MESSAGES = 200;
+      AtomicInteger errors = new AtomicInteger(0);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+      runAfter(executorService::shutdownNow);
+
+      CyclicBarrier startFlag = new CyclicBarrier(THREADS);
+
+      ReusableLatch latch = new ReusableLatch(THREADS);
+
+      for (int i = 0; i < THREADS; i++) {
+         final int threadNumber = i;
+         executorService.execute(() -> {
+            try {
+               startFlag.await(10, TimeUnit.SECONDS);
+               ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+               try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session txSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
+
+                  logger.info("sending thread {}", threadNumber);
+
+                  javax.jms.Topic jmsQueue = session.createTopic(queueName);
+                  MessageProducer producerNonTX = session.createProducer(jmsQueue);
+                  MessageProducer producerTX = txSession.createProducer(jmsQueue);
+
+                  for (int message = 0; message < NON_TXT_SEND; message++) {
+                     TextMessage txtMessage = session.createTextMessage("hello" + message);
+                     txtMessage.setBooleanProperty("first", false);
+                     producerNonTX.send(session.createTextMessage("hello" + message));
+                  }
+                  for (int message = 0; message < TX_SEND; message++) {
+                     producerTX.send(session.createTextMessage("helloTX" + message));
+                  }
+                  txSession.commit();
+               }
+
+            } catch (Throwable e) {
+               errors.incrementAndGet();
+            } finally {
+               latch.countDown();
+            }
+         });
+      }
+
+      // this should be fast on the CIs, but if you use a slow disk, it might take a few extra seconds.
+      Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
+
+      final int numberOfMessages = TX_SEND * THREADS + NON_TXT_SEND * THREADS;
+      Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) {
+         MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName));
+         connection.start();
+         for (int i = 0; i < CONSUME_MESSAGES; i++) {
+            Message message = consumer.receive(5000);
+            Assert.assertNotNull(message);
+         }
+      }
+
+      Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+      Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+      server.stop();
+      server.start();
+
+      serverQueue = server.locateQueue(queueName);
+      serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+      Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+      Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+      serverQueue.getPageSubscription().getCounter().markRebuilding();
+      serverNonConsumedQueue.getPageSubscription().getCounter().markRebuilding();
+
+      // if though we are rebuilding, we are still returning based on the last recorded value until processing is finished
+      Assert.assertEquals(8600, serverQueue.getMessageCount());
+      Assert.assertEquals(8800, serverNonConsumedQueue.getMessageCount());
+
+      serverQueue.getPageSubscription().getCounter().finishRebuild();
+
+      serverNonConsumedQueue.getPageSubscription().getCounter().finishRebuild();
+
+      Assert.assertEquals(0, serverQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild
+      Assert.assertEquals(0, serverNonConsumedQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild
+
+      server.stop();
+      server.start();
+
+      serverQueue = server.locateQueue(queueName);
+      serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+      // after a rebuild, the counter should be back to where it was
+      Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+      Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+      server.stop();
+      server.start();
+
+      serverQueue = server.locateQueue(queueName);
+      serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+      Assert.assertNotNull(serverQueue);
+      Assert.assertNotNull(serverNonConsumedQueue);
+
+      Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+      Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+      server.stop();
+      // restarting the server to issue a rebuild on the counters
+      server.start();
+
+      logger.info("Consuming messages");
+      factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) {
+         MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName));
+         connection.start();
+         for (int i = 0; i < numberOfMessages - CONSUME_MESSAGES; i++) {
+            Message message = consumer.receive(5000);
+            Assert.assertNotNull(message);
+            if (i % 100 == 0) {
+               logger.info("Received {} messages", i);
+            }
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+         consumer.close();
+
+         consumer = session.createConsumer(session.createQueue(queueName + "::" + nonConsumedQueueName));
+         connection.start();
+         for (int i = 0; i < numberOfMessages; i++) {
+            Message message = consumer.receive(5000);
+            Assert.assertNotNull(message);
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+         consumer.close();
+      }
+
+      serverQueue = server.locateQueue(queueName);
+      serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+      Wait.assertEquals(0L, serverQueue::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, serverNonConsumedQueue::getMessageCount, 1000, 100);
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index f68608e24a..8492f9626f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -164,6 +163,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          server.stop();
 
+         server.setRebuildCounters(false);
+
          server.start();
 
          queue = server.locateQueue("A1");
@@ -177,6 +178,70 @@ public class PagingCounterTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testMultiThreadCounter() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(sl);
+      ClientSession session = sf.createSession();
+
+      try {
+         server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
+         Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
+
+         final PageSubscriptionCounter counter = locateCounter(queue);
+
+         final int THREADS = 10;
+
+         final CyclicBarrier flagStart = new CyclicBarrier(THREADS);
+         final CountDownLatch done = new CountDownLatch(THREADS);
+
+         final int BUMPS = 2000;
+
+         Assert.assertEquals(0, counter.getValue());
+
+         ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+         runAfter(executorService::shutdownNow);
+
+         for (int i = 0; i < THREADS; i++) {
+            executorService.execute(() -> {
+               try {
+                  flagStart.await(10, TimeUnit.SECONDS);
+                  for (int repeat = 0; repeat < BUMPS; repeat++) {
+                     counter.increment(null, 1, 1L);
+                     Transaction tx = new TransactionImpl(server.getStorageManager());
+                     counter.increment(tx, 1, 1L);
+                     tx.commit();
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               } finally {
+                  done.countDown();
+               }
+            });
+         }
+
+         // it should take a couple seconds only
+         done.await(1, TimeUnit.MINUTES);
+
+         Wait.assertEquals((long)(BUMPS * 2 * THREADS), counter::getValue, 5000, 100);
+
+         server.stop();
+
+         server.setRebuildCounters(false);
+
+         server.start();
+
+         queue = server.locateQueue("A1");
+
+         final PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
+         Wait.assertEquals((long)(BUMPS * 2 * THREADS), counterAfterRestart::getValue, 5000, 100);
+         Assert.assertEquals(BUMPS * 2 * THREADS, counterAfterRestart.getValue());
+
+      } finally {
+         sf.close();
+         session.close();
+      }
+   }
+
    @Test
    public void testCleanupCounter() throws Exception {
       ClientSessionFactory sf = createSessionFactory(sl);
@@ -216,6 +281,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          server.stop();
 
          server = newActiveMQServer();
+         server.setRebuildCounters(false);
 
          server.start();
 
@@ -228,6 +294,11 @@ public class PagingCounterTest extends ActiveMQTestBase {
          assertEquals(2100, counter.getValue());
          assertEquals(2100 * 1000, counter.getPersistentSize());
 
+         server.getPagingManager().rebuildCounters();
+
+         // it should be zero after rebuild, since no actual messages were sent
+         Wait.assertEquals(0, counter::getValue);
+
       } finally {
          sf.close();
          session.close();
@@ -246,8 +317,6 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          PageSubscriptionCounter counter = locateCounter(queue);
 
-         ((PageSubscriptionCounterImpl) counter).setPersistent(false);
-
          StorageManager storage = server.getStorageManager();
 
          Transaction tx = new TransactionImpl(server.getStorageManager());
@@ -321,7 +390,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       server.stop();
 
+
       server = newActiveMQServer();
+      server.setRebuildCounters(false);
 
       server.start();
 
@@ -329,10 +400,29 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       assertNotNull(queue);
 
-      counter = locateCounter(queue);
+      PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
+
+      Wait.assertEquals(1, counterAfterRestart::getValue);
+      Wait.assertEquals(1000, counterAfterRestart::getPersistentSize);
 
-      assertEquals(1, counter.getValue());
-      assertEquals(1000, counter.getPersistentSize());
+      counterAfterRestart.markRebuilding();
+
+      // should be using a previously added value while rebuilding
+      Wait.assertEquals(1, counterAfterRestart::getValue);
+
+      tx = new TransactionImpl(server.getStorageManager());
+
+      counterAfterRestart.increment(tx, 10, 10_000);
+      tx.commit();
+
+      Wait.assertEquals(11, counterAfterRestart::getValue);
+      Wait.assertEquals(11_000, counterAfterRestart::getPersistentSize);
+      counterAfterRestart.finishRebuild();
+
+      server.getPagingManager().rebuildCounters();
+
+      Wait.assertEquals(0, counterAfterRestart::getValue);
+      Wait.assertEquals(0, counterAfterRestart::getPersistentSize);
 
    }
 
@@ -349,7 +439,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testPrepareCounter() throws Exception {
+   public void testCommitCounter() throws Exception {
       Xid xid = newXID();
 
       Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
@@ -366,19 +456,19 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       assertEquals(0, counter.getValue());
 
-      tx.prepare();
+      tx.commit();
 
       storage.waitOnOperations();
 
-      assertEquals(0, counter.getValue());
+      assertEquals(2000, counter.getValue());
 
       server.stop();
 
       server = newActiveMQServer();
 
-      server.start();
+      server.setRebuildCounters(false);
 
-      storage = server.getStorageManager();
+      server.start();
 
       queue = server.locateQueue(new SimpleString("A1"));
 
@@ -386,16 +476,6 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       counter = locateCounter(queue);
 
-      tx = server.getResourceManager().removeTransaction(xid, null);
-
-      assertNotNull(tx);
-
-      assertEquals(0, counter.getValue());
-
-      tx.commit(false);
-
-      storage.waitOnOperations();
-
       Wait.assertEquals(2000, counter::getValue);
 
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index 381e14e27a..8192a1e54f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -34,8 +34,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -62,7 +60,6 @@ public class PagingSendTest extends ActiveMQTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      Configuration config = new ConfigurationImpl();
       server = newActiveMQServer();
 
       server.start();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index f29ad471d5..a76465988d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -651,6 +651,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
 
       paging.start();
+      runAfter(paging::stop);
       return paging;
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
index 00ced9ee10..d3a69d2202 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
@@ -150,9 +150,9 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
                ClientProducer producer = session.createProducer("flowcontrol");
                ClientMessage message = session.createMessage(true);
                message.writeBodyBufferBytes(body);
-               logger.info("try to send a message after replicated");
+               logger.debug("try to send a message after replicated");
                producer.send(message);
-               logger.info("send message done");
+               logger.debug("send message done");
                producer.close();
                session.close();
 
@@ -187,8 +187,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
             if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
                // ignore
             }
-            logger.info("got live message {} {}", info.id, info.userRecordType);
             liveJournalCounter.incrementAndGet();
+            logger.info("got live message {} {}, counter={}", info.id, info.userRecordType, liveJournalCounter.get());
          }
       });
 
@@ -207,8 +207,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
             if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
                // ignore
             }
-            logger.info("replicated message {}", info.id);
             replicationCounter.incrementAndGet();
+            logger.info("replicated message {}, counter={}", info.id, replicationCounter.get());
          }
       });
 
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 0426f55706..5297e6dcf0 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -245,6 +245,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
 
    class FakePagingStore implements PagingStore {
 
+      @Override
+      public void counterSnapshot() {
+      }
+
       @Override
       public void execute(Runnable runnable) {
          runnable.run();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
index 713ce21d44..68024e4fce 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
@@ -61,6 +61,8 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
 
       managerImpl.start();
 
+      runAfter(managerImpl::stop);
+
       PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
 
       ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10));
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index bed3ea5ad6..2012848010 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -39,6 +39,11 @@ public class FakePagingManager implements PagingManager {
 
    }
 
+   @Override
+   public void counterSnapshot() {
+
+   }
+
    @Override
    public void addTransaction(final PageTransactionInfo pageTransaction) {
    }