You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/07/25 12:39:04 UTC
activemq-artemis git commit: [ARTEMIS-1986]
PagingTest#testDeletePhysicalPages will fail if a record about deleting a
page is not saved in journal
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x b80c29759 -> 8cc497859
[ARTEMIS-1986] PagingTest#testDeletePhysicalPages will fail if a record about deleting a page is not saved in journal
(cherry picked from commit 993499daafbe546dbeae22febeeb5ad98b140396)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8cc49785
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8cc49785
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8cc49785
Branch: refs/heads/2.6.x
Commit: 8cc49785990574578385b3cfc50321bcb9e877fb
Parents: b80c297
Author: JiriOndrusek <jo...@redhat.com>
Authored: Mon Jul 23 14:17:24 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jul 25 08:38:56 2018 -0400
----------------------------------------------------------------------
.../tests/integration/paging/PagingTest.java | 148 +++++++++++++++++++
1 file changed, 148 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8cc49785/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 566142d..1436bea 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -1585,6 +1585,154 @@ public class PagingTest extends ActiveMQTestBase {
}
+ // 4 messages are send/received, it creates 2 pages, where for second page there is no delete completion record in journal
+ // server is restarted and 4 messages sent/received again. There should be no lost message.
+ @Test
+ public void testRestartWithCompleteAndDeletedPhysicalPage() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig();
+
+ final AtomicBoolean mainCleanup = new AtomicBoolean(true);
+
+ class InterruptedCursorProvider extends PageCursorProviderImpl {
+
+ InterruptedCursorProvider(PagingStore pagingStore,
+ StorageManager storageManager,
+ ArtemisExecutor executor,
+ int maxCacheSize) {
+ super(pagingStore, storageManager, executor, maxCacheSize);
+ }
+
+ @Override
+ public void cleanup() {
+ if (mainCleanup.get()) {
+ super.cleanup();
+ } else {
+ try {
+ pagingStore.unlock();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+ @Override
+ protected PagingStoreFactoryNIO getPagingStoreFactory() {
+ return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ @Override
+ public PageCursorProvider newCursorProvider(PagingStore store,
+ StorageManager storageManager,
+ AddressSettings addressSettings,
+ ArtemisExecutor executor) {
+ return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ }
+ };
+ }
+
+ };
+
+ addServer(server);
+
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(MESSAGE_SIZE).
+ setMaxSizeBytes(2 * MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(true, true, 0);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message;
+
+ for (int i = 0; i < 4; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);
+
+ producer.send(message);
+ session.commit();
+
+ //last page (#2, whch contains only message #3) is marked as complete - is full - but no delete complete record is added
+ if (i == 3) {
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+ }
+
+ }
+
+ Assert.assertEquals(3, queue.getPageSubscription().getPagingStore().getCurrentWritingPage());
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ session.start();
+
+ for (int i = 0; i < 4; i++) {
+ message = consumer.receive(5000);
+ Assert.assertNotNull("Before restart - message " + i + " is empty.",message);
+ message.acknowledge();
+ }
+
+
+
+ server.stop();
+ mainCleanup.set(false);
+
+
+
+ // Deleting the paging data. Simulating a failure
+ // a dumb user, or anything that will remove the data
+ deleteDirectory(new File(getPageDir()));
+
+ logger.trace("Server restart");
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = createSessionFactory(locator);
+ session = sf.createSession(null, null, false, false, true, false, 0);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < 4; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);
+
+
+ producer.send(message);
+ }
+ session.commit();
+
+ mainCleanup.set(true);
+
+ queue = server.locateQueue(ADDRESS);
+ queue.getPageSubscription().cleanupEntries(false);
+ queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+
+ consumer = session.createConsumer(ADDRESS);
+ session.start();
+
+ for (int i = 0; i < 4; i++) {
+ message = consumer.receive(5000);
+ Assert.assertNotNull("After restart - message " + i + " is empty.",message);
+ message.acknowledge();
+ }
+
+ server.stop();
+
+ }
+
@Test
public void testMissingTXEverythingAcked() throws Exception {
clearDataRecreateServerDirs();