You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/07/12 18:22:42 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1958 Artemis may not be able to delete pages when there are some empty page files

ARTEMIS-1958 Artemis may not be able to delete pages when there are some empty page files


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5ec22340
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5ec22340
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5ec22340

Branch: refs/heads/master
Commit: 5ec2234010dfad8c7ff2e49f2505ea44dba9388a
Parents: 8d9ec3e
Author: 17103355 <17...@cnsuning.com>
Authored: Thu Jun 28 20:14:50 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 12 14:11:16 2018 -0400

----------------------------------------------------------------------
 .../cursor/impl/PageSubscriptionImpl.java       | 21 +++++
 .../tests/integration/paging/PagingTest.java    | 93 ++++++++++++++++++++
 2 files changed, 114 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5ec22340/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index e1c9537..5fec9a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -383,19 +383,29 @@ final class PageSubscriptionImpl implements PageSubscription {
 
       PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
 
+      PageCache emptyCache = null;
       if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
+         emptyCache = cache;
+         saveEmptyPageAsConsumedPage(emptyCache);
          // The next message is beyond what's available at the current page, so we need to move to the next page
          cache = null;
       }
 
       // it will scan for the next available page
       while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) {
+         emptyCache = cache;
          retPos = moveNextPage(retPos);
 
          cache = cursorProvider.getPageCache(retPos.getPageNr());
+
+         if (cache != null) {
+            saveEmptyPageAsConsumedPage(emptyCache);
+         }
       }
 
       if (cache == null) {
+         saveEmptyPageAsConsumedPage(emptyCache);
+
          // it will be null in the case of the current writing page
          return null;
       } else {
@@ -787,6 +797,17 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    }
 
+   private void saveEmptyPageAsConsumedPage(final PageCache cache) {
+      if (cache != null && cache.getNumberOfMessages() == 0) {
+         synchronized (consumedPages) {
+            PageCursorInfo pageInfo = consumedPages.get(cache.getPageId());
+            if (pageInfo == null) {
+               consumedPages.put(cache.getPageId(), new PageCursorInfo(cache.getPageId(), cache.getNumberOfMessages(), cache));
+            }
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5ec22340/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index bc09fa1..566142d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -5405,6 +5405,99 @@ public class PagingTest extends ActiveMQTestBase {
       internalTestMultiFilters(false);
    }
 
+   @Test
+   public void testPageEmptyFile() throws Exception {
+      boolean persistentMessages = true;
+
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 100;
+
+      try {
+         ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+         store.forceAnotherPage();
+         store.forceAnotherPage();
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            message = session.createMessage(persistentMessages);
+
+            ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            producer.send(message);
+         }
+
+         session.commit();
+
+         Queue queue = server.locateQueue(PagingTest.ADDRESS);
+         Assert.assertEquals(numberOfMessages, queue.getMessageCount());
+
+         store.forceAnotherPage();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++) {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+         }
+
+         session.commit();
+
+         assertNull(consumer.receiveImmediate());
+
+         consumer.close();
+
+         store.getCursorProvider().cleanup();
+
+         Assert.assertEquals(0, queue.getMessageCount());
+
+         long timeout = System.currentTimeMillis() + 5000;
+         while (store.isPaging() && timeout > System.currentTimeMillis()) {
+            Thread.sleep(100);
+         }
+
+         store.getCursorProvider().cleanup();
+
+         sf.close();
+
+         locator.close();
+
+         Assert.assertEquals(1, store.getNumberOfPages());
+
+      } finally {
+         try {
+            server.stop();
+         } catch (Throwable ignored) {
+         }
+      }
+   }
+
    public void internalTestMultiFilters(boolean browsing) throws Throwable {
       clearDataRecreateServerDirs();