You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/18 14:31:53 UTC
[activemq-artemis] branch master updated: ARTEMIS-2418 Race
conditions between cursor movement and page writing
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 99158cc ARTEMIS-2418 Race conditions between cursor movement and page writing
new 4d8e935 This closes #2743
99158cc is described below
commit 99158ccd3dc6a6803dc732a9dc0d743bd22e1b1e
Author: yang wei <wy...@gmail.com>
AuthorDate: Mon Jul 8 19:08:11 2019 +0800
ARTEMIS-2418 Race conditions between cursor movement and page writing
---
.../paging/cursor/impl/PageSubscriptionImpl.java | 43 ++---
.../extras/byteman/RaceOnCursorIteratorTest.java | 208 +++++++++++++++++++++
.../tests/integration/paging/PagingTest.java | 2 +-
3 files changed, 227 insertions(+), 26 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 6a272cf..3aec7ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -382,42 +382,35 @@ public final class PageSubscriptionImpl implements PageSubscription {
private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage();
- PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
-
- PageCache emptyCache = null;
- if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
- emptyCache = cache;
- saveEmptyPageAsConsumedPage(emptyCache);
- // The next message is beyond what's available at the current page, so we need to move to the next page
- cache = null;
- }
-
- // it will scan for the next available page
- while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) {
- emptyCache = cache;
- retPos = moveNextPage(retPos);
+ PageCache cache = null;
+ while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
cache = cursorProvider.getPageCache(retPos.getPageNr());
-
- if (cache != null) {
- saveEmptyPageAsConsumedPage(emptyCache);
+ /**
+ * In following cases, we should move to the next page
+ * case 1: cache == null means file might be deleted unexpectedly.
+ * case 2: cache is not live and contains no messages.
+ * case 3: cache is not live and next message is beyond what's available at the current page.
+ */
+ if (cache == null || (!cache.isLive() && (retPos.getMessageNr() >= cache.getNumberOfMessages() || cache.getNumberOfMessages() == 0))) {
+ // Save current empty page every time we move to next page
+ saveEmptyPageAsConsumedPage(cache);
+ retPos = moveNextPage(retPos);
+ cache = null;
+ } else {
+ // We need to break loop to get message if cache is live or the next message number is in the range of current page
+ break;
}
}
- if (cache == null) {
- saveEmptyPageAsConsumedPage(emptyCache);
-
- // it will be null in the case of the current writing page
- return null;
- } else {
+ if (cache != null) {
PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
if (serverMessage != null) {
return cursorProvider.newReference(retPos, serverMessage, this);
- } else {
- return null;
}
}
+ return null;
}
private PagePosition moveNextPage(final PagePosition pos) {
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java
new file mode 100644
index 0000000..f06a313
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class RaceOnCursorIteratorTest extends ActiveMQTestBase {
+
+ private static ServerLocator locator;
+
+ private static ActiveMQServer server;
+
+ private static ClientSessionFactory sf;
+
+ private static ClientSession session;
+
+ private static Queue queue;
+
+ private static final ReentrantReadWriteLock.ReadLock lock = new ReentrantReadWriteLock().readLock();
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ static boolean skipLivePageCache = false;
+
+ static boolean skipNullPageCache = false;
+
+ static boolean moveNextPageCalled = false;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ skipLivePageCache = false;
+ skipNullPageCache = false;
+ moveNextPageCalled = false;
+ locator = createInVMNonHALocator();
+
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setJournalSyncNonTransactional(false);
+
+ HashMap<String, AddressSettings> map = new HashMap<>();
+ AddressSettings value = new AddressSettings();
+ map.put(ADDRESS.toString(), value);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(false);
+ locator.setConsumerWindowSize(0);
+
+ sf = createSessionFactory(locator);
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ queue = server.locateQueue(ADDRESS);
+ queue.getPageSubscription().getPagingStore().startPaging();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ session.close();
+ sf.close();
+ locator.close();
+ server.stop();
+ super.tearDown();
+ }
+
+ public static void raceAddLivePageCache() throws Exception {
+ if (skipLivePageCache) {
+ createMessage(1);
+
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+ createMessage(2);
+ }
+ moveNextPageCalled = true;
+ }
+
+ public static void raceAddTwoCaches() throws Exception {
+ if (skipNullPageCache && moveNextPageCalled) {
+ createMessage(1);
+
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+ createMessage(2);
+ }
+ }
+
+ @Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "raceLiveCache",
+ targetClass = "org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl",
+ targetMethod = "moveNextPage",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnCursorIteratorTest.raceAddLivePageCache()")})
+ public void testSkipLivePageCache() {
+ skipLivePageCache = true;
+ // Simulate scenario #1 depicted in https://issues.apache.org/jira/browse/ARTEMIS-2418
+ PagedReference ref = queue.getPageSubscription().iterator().next();
+ assertTrue("first msg should not be " + (ref == null ? "null" : ref.getPagedMessage().getMessage().getMessageID()),
+ ref == null || ref.getPagedMessage().getMessage().getMessageID() == 1);
+ }
+
+ @Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "raceLiveCache",
+ targetClass = "org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl",
+ targetMethod = "moveNextPage",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnCursorIteratorTest.raceAddLivePageCache()"),
+ @BMRule(
+ name = "raceNullCache",
+ targetClass = "org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl",
+ targetMethod = "getPageCache",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnCursorIteratorTest.raceAddTwoCaches()")})
+ public void testSkipNullPageCache() throws Exception {
+ skipNullPageCache = true;
+ // Simulate scenario #2 depicted in https://issues.apache.org/jira/browse/ARTEMIS-2418
+ queue.getPageSubscription().getPagingStore().getCurrentPage().close();
+
+ PagedReference ref = queue.getPageSubscription().iterator().next();
+ assertTrue("first msg should not be " + (ref == null ? "null" : ref.getPagedMessage().getMessage().getMessageID()),
+ ref == null || ref.getPagedMessage().getMessage().getMessageID() == 1);
+ }
+
+ private static CoreMessage createMessage(final long id) throws Exception {
+ ActiveMQBuffer buffer = createRandomBuffer(0, 10);
+ PagingStore pagingStore = queue.getPageSubscription().getPagingStore();
+
+ CoreMessage msg = new CoreMessage(id, 50 + buffer.capacity());
+
+ msg.setAddress(ADDRESS);
+
+ msg.setContext(pagingStore);
+
+ msg.getBodyBuffer().resetReaderIndex();
+ msg.getBodyBuffer().resetWriterIndex();
+
+ msg.getBodyBuffer().writeBytes(buffer, buffer.capacity());
+
+ final RoutingContextImpl ctx = new RoutingContextImpl(null);
+ ctx.addQueue(ADDRESS, queue);
+ pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
+
+ return msg;
+ }
+
+ protected static ActiveMQBuffer createRandomBuffer(final long id, final int size) {
+ return RandomUtil.randomBuffer(size, id);
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index b12f25c..5dd5cc2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -5833,7 +5833,7 @@ public class PagingTest extends ActiveMQTestBase {
locator.close();
- Wait.assertEquals(1, store::getNumberOfPages);
+ Wait.assertEquals(2, store::getNumberOfPages);
} finally {
try {