You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/05/23 18:42:17 UTC

[activemq-artemis] 05/07: fixing tests

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch new-paging
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 8d58243b2dddab4c737e2c2431a76c5ca9fb1ca2
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu May 19 14:45:32 2022 -0400

    fixing tests
---
 .../paging/cursor/impl/PageCursorProviderImpl.java |   3 +-
 .../paging/cursor/impl/PageSubscriptionImpl.java   | 140 +++++++++++----------
 .../tests/integration/paging/PagingTest.java       |  12 +-
 3 files changed, 86 insertions(+), 69 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index de9efb5ab2..f0c1c9828f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -266,7 +266,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                   return;
                }
 
-               if (pagingStore.getNumberOfPages() == 0) {
+               if (!pagingStore.isPaging()) {
+                  logger.trace("Paging Store was not paging, so no reason to retry the cleanup");
                   return;
                }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index fad96de84a..c3d4b895a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -432,90 +432,95 @@ public final class PageSubscriptionImpl implements PageSubscription {
       if (completeDelete) {
          counter.delete();
       }
-      if (logger.isTraceEnabled()) {
-         logger.trace("cleanupEntries", new Exception("trace"));
-      }
-      Transaction tx = new TransactionImpl(store);
+      logger.trace(">>>>>>> cleanupEntries");
+      try {
+         Transaction tx = new TransactionImpl(store);
 
-      boolean persist = false;
+         boolean persist = false;
 
-      final ArrayList<PageCursorInfo> completedPages = new ArrayList<>();
+         final ArrayList<PageCursorInfo> completedPages = new ArrayList<>();
 
-      // First get the completed pages using a lock
-      synchronized (consumedPages) {
-         // lastAckedPosition = null means no acks were done yet, so we are not ready to cleanup
-         if (lastAckedPosition == null) {
-            return;
-         }
+         // First get the completed pages using a lock
+         synchronized (consumedPages) {
+            // lastAckedPosition = null means no acks were done yet, so we are not ready to cleanup
+            if (lastAckedPosition == null) {
+               return;
+            }
 
-         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) {
-            PageCursorInfo info = entry.getValue();
+            for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) {
+               PageCursorInfo info = entry.getValue();
 
-            if (info.isDone() && !info.isPendingDelete()) {
-               Page currentPage = pageStore.getCurrentPage();
+               if (info.isDone() && !info.isPendingDelete()) {
+                  Page currentPage = pageStore.getCurrentPage();
 
-               if (currentPage != null && entry.getKey() == pageStore.getCurrentPage().getPageId()) {
-                  logger.tracef("We can't clear page %s 's the current page", entry.getKey());
-               } else {
-                  info.setPendingDelete();
-                  completedPages.add(entry.getValue());
+                  if (currentPage != null && entry.getKey() == pageStore.getCurrentPage().getPageId()) {
+                     logger.tracef("We can't clear page %s 's the current page", entry.getKey());
+                  } else {
+                     if (logger.isTraceEnabled()) {
+                        logger.tracef("cleanup marking page %s as complete", info.pageId);
+                     }
+                     info.setPendingDelete();
+                     completedPages.add(entry.getValue());
+                  }
                }
             }
          }
-      }
-
-      for (PageCursorInfo infoPG : completedPages) {
-         // HORNETQ-1017: There are a few cases where a pending transaction might set a big hole on the page system
-         //               where we need to ignore these pages in case of a restart.
-         //               for that reason when we delete complete ACKs we store a single record per page file that will
-         //               be removed once the page file is deleted
-         //               notice also that this record is added as part of the same transaction where the information is deleted.
-         //               In case of a TX Failure (a crash on the server) this will be recovered on the next cleanup once the
-         //               server is restarted.
-         // first will mark the page as complete
-         if (isPersistent()) {
-            PagePosition completePage = new PagePositionImpl(infoPG.getPageId(), infoPG.getNumberOfMessages());
-            infoPG.setCompleteInfo(completePage);
-            store.storePageCompleteTransactional(tx.getID(), this.getId(), completePage);
-            if (!persist) {
-               persist = true;
-               tx.setContainsPersistent();
-            }
-         }
 
-         // it will delete the page ack records
-         for (PagePosition pos : infoPG.acks.values()) {
-            if (pos.getRecordID() >= 0) {
-               store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+         for (PageCursorInfo infoPG : completedPages) {
+            // HORNETQ-1017: There are a few cases where a pending transaction might set a big hole on the page system
+            //               where we need to ignore these pages in case of a restart.
+            //               for that reason when we delete complete ACKs we store a single record per page file that will
+            //               be removed once the page file is deleted
+            //               notice also that this record is added as part of the same transaction where the information is deleted.
+            //               In case of a TX Failure (a crash on the server) this will be recovered on the next cleanup once the
+            //               server is restarted.
+            // first will mark the page as complete
+            if (isPersistent()) {
+               PagePosition completePage = new PagePositionImpl(infoPG.getPageId(), infoPG.getNumberOfMessages());
+               infoPG.setCompleteInfo(completePage);
+               store.storePageCompleteTransactional(tx.getID(), this.getId(), completePage);
                if (!persist) {
-                  // only need to set it once
-                  tx.setContainsPersistent();
                   persist = true;
+                  tx.setContainsPersistent();
                }
             }
-         }
 
-         infoPG.acks.clear();
-         infoPG.removedReferences.clear();
-      }
+            // it will delete the page ack records
+            for (PagePosition pos : infoPG.acks.values()) {
+               if (pos.getRecordID() >= 0) {
+                  store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+                  if (!persist) {
+                     // only need to set it once
+                     tx.setContainsPersistent();
+                     persist = true;
+                  }
+               }
+            }
 
-      tx.addOperation(new TransactionOperationAbstract() {
+            infoPG.acks.clear();
+            infoPG.removedReferences.clear();
+         }
 
-         @Override
-         public void afterCommit(final Transaction tx1) {
-            pageStore.execute(new Runnable() {
+         tx.addOperation(new TransactionOperationAbstract() {
+
+            @Override
+            public void afterCommit(final Transaction tx1) {
+               pageStore.execute(new Runnable() {
 
-               @Override
-               public void run() {
-                  if (!completeDelete) {
-                     cursorProvider.scheduleCleanup();
+                  @Override
+                  public void run() {
+                     if (!completeDelete) {
+                        cursorProvider.scheduleCleanup();
+                     }
                   }
-               }
-            });
-         }
-      });
+               });
+            }
+         });
 
-      tx.commit();
+         tx.commit();
+      } finally {
+         logger.trace("<<<<<< cleanupEntries");
+      }
 
    }
 
@@ -1008,6 +1013,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
     */
    private void onPageDone(final PageCursorInfo info) {
       if (autoCleanup) {
+         if (logger.isTraceEnabled()) {
+            logger.tracef("onPageDone page %s", info.getPageId());
+         }
          scheduleCleanupCheck();
       }
    }
@@ -1108,7 +1116,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
             logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get());
 
          }
-         return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
+         // in cases where the file was damaged it is possible to get more confirmed records than we actually had messages
+         // for that case we set confirmed.get() >= getNumberOfMessages instead of ==
+         return completePage != null || (confirmed.get() >= getNumberOfMessages() && pendingTX.get() == 0);
       }
 
       public boolean isPendingDelete() {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 655d19c3bd..36e178969b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -816,9 +816,11 @@ public class PagingTest extends ActiveMQTestBase {
 
       queue.getPagingStore().forceAnotherPage(); // forcing an empty file, just to make it more challenging
 
+      int page = 1;
       for (int i = 0; i < numberOfMessages; i++) {
          if (i % 10 == 0 && i > 0) {
             queue.getPagingStore().forceAnotherPage();
+            page++;
          }
          message = session.createMessage(true);
 
@@ -827,6 +829,7 @@ public class PagingTest extends ActiveMQTestBase {
          bodyLocal.writeBytes(body);
 
          message.putIntProperty("i", i);
+         message.putIntProperty("page", page);
 
          producer.send(message);
       }
@@ -895,6 +898,9 @@ public class PagingTest extends ActiveMQTestBase {
       sf = createSessionFactory(locator);
       session = sf.createSession(false, true, true);
 
+      logger.info("*******************************************************************************************************************************");
+      logger.info("Creating consumer");
+
       consumer = session.createConsumer(ADDRESS);
       session.start();
 
@@ -910,7 +916,7 @@ public class PagingTest extends ActiveMQTestBase {
       Assert.assertNull(msgClient);
       session.commit();
 
-      Wait.assertFalse(queue.getPagingStore()::isPaging);
+      Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
    }
 
 
@@ -6860,13 +6866,13 @@ public class PagingTest extends ActiveMQTestBase {
 
          PageCursorProviderAccessor.cleanup(store.getCursorProvider());
 
-         Wait.assertTrue(store::isPaging, 5000, 100);
+         Wait.assertFalse(store::isPaging, 5000, 100);
 
          sf.close();
 
          locator.close();
 
-         Wait.assertEquals(0, store::getNumberOfPages);
+         Wait.assertEquals(1L, store::getNumberOfPages, 5000, 100);
 
       } finally {
          try {