You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/05/23 18:42:17 UTC
[activemq-artemis] 05/07: fixing tests
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch new-paging
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 8d58243b2dddab4c737e2c2431a76c5ca9fb1ca2
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu May 19 14:45:32 2022 -0400
fixing tests
---
.../paging/cursor/impl/PageCursorProviderImpl.java | 3 +-
.../paging/cursor/impl/PageSubscriptionImpl.java | 140 +++++++++++----------
.../tests/integration/paging/PagingTest.java | 12 +-
3 files changed, 86 insertions(+), 69 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index de9efb5ab2..f0c1c9828f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -266,7 +266,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
return;
}
- if (pagingStore.getNumberOfPages() == 0) {
+ if (!pagingStore.isPaging()) {
+ logger.trace("Paging Store was not paging, so no reason to retry the cleanup");
return;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index fad96de84a..c3d4b895a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -432,90 +432,95 @@ public final class PageSubscriptionImpl implements PageSubscription {
if (completeDelete) {
counter.delete();
}
- if (logger.isTraceEnabled()) {
- logger.trace("cleanupEntries", new Exception("trace"));
- }
- Transaction tx = new TransactionImpl(store);
+ logger.trace(">>>>>>> cleanupEntries");
+ try {
+ Transaction tx = new TransactionImpl(store);
- boolean persist = false;
+ boolean persist = false;
- final ArrayList<PageCursorInfo> completedPages = new ArrayList<>();
+ final ArrayList<PageCursorInfo> completedPages = new ArrayList<>();
- // First get the completed pages using a lock
- synchronized (consumedPages) {
- // lastAckedPosition = null means no acks were done yet, so we are not ready to cleanup
- if (lastAckedPosition == null) {
- return;
- }
+ // First get the completed pages using a lock
+ synchronized (consumedPages) {
+ // lastAckedPosition = null means no acks were done yet, so we are not ready to cleanup
+ if (lastAckedPosition == null) {
+ return;
+ }
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) {
- PageCursorInfo info = entry.getValue();
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) {
+ PageCursorInfo info = entry.getValue();
- if (info.isDone() && !info.isPendingDelete()) {
- Page currentPage = pageStore.getCurrentPage();
+ if (info.isDone() && !info.isPendingDelete()) {
+ Page currentPage = pageStore.getCurrentPage();
- if (currentPage != null && entry.getKey() == pageStore.getCurrentPage().getPageId()) {
- logger.tracef("We can't clear page %s 's the current page", entry.getKey());
- } else {
- info.setPendingDelete();
- completedPages.add(entry.getValue());
+ if (currentPage != null && entry.getKey() == pageStore.getCurrentPage().getPageId()) {
+ logger.tracef("We can't clear page %s 's the current page", entry.getKey());
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("cleanup marking page %s as complete", info.pageId);
+ }
+ info.setPendingDelete();
+ completedPages.add(entry.getValue());
+ }
}
}
}
- }
-
- for (PageCursorInfo infoPG : completedPages) {
- // HORNETQ-1017: There are a few cases where a pending transaction might set a big hole on the page system
- // where we need to ignore these pages in case of a restart.
- // for that reason when we delete complete ACKs we store a single record per page file that will
- // be removed once the page file is deleted
- // notice also that this record is added as part of the same transaction where the information is deleted.
- // In case of a TX Failure (a crash on the server) this will be recovered on the next cleanup once the
- // server is restarted.
- // first will mark the page as complete
- if (isPersistent()) {
- PagePosition completePage = new PagePositionImpl(infoPG.getPageId(), infoPG.getNumberOfMessages());
- infoPG.setCompleteInfo(completePage);
- store.storePageCompleteTransactional(tx.getID(), this.getId(), completePage);
- if (!persist) {
- persist = true;
- tx.setContainsPersistent();
- }
- }
- // it will delete the page ack records
- for (PagePosition pos : infoPG.acks.values()) {
- if (pos.getRecordID() >= 0) {
- store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+ for (PageCursorInfo infoPG : completedPages) {
+ // HORNETQ-1017: There are a few cases where a pending transaction might set a big hole on the page system
+ // where we need to ignore these pages in case of a restart.
+ // for that reason when we delete complete ACKs we store a single record per page file that will
+ // be removed once the page file is deleted
+ // notice also that this record is added as part of the same transaction where the information is deleted.
+ // In case of a TX Failure (a crash on the server) this will be recovered on the next cleanup once the
+ // server is restarted.
+ // first will mark the page as complete
+ if (isPersistent()) {
+ PagePosition completePage = new PagePositionImpl(infoPG.getPageId(), infoPG.getNumberOfMessages());
+ infoPG.setCompleteInfo(completePage);
+ store.storePageCompleteTransactional(tx.getID(), this.getId(), completePage);
if (!persist) {
- // only need to set it once
- tx.setContainsPersistent();
persist = true;
+ tx.setContainsPersistent();
}
}
- }
- infoPG.acks.clear();
- infoPG.removedReferences.clear();
- }
+ // it will delete the page ack records
+ for (PagePosition pos : infoPG.acks.values()) {
+ if (pos.getRecordID() >= 0) {
+ store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+ if (!persist) {
+ // only need to set it once
+ tx.setContainsPersistent();
+ persist = true;
+ }
+ }
+ }
- tx.addOperation(new TransactionOperationAbstract() {
+ infoPG.acks.clear();
+ infoPG.removedReferences.clear();
+ }
- @Override
- public void afterCommit(final Transaction tx1) {
- pageStore.execute(new Runnable() {
+ tx.addOperation(new TransactionOperationAbstract() {
+
+ @Override
+ public void afterCommit(final Transaction tx1) {
+ pageStore.execute(new Runnable() {
- @Override
- public void run() {
- if (!completeDelete) {
- cursorProvider.scheduleCleanup();
+ @Override
+ public void run() {
+ if (!completeDelete) {
+ cursorProvider.scheduleCleanup();
+ }
}
- }
- });
- }
- });
+ });
+ }
+ });
- tx.commit();
+ tx.commit();
+ } finally {
+ logger.trace("<<<<<< cleanupEntries");
+ }
}
@@ -1008,6 +1013,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
*/
private void onPageDone(final PageCursorInfo info) {
if (autoCleanup) {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("onPageDone page %s", info.getPageId());
+ }
scheduleCleanupCheck();
}
}
@@ -1108,7 +1116,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get());
}
- return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
+ // in cases where the file was damaged it is possible to get more confirmed records than we actually had messages
+ // for that case we set confirmed.get() >= getNumberOfMessages instead of ==
+ return completePage != null || (confirmed.get() >= getNumberOfMessages() && pendingTX.get() == 0);
}
public boolean isPendingDelete() {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 655d19c3bd..36e178969b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -816,9 +816,11 @@ public class PagingTest extends ActiveMQTestBase {
queue.getPagingStore().forceAnotherPage(); // forcing an empty file, just to make it more challenging
+ int page = 1;
for (int i = 0; i < numberOfMessages; i++) {
if (i % 10 == 0 && i > 0) {
queue.getPagingStore().forceAnotherPage();
+ page++;
}
message = session.createMessage(true);
@@ -827,6 +829,7 @@ public class PagingTest extends ActiveMQTestBase {
bodyLocal.writeBytes(body);
message.putIntProperty("i", i);
+ message.putIntProperty("page", page);
producer.send(message);
}
@@ -895,6 +898,9 @@ public class PagingTest extends ActiveMQTestBase {
sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
+ logger.info("*******************************************************************************************************************************");
+ logger.info("Creating consumer");
+
consumer = session.createConsumer(ADDRESS);
session.start();
@@ -910,7 +916,7 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertNull(msgClient);
session.commit();
- Wait.assertFalse(queue.getPagingStore()::isPaging);
+ Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
}
@@ -6860,13 +6866,13 @@ public class PagingTest extends ActiveMQTestBase {
PageCursorProviderAccessor.cleanup(store.getCursorProvider());
- Wait.assertTrue(store::isPaging, 5000, 100);
+ Wait.assertFalse(store::isPaging, 5000, 100);
sf.close();
locator.close();
- Wait.assertEquals(0, store::getNumberOfPages);
+ Wait.assertEquals(1L, store::getNumberOfPages, 5000, 100);
} finally {
try {