You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/03/18 16:05:00 UTC

[activemq-artemis] branch master updated: ARTEMIS-2662 Make Page to be resilient to invalid position

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf8bcc  ARTEMIS-2662 Make Page to be resilient to invalid position
     new 727af5b  This closes #3026
7bf8bcc is described below

commit 7bf8bcc7fefb2ae5742bb10963781109fee8d495
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Mar 17 15:06:02 2020 -0400

    ARTEMIS-2662 Make Page to be resilient to invalid position
    
    in case of an invalid position on getMessage, it should instead of runtimeException
    be more resilient and fix itself.
---
 .../activemq/artemis/core/paging/impl/Page.java    | 14 +++++++++-
 .../artemis/core/server/ActiveMQServerLogger.java  |  5 ++++
 .../core/paging/cursor/impl/PageReaderTest.java    | 31 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index bdaf99e..315d566 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -126,6 +126,11 @@ public final class Page implements Comparable<Page> {
       try {
          if (readFileBuffer == null) {
             readProcessedBytes = startOffset;
+
+            if (startOffset > fileSize) {
+               return readMessage(0, 0, targetMessageNumber);
+            }
+
             file.position(readProcessedBytes);
             readFileBuffer = fileFactory.allocateDirectBuffer(Math.min(fileSize - readProcessedBytes, MIN_CHUNK_SIZE));
             //the wrapper is reused to avoid unnecessary allocations
@@ -225,7 +230,14 @@ public final class Page implements Comparable<Page> {
          throw e;
       }
       resetReadMessageStatus();
-      throw new RuntimeException("target message no." + targetMessageNumber + " not found from start offset " + startOffset + " and start message number " + startMessageNumber);
+
+      ActiveMQServerLogger.LOGGER.pageLookupError(this.pageId, targetMessageNumber, startOffset, startMessageNumber);
+
+      if (startOffset > 0) {
+         return readMessage(0, 0, targetMessageNumber);
+      } else {
+         return null;
+      }
    }
 
    public synchronized List<PagedMessage> read() throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 4909c0b..5024c82 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1664,6 +1664,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void federationBindingsLookupError(@Cause Throwable e, SimpleString address);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222288, value = "Page {0}, message {1} could not be found on offset {2}, with starting message {3}. This represents a logic error or inconsistency on the data, and the system will try once again from the beggining of the page file.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void pageLookupError(int pageNr, int messageNr, int offset, int startNr);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
index 93cf5bd..2ad18e5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -96,6 +97,36 @@ public class PageReaderTest extends ActiveMQTestBase {
       pageReader.close();
    }
 
+
+   @Test
+   public void testForceInvalidPosition() throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      try {
+         recreateDirectory(getTestDir());
+         int num = 2;
+         int[] offsets = createPage(num);
+         PageReader pageReader = getPageReader();
+
+         PagedMessage[] pagedMessages = pageReader.getMessages();
+         assertEquals(pagedMessages.length, num);
+
+         PagePosition pagePosition = new PagePositionImpl(10, 0, 50);
+         PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition);
+         assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize());
+         PagePosition nextPagePosition = new PagePositionImpl(10, 1, 5000);
+         PagedMessage pagedMessage = pageReader.getMessage(nextPagePosition);
+         assertNotNull(pagedMessage);
+         assertEquals(pagedMessage.getMessage().getMessageID(), 1);
+         assertEquals(pagedMessages[1].getMessage().getMessageID(), 1);
+         pageReader.close();
+         Assert.assertTrue("Logging did not throw warn expected", AssertionLoggerHandler.findText("AMQ222288"));
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+         AssertionLoggerHandler.clear();
+      }
+   }
+
    @Test
    public void testPageReadMessageBeyondPage() throws Exception {
       recreateDirectory(getTestDir());