You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2022/10/21 04:36:42 UTC

[GitHub] [activemq-artemis] clebertsuconic opened a new pull request, #4265: ARTEMIS-4065 Non Persistent Page Counters

clebertsuconic opened a new pull request, #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265

   Instead of relying on the journal to store the value of the counters, and have it eventually getting out of sync we should just use real counters from reading the pages on startup.
   
   In a test performed, reading 700 files didn't take more than 1 minute in a modest laptop. And besides the data will be available while the rebuild is being done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1044298291


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) {
       return this;
    }
 
+   public void reset() {
+      sizeUpdater.set(this, 0);
+      elementsUpdater.set(this, 0);
+   }
+
+   public void reset(long size, long elements) {
+      sizeUpdater.set(this, size);
+      elementsUpdater.set(this, elements);
+   }

Review Comment:
   This looks unused?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java:
##########
@@ -141,6 +143,7 @@ default long getGlobalMessages() {
     */
    void checkMemory(Runnable runWhenAvailable);
 
+   void counterSnashot();

Review Comment:
   typo in method name



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java:
##########
@@ -56,4 +62,8 @@ public interface PageCursorProvider {
     */
    void close(PageSubscription pageCursorImpl);
 
+   void startCounterRebuild();
+
+   void finishCounterRebuild();

Review Comment:
   Perhaps counterRebuildStarted and counterRebuildFinished? More of a notification than an instruction to start/stop than the existing names, and groups the 3 counter-related methods together.



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -163,6 +173,21 @@ public final long addSize(final int delta) {
       return addSize(delta, false);
    }
 
+   public final void add(final int elements, final long size) {
+
+      long currentSize = sizeUpdater.addAndGet(this, size);
+      long currentElements = elementsUpdater.addAndGet(this, elements);
+
+      if (elements >= 0) {
+         assert size >= 0 : "If elements is positve, size must be positive";

Review Comment:
   
   
   Should it be checking that if one is 0 the other is also 0, and if one is positive the other is? Currently it would allow them to differ, one 0 and the other not. 0 also isnt positive.
   
   Or is the intent to allow doing the 'elements or size only' behaviour the other methods facilitate?



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean elementsEnabled) {
       return this;
    }
 
+   public void reset() {
+      sizeUpdater.set(this, 0);
+      elementsUpdater.set(this, 0);
+   }

Review Comment:
   Is this method ever used while other things may be updating the counts? If so they could get out of sync.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+
+   @Override
+   public boolean isRebuilding() {
+      return rebuilding;
+   }
+
+
+
+

Review Comment:
   Superfluous lines.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();

Review Comment:
   Doesnt seem like these need to be package private variables.
   Constructors more typically go below the variables, looks nicer.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -216,6 +243,11 @@ public void resumeCleanup() {
 
    protected void cleanup() {
 
+      if (!countersRebuilt) {
+         logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
+         return;
+      }

Review Comment:
   When is cleanup called? Should it be deferred and run once the rebuild is done? Or will another cleanup call come along promptly to replace the one that was ignored?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -860,6 +868,13 @@ public void processReload() throws Exception {
    public void stop() {
    }
 
+   @Override
+   public void counterSnapshot() {
+      if (counter != null) {
+         counter.snapshot();
+      }

Review Comment:
   Seems odd for this method to null check when others such as isCounterPending() dont.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {

Review Comment:
   Having the 'Local' prefix on this class but not the one above seems odd, they should follow a similar convention.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;

Review Comment:
   CopiedSubscription.this. is superfluous, its a static class.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);

Review Comment:
   "got to the"?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
+                     }
+                     // this is the limit where we should count..
+                     // anything beyond this will be new data
+                     break;
+                  }
+               }
+               msg.initMessage(sm);
+               long[] routedQueues = msg.getQueueIDs();
+
+               if (logger.isTraceEnabled()) {
+                  logger.trace("reading message for rebuild cursor on {}, pg={}, messageNR={}, routedQueues={}, message={}, queueList={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg);

Review Comment:
   log message has more placeholders in it than variables are actually provided



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;

Review Comment:
   Throwing would more uniformly fail, and be more succinct from not needing a 3 line comment to explain it...twice (same below).



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java:
##########
@@ -465,4 +419,17 @@ public void addAndGet(int count, long persistentSize) {
          SIZE_UPDATER.addAndGet(this, persistentSize);
       }
    }
+
+   @Override
+   public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
+      this.subscription = subscription;
+
+      if (subscription == null) {
+         this.pageExecutor = null;
+      } else {
+         this.pageExecutor = subscription.getPagingStore().getExecutor();
+         assert pageExecutor != null;

Review Comment:
   Should the executor be cleared on null? 
   
   Actually, is the executor ever actually used?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {

Review Comment:
   There look to be different ways of calling this, suggesting it could happen concurrently, which doesnt seem like it would be safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046194019


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;

Review Comment:
   I wanted to play safe and not throw the error in a production server. I'm just changing it to throw RuntimeException now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1047490734


##########
tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy:
##########
@@ -0,0 +1,31 @@
+package pageCounter
+
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   this is just a copy from the others groovy script



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#issuecomment-1352479731

   I had a few successful run with this PR over the full CI now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#issuecomment-1286449451

   I stil have both versions of the counters in the codebase. I am going to remove the other implementation of the PageCounter and only keep the non persistent.
   
   
   This PR is for a draft only.. and I will finish after fixing the testsuite
   
   (the built server would work fine though)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#issuecomment-1348640602

   @gemmellr I will do some testing to make sure it's solid and I should merge it.. unless you have any other concerns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046380089


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {

Review Comment:
   not any more.. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1047652737


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java:
##########
@@ -522,6 +522,9 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
    @Message(id = 229243, value = "Embedded web server restart failed")
    ActiveMQException embeddedWebServerRestartFailed(Exception e);
 
-   @Message(id = 229244, value = "Meters already registered for {}")
+   @Message(id = 229244, value = "Management controller is busy with another task. Please try again")
+   ActiveMQTimeoutException managementBusy();
+
+   @Message(id = 229245, value = "Meters already registered for {}")

Review Comment:
   git-fu merge/rebase ... thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1048642684


##########
tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy:
##########
@@ -0,0 +1,31 @@
+package pageCounter
+
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   I believe I did it this way because at the version where these were written years ago, it was mandatory to have the package statement on the first line.
   
   
   I"m not 100% sure at this point if this was an actual issue or just my perception. I certainly remember having issues with the comment before the package.
   
   But now since this is clear, I will change the others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1048642982


##########
tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy:
##########
@@ -0,0 +1,31 @@
+package pageCounter
+
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   @gemmellr  ^^
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046200326


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java:
##########
@@ -465,4 +419,17 @@ public void addAndGet(int count, long persistentSize) {
          SIZE_UPDATER.addAndGet(this, persistentSize);
       }
    }
+
+   @Override
+   public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
+      this.subscription = subscription;
+
+      if (subscription == null) {
+         this.pageExecutor = null;
+      } else {
+         this.pageExecutor = subscription.getPagingStore().getExecutor();
+         assert pageExecutor != null;

Review Comment:
   not any more... the previous version did.. removing it.. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046255018


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -216,6 +243,11 @@ public void resumeCleanup() {
 
    protected void cleanup() {
 
+      if (!countersRebuilt) {
+         logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
+         return;
+      }

Review Comment:
   PageCounterRebuilderManager is calling this on the done callback:
   
   pgStore.getCursorProvider().scheduleCleanup();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046380265


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {

Review Comment:
   only one way to call this now.. I removed the other calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046173412


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -163,6 +173,21 @@ public final long addSize(final int delta) {
       return addSize(delta, false);
    }
 
+   public final void add(final int elements, final long size) {
+
+      long currentSize = sizeUpdater.addAndGet(this, size);
+      long currentElements = elementsUpdater.addAndGet(this, elements);
+
+      if (elements >= 0) {
+         assert size >= 0 : "If elements is positve, size must be positive";

Review Comment:
   @gemmellr thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1047200127


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);

Review Comment:
   Phrasing seemed fine, just noting the t was missing in 'got' :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1047425947


##########
artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java:
##########
@@ -2314,6 +2314,10 @@ protected int getMessageCount(final PostOffice postOffice, final String address)
    }
 
    protected int getMessageCount(final Queue queue) {
+      try {
+         Wait.waitFor(() -> queue.getPageSubscription().isCounterPending() == false);
+      } catch (Exception ignroed) {

Review Comment:
   typo



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java:
##########
@@ -548,4 +567,26 @@ public void lock() {
       syncLock.writeLock().lock();
    }
 
+   @Override
+   public void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
+      transactions.forEach(transactionConsumer);
+   }
+
+   @Override
+   public Future<Object> rebuildCounters() {
+      LongHashSet transactionsSet = new LongHashSet();
+      transactions.forEach((txId, tx) -> {
+         transactionsSet.add(txId);
+      });
+      stores.forEach((address, pgStore) -> {
+         PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet);
+         logger.debug("Setting destination {} to rebuild counters", pgStore.getAddress());

Review Comment:
   Is the address arg passed to the lambda the same as pgStore.getAddress() returns? If so use it instead?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java:
##########
@@ -151,7 +156,7 @@ public void run() {
                }
             }
          } catch (Exception expected) {
-            expected.printStackTrace();
+            logger.warn(expected.toString(), expected);

Review Comment:
   Debug? If its expected it hardly seems a warning. Message would be good to say 'Got expected exception...' so it doesnt look like an unexpected one.



##########
tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy:
##########
@@ -0,0 +1,53 @@
+package pageCounter
+
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   Ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java:
##########
@@ -466,19 +471,25 @@ public void start() throws Exception {
 
          reloadStores();
 
-         if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) {
-            this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) {
+         if (ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL > 0) {
+            this.snapshotUpdater = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL, TimeUnit.SECONDS, false) {
                @Override
                public void run() {
-                  debug();

Review Comment:
   The method is still present, perhaps unused, can/should it go too?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java:
##########
@@ -522,6 +522,9 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
    @Message(id = 229243, value = "Embedded web server restart failed")
    ActiveMQException embeddedWebServerRestartFailed(Exception e);
 
-   @Message(id = 229244, value = "Meters already registered for {}")
+   @Message(id = 229244, value = "Management controller is busy with another task. Please try again")
+   ActiveMQTimeoutException managementBusy();
+
+   @Message(id = 229245, value = "Meters already registered for {}")

Review Comment:
   Changes an existing ID while also adding a new one, seems odd?



##########
tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy:
##########
@@ -0,0 +1,31 @@
+package pageCounter
+
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   Header is in weird place, cant it go at/nearer top?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1001354901


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/NonPersistentPagingCounterImpl.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import java.lang.invoke.MethodHandles;
+import java.util.LinkedList;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperation;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
+import org.apache.activemq.artemis.utils.SizeAwareMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class will encapsulate the persistent counters for the PagingSubscription, however without using the journal.
+ * Upon restart the system will always rebuild the counters.
+ */
+public class NonPersistentPagingCounterImpl implements PageSubscriptionCounter {

Review Comment:
   just a FYI for anyone reviewing, the other implementation of PageSubscriptionCounter will be removed..
   
   
   And this one will probably be renamed. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic merged pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic merged PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1048600629


##########
tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy:
##########
@@ -0,0 +1,31 @@
+package pageCounter
+
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   Thats precisely why it needed fixed, so it doesnt get copied around further hehe :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1047201079


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -216,6 +243,11 @@ public void resumeCleanup() {
 
    protected void cleanup() {
 
+      if (!countersRebuilt) {
+         logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
+         return;
+      }

Review Comment:
   Ah, didnt see that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
gemmellr commented on PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#issuecomment-1348645949

   @clebertsuconic still need to look over the rest, plus whatever changed since yesterday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1001355865


##########
tests/config/log4j2-tests-config.properties:
##########
@@ -29,6 +29,16 @@ logger.curator.level=WARN
 logger.zookeeper.name=org.apache.zookeeper
 logger.zookeeper.level=WARN
 
+logger.test2.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager

Review Comment:
   these changes will be gone of course before I make it final.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046173105


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -163,6 +173,21 @@ public final long addSize(final int delta) {
       return addSize(delta, false);
    }
 
+   public final void add(final int elements, final long size) {
+
+      long currentSize = sizeUpdater.addAndGet(this, size);
+      long currentElements = elementsUpdater.addAndGet(this, elements);
+
+      if (elements >= 0) {
+         assert size >= 0 : "If elements is positve, size must be positive";

Review Comment:
   In a previous version of my changes, I was going to use SizeAwareMetric in the PageCounters in replacement of the values in there.
   
   As I progressed with the changes I did not use any of it...
   
   
   I am going to revert all changes in SizeAwareMetric on this PR.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1046259017


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+      // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new LocalCopiedConsumedPage();
+               copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);

Review Comment:
   @gemmellr my broken English kind of typo:
   
   "reached the last message at"
   
   
   thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on pull request #4265: ARTEMIS-4065 Non Persistent Page Counters

Posted by GitBox <gi...@apache.org>.
clebertsuconic commented on PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#issuecomment-1337515536

   I updated this PR with a new commit. From now on we save snapshots of the counter upon shutdown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org