You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/14 20:32:03 UTC
[activemq-artemis] branch main updated: NO-JIRA Adding a test to validating paging reload
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3f63e6be82 NO-JIRA Adding a test to validating paging reload
3f63e6be82 is described below
commit 3f63e6be82ffdf4f62ac29d8125f4d891e21c503
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Oct 14 16:31:35 2022 -0400
NO-JIRA Adding a test to validating paging reload
no issues found as part of this test. Just adding a new test.
---
.../tests/soak/paging/MegaCleanerPagingTest.java | 179 +++++++++++++++++++--
1 file changed, 165 insertions(+), 14 deletions(-)
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
index 4f0ccf3e2c..32cae433f6 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/MegaCleanerPagingTest.java
@@ -25,6 +25,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -61,35 +62,49 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
@Test
public void testCleanup() throws Throwable {
- testCleanup(false);
+ if (DIRECT_CALL) {
+ internalTestRegular();
+ } else {
+ remoteCall("internalTestRegular");
+ }
}
@Test
public void testCleanupMidstream() throws Throwable {
- testCleanup(true);
- }
-
- public void testCleanup(boolean midstream) throws Throwable {
-
if (DIRECT_CALL) {
- internalTest(midstream);
+ internalTestMidstream();
} else {
- // Using a spawn to limit memory consumption to the test
- Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir(), "" + midstream);
- logger.debug("process PID {}", process.pid());
- Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
- Assert.assertEquals(OK, process.exitValue());
+ remoteCall("internalTestMidstream");
}
}
+
+ @Test
+ public void testRestart() throws Throwable {
+ remoteCall("populate");
+ System.out.println("Resuming...");
+ remoteCall("resume");
+ }
+
+ private void remoteCall(String methodName) throws Exception {
+ // Using a spawn to limit memory consumption to the test
+ Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir(), methodName);
+ logger.debug("process PID {}", process.pid());
+ Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
+ Assert.assertEquals(OK, process.exitValue());
+ }
+
// I am using a separate VM to limit memory..
// and the test will pass the parameters needed for the JUnit Class to be able to proceed
public static void main(String[] arg) {
try {
MegaCleanerPagingTest megaCleanerPagingTest = new MegaCleanerPagingTest();
megaCleanerPagingTest.setTestDir(arg[0]);
- boolean midstream = Boolean.parseBoolean(arg[1]);
- megaCleanerPagingTest.internalTest(midstream);
+ String methodName = arg[1];
+
+
+ Method method = megaCleanerPagingTest.getClass().getMethod(methodName);
+ method.invoke(megaCleanerPagingTest);
System.exit(OK);
} catch (Throwable e) {
e.printStackTrace();
@@ -98,6 +113,142 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
}
}
+ public void internalTestMidstream() throws Throwable {
+ internalTest(true);
+ }
+
+ public void internalTestRegular() throws Throwable {
+ internalTest(false);
+ }
+
+ public void populate() throws Throwable {
+ ActiveMQServer server = createServer(true, true);
+ server.getConfiguration().clearAddressSettings();
+ server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
+ server.start();
+
+ // if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
+ // if more memory is set to the JUNIT Runner.
+ // This is the main reason we limit memory on this test
+ int NUMBER_OF_MESSAGES = 100_000;
+
+ String queueName = "testPageAndDepage";
+
+ server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+ Assert.assertNotNull(serverQueue);
+ serverQueue.getPagingStore().startPaging();
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616?consumerWindowSize=0");
+ Assert.assertEquals(0, ((ActiveMQConnectionFactory)cf).getServerLocator().getConsumerWindowSize());
+
+ final int SIZE = 10 * 1024;
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage(createBuffer(i, SIZE)));
+ if (i % 1000 == 0) {
+ logger.debug("sent {} messages", i);
+ session.commit();
+ }
+ }
+ session.commit();
+
+
+ PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
+ store.disableCleanup();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES / 2; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(createBuffer(i, SIZE), message.getText());
+
+ if (i % 1000 == 0) {
+ logger.debug("received {} messages", i);
+ session.commit();
+ }
+ }
+ session.commit();
+ connection.close();
+
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+
+ server.stop();
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
+ }
+
+
+ public void resume() throws Throwable {
+ ActiveMQServer server = createServer(true, true);
+ server.getConfiguration().clearAddressSettings();
+ server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
+ server.start();
+
+ // if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
+ // if more memory is set to the JUNIT Runner.
+ // This is the main reason we limit memory on this test
+ int NUMBER_OF_MESSAGES = 100_000;
+
+ String queueName = "testPageAndDepage";
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
+ Assert.assertNotNull(serverQueue);
+ serverQueue.getPagingStore().startPaging();
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616?consumerWindowSize=0");
+ Assert.assertEquals(0, ((ActiveMQConnectionFactory)cf).getServerLocator().getConsumerWindowSize());
+
+ final int SIZE = 10 * 1024;
+ session.commit();
+
+
+ PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
+ store.disableCleanup();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = NUMBER_OF_MESSAGES / 2; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(createBuffer(i, SIZE), message.getText());
+
+ if (i % 1000 == 0) {
+ logger.debug("received {} messages", i);
+ session.commit();
+ }
+ }
+ session.commit();
+ Assert.assertNull(consumer.receiveNoWait());
+ connection.close();
+
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+ store.getCursorProvider().resumeCleanup();
+
+ server.stop();
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
+ }
+
public void internalTest(boolean midstream) throws Throwable {
ActiveMQServer server = createServer(true, true);
server.getConfiguration().clearAddressSettings();