You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2022/12/12 17:33:13 UTC

[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1044298291


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) {
       return this;
    }
 
+   public void reset() {
+      sizeUpdater.set(this, 0);
+      elementsUpdater.set(this, 0);
+   }
+
+   public void reset(long size, long elements) {
+      sizeUpdater.set(this, size);
+      elementsUpdater.set(this, elements);
+   }

Review Comment:
   This looks unused?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java:
##########
@@ -141,6 +143,7 @@ default long getGlobalMessages() {
     */
    void checkMemory(Runnable runWhenAvailable);
 
+   void counterSnashot();

Review Comment:
   typo in method name



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java:
##########
@@ -56,4 +62,8 @@ public interface PageCursorProvider {
     */
    void close(PageSubscription pageCursorImpl);
 
+   void startCounterRebuild();
+
+   void finishCounterRebuild();

Review Comment:
   Perhaps counterRebuildStarted and counterRebuildFinished? More of a notification than an instruction to start/stop than the existing names, and groups the 3 counter-related methods together.



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -163,6 +173,21 @@ public final long addSize(final int delta) {
       return addSize(delta, false);
    }
 
+   public final void add(final int elements, final long size) {
+
+      long currentSize = sizeUpdater.addAndGet(this, size);
+      long currentElements = elementsUpdater.addAndGet(this, elements);
+
+      if (elements >= 0) {
+         assert size >= 0 : "If elements is positve, size must be positive";

Review Comment:
   
   
   Should it be checking that if one is 0 the other is also 0, and if one is positive the other is? Currently it would allow them to differ, one 0 and the other not. 0 also isnt positive.
   
   Or is the intent to allow doing the 'elements or size only' behaviour the other methods facilitate?



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) {
       return this;
    }
 
+   public void reset() {
+      sizeUpdater.set(this, 0);
+      elementsUpdater.set(this, 0);
+   }

Review Comment:
   Is this method ever used while other things may be updating the counts? If so they could get out of sync.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+
+   @Override
+   public boolean isRebuilding() {
+      return rebuilding;
+   }
+
+
+
+

Review Comment:
   Superfluous lines.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();

Review Comment:
   Doesnt seem like these need to be package private variables.
   Constructors more typically go below the variables, looks nicer.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -216,6 +243,11 @@ public void resumeCleanup() {
 
    protected void cleanup() {
 
+      if (!countersRebuilt) {
+         logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
+         return;
+      }

Review Comment:
   When is cleanup called? Should it be deferred and run once the rebuild is done? Or will another cleanup call come along promptly to replace the one that was ignored?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -860,6 +868,13 @@ public void processReload() throws Exception {
    public void stop() {
    }
 
+   @Override
+   public void counterSnapshot() {
+      if (counter != null) {
+         counter.snapshot();
+      }

Review Comment:
   Seems odd for this method to null check when others such as isCounterPending() dont.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {

Review Comment:
   Having the 'Local' prefix on this class but not the one above seems odd, they should follow a similar convention.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;

Review Comment:
   CopiedSubscription.this. is superfluous, its a static class.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);

Review Comment:
   "got to the"?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
+                     }
+                     // this is the limit where we should count..
+                     // anything beyond this will be new data
+                     break;
+                  }
+               }
+               msg.initMessage(sm);
+               long[] routedQueues = msg.getQueueIDs();
+
+               if (logger.isTraceEnabled()) {
+                  logger.trace("reading message for rebuild cursor on {}, pg={}, messageNR={}, routedQueues={}, message={}, queueList={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg);

Review Comment:
   log message has more placeholders in it than variables are actually provided



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;

Review Comment:
   Throwing would more uniformly fail, and be more succinct from not needing a 3 line comment to explain it...twice (same below).



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java:
##########
@@ -465,4 +419,17 @@ public void addAndGet(int count, long persistentSize) {
          SIZE_UPDATER.addAndGet(this, persistentSize);
       }
    }
+
+   @Override
+   public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
+      this.subscription = subscription;
+
+      if (subscription == null) {
+         this.pageExecutor = null;
+      } else {
+         this.pageExecutor = subscription.getPagingStore().getExecutor();
+         assert pageExecutor != null;

Review Comment:
   Should the executor be cleared on null? 
   
   Actually, is the executor ever actually used?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {

Review Comment:
   There look to be different ways of calling this, suggesting it could happen concurrently, which doesnt seem like it would be safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org