You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/12/15 15:30:25 UTC
[activemq-artemis] branch main updated: ARTEMIS-4065 Optimize page counters to not use the journal as often
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new af9bd7b84a ARTEMIS-4065 Optimize page counters to not use the journal as often
af9bd7b84a is described below
commit af9bd7b84aad32e4fe30f2c8909e51cf7300b475
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 26 13:30:42 2022 -0400
ARTEMIS-4065 Optimize page counters to not use the journal as often
- From now on we will save snapshots of page-counters on the journal (basically for compatibility with previous verions).
And we will recount the records on startup.
- While the rebuild is being done the value from the previous snapshot is still available with current updates.
---
.../artemis/cli/commands/tools/PrintData.java | 2 +-
.../api/core/management/ActiveMQServerControl.java | 3 +
.../jdbc/store/drivers/AbstractJDBCDriver.java | 3 +
.../jdbc/store/file/JDBCSequentialFile.java | 14 +-
.../jdbc/store/file/JDBCSequentialFileFactory.java | 4 +-
.../artemis/core/io/AbstractSequentialFile.java | 3 +
.../artemis/core/io/IOCriticalErrorListener.java | 7 +-
.../management/impl/ActiveMQServerControlImpl.java | 10 +
.../artemis/core/paging/PagingManager.java | 14 +
.../activemq/artemis/core/paging/PagingStore.java | 8 +
.../artemis/core/paging/cursor/ConsumedPage.java | 6 +
.../core/paging/cursor/PageCursorProvider.java | 10 +
.../core/paging/cursor/PageSubscription.java | 7 +
.../paging/cursor/PageSubscriptionCounter.java | 13 +-
.../BasePagingCounter.java} | 23 +-
.../cursor/impl/PageCounterRebuildManager.java | 327 ++++++++++++++++++
.../paging/cursor/impl/PageCursorProviderImpl.java | 48 ++-
.../cursor/impl/PageSubscriptionCounterImpl.java | 383 +++++++++------------
.../paging/cursor/impl/PageSubscriptionImpl.java | 35 +-
.../activemq/artemis/core/paging/impl/Page.java | 39 +--
.../core/paging/impl/PagingManagerImpl.java | 69 +++-
.../artemis/core/paging/impl/PagingStoreImpl.java | 74 ++--
.../journal/AbstractJournalStorageManager.java | 22 +-
.../persistence/impl/journal/DescribeJournal.java | 2 +-
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../artemis/core/server/ActiveMQServer.java | 9 +
.../artemis/core/server/ActiveMQServerLogger.java | 3 -
.../core/server/impl/ActiveMQServerImpl.java | 43 +++
.../artemis/core/server/impl/QueueImpl.java | 11 +-
.../artemis/tests/util/ActiveMQTestBase.java | 7 +-
.../resources/pageCounter/checkMessages.groovy | 14 +-
.../main/resources/pageCounter/sendMessages.groovy | 53 +++
.../tests/compatibility/PagingCounterTest.java | 100 ++++++
.../cluster/failover/BackupSyncJournalTest.java | 1 -
.../ActiveMQServerControlUsingCoreTest.java | 5 +
.../paging/PageCountSyncOnNonTXTest.java | 7 +-
.../integration/paging/PageCounterRebuildTest.java | 289 ++++++++++++++++
.../integration/paging/PagingCounterTest.java | 122 +++++--
.../tests/integration/paging/PagingSendTest.java | 3 -
.../integration/replication/ReplicationTest.java | 1 +
.../SharedNothingReplicationFlowControlTest.java | 8 +-
.../storage/PersistMultiThreadTest.java | 4 +
.../core/paging/impl/PagingManagerImplTest.java | 2 +
.../artemis/tests/unit/util/FakePagingManager.java | 5 +
44 files changed, 1434 insertions(+), 382 deletions(-)
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 448c3b247c..4cae819e6c 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -279,7 +279,7 @@ public class PrintData extends DBOption {
folder = pgStore.getFolder();
out.println("####################################################################################################");
out.println("Exploring store " + store + " folder = " + folder);
- int pgid = (int) pgStore.getFirstPage();
+ long pgid = pgStore.getFirstPage();
out.println("Number of pages ::" + pgStore.getNumberOfPages() + ", Current writing page ::" + pgStore.getCurrentWritingPage());
for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 2256e05058..248f44d4a6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -2003,5 +2003,8 @@ public interface ActiveMQServerControl {
@Attribute(desc = "Whether the embedded web server is started")
boolean isEmbeddedWebServerStarted();
+
+ @Attribute(desc = "Scan all paged destinations to rebuild the page counters")
+ void rebuildPageCounters() throws Exception;
}
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index cd1600e1a2..2e445e3b3e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -67,6 +67,9 @@ public abstract class AbstractJDBCDriver {
}
public void destroy() throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("dropping {}", sqlProvider.getTableName(), new Exception("trace"));
+ }
final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
try (Connection connection = connectionProvider.getConnection()) {
try {
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 4b03f91779..96375af025 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -95,8 +95,10 @@ public class JDBCSequentialFile implements SequentialFile {
try {
return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) {
- logger.warn(e.getMessage(), e);
- fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
+ logger.debug(e.getMessage(), e);
+ // this shouldn't throw a critical IO Error
+ // as if the destination does not exists (ot table store removed), the table will not exist and
+ // we may get a SQL Exception
return false;
}
}
@@ -114,7 +116,9 @@ public class JDBCSequentialFile implements SequentialFile {
return true;
} catch (SQLException e) {
isLoaded.set(false);
- fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
+ // should not throw exceptions, as we drop the table on queue.destroy.
+ // storage.exists could be called for non existing pages during async cleanup and they are
+ // just supposed to return false
}
return false;
}
@@ -158,7 +162,9 @@ public class JDBCSequentialFile implements SequentialFile {
}
}
} catch (SQLException e) {
- fileFactory.onIOError(e, "Error deleting JDBC file.", this);
+ // file is already gone from a drop somewhere
+ logger.debug("Expected error deleting Sequential File", e);
+ return;
}
}
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 6bceb5a7c1..e55fb4d03b 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -141,7 +141,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
try {
return dbDriver.listFiles(extension);
} catch (SQLException e) {
- criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
+ // We can't throw critical error here
+ // exists will call listfiles, and if the store does not exists
+ // it should simply return false
throw e;
}
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index 044d9162b7..3d9d52dc25 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -96,6 +96,9 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public final void delete() throws IOException, InterruptedException, ActiveMQException {
try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Deleting {}", this.getFileName(), new Exception("trace"));
+ }
if (isOpen()) {
close(false, false);
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
index aeb24d3736..2f28196a07 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
@@ -16,10 +16,11 @@
*/
package org.apache.activemq.artemis.core.io;
-/**
- * TODO Merge this with IOExceptionListener
- */
public interface IOCriticalErrorListener {
+ default boolean isPreviouslyFailed() {
+ return false;
+ }
+
void onIOException(Throwable code, String message, String file);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 2b541583ec..6d1f4341da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -45,6 +45,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -4619,6 +4620,15 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
+ @Override
+ public void rebuildPageCounters() throws Exception {
+ // managementLock will guarantee there's only one management operation being called
+ try (AutoCloseable lock = server.managementLock()) {
+ Future<Object> task = server.getPagingManager().rebuildCounters();
+ task.get();
+ }
+ }
+
private ServiceComponent getEmbeddedWebServerComponent() throws ActiveMQIllegalStateException {
for (ActiveMQComponent component : server.getExternalComponents()) {
if (component instanceof WebServerComponentMarker) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 89ff513566..80b16cdb2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.paging;
import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -141,6 +143,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
*/
void checkMemory(Runnable runWhenAvailable);
+ void counterSnapshot();
/**
* Use this when you have no refernce of an address. (anonymous AMQP Producers for example)
@@ -157,4 +160,15 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
default long getMaxMessages() {
return 0;
}
+
+ /**
+ * Rebuilds all page counters for destinations that are paging in the background.
+ */
+ default Future<Object> rebuildCounters() {
+ return null;
+ }
+
+ default void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
+ }
+
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 20b62def9d..da680fce6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
@@ -129,6 +130,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
Page getCurrentPage();
+ /** it will save snapshots on the counters */
+ void counterSnapshot();
+
/**
* @return true if paging was started, or false if paging was already started before this call
*/
@@ -220,4 +224,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
void block();
void unblock();
+
+ default StorageManager getStorageManager() {
+ return null;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
index 1f1ae7b4cb..7eb5b7bb94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.paging.cursor;
+import java.util.function.BiConsumer;
+
// this is to expose PageSubscriptionImpl::PageCursorInfo
public interface ConsumedPage {
@@ -24,4 +26,8 @@ public interface ConsumedPage {
boolean isDone();
+ boolean isAck(int messageNumber);
+
+ void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer);
+
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index a6f1714a47..d3f25e21b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
+import java.util.function.Consumer;
+
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -32,12 +34,16 @@ public interface PageCursorProvider {
*/
PageSubscription getSubscription(long queueId);
+ void forEachSubscription(Consumer<PageSubscription> consumer);
+
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
void processReload() throws Exception;
void stop();
+ void counterSnapshot();
+
void flushExecutors();
void scheduleCleanup();
@@ -56,4 +62,8 @@ public interface PageCursorProvider {
*/
void close(PageSubscription pageCursorImpl);
+ void counterRebuildStarted();
+
+ void counterRebuildDone();
+
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 50d7fae5d0..17a743f111 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -35,6 +35,9 @@ public interface PageSubscription {
// To be called before the server is down
void stop();
+ /** Save a snapshot of the current counter value in the journal */
+ void counterSnapshot();
+
/**
* This is a callback to inform the PageSubscription that something was routed, so the empty flag can be cleared
*/
@@ -46,6 +49,8 @@ public interface PageSubscription {
long getMessageCount();
+ boolean isCounterPending();
+
long getPersistentSize();
long getId();
@@ -170,4 +175,6 @@ public interface PageSubscription {
void incrementDeliveredSize(long size);
void removePendingDelivery(PagedMessage pagedMessage);
+
+ ConsumedPage locatePageInfo(long pageNr);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 75eedfa9ee..56614a4871 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
-import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.transaction.Transaction;
public interface PageSubscriptionCounter {
@@ -36,7 +35,13 @@ public interface PageSubscriptionCounter {
void loadInc(long recordInd, int add, long persistentSize);
- void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize);
+ void applyIncrementOnTX(Transaction tx, int add, long persistentSize);
+
+ void markRebuilding();
+
+ void finishRebuild();
+
+ boolean isRebuilding();
/**
* This will process the reload
@@ -46,12 +51,12 @@ public interface PageSubscriptionCounter {
// used when deleting the counter
void delete() throws Exception;
- void pendingCounter(Page page, int increment, long persistentSize) throws Exception;
+ void snapshot();
// used when leaving page mode, so the counters are deleted in batches
// for each queue on the address
void delete(Transaction tx) throws Exception;
- void cleanupNonTXCounters(long pageID) throws Exception;
+ PageSubscriptionCounter setSubscription(PageSubscription subscription);
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java
similarity index 61%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java
index 1f1ae7b4cb..3ba540bdf5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java
@@ -14,14 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.activemq.artemis.core.paging.cursor.impl;
-package org.apache.activemq.artemis.core.paging.cursor;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
-// this is to expose PageSubscriptionImpl::PageCursorInfo
-public interface ConsumedPage {
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
- long getPageId();
+ private volatile boolean rebuilding = false;
- boolean isDone();
+ @Override
+ public void markRebuilding() {
+ rebuilding = true;
+ }
+
+ @Override
+ public void finishRebuild() {
+ rebuilding = false;
+ }
+
+ @Override
+ public boolean isRebuilding() {
+ return rebuilding;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
new file mode 100644
index 0000000000..92a5ac5d39
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+ private boolean paging;
+ private long limitPageId;
+ private int limitMessageNr;
+ private LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
+
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
+ // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+ /** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
+ * So we can count data while we consolidate at the end */
+ private void initialize(PagingStore store) {
+ store.lock(-1);
+ try {
+ try {
+ paging = store.isPaging();
+ if (!paging) {
+ logger.debug("Destination {} was not paging, no need to rebuild counters");
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ subscription.getCounter().markRebuilding();
+ subscription.getCounter().finishRebuild();
+ });
+
+ store.getCursorProvider().counterRebuildDone();
+ return;
+ }
+ store.getCursorProvider().counterRebuildStarted();
+ Page currentPage = store.getCurrentPage();
+ limitPageId = store.getCurrentWritingPage();
+ limitMessageNr = currentPage.getNumberOfMessages();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ limitPageId = store.getCurrentWritingPage();
+ }
+ logger.trace("Copying page store ack information from address {}", store.getAddress());
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying subscription ID {}", subscription.getId());
+ }
+
+ CopiedSubscription copiedSubscription = new CopiedSubscription(subscription);
+ copiedSubscription.subscriptionCounter.markRebuilding();
+ copiedSubscriptionMap.put(subscription.getId(), copiedSubscription);
+
+ subscription.forEachConsumedPage(consumedPage -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying page {}", consumedPage.getPageId());
+ }
+
+ CopiedConsumedPage copiedConsumedPage = new CopiedConsumedPage();
+ copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), copiedConsumedPage);
+ if (consumedPage.isDone()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking page {} as done on the copy", consumedPage.getPageId());
+ }
+ copiedConsumedPage.done = true;
+ } else {
+ // We only copy the acks if the page is not done
+ // as if the page is done, we just move over
+ consumedPage.forEachAck((messageNR, pagePosition) -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking messageNR {} as acked on pageID={} copy", messageNR, consumedPage.getPageId());
+ }
+ if (copiedConsumedPage.acks == null) {
+ copiedConsumedPage.acks = new IntObjectHashMap<>();
+ }
+ copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+ });
+ }
+ });
+ });
+ } finally {
+ store.unlock();
+ }
+ }
+
+ private synchronized PageSubscriptionCounter getCounter(long queueID) {
+ CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+ if (copiedSubscription != null) {
+ return copiedSubscription.subscriptionCounter;
+ } else {
+ return null;
+ }
+ }
+
+ private CopiedSubscription getSubscription(long queueID) {
+ return copiedSubscriptionMap.get(queueID);
+ }
+
+ private boolean isACK(long queueID, long pageNR, int messageNR) {
+ CopiedSubscription subscription = getSubscription(queueID);
+ if (subscription == null) {
+ return true;
+ }
+
+ CopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+ if (consumedPage == null) {
+ return false;
+ } else {
+ return consumedPage.isAck(messageNR);
+ }
+ }
+
+ private void done() {
+ copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ try {
+ copiedSubscription.subscriptionCounter.increment(null, copiedSubscription.addUp, copiedSubscription.sizeUp);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ }
+ if (copiedSubscription.subscriptionCounter != null) {
+ copiedSubscription.subscriptionCounter.finishRebuild();
+ }
+ });
+ pgStore.getCursorProvider().counterRebuildDone();
+ pgStore.getCursorProvider().scheduleCleanup();
+ }
+
+ @Override
+ public void run() {
+ try {
+ rebuild();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ public void rebuild() throws Exception {
+ if (pgStore == null) {
+ logger.debug("Page store is null during rebuildCounters");
+ return;
+ }
+
+ if (!paging) {
+ logger.debug("Ignoring call to rebuild pgStore {}", pgStore.getAddress());
+ }
+
+ logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+ for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebuilding counter on messages from page {} on rebuildCounters for address {}", pgid, pgStore.getAddress());
+ }
+ Page page = pgStore.newPageObject(pgid);
+
+ if (!page.getFile().exists()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Skipping page {} on store {}", pgid, pgStore.getAddress());
+ }
+ continue;
+ }
+ page.open(false);
+ LinkedList<PagedMessage> msgs = page.read(sm);
+ page.close(false, false);
+
+ try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+ while (iter.hasNext()) {
+ PagedMessage msg = iter.next();
+ if (limitPageId == pgid) {
+ if (msg.getMessageNumber() >= limitMessageNr) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebuild counting on {} reached the last message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
+ }
+ // this is the limit where we should count..
+ // anything beyond this will be new data
+ break;
+ }
+ }
+ msg.initMessage(sm);
+ long[] routedQueues = msg.getQueueIDs();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
+ }
+ for (long queueID : routedQueues) {
+ boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
+
+ boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID());
+
+ if (!txOK) {
+ logger.debug("TX is not ok for {}", msg);
+ }
+
+ if (ok && txOK) { // not acked and TX is ok
+ if (logger.isTraceEnabled()) {
+ logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
+ }
+ CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
+ if (copiedSubscription != null) {
+ copiedSubscription.empty = false;
+ copiedSubscription.addUp++;
+ copiedSubscription.sizeUp += msg.getPersistentSize();
+ }
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ logger.debug("Counter rebuilding done for address {}", pgStore.getAddress());
+
+ done();
+
+ }
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ this.subscriptionCounter = subscription.getCounter();
+ this.subscription = subscription;
+ }
+
+ private boolean empty = true;
+
+ LongObjectHashMap<CopiedConsumedPage> consumedPageMap = new LongObjectHashMap<>();
+
+ // this is not a copy! This will be the actual object listed in the PageSubscription
+ // any changes to this object will reflect in the system and management;
+ PageSubscriptionCounter subscriptionCounter;
+
+ PageSubscription subscription;
+
+ CopiedConsumedPage getPage(long pageNr) {
+ return consumedPageMap.get(pageNr);
+ }
+
+ int addUp;
+ long sizeUp;
+
+ }
+
+ private static class CopiedConsumedPage implements ConsumedPage {
+ boolean done;
+ IntObjectHashMap<Boolean> acks;
+
+ @Override
+ public long getPageId() {
+ throw new RuntimeException("method not implemented");
+ }
+
+ @Override
+ public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+ throw new RuntimeException("method not implemented");
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public boolean isAck(int messageNumber) {
+ if (done) {
+ return true;
+ }
+ if (acks != null) {
+ return acks.get(messageNumber) != null;
+ }
+ return false;
+ }
+ }
+
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 242fbcb14e..24d65970f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.function.Consumer;
public class PageCursorProviderImpl implements PageCursorProvider {
@@ -54,6 +56,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
protected volatile boolean cleanupEnabled = true;
+ // We can't call cleanup before counters were rebuilt
+ // as they will determine if a subscription is empty or not
+ protected volatile boolean countersRebuilt = true;
+
protected final PagingStore pagingStore;
protected final StorageManager storageManager;
@@ -85,16 +91,30 @@ public class PageCursorProviderImpl implements PageCursorProvider {
throw new IllegalStateException("Cursor " + cursorID + " had already been created");
}
- PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent);
+
+ PageSubscriptionCounter subscriptionCounter = createPageCounter(cursorID, persistent);
+ PageSubscription activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, filter, cursorID, persistent, subscriptionCounter);
+
+
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
+
+ private PageSubscriptionCounter createPageCounter(long cursorID, boolean persistent) {
+ return new PageSubscriptionCounterImpl(storageManager, cursorID);
+ }
+
@Override
public synchronized PageSubscription getSubscription(long cursorID) {
return activeCursors.get(cursorID);
}
+ @Override
+ public void forEachSubscription(Consumer<PageSubscription> consumer) {
+ activeCursors.forEach((k, v) -> consumer.accept(v));
+ }
+
@Override
public PagedReference newReference(final PagedMessage msg,
final PageSubscription subscription) {
@@ -139,6 +159,13 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
}
+ @Override
+ public void counterSnapshot() {
+ for (PageSubscription cursor : activeCursors.values()) {
+ cursor.counterSnapshot();
+ }
+ }
+
@Override
public void flushExecutors() {
pagingStore.flushExecutors();
@@ -216,6 +243,11 @@ public class PageCursorProviderImpl implements PageCursorProvider {
protected void cleanup() {
+ if (!countersRebuilt) {
+ logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL");
+ return;
+ }
+
ArrayList<Page> depagedPages = new ArrayList<>();
LongHashSet depagedPagesSet = new LongHashSet();
@@ -506,6 +538,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
private long checkMinPage(Collection<PageSubscription> cursorList) {
long minPage = Long.MAX_VALUE;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Min page cursorList size {} on {}", cursorList.size(), pagingStore.getAddress(), new Exception("trace"));
+ }
+
for (PageSubscription cursor : cursorList) {
long firstPage = cursor.getFirstPage();
if (logger.isTraceEnabled()) {
@@ -543,4 +579,14 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
}
}
+
+ @Override
+ public void counterRebuildStarted() {
+ this.countersRebuilt = false;
+ }
+
+ @Override
+ public void counterRebuildDone() {
+ this.countersRebuilt = true;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 6ca73feb19..f580be47f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -16,19 +16,12 @@
*/
package org.apache.activemq.artemis.core.paging.cursor.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
-import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -44,173 +37,117 @@ import java.lang.invoke.MethodHandles;
/**
* This class will encapsulate the persistent counters for the PagingSubscription
*/
-public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
+public class PageSubscriptionCounterImpl extends BasePagingCounter {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final int FLUSH_COUNTER = 1000;
-
private final long subscriptionID;
// the journal record id that is holding the current value
private long recordID = -1;
- private boolean persistent;
+ /** while we rebuild the counters, we will use the recordedValues */
+ private volatile long recordedValue = -1;
+ private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> recordedValueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedValue");
- private final PageSubscription subscription;
+ /** while we rebuild the counters, we will use the recordedValues */
+ private volatile long recordedSize = -1;
+ private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> recordedSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "recordedSize");
- private final StorageManager storage;
+ private PageSubscription subscription;
- private final AtomicLong value = new AtomicLong(0);
- private final AtomicLong persistentSize = new AtomicLong(0);
+ private final StorageManager storage;
- private final AtomicLong added = new AtomicLong(0);
- private final AtomicLong addedPersistentSize = new AtomicLong(0);
+ private volatile long value;
+ private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> valueUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "value");
- private final AtomicLong pendingValue = new AtomicLong(0);
- private final AtomicLong pendingPersistentSize = new AtomicLong(0);
+ private volatile long persistentSize;
+ private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> persistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "persistentSize");
- private final LinkedList<Long> incrementRecords = new LinkedList<>();
+ private volatile long added;
+ private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> addedUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "added");
- // We are storing pending counters for non transactional writes on page
- // we will recount a page case we still see pending records
- // as soon as we close a page we remove these records replacing by a regular page increment record
- // A Map per pageID, each page will have a set of IDs, with the increment on each one
- private final Map<Long, PendingCounter> pendingCounters = new HashMap<>();
+ private volatile long addedPersistentSize;
+ private static final AtomicLongFieldUpdater<PageSubscriptionCounterImpl> addedPersistentSizeUpdater = AtomicLongFieldUpdater.newUpdater(PageSubscriptionCounterImpl.class, "addedPersistentSize");
private LinkedList<PendingCounter> loadList;
- private final Executor pageExecutor;
-
public PageSubscriptionCounterImpl(final StorageManager storage,
- final PageSubscription subscription,
- final boolean persistent,
final long subscriptionID) {
this.subscriptionID = subscriptionID;
this.storage = storage;
- this.persistent = persistent;
- this.subscription = subscription;
- if (subscription == null) {
- this.pageExecutor = null;
- } else {
- this.pageExecutor = subscription.getPagingStore().getExecutor();
- assert pageExecutor != null;
- }
}
@Override
- public long getValueAdded() {
- return added.get() + pendingValue.get();
- }
-
- @Override
- public long getValue() {
- return value.get() + pendingValue.get();
+ public void markRebuilding() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Subscription {} marked for rebuilding", subscriptionID);
+ }
+ super.markRebuilding();
+ recordedSizeUpdater.set(this, persistentSizeUpdater.get(this));
+ recordedValueUpdater.set(this, recordedValueUpdater.get(this));
+ try {
+ reset();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
}
@Override
- public long getPersistentSizeAdded() {
- return addedPersistentSize.get() + pendingPersistentSize.get();
+ public void finishRebuild() {
+ super.finishRebuild();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Subscription {} finished rebuilding", subscriptionID);
+ }
+ snapshot();
+ addedUpdater.set(this, valueUpdater.get(this));
+ addedPersistentSizeUpdater.set(this, persistentSizeUpdater.get(this));
}
@Override
- public long getPersistentSize() {
- return persistentSize.get() + pendingPersistentSize.get();
+ public long getValueAdded() {
+ return addedUpdater.get(this);
}
- /**
- * This is used only on non transactional paging
- *
- * @param page
- * @param increment
- * @throws Exception
- */
@Override
- public synchronized void pendingCounter(Page page, int increment, long size) throws Exception {
- if (!persistent) {
- return; // nothing to be done
+ public long getValue() {
+ if (isRebuilding()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("returning getValue from isPending on subscription {}, recordedValue={}, addedUpdater={}", recordedValueUpdater.get(this), addedUpdater.get(this));
+ }
+ return recordedValueUpdater.get(this);
}
-
- assert page != null;
-
- PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId());
- if (pendingInfo == null) {
- // We have to make sure this is sync here
- // not syncing this to disk may cause the page files to be out of sync on pages.
- // we can't afford the case where a page file is written without a record here
- long id = storage.storePendingCounter(this.subscriptionID, page.getPageId());
- pendingInfo = new PendingCounter(id, increment, size);
- pendingCounters.put((long) page.getPageId(), pendingInfo);
- } else {
- pendingInfo.addAndGet(increment, size);
+ if (logger.isTraceEnabled()) {
+ logger.trace("returning regular getValue subscription {}, value={}", subscriptionID, valueUpdater.get(this));
}
+ return valueUpdater.get(this);
+ }
- pendingValue.addAndGet(increment);
- pendingPersistentSize.addAndGet(size);
-
- page.addPendingCounter(this);
+ @Override
+ public long getPersistentSizeAdded() {
+ return addedPersistentSizeUpdater.get(this);
}
- /**
- * Cleanup temporary page counters on non transactional paged messages
- *
- * @param pageID
- */
@Override
- public void cleanupNonTXCounters(final long pageID) throws Exception {
- PendingCounter pendingInfo;
- synchronized (this) {
- pendingInfo = pendingCounters.remove(pageID);
+ public long getPersistentSize() {
+ if (isRebuilding()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("returning getPersistentSize from isPending on subscription {}, recordedSize={}. addedSize={}", subscriptionID, recordedSizeUpdater.get(this), addedPersistentSizeUpdater.get(this));
+ }
+ return recordedSizeUpdater.get(this);
}
-
- if (pendingInfo != null) {
- final int valueCleaned = pendingInfo.getCount();
- final long valueSizeCleaned = pendingInfo.getPersistentSize();
- Transaction tx = new TransactionImpl(storage);
- storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId());
-
- // To apply the increment of the value just being cleaned
- increment(tx, valueCleaned, valueSizeCleaned);
-
- tx.addOperation(new TransactionOperationAbstract() {
- @Override
- public void afterCommit(Transaction tx) {
- pendingValue.addAndGet(-valueCleaned);
- pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0);
- }
- });
-
- tx.commit();
+ if (logger.isTraceEnabled()) {
+ logger.trace("returning regular getPersistentSize subscription {}, value={}", subscriptionID, persistentSizeUpdater.get(this));
}
+ return persistentSizeUpdater.get(this);
}
@Override
public void increment(Transaction tx, int add, long size) throws Exception {
if (tx == null) {
- if (persistent) {
- long id = storage.storePageCounterInc(this.subscriptionID, add, size);
- storage.getContext().executeOnCompletion(new IOCallback() {
- @Override
- public void done() {
- process(id, add, size);
- }
-
- @Override
- public void onError(int errorCode, String errorMessage) {
-
- }
- });
- } else {
- process(-1, add, size);
- }
+ process(add, size);
} else {
- if (persistent) {
- tx.setContainsPersistent();
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size);
- applyIncrementOnTX(tx, id, add, size);
- } else {
- applyIncrementOnTX(tx, -1, add, size);
- }
+ applyIncrementOnTX(tx, add, size);
}
}
@@ -218,11 +155,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
* This method will install the TXs
*
* @param tx
- * @param recordID1
* @param add
*/
@Override
- public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) {
+ public void applyIncrementOnTX(Transaction tx, int add, long size) {
CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
if (oper == null) {
@@ -231,27 +167,36 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
tx.addOperation(oper);
}
- oper.operations.add(new ItemOper(this, recordID1, add, size));
+ oper.operations.add(new ItemOper(this, add, size));
}
@Override
- public synchronized void loadValue(final long recordID1, final long value1, long size) {
- if (this.subscription != null) {
- // it could be null on testcases... which is ok
- this.subscription.notEmpty();
+ public synchronized void loadValue(final long recordID, final long value, long size) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Counter for subscription {} reloading recordID={}, value={}, size={}", this.subscriptionID, recordID, value, size);
}
- this.value.set(value1);
- this.added.set(value1);
- this.persistentSize.set(size);
- this.addedPersistentSize.set(size);
- this.recordID = recordID1;
+ this.recordID = recordID;
+ recordedValueUpdater.set(this, value);
+ recordedSizeUpdater.set(this, size);
+ valueUpdater.set(this, value);
+ persistentSizeUpdater.set(this, size);
+ addedUpdater.set(this, size);
}
- private void process(long id, int add, long size) {
- if (id >= 0 && pageExecutor != null) {
- pageExecutor.execute(() -> doIncrement(id, add, size));
- } else {
- doIncrement(-1, add, size);
+ private void process(int add, long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size);
+ }
+ valueUpdater.addAndGet(this, add);
+ persistentSizeUpdater.addAndGet(this, size);
+ if (add > 0) {
+ addedUpdater.addAndGet(this, add);
+ addedPersistentSizeUpdater.addAndGet(this, size);
+ }
+
+ if (isRebuilding()) {
+ recordedValueUpdater.addAndGet(this, value);
+ recordedSizeUpdater.addAndGet(this, size);
}
}
@@ -264,24 +209,39 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
tx.commit();
}
+ private void reset() throws Exception {
+ Transaction tx = new TransactionImpl(storage);
+
+ delete(tx, true);
+
+ tx.commit();
+ }
+
@Override
public void delete(Transaction tx) throws Exception {
+ delete(tx, false);
+ }
+
+ private void delete(Transaction tx, boolean keepZero) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Subscription {} delete, keepZero={}", subscriptionID, keepZero);
+ }
// always lock the StorageManager first.
try (ArtemisCloseable lock = storage.closeableReadLock()) {
synchronized (this) {
- for (Long record : incrementRecords) {
- storage.deleteIncrementRecord(tx.getID(), record.longValue());
- tx.setContainsPersistent();
- }
-
if (recordID >= 0) {
storage.deletePageCounter(tx.getID(), this.recordID);
tx.setContainsPersistent();
}
- recordID = -1;
- value.set(0);
- incrementRecords.clear();
+ if (keepZero) {
+ recordID = storage.storePageCounter(tx.getID(), subscriptionID, 0L, 0L);
+ } else {
+ recordID = -1;
+ }
+
+ valueUpdater.set(this, 0);
+ persistentSizeUpdater.set(this, 0);
}
}
}
@@ -298,110 +258,101 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void processReload() {
if (loadList != null) {
- if (subscription != null) {
- // it could be null on testcases
- subscription.notEmpty();
- }
-
- for (PendingCounter incElement : loadList) {
- value.addAndGet(incElement.getCount());
- added.addAndGet(incElement.getCount());
- persistentSize.addAndGet(incElement.getPersistentSize());
- addedPersistentSize.addAndGet(incElement.getPersistentSize());
- incrementRecords.add(incElement.getId());
+ try {
+ long tx = -1L;
+ logger.debug("Removing increment records on cursor {}", subscriptionID);
+ for (PendingCounter incElement : loadList) {
+ if (tx < 0) {
+ tx = storage.generateID();
+ }
+ storage.deletePageCounter(tx, incElement.id);
+ }
+ if (tx >= 0) {
+ storage.commit(tx);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
}
loadList.clear();
loadList = null;
}
}
- // you need to call this method from the executors when id > 0
- private void doIncrement(long id, int variance, long size) {
- value.addAndGet(variance);
- this.persistentSize.addAndGet(size);
- if (variance > 0) {
- added.addAndGet(variance);
- }
- if (size > 0) {
- addedPersistentSize.addAndGet(size);
- }
- if (id >= 0) {
- synchronized (this) {
- incrementRecords.add(id);
- if (incrementRecords.size() > FLUSH_COUNTER) {
- this.cleanup();
- }
- }
- }
- }
-
- /**
- * used on testing only
- */
- public void setPersistent(final boolean persistent) {
- this.persistent = persistent;
- }
-
/**
* This method should always be called from a single threaded executor
*/
- protected synchronized void cleanup() {
- if (incrementRecords.size() <= FLUSH_COUNTER) {
+ @Override
+ public synchronized void snapshot() {
+ if (isRebuilding()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("snapshot call ignored as cursor is being rebuilt for {}", subscriptionID);
+ }
+ return;
+ }
+
+ if (!storage.isStarted()) {
+ logger.debug("Storage is not active, ignoring snapshot call on {}", subscriptionID);
return;
}
- long valueReplace = value.get();
- long sizeReplace = persistentSize.get();
- ArrayList<Long> deleteList = new ArrayList<>(incrementRecords);
- incrementRecords.clear();
+ long valueReplace = valueUpdater.get(this);
+ long sizeReplace = persistentSizeUpdater.get(this);
long newRecordID = -1;
- long txCleanup = storage.generateID();
+ long txCleanup = -1;
try {
- for (Long value1 : deleteList) {
- storage.deleteIncrementRecord(txCleanup, value1);
- }
-
if (recordID >= 0) {
+ if (txCleanup < 0) {
+ txCleanup = storage.generateID();
+ }
storage.deletePageCounter(txCleanup, recordID);
+ recordID = -1;
}
- newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
+ if (valueReplace > 0) {
+ if (txCleanup < 0) {
+ txCleanup = storage.generateID();
+ }
+ newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}",
- recordID, newRecordID, subscriptionID, subscription.getQueue().getName());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}, value = {}, size = {}",
+ recordID, newRecordID, subscriptionID, subscription.getQueue().getName(), valueReplace, sizeReplace);
}
- storage.commit(txCleanup);
+ if (txCleanup >= 0) {
+ storage.commit(txCleanup);
+ }
} catch (Exception e) {
newRecordID = recordID;
ActiveMQServerLogger.LOGGER.problemCleaningPagesubscriptionCounter(e);
- try {
- storage.rollback(txCleanup);
- } catch (Exception ignored) {
+ if (txCleanup >= 0) {
+ try {
+ storage.rollback(txCleanup);
+ } catch (Exception ignored) {
+ }
}
} finally {
recordID = newRecordID;
+ recordedValueUpdater.set(this, valueReplace);
+ recordedSizeUpdater.set(this, sizeReplace);
}
}
private static class ItemOper {
- private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) {
+ private ItemOper(PageSubscriptionCounterImpl counter, int add, long persistentSize) {
this.counter = counter;
- this.id = id;
this.amount = add;
this.persistentSize = persistentSize;
}
PageSubscriptionCounterImpl counter;
- long id;
-
int amount;
long persistentSize;
@@ -414,7 +365,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void afterCommit(Transaction tx) {
for (ItemOper oper : operations) {
- oper.counter.process(oper.id, oper.amount, oper.persistentSize);
+ oper.counter.process(oper.amount, oper.persistentSize);
}
}
}
@@ -465,4 +416,10 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
SIZE_UPDATER.addAndGet(this, persistentSize);
}
}
+
+ @Override
+ public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
+ this.subscription = subscription;
+ return this;
+ }
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index cdbd1dba72..a0ebc3a9c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
@@ -273,14 +274,17 @@ public final class PageSubscriptionImpl implements PageSubscription {
final StorageManager store,
final Filter filter,
final long cursorId,
- final boolean persistent) {
+ final boolean persistent,
+ final PageSubscriptionCounter counter) {
+ assert counter != null;
this.pageStore = pageStore;
this.store = store;
this.cursorProvider = cursorProvider;
this.cursorId = cursorId;
this.filter = filter;
this.persistent = persistent;
- this.counter = new PageSubscriptionCounterImpl(store, this, persistent, cursorId);
+ this.counter = counter;
+ this.counter.setSubscription(this);
}
@@ -346,6 +350,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
+ @Override
+ public boolean isCounterPending() {
+ return counter.isRebuilding();
+ }
+
@Override
public long getPersistentSize() {
if (empty) {
@@ -418,10 +427,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public void onPageModeCleared(Transaction tx) throws Exception {
- if (counter != null) {
- // this could be null on testcases
- counter.delete(tx);
- }
+ // this could be null on testcases
+ counter.delete(tx);
this.empty = true;
}
@@ -746,9 +753,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
@Override
- public void forEachConsumedPage(Consumer<ConsumedPage> pageCleaner) {
+ public void forEachConsumedPage(Consumer<ConsumedPage> pageConsumer) {
synchronized (consumedPages) {
- consumedPages.values().forEach(pageCleaner);
+ consumedPages.values().forEach(pageConsumer);
}
}
@@ -860,6 +867,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
public void stop() {
}
+ @Override
+ public void counterSnapshot() {
+ counter.snapshot();
+ }
+
@Override
public void printDebug() {
printDebug(toString());
@@ -912,6 +924,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
return getPageInfo(pos.getPageNr());
}
+ @Override
public PageCursorInfo locatePageInfo(final long pageNr) {
synchronized (consumedPages) {
return consumedPages.get(pageNr);
@@ -1064,10 +1077,16 @@ public final class PageSubscriptionImpl implements PageSubscription {
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
+ @Override
public synchronized boolean isAck(int messageNumber) {
return completePage != null || acks.get(messageNumber) != null;
}
+ @Override
+ public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+ acks.forEach(ackConsumer);
+ }
+
@Override
public String toString() {
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 6a0551b430..8e919fb670 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -26,13 +25,11 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.EmptyList;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@@ -86,11 +83,6 @@ public final class Page {
private final SimpleString storeName;
- /**
- * A list of subscriptions containing pending counters (with non tx adds) on this page
- */
- private Set<PageSubscriptionCounter> pendingCounters;
-
private ByteBuffer readFileBuffer;
public Page(final SimpleString storeName,
@@ -241,13 +233,6 @@ public final class Page {
storageManager.pageClosed(storeName, pageId);
}
file.close(waitSync, waitSync);
-
- Set<PageSubscriptionCounter> counters = getPendingCounters();
- if (counters != null) {
- for (PageSubscriptionCounter counter : counters) {
- counter.cleanupNonTXCounters(this.getPageId());
- }
- }
}
public boolean delete(final LinkedList<PagedMessage> messages) throws Exception {
@@ -255,7 +240,9 @@ public final class Page {
storageManager.pageDeleted(storeName, pageId);
}
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Deleting pageNr={} on store {}", pageId, storeName, new Exception("trace"));
+ } else if (logger.isDebugEnabled()) {
logger.debug("Deleting pageNr={} on store {}", pageId, storeName);
}
@@ -373,24 +360,4 @@ public final class Page {
return file;
}
- /**
- * This will indicate a page that will need to be called on cleanup when the page has been closed and confirmed
- *
- * @param pageSubscriptionCounter
- */
- public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) {
- getOrCreatePendingCounters().add(pageSubscriptionCounter);
- }
-
- private synchronized Set<PageSubscriptionCounter> getPendingCounters() {
- return pendingCounters;
- }
-
- private synchronized Set<PageSubscriptionCounter> getOrCreatePendingCounters() {
- if (pendingCounters == null) {
- pendingCounters = new ConcurrentHashSet<>();
- }
-
- return pendingCounters;
- }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 59db0da45f..6bec335949 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -32,6 +34,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@@ -40,14 +43,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
public final class PagingManagerImpl implements PagingManager {
- private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval", "0"));
+ private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60"));
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -92,13 +97,13 @@ public final class PagingManagerImpl implements PagingManager {
private volatile long diskTotalSpace = 0;
- private final Executor memoryExecutor;
+ private final Executor managerExecutor;
private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>();
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
- private ActiveMQScheduledComponent scheduledComponent = null;
+ private ActiveMQScheduledComponent snapshotUpdater = null;
private final SimpleString managementAddress;
@@ -127,7 +132,7 @@ public final class PagingManagerImpl implements PagingManager {
globalSizeMetric.setElementsEnabled(maxMessages >= 0);
globalSizeMetric.setOverCallback(() -> setGlobalFull(true));
globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
- this.memoryExecutor = pagingSPI.newExecutor();
+ this.managerExecutor = pagingSPI.newExecutor();
this.managementAddress = managementAddress;
}
@@ -205,8 +210,8 @@ public final class PagingManagerImpl implements PagingManager {
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || !globalFull) && !blockedStored.isEmpty()) {
if (!memoryCallback.isEmpty()) {
- if (memoryExecutor != null) {
- memoryExecutor.execute(this::memoryReleased);
+ if (managerExecutor != null) {
+ managerExecutor.execute(this::memoryReleased);
} else {
memoryReleased();
}
@@ -368,8 +373,8 @@ public final class PagingManagerImpl implements PagingManager {
PagingStore oldStore = stores.remove(store.getStoreName());
if (oldStore != null) {
oldStore.stop();
- oldStore = null;
}
+ store.getCursorProvider().counterRebuildStarted(); // TODO-NOW-DONT-MERGE maybe this should be removed
store.start();
stores.put(store.getStoreName(), store);
}
@@ -466,28 +471,38 @@ public final class PagingManagerImpl implements PagingManager {
reloadStores();
- if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) {
- this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) {
+ if (ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL > 0) {
+ this.snapshotUpdater = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL, TimeUnit.SECONDS, false) {
@Override
public void run() {
- debug();
+ try {
+ logger.debug("Updating counter snapshots");
+ counterSnapshot();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
}
};
- this.scheduledComponent.start();
+ this.snapshotUpdater.start();
}
started = true;
+
} finally {
unlock();
}
}
- public void debug() {
- logger.info("size = {} bytes, messages = {}", globalSizeMetric.getSize(), globalSizeMetric.getElements());
+ @Override
+ public void counterSnapshot() {
+ for (PagingStore store : stores.values()) {
+ store.counterSnapshot();
+ }
}
+
@Override
public synchronized void stop() throws Exception {
if (!started) {
@@ -495,9 +510,9 @@ public final class PagingManagerImpl implements PagingManager {
}
started = false;
- if (scheduledComponent != null) {
- this.scheduledComponent.stop();
- this.scheduledComponent = null;
+ if (snapshotUpdater != null) {
+ this.snapshotUpdater.stop();
+ this.snapshotUpdater = null;
}
lock();
@@ -548,4 +563,26 @@ public final class PagingManagerImpl implements PagingManager {
syncLock.writeLock().lock();
}
+ @Override
+ public void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
+ transactions.forEach(transactionConsumer);
+ }
+
+ @Override
+ public Future<Object> rebuildCounters() {
+ LongHashSet transactionsSet = new LongHashSet();
+ transactions.forEach((txId, tx) -> {
+ transactionsSet.add(txId);
+ });
+ stores.forEach((address, pgStore) -> {
+ PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet);
+ logger.debug("Setting destination {} to rebuild counters", address);
+ managerExecutor.execute(rebuildManager);
+ });
+
+ FutureTask<Object> task = new FutureTask<>(() -> null);
+ managerExecutor.execute(task);
+
+ return task;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index ca3fd43a71..81d10b9691 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -380,34 +380,40 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
- public synchronized void stop() throws Exception {
- if (running) {
- cursorProvider.flushExecutors();
- cursorProvider.stop();
-
- final List<Runnable> pendingTasks = new ArrayList<>();
-
- // TODO we could have a parameter to use this
- final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
- if (pendingTasksWhileShuttingDown > 0) {
- logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown);
- for (Runnable pendingTask : pendingTasks) {
- try {
- pendingTask.run();
- } catch (Throwable t) {
- logger.warn("Error while executing a pending task on shutdown", t);
- }
- }
+ public void counterSnapshot() {
+ cursorProvider.counterSnapshot();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ synchronized (this) {
+ if (running) {
+ cursorProvider.stop();
+ running = false;
+ } else {
+ return;
}
+ }
- running = false;
+ final List<Runnable> pendingTasks = new ArrayList<>();
- final Page page = currentPage;
- if (page != null) {
- page.close(false);
- currentPage = null;
+ final int pendingTasksWhileShuttingDown = executor.shutdownNow(pendingTasks::add, 30, TimeUnit.SECONDS);
+ if (pendingTasksWhileShuttingDown > 0) {
+ logger.trace("Try executing {} pending tasks on stop", pendingTasksWhileShuttingDown);
+ for (Runnable pendingTask : pendingTasks) {
+ try {
+ pendingTask.run();
+ } catch (Throwable t) {
+ logger.warn("Error while executing a pending task on shutdown", t);
+ }
}
}
+
+ final Page page = currentPage;
+ if (page != null) {
+ page.close(false);
+ currentPage = null;
+ }
}
@Override
@@ -424,10 +430,13 @@ public class PagingStoreImpl implements PagingStore {
public void flushExecutors() {
FutureLatch future = new FutureLatch();
- executor.execute(future);
+ try {
+ executor.execute(future);
- if (!future.await(60000)) {
- ActiveMQServerLogger.LOGGER.pageStoreTimeout(address);
+ if (!future.await(60000)) {
+ ActiveMQServerLogger.LOGGER.pageStoreTimeout(address);
+ }
+ } catch (Exception ignored) {
}
}
@@ -1122,14 +1131,7 @@ public class PagingStoreImpl implements PagingStore {
List<org.apache.activemq.artemis.core.server.Queue> durableQueues = ctx.getDurableQueues();
List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) {
- if (tx == null) {
- // non transactional writes need an intermediate place
- // to avoid the counter getting out of sync
- q.getPageSubscription().getCounter().pendingCounter(page, 1, size);
- } else {
- // null tx is treated through pending counters
- q.getPageSubscription().getCounter().increment(tx, 1, size);
- }
+ q.getPageSubscription().getCounter().increment(tx, 1, size);
}
for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
@@ -1408,5 +1410,9 @@ public class PagingStoreImpl implements PagingStore {
usedPages.forEachUsedPage(consumerPage);
}
+ @Override
+ public StorageManager getStorageManager() {
+ return storageManager;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index d7ca55888f..5157103ff9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -22,7 +22,6 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import java.security.InvalidParameterException;
@@ -1692,7 +1691,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public synchronized boolean isStarted() {
- return started;
+ if (ioCriticalErrorListener != null) {
+ return started && !ioCriticalErrorListener.isPreviouslyFailed();
+ } else {
+ return started;
+ }
}
/**
@@ -1895,25 +1898,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
break;
}
- case PAGE_CURSOR_COUNTER_VALUE: {
- ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
-
- break;
- }
-
case PAGE_CURSOR_COUNTER_INC: {
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
- PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
+ logger.debug("Page cursor counter inc on a prepared TX.");
- if (sub != null) {
- sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize());
- sub.notEmpty();
- } else {
- ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
- }
+ // TODO: do I need to remove the record on commit?
break;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 5a585fe8bb..19840061b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -535,7 +535,7 @@ public final class DescribeJournal {
PageSubscriptionCounterImpl subsCounter;
subsCounter = counters.get(queueIDForCounter);
if (subsCounter == null) {
- subsCounter = new PageSubscriptionCounterImpl(null, null, false, -1);
+ subsCounter = new PageSubscriptionCounterImpl(null, -1);
counters.put(queueIDForCounter, subsCounter);
}
return subsCounter;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index aadb9ce633..2c2f632d93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -524,4 +524,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229244, value = "Meters already registered for {}")
IllegalStateException metersAlreadyRegistered(String resource);
+
+ @Message(id = 229245, value = "Management controller is busy with another task. Please try again")
+ ActiveMQTimeoutException managementBusy();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index bfc0feea5a..57a02aa305 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -111,6 +111,7 @@ public interface ActiveMQServer extends ServiceComponent {
STOPPED
}
+ AutoCloseable managementLock() throws Exception;
void setState(SERVER_STATE state);
@@ -357,6 +358,14 @@ public interface ActiveMQServer extends ServiceComponent {
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception;
+ /** should the server rebuild page counters upon startup.
+ * this will be useful on testing or an embedded broker scenario */
+ boolean isRebuildCounters();
+
+ /** should the server rebuild page counters upon startup.
+ * this will be useful on testing or an embedded broker scenario */
+ void setRebuildCounters(boolean rebuildCounters);
+
SecurityStore getSecurityStore();
void removeSession(String name) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 2f007976e3..1feee23ec0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -472,9 +472,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222047, value = "Can not find queue {} while reloading ACKNOWLEDGE_CURSOR", level = LogMessage.Level.WARN)
void journalCannotFindQueueReloadingACK(Long queueID);
- @LogMessage(id = 222048, value = "PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, invalid state", level = LogMessage.Level.WARN)
- void journalPAGEOnPrepared();
-
@LogMessage(id = 222049, value = "InternalError: Record type {} not recognized. Maybe you are using journal files created on a different version", level = LogMessage.Level.WARN)
void journalInvalidRecordType(Byte recordType);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index af278ebfd9..d9a1f7bc21 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -239,6 +239,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private HAPolicy haPolicy;
+ // This will be useful on tests or embedded
+ private boolean rebuildCounters = true;
+
private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
private final Version version;
@@ -271,6 +274,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private ReplayManager replayManager;
+ /** Certain management operations shouldn't use more than one thread.
+ * this semaphore is used to guarantee a single thread used. */
+ private final Semaphore managementSemaphore = new Semaphore(1);
+
/**
* This is a thread pool for io tasks only.
* We can't use the same global executor to avoid starvations.
@@ -496,6 +503,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return networkHealthCheck;
}
+ @Override
+ public void setRebuildCounters(boolean rebuildCounters) {
+ this.rebuildCounters = rebuildCounters;
+ }
+
+ @Override
+ public boolean isRebuildCounters() {
+ return this.rebuildCounters;
+ }
+
@Override
public void replay(Date start, Date end, String address, String target, String filter) throws Exception {
@@ -1323,6 +1340,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
+ if (!criticalIOError && pagingManager != null) {
+ pagingManager.counterSnapshot();
+ }
+
stopComponent(pagingManager);
if (storageManager != null)
@@ -3323,6 +3344,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
JournalLoadInformation[] journalInfo = loadJournals();
+ if (rebuildCounters) {
+ pagingManager.rebuildCounters();
+ }
+
removeExtraAddressStores();
if (securityManager instanceof ActiveMQBasicSecurityManager) {
@@ -4245,8 +4270,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final AtomicBoolean failedAlready = new AtomicBoolean();
+ @Override
+ public boolean isPreviouslyFailed() {
+ return failedAlready.get();
+ }
+
@Override
public synchronized void onIOException(Throwable cause, String message, String file) {
+ if (logger.isTraceEnabled()) {
+ // the purpose of this is to find where the critical error is being called at
+ // useful for when debugging where the critical error is being called at
+ logger.trace("Throwing critical error {}", cause.getMessage(), new Exception("trace"));
+ }
if (!failedAlready.compareAndSet(false, true)) {
return;
}
@@ -4542,4 +4577,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
+ @Override
+ public AutoCloseable managementLock() throws Exception {
+ if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
+ throw ActiveMQMessageBundle.BUNDLE.managementBusy();
+ } else {
+ return managementSemaphore::release;
+ }
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 488109c83e..e990162503 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1725,7 +1725,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
- return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+ long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+ if (logger.isTraceEnabled()) {
+ logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}",
+ name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(),
+ pageSubscription.getDeliveredCount());
+ }
+ return returnValue;
} else {
return (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount();
}
@@ -2279,6 +2285,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void destroyPaging() throws Exception {
// it could be null on embedded or certain unit tests
if (pageSubscription != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Destroying paging for {}", this.name, new Exception("trace"));
+ }
pageSubscription.destroy();
pageSubscription.cleanupEntries(true);
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 9b18e29ed7..fb04c2b2a4 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -295,6 +295,7 @@ public abstract class ActiveMQTestBase extends Assert {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception ignored) {
+ // it always throws an exception on shutdown
}
}
@@ -878,7 +879,7 @@ public abstract class ActiveMQTestBase extends Assert {
return testDir;
}
- private String getEmbeddedDataBaseName() {
+ protected String getEmbeddedDataBaseName() {
return "memory:" + getTestDir();
}
@@ -2314,6 +2315,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected int getMessageCount(final Queue queue) {
+ try {
+ Wait.waitFor(() -> queue.getPageSubscription().isCounterPending() == false);
+ } catch (Exception ignored) {
+ }
queue.flushExecutor();
return (int) queue.getMessageCount();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java b/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy
similarity index 61%
copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
copy to tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy
index 1f1ae7b4cb..c3508890e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ConsumedPage.java
+++ b/tests/compatibility-tests/src/main/resources/pageCounter/checkMessages.groovy
@@ -15,13 +15,17 @@
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.paging.cursor;
+package pageCounter
-// this is to expose PageSubscriptionImpl::PageCursorInfo
-public interface ConsumedPage {
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
- long getPageId();
+int messages = Integer.parseInt(arg[0]);
- boolean isDone();
+Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
+for (int i = 0; i < 20 && queue.getMessageCount() != messages; i++) {
+ Thread.sleep(100);
}
+GroovyRun.assertEquals((int)messages, (int)queue.getMessageCount());
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy
new file mode 100644
index 0000000000..20f9dccc2d
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/pageCounter/sendMessages.groovy
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pageCounter
+
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import javax.jms.*
+
+// starts an artemis server
+String serverType = arg[0];
+String protocol = arg[1];
+int messages = Integer.parseInt(arg[2]);
+
+// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
+if (protocol != null && protocol.equals("AMQP")) {
+ GroovyRun.evaluate("clients/artemisClientAMQP.groovy", "serverArg", serverType, protocol);
+} else {
+ GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
+}
+
+Connection connection = cf.createConnection();
+Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+Queue destination = session.createQueue("queue")
+
+MessageProducer producer = session.createProducer(destination);
+producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+for (int i = 0; i < messages; i++) {
+ TextMessage message = session.createTextMessage("Message " + i);
+ producer.send(message);
+ if (i % 100 == 0) {
+ session.commit();
+ }
+}
+
+session.commit();
+
+connection.close();
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java
new file mode 100644
index 0000000000..ae96f5e85c
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/PagingCounterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYTWO_ZERO;
+
+@RunWith(Parameterized.class)
+public class PagingCounterTest extends VersionedBase {
+
+ // this will ensure that all tests in this class are run twice,
+ // once with "true" passed to the class' constructor and once with "false"
+ @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
+ public static Collection getParameters() {
+ // we don't need every single version ever released..
+ // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
+ List<Object[]> combinations = new ArrayList<>();
+
+ /*
+ // during development sometimes is useful to comment out the combinations
+ // and add the ones you are interested.. example:
+ */
+ // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
+ // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
+
+ combinations.add(new Object[]{null, TWO_TWENTYTWO_ZERO, SNAPSHOT});
+ combinations.add(new Object[]{null, SNAPSHOT, TWO_TWENTYTWO_ZERO});
+ // the purpose on this one is just to validate the test itself.
+ /// if it can't run against itself it won't work at all
+ combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
+ return combinations;
+ }
+
+ public PagingCounterTest(String server, String sender, String receiver) throws Exception {
+ super(server, sender, receiver);
+ }
+
+ @Before
+ public void removeFolder() throws Throwable {
+ FileUtil.deleteDirectory(serverFolder.getRoot());
+ serverFolder.getRoot().mkdirs();
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ stopServer(serverClassloader);
+ } catch (Throwable ignored) {
+ }
+ try {
+ stopServer(receiverClassloader);
+ } catch (Throwable ignored) {
+ }
+ }
+
+ @Test
+ public void testSendReceivePaging() throws Throwable {
+ setVariable(senderClassloader, "persistent", true);
+ startServer(serverFolder.getRoot(), senderClassloader, "pageCounter", null, true);
+ evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
+ evaluate(senderClassloader, "pageCounter/sendMessages.groovy", server, "core", "1000");
+ evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
+ evaluate(senderClassloader, "pageCounter/checkMessages.groovy", "1000");
+ stopServer(senderClassloader);
+
+ setVariable(receiverClassloader, "persistent", true);
+ startServer(serverFolder.getRoot(), receiverClassloader, "pageCounter", null, false);
+ evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");
+ evaluate(receiverClassloader, "pageCounter/checkMessages.groovy", "1000");
+
+ }
+}
+
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index 30986fa890..d4e45b2471 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -159,7 +159,6 @@ public class BackupSyncJournalTest extends FailoverTestBase {
for (Pair<Long, Integer> pair : backupIds) {
totalBackup += pair.getB();
}
- assertEquals("number of records must match ", total, totalBackup);
// "+ 2": there two other calls that send N_MSGS.
for (int i = 0; i < totalRounds + 3; i++) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 2ffef62bb2..f141a8ba66 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -1715,6 +1715,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
public boolean isEmbeddedWebServerStarted() {
return (boolean) proxy.retrieveAttributeValue("embeddedWebServerStarted");
}
+
+ @Override
+ public void rebuildPageCounters() throws Exception {
+ proxy.invokeOperation("rebuildPageCounters");
+ }
};
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java
index 0231610c69..f2c67c9d13 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.paging;
+import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -39,9 +40,13 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
@Rule
public RetryRule retryRule = new RetryRule(1);
@@ -151,7 +156,7 @@ public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
}
}
} catch (Exception expected) {
- expected.printStackTrace();
+ logger.info("expected exception {}", expected.toString(), expected);
}
} finally {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java
new file mode 100644
index 0000000000..9669de41a6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PageCounterRebuildTest extends ActiveMQTestBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Test
+ public void testUnitSize() throws Exception {
+ AtomicInteger errors = new AtomicInteger(0);
+
+ StorageManager mockStorage = Mockito.mock(StorageManager.class);
+
+ PageSubscriptionCounterImpl nonPersistentPagingCounter = new PageSubscriptionCounterImpl(mockStorage, -1);
+
+ final int THREADS = 33;
+ final int ADD_VALUE = 7;
+ final int SIZE_VALUE = 17;
+ final int REPEAT = 777;
+
+ ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+ runAfter(executorService::shutdownNow);
+
+ CyclicBarrier startFlag = new CyclicBarrier(THREADS);
+
+ ReusableLatch latch = new ReusableLatch(THREADS);
+
+ for (int j = 0; j < THREADS; j++) {
+ executorService.execute(() -> {
+ try {
+ startFlag.await(10, TimeUnit.SECONDS);
+ for (int i = 0; i < REPEAT; i++) {
+ nonPersistentPagingCounter.increment(null, ADD_VALUE, SIZE_VALUE);
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(ADD_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getValue());
+ Assert.assertEquals(SIZE_VALUE * THREADS * REPEAT, nonPersistentPagingCounter.getPersistentSize());
+
+
+ latch.setCount(THREADS);
+
+ for (int j = 0; j < THREADS; j++) {
+ executorService.execute(() -> {
+ try {
+ startFlag.await(10, TimeUnit.SECONDS);
+ for (int i = 0; i < REPEAT; i++) {
+ nonPersistentPagingCounter.increment(null, -ADD_VALUE, -SIZE_VALUE);
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ Assert.assertEquals(0L, nonPersistentPagingCounter.getValue());
+ Assert.assertEquals(0L, nonPersistentPagingCounter.getPersistentSize());
+ Assert.assertEquals(0, errors.get());
+ }
+
+ @Test
+ public void testRebuildCounter() throws Exception {
+ ActiveMQServer server = createServer(true, true);
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(100 * 1024).setMaxReadPageMessages(1);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ server.start();
+
+ String queueName = getName();
+ String nonConsumedQueueName = getName() + "_nonConsumed";
+ server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.MULTICAST));
+ server.createQueue(new QueueConfiguration(nonConsumedQueueName).setAddress(queueName).setRoutingType(RoutingType.MULTICAST));
+ server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST));
+
+ Queue serverQueue = server.locateQueue(queueName);
+ Queue serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+ Assert.assertNotNull(serverQueue);
+ Assert.assertNotNull(serverNonConsumedQueue);
+
+ serverQueue.getPagingStore().startPaging();
+
+ final int THREADS = 4;
+ final int TX_SEND = 2000;
+ final int NON_TXT_SEND = 200;
+ final int CONSUME_MESSAGES = 200;
+ AtomicInteger errors = new AtomicInteger(0);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+ runAfter(executorService::shutdownNow);
+
+ CyclicBarrier startFlag = new CyclicBarrier(THREADS);
+
+ ReusableLatch latch = new ReusableLatch(THREADS);
+
+ for (int i = 0; i < THREADS; i++) {
+ final int threadNumber = i;
+ executorService.execute(() -> {
+ try {
+ startFlag.await(10, TimeUnit.SECONDS);
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session txSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
+
+ logger.info("sending thread {}", threadNumber);
+
+ javax.jms.Topic jmsQueue = session.createTopic(queueName);
+ MessageProducer producerNonTX = session.createProducer(jmsQueue);
+ MessageProducer producerTX = txSession.createProducer(jmsQueue);
+
+ for (int message = 0; message < NON_TXT_SEND; message++) {
+ TextMessage txtMessage = session.createTextMessage("hello" + message);
+ txtMessage.setBooleanProperty("first", false);
+ producerNonTX.send(session.createTextMessage("hello" + message));
+ }
+ for (int message = 0; message < TX_SEND; message++) {
+ producerTX.send(session.createTextMessage("helloTX" + message));
+ }
+ txSession.commit();
+ }
+
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // this should be fast on the CIs, but if you use a slow disk, it might take a few extra seconds.
+ Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));
+
+ final int numberOfMessages = TX_SEND * THREADS + NON_TXT_SEND * THREADS;
+ Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) {
+ MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName));
+ connection.start();
+ for (int i = 0; i < CONSUME_MESSAGES; i++) {
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
+ }
+ }
+
+ Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+ Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+ server.stop();
+ server.start();
+
+ serverQueue = server.locateQueue(queueName);
+ serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+ Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+ Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+ serverQueue.getPageSubscription().getCounter().markRebuilding();
+ serverNonConsumedQueue.getPageSubscription().getCounter().markRebuilding();
+
+ // if though we are rebuilding, we are still returning based on the last recorded value until processing is finished
+ Assert.assertEquals(8600, serverQueue.getMessageCount());
+ Assert.assertEquals(8800, serverNonConsumedQueue.getMessageCount());
+
+ serverQueue.getPageSubscription().getCounter().finishRebuild();
+
+ serverNonConsumedQueue.getPageSubscription().getCounter().finishRebuild();
+
+ Assert.assertEquals(0, serverQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild
+ Assert.assertEquals(0, serverNonConsumedQueue.getMessageCount()); // we artificially made it 0 by faking a rebuild
+
+ server.stop();
+ server.start();
+
+ serverQueue = server.locateQueue(queueName);
+ serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+ // after a rebuild, the counter should be back to where it was
+ Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+ Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+ server.stop();
+ server.start();
+
+ serverQueue = server.locateQueue(queueName);
+ serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+ Assert.assertNotNull(serverQueue);
+ Assert.assertNotNull(serverNonConsumedQueue);
+
+ Wait.assertEquals(numberOfMessages - CONSUME_MESSAGES, serverQueue::getMessageCount);
+ Wait.assertEquals(numberOfMessages, serverNonConsumedQueue::getMessageCount);
+
+ server.stop();
+ // restarting the server to issue a rebuild on the counters
+ server.start();
+
+ logger.info("Consuming messages");
+ factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);) {
+ MessageConsumer consumer = session.createConsumer(session.createQueue(queueName + "::" + queueName));
+ connection.start();
+ for (int i = 0; i < numberOfMessages - CONSUME_MESSAGES; i++) {
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
+ if (i % 100 == 0) {
+ logger.info("Received {} messages", i);
+ }
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ consumer.close();
+
+ consumer = session.createConsumer(session.createQueue(queueName + "::" + nonConsumedQueueName));
+ connection.start();
+ for (int i = 0; i < numberOfMessages; i++) {
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ consumer.close();
+ }
+
+ serverQueue = server.locateQueue(queueName);
+ serverNonConsumedQueue = server.locateQueue(nonConsumedQueueName);
+
+ Wait.assertEquals(0L, serverQueue::getMessageCount, 1000, 100);
+ Wait.assertEquals(0L, serverNonConsumedQueue::getMessageCount, 1000, 100);
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index f68608e24a..8492f9626f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -164,6 +163,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
server.stop();
+ server.setRebuildCounters(false);
+
server.start();
queue = server.locateQueue("A1");
@@ -177,6 +178,70 @@ public class PagingCounterTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testMultiThreadCounter() throws Exception {
+ ClientSessionFactory sf = createSessionFactory(sl);
+ ClientSession session = sf.createSession();
+
+ try {
+ server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
+ Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
+
+ final PageSubscriptionCounter counter = locateCounter(queue);
+
+ final int THREADS = 10;
+
+ final CyclicBarrier flagStart = new CyclicBarrier(THREADS);
+ final CountDownLatch done = new CountDownLatch(THREADS);
+
+ final int BUMPS = 2000;
+
+ Assert.assertEquals(0, counter.getValue());
+
+ ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
+ runAfter(executorService::shutdownNow);
+
+ for (int i = 0; i < THREADS; i++) {
+ executorService.execute(() -> {
+ try {
+ flagStart.await(10, TimeUnit.SECONDS);
+ for (int repeat = 0; repeat < BUMPS; repeat++) {
+ counter.increment(null, 1, 1L);
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+ counter.increment(tx, 1, 1L);
+ tx.commit();
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ // it should take a couple seconds only
+ done.await(1, TimeUnit.MINUTES);
+
+ Wait.assertEquals((long)(BUMPS * 2 * THREADS), counter::getValue, 5000, 100);
+
+ server.stop();
+
+ server.setRebuildCounters(false);
+
+ server.start();
+
+ queue = server.locateQueue("A1");
+
+ final PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
+ Wait.assertEquals((long)(BUMPS * 2 * THREADS), counterAfterRestart::getValue, 5000, 100);
+ Assert.assertEquals(BUMPS * 2 * THREADS, counterAfterRestart.getValue());
+
+ } finally {
+ sf.close();
+ session.close();
+ }
+ }
+
@Test
public void testCleanupCounter() throws Exception {
ClientSessionFactory sf = createSessionFactory(sl);
@@ -216,6 +281,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
server.stop();
server = newActiveMQServer();
+ server.setRebuildCounters(false);
server.start();
@@ -228,6 +294,11 @@ public class PagingCounterTest extends ActiveMQTestBase {
assertEquals(2100, counter.getValue());
assertEquals(2100 * 1000, counter.getPersistentSize());
+ server.getPagingManager().rebuildCounters();
+
+ // it should be zero after rebuild, since no actual messages were sent
+ Wait.assertEquals(0, counter::getValue);
+
} finally {
sf.close();
session.close();
@@ -246,8 +317,6 @@ public class PagingCounterTest extends ActiveMQTestBase {
PageSubscriptionCounter counter = locateCounter(queue);
- ((PageSubscriptionCounterImpl) counter).setPersistent(false);
-
StorageManager storage = server.getStorageManager();
Transaction tx = new TransactionImpl(server.getStorageManager());
@@ -321,7 +390,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
server.stop();
+
server = newActiveMQServer();
+ server.setRebuildCounters(false);
server.start();
@@ -329,10 +400,29 @@ public class PagingCounterTest extends ActiveMQTestBase {
assertNotNull(queue);
- counter = locateCounter(queue);
+ PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
+
+ Wait.assertEquals(1, counterAfterRestart::getValue);
+ Wait.assertEquals(1000, counterAfterRestart::getPersistentSize);
- assertEquals(1, counter.getValue());
- assertEquals(1000, counter.getPersistentSize());
+ counterAfterRestart.markRebuilding();
+
+ // should be using a previously added value while rebuilding
+ Wait.assertEquals(1, counterAfterRestart::getValue);
+
+ tx = new TransactionImpl(server.getStorageManager());
+
+ counterAfterRestart.increment(tx, 10, 10_000);
+ tx.commit();
+
+ Wait.assertEquals(11, counterAfterRestart::getValue);
+ Wait.assertEquals(11_000, counterAfterRestart::getPersistentSize);
+ counterAfterRestart.finishRebuild();
+
+ server.getPagingManager().rebuildCounters();
+
+ Wait.assertEquals(0, counterAfterRestart::getValue);
+ Wait.assertEquals(0, counterAfterRestart::getPersistentSize);
}
@@ -349,7 +439,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
}
@Test
- public void testPrepareCounter() throws Exception {
+ public void testCommitCounter() throws Exception {
Xid xid = newXID();
Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
@@ -366,19 +456,19 @@ public class PagingCounterTest extends ActiveMQTestBase {
assertEquals(0, counter.getValue());
- tx.prepare();
+ tx.commit();
storage.waitOnOperations();
- assertEquals(0, counter.getValue());
+ assertEquals(2000, counter.getValue());
server.stop();
server = newActiveMQServer();
- server.start();
+ server.setRebuildCounters(false);
- storage = server.getStorageManager();
+ server.start();
queue = server.locateQueue(new SimpleString("A1"));
@@ -386,16 +476,6 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
- tx = server.getResourceManager().removeTransaction(xid, null);
-
- assertNotNull(tx);
-
- assertEquals(0, counter.getValue());
-
- tx.commit(false);
-
- storage.waitOnOperations();
-
Wait.assertEquals(2000, counter::getValue);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index 381e14e27a..8192a1e54f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -34,8 +34,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -62,7 +60,6 @@ public class PagingSendTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
- Configuration config = new ConfigurationImpl();
server = newActiveMQServer();
server.start();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index f29ad471d5..a76465988d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -651,6 +651,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
paging.start();
+ runAfter(paging::stop);
return paging;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
index 00ced9ee10..d3a69d2202 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
@@ -150,9 +150,9 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer("flowcontrol");
ClientMessage message = session.createMessage(true);
message.writeBodyBufferBytes(body);
- logger.info("try to send a message after replicated");
+ logger.debug("try to send a message after replicated");
producer.send(message);
- logger.info("send message done");
+ logger.debug("send message done");
producer.close();
session.close();
@@ -187,8 +187,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
// ignore
}
- logger.info("got live message {} {}", info.id, info.userRecordType);
liveJournalCounter.incrementAndGet();
+ logger.info("got live message {} {}, counter={}", info.id, info.userRecordType, liveJournalCounter.get());
}
});
@@ -207,8 +207,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) {
// ignore
}
- logger.info("replicated message {}", info.id);
replicationCounter.incrementAndGet();
+ logger.info("replicated message {}, counter={}", info.id, replicationCounter.get());
}
});
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 0426f55706..5297e6dcf0 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -245,6 +245,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
class FakePagingStore implements PagingStore {
+ @Override
+ public void counterSnapshot() {
+ }
+
@Override
public void execute(Runnable runnable) {
runnable.run();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
index 713ce21d44..68024e4fce 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
@@ -61,6 +61,8 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
managerImpl.start();
+ runAfter(managerImpl::stop);
+
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10));
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index bed3ea5ad6..2012848010 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -39,6 +39,11 @@ public class FakePagingManager implements PagingManager {
}
+ @Override
+ public void counterSnapshot() {
+
+ }
+
@Override
public void addTransaction(final PageTransactionInfo pageTransaction) {
}