You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/06 11:53:44 UTC

[activemq-artemis] branch main updated: ARTEMIS-4029 Avoid OME When too many pages are cleared all at once

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 42d07458e9 ARTEMIS-4029 Avoid OME When too many pages are cleared all at once
42d07458e9 is described below

commit 42d07458e94f99d751bb736490f138d33e4a3d14
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 5 12:55:21 2022 -0400

    ARTEMIS-4029 Avoid OME When too many pages are cleared all at once
    
    The issue is that depage should not put pages on the used pages as they were not actually intended to read.
    instead I should create a newPageObject and not use the RefCounts caching.
---
 .../paging/cursor/impl/PageCursorProviderImpl.java |   2 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  12 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../tests/soak/paging/MegaCleanerPagingTest.java   | 154 +++++++++++++++++++++
 4 files changed, 167 insertions(+), 3 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 68e8889064..7d784792d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -279,7 +279,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                      logger.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() + " as numberOfPages == " + pagingStore.getNumberOfPages() + " and currentPage.numberOfMessages = " + pagingStore.getCurrentPage().getNumberOfMessages());
                   }
                }
-            } catch (Exception ex) {
+            } catch (Throwable ex) {
                ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(pagingStore.getAddress(), ex);
                logger.warn(ex.getMessage(), ex);
                return;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index ddad68294c..f51fc4798c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -817,7 +817,17 @@ public class PagingStoreImpl implements PagingStore {
                }
             } else {
                logger.trace("firstPageId++ = beforeIncrement={}", firstPageId);
-               returnPage = usePage(firstPageId++);
+               long pageNR = firstPageId++;
+
+               // first we look for the page on the used Pages cache
+               // if non existing, we just create a new one outside of the cache
+               // as we should not introduce any extras
+               Page usedPage = usePage(pageNR, false);
+               if (usedPage == null) {
+                  returnPage = newPageObject(pageNR);
+               } else {
+                  returnPage = usedPage;
+               }
             }
 
             if (!returnPage.getFile().exists()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 53dbe5b791..4a62d6352f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -398,7 +398,7 @@ public interface ActiveMQServerLogger {
    void timedOutStoppingPagingCursor(Executor executor);
 
    @LogMessage(id = 222023, value = "problem cleaning page address {}", level = LogMessage.Level.WARN)
-   void problemCleaningPageAddress(SimpleString address, Exception e);
+   void problemCleaningPageAddress(SimpleString address, Throwable e);
 
    @LogMessage(id = 222024, value = "Could not complete operations on IO context {}", level = LogMessage.Level.WARN)
    void problemCompletingOperations(OperationContext e);
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
new file mode 100644
index 0000000000..3bb7c6600a
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PageCleanup should still be able to perform it well.
+ * */
+public class MegaCleanerPagingTest extends ActiveMQTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final int OK = 35; // Abitrary code the spawn must return if ok.
+
+   @Test
+   public void testCleanup() throws Exception {
+      // Using a spawn to limit memory consumption to the test
+      Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir());
+      logger.debug("process PID {}", process.pid());
+      Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
+      Assert.assertEquals(OK, process.exitValue());
+   }
+
+   // I am using a separate VM to limit memory..
+   // and the test will pass the parameters needed for the JUnit Class to be able to proceed
+   public static void main(String[] arg) {
+      try {
+         MegaCleanerPagingTest megaCleanerPagingTest = new MegaCleanerPagingTest();
+         megaCleanerPagingTest.setTestDir(arg[0]);
+         megaCleanerPagingTest.internalTest();
+         System.exit(OK);
+      } catch (Throwable e) {
+         e.printStackTrace();
+         logger.warn(e.getMessage(), e);
+         System.exit(-1);
+      }
+   }
+
+   public void internalTest() throws Throwable {
+      ActiveMQServer server = createServer(true, true);
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
+      server.start();
+
+      // if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
+      // if more memory is set to the JUNIT Runner.
+      // This is the main reason we limit memory on this test
+      int NUMBER_OF_MESSAGES = 50_000;
+
+      String queueName = "testPageAndDepage";
+
+      server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+
+
+      final int SIZE = 10 * 1024;
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         producer.send(session.createTextMessage(createBuffer(i, SIZE)));
+         if (i % 1000 == 0) {
+            logger.debug("sent {} messages", i);
+            session.commit();
+         }
+      }
+      session.commit();
+
+
+      PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
+      store.disableCleanup();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(createBuffer(i, SIZE), message.getText());
+
+         if (i % 1000 == 0) {
+            logger.debug("received {} messages", i);
+            session.commit();
+         }
+      }
+      session.commit();
+      Assert.assertNull(consumer.receiveNoWait());
+      connection.close();
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+      store.enableCleanup();
+      store.getCursorProvider().scheduleCleanup();
+      Wait.assertFalse(store::isPaging);
+      server.stop();
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
+   }
+
+
+   String createBuffer(int msgNumber, int size) {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append("message " + msgNumber + " ");
+      while (buffer.length() < size) {
+         buffer.append(" Lorem Ipsum Whatever it's saying in there... ");
+      }
+      return buffer.toString();
+   }
+}
\ No newline at end of file