You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/14 20:32:03 UTC

[activemq-artemis] branch main updated: NO-JIRA Adding a test to validating paging reload

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f63e6be82 NO-JIRA Adding a test to validating paging reload
3f63e6be82 is described below

commit 3f63e6be82ffdf4f62ac29d8125f4d891e21c503
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Oct 14 16:31:35 2022 -0400

    NO-JIRA Adding a test to validating paging reload
    
    no issues found as part of this test. Just adding a new test.
---
 .../tests/soak/paging/MegaCleanerPagingTest.java   | 179 +++++++++++++++++++--
 1 file changed, 165 insertions(+), 14 deletions(-)

diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
index 4f0ccf3e2c..32cae433f6 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
@@ -25,6 +25,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -61,35 +62,49 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
 
    @Test
    public void testCleanup() throws Throwable {
-      testCleanup(false);
+      if (DIRECT_CALL) {
+         internalTestRegular();
+      } else {
+         remoteCall("internalTestRegular");
+      }
    }
 
    @Test
    public void testCleanupMidstream() throws Throwable {
-      testCleanup(true);
-   }
-
-   public void testCleanup(boolean midstream) throws Throwable {
-
       if (DIRECT_CALL) {
-         internalTest(midstream);
+         internalTestMidstream();
       } else {
-         // Using a spawn to limit memory consumption to the test
-         Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir(), "" + midstream);
-         logger.debug("process PID {}", process.pid());
-         Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
-         Assert.assertEquals(OK, process.exitValue());
+         remoteCall("internalTestMidstream");
       }
    }
 
+
+   @Test
+   public void testRestart() throws Throwable {
+      remoteCall("populate");
+      System.out.println("Resuming...");
+      remoteCall("resume");
+   }
+
+   private void remoteCall(String methodName) throws Exception {
+      // Using a spawn to limit memory consumption to the test
+      Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir(), methodName);
+      logger.debug("process PID {}", process.pid());
+      Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
+      Assert.assertEquals(OK, process.exitValue());
+   }
+
    // I am using a separate VM to limit memory..
    // and the test will pass the parameters needed for the JUnit Class to be able to proceed
    public static void main(String[] arg) {
       try {
          MegaCleanerPagingTest megaCleanerPagingTest = new MegaCleanerPagingTest();
          megaCleanerPagingTest.setTestDir(arg[0]);
-         boolean midstream = Boolean.parseBoolean(arg[1]);
-         megaCleanerPagingTest.internalTest(midstream);
+         String methodName = arg[1];
+
+
+         Method method = megaCleanerPagingTest.getClass().getMethod(methodName);
+         method.invoke(megaCleanerPagingTest);
          System.exit(OK);
       } catch (Throwable e) {
          e.printStackTrace();
@@ -98,6 +113,142 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
       }
    }
 
+   public void internalTestMidstream() throws Throwable {
+      internalTest(true);
+   }
+
+   public void internalTestRegular() throws Throwable {
+      internalTest(false);
+   }
+
+   public void populate() throws Throwable {
+      ActiveMQServer server = createServer(true, true);
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
+      server.start();
+
+      // if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
+      // if more memory is set to the JUNIT Runner.
+      // This is the main reason we limit memory on this test
+      int NUMBER_OF_MESSAGES = 100_000;
+
+      String queueName = "testPageAndDepage";
+
+      server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+      Assert.assertNotNull(serverQueue);
+      serverQueue.getPagingStore().startPaging();
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616?consumerWindowSize=0");
+      Assert.assertEquals(0, ((ActiveMQConnectionFactory)cf).getServerLocator().getConsumerWindowSize());
+
+      final int SIZE = 10 * 1024;
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         producer.send(session.createTextMessage(createBuffer(i, SIZE)));
+         if (i % 1000 == 0) {
+            logger.debug("sent {} messages", i);
+            session.commit();
+         }
+      }
+      session.commit();
+
+
+      PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
+      store.disableCleanup();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES / 2; i++) {
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(createBuffer(i, SIZE), message.getText());
+
+         if (i % 1000 == 0) {
+            logger.debug("received {} messages", i);
+            session.commit();
+         }
+      }
+      session.commit();
+      connection.close();
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+
+      server.stop();
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
+   }
+
+
+   public void resume() throws Throwable {
+      ActiveMQServer server = createServer(true, true);
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
+      server.start();
+
+      // if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
+      // if more memory is set to the JUNIT Runner.
+      // This is the main reason we limit memory on this test
+      int NUMBER_OF_MESSAGES = 100_000;
+
+      String queueName = "testPageAndDepage";
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      Queue queue = session.createQueue(queueName);
+
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+      Assert.assertNotNull(serverQueue);
+      serverQueue.getPagingStore().startPaging();
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616?consumerWindowSize=0");
+      Assert.assertEquals(0, ((ActiveMQConnectionFactory)cf).getServerLocator().getConsumerWindowSize());
+
+      final int SIZE = 10 * 1024;
+      session.commit();
+
+
+      PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
+      store.disableCleanup();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = NUMBER_OF_MESSAGES / 2; i < NUMBER_OF_MESSAGES; i++) {
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(createBuffer(i, SIZE), message.getText());
+
+         if (i % 1000 == 0) {
+            logger.debug("received {} messages", i);
+            session.commit();
+         }
+      }
+      session.commit();
+      Assert.assertNull(consumer.receiveNoWait());
+      connection.close();
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+      store.getCursorProvider().resumeCleanup();
+
+      server.stop();
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
+   }
+
    public void internalTest(boolean midstream) throws Throwable {
       ActiveMQServer server = createServer(true, true);
       server.getConfiguration().clearAddressSettings();