You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/18 14:31:53 UTC

[activemq-artemis] branch master updated: ARTEMIS-2418 Race conditions between cursor movement and page writing

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 99158cc  ARTEMIS-2418 Race conditions between cursor movement and page writing
     new 4d8e935  This closes #2743
99158cc is described below

commit 99158ccd3dc6a6803dc732a9dc0d743bd22e1b1e
Author: yang wei <wy...@gmail.com>
AuthorDate: Mon Jul 8 19:08:11 2019 +0800

    ARTEMIS-2418 Race conditions between cursor movement and page writing
---
 .../paging/cursor/impl/PageSubscriptionImpl.java   |  43 ++---
 .../extras/byteman/RaceOnCursorIteratorTest.java   | 208 +++++++++++++++++++++
 .../tests/integration/paging/PagingTest.java       |   2 +-
 3 files changed, 227 insertions(+), 26 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 6a272cf..3aec7ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -382,42 +382,35 @@ public final class PageSubscriptionImpl implements PageSubscription {
    private PagedReference internalGetNext(final PagePosition pos) {
       PagePosition retPos = pos.nextMessage();
 
-      PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
-
-      PageCache emptyCache = null;
-      if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
-         emptyCache = cache;
-         saveEmptyPageAsConsumedPage(emptyCache);
-         // The next message is beyond what's available at the current page, so we need to move to the next page
-         cache = null;
-      }
-
-      // it will scan for the next available page
-      while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) {
-         emptyCache = cache;
-         retPos = moveNextPage(retPos);
+      PageCache cache = null;
 
+      while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
          cache = cursorProvider.getPageCache(retPos.getPageNr());
-
-         if (cache != null) {
-            saveEmptyPageAsConsumedPage(emptyCache);
+         /**
+          * In following cases, we should move to the next page
+          * case 1: cache == null means file might be deleted unexpectedly.
+          * case 2: cache is not live and  contains no messages.
+          * case 3: cache is not live and next message is beyond what's available at the current page.
+          */
+         if (cache == null || (!cache.isLive() && (retPos.getMessageNr() >= cache.getNumberOfMessages() || cache.getNumberOfMessages() == 0))) {
+            // Save current empty page every time we move to next page
+            saveEmptyPageAsConsumedPage(cache);
+            retPos = moveNextPage(retPos);
+            cache = null;
+         } else {
+            // We need to break loop to get message if cache is live or the next message number is in the range of current page
+            break;
          }
       }
 
-      if (cache == null) {
-         saveEmptyPageAsConsumedPage(emptyCache);
-
-         // it will be null in the case of the current writing page
-         return null;
-      } else {
+      if (cache != null) {
          PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
 
          if (serverMessage != null) {
             return cursorProvider.newReference(retPos, serverMessage, this);
-         } else {
-            return null;
          }
       }
+      return null;
    }
 
    private PagePosition moveNextPage(final PagePosition pos) {
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java
new file mode 100644
index 0000000..f06a313
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class RaceOnCursorIteratorTest extends ActiveMQTestBase {
+
+   private static ServerLocator locator;
+
+   private static ActiveMQServer server;
+
+   private static ClientSessionFactory sf;
+
+   private static ClientSession session;
+
+   private static Queue queue;
+
+   private static final ReentrantReadWriteLock.ReadLock lock = new ReentrantReadWriteLock().readLock();
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   static boolean skipLivePageCache = false;
+
+   static boolean skipNullPageCache = false;
+
+   static boolean moveNextPageCalled = false;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      skipLivePageCache = false;
+      skipNullPageCache = false;
+      moveNextPageCalled = false;
+      locator = createInVMNonHALocator();
+
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setJournalSyncNonTransactional(false);
+
+      HashMap<String, AddressSettings> map = new HashMap<>();
+      AddressSettings value = new AddressSettings();
+      map.put(ADDRESS.toString(), value);
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+
+      server.start();
+
+      locator = createInVMNonHALocator();
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(false);
+      locator.setConsumerWindowSize(0);
+
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(false, true, true);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      queue = server.locateQueue(ADDRESS);
+      queue.getPageSubscription().getPagingStore().startPaging();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      session.close();
+      sf.close();
+      locator.close();
+      server.stop();
+      super.tearDown();
+   }
+
+   public static void raceAddLivePageCache() throws Exception {
+      if (skipLivePageCache) {
+         createMessage(1);
+
+         queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+         createMessage(2);
+      }
+      moveNextPageCalled = true;
+   }
+
+   public static void raceAddTwoCaches() throws Exception {
+      if (skipNullPageCache && moveNextPageCalled) {
+         createMessage(1);
+
+         queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+         createMessage(2);
+      }
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "raceLiveCache",
+         targetClass = "org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl",
+         targetMethod = "moveNextPage",
+         targetLocation = "EXIT",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnCursorIteratorTest.raceAddLivePageCache()")})
+   public void testSkipLivePageCache() {
+      skipLivePageCache = true;
+      // Simulate scenario #1 depicted in https://issues.apache.org/jira/browse/ARTEMIS-2418
+      PagedReference ref = queue.getPageSubscription().iterator().next();
+      assertTrue("first msg should not be " + (ref == null ? "null" : ref.getPagedMessage().getMessage().getMessageID()),
+                 ref == null || ref.getPagedMessage().getMessage().getMessageID() == 1);
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "raceLiveCache",
+         targetClass = "org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl",
+         targetMethod = "moveNextPage",
+         targetLocation = "EXIT",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnCursorIteratorTest.raceAddLivePageCache()"),
+         @BMRule(
+            name = "raceNullCache",
+            targetClass = "org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl",
+            targetMethod = "getPageCache",
+            targetLocation = "EXIT",
+            action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnCursorIteratorTest.raceAddTwoCaches()")})
+   public void testSkipNullPageCache() throws Exception {
+      skipNullPageCache = true;
+      // Simulate scenario #2 depicted in https://issues.apache.org/jira/browse/ARTEMIS-2418
+      queue.getPageSubscription().getPagingStore().getCurrentPage().close();
+
+      PagedReference ref = queue.getPageSubscription().iterator().next();
+      assertTrue("first msg should not be " + (ref == null ? "null" : ref.getPagedMessage().getMessage().getMessageID()),
+         ref == null || ref.getPagedMessage().getMessage().getMessageID() == 1);
+   }
+
+   private static CoreMessage createMessage(final long id) throws Exception {
+      ActiveMQBuffer buffer = createRandomBuffer(0, 10);
+      PagingStore pagingStore = queue.getPageSubscription().getPagingStore();
+
+      CoreMessage msg = new CoreMessage(id, 50 + buffer.capacity());
+
+      msg.setAddress(ADDRESS);
+
+      msg.setContext(pagingStore);
+
+      msg.getBodyBuffer().resetReaderIndex();
+      msg.getBodyBuffer().resetWriterIndex();
+
+      msg.getBodyBuffer().writeBytes(buffer, buffer.capacity());
+
+      final RoutingContextImpl ctx = new RoutingContextImpl(null);
+      ctx.addQueue(ADDRESS, queue);
+      pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
+
+      return msg;
+   }
+
+   protected static ActiveMQBuffer createRandomBuffer(final long id, final int size) {
+      return RandomUtil.randomBuffer(size, id);
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index b12f25c..5dd5cc2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -5833,7 +5833,7 @@ public class PagingTest extends ActiveMQTestBase {
 
          locator.close();
 
-         Wait.assertEquals(1, store::getNumberOfPages);
+         Wait.assertEquals(2, store::getNumberOfPages);
 
       } finally {
          try {