You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/03/18 16:05:00 UTC
[activemq-artemis] branch master updated: ARTEMIS-2662 Make Page to
be resilient to invalid position
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf8bcc ARTEMIS-2662 Make Page to be resilient to invalid position
new 727af5b This closes #3026
7bf8bcc is described below
commit 7bf8bcc7fefb2ae5742bb10963781109fee8d495
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Mar 17 15:06:02 2020 -0400
ARTEMIS-2662 Make Page to be resilient to invalid position
in case of an invalid position on getMessage, it should instead of runtimeException
be more resilient and fix itself.
---
.../activemq/artemis/core/paging/impl/Page.java | 14 +++++++++-
.../artemis/core/server/ActiveMQServerLogger.java | 5 ++++
.../core/paging/cursor/impl/PageReaderTest.java | 31 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index bdaf99e..315d566 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -126,6 +126,11 @@ public final class Page implements Comparable<Page> {
try {
if (readFileBuffer == null) {
readProcessedBytes = startOffset;
+
+ if (startOffset > fileSize) {
+ return readMessage(0, 0, targetMessageNumber);
+ }
+
file.position(readProcessedBytes);
readFileBuffer = fileFactory.allocateDirectBuffer(Math.min(fileSize - readProcessedBytes, MIN_CHUNK_SIZE));
//the wrapper is reused to avoid unnecessary allocations
@@ -225,7 +230,14 @@ public final class Page implements Comparable<Page> {
throw e;
}
resetReadMessageStatus();
- throw new RuntimeException("target message no." + targetMessageNumber + " not found from start offset " + startOffset + " and start message number " + startMessageNumber);
+
+ ActiveMQServerLogger.LOGGER.pageLookupError(this.pageId, targetMessageNumber, startOffset, startMessageNumber);
+
+ if (startOffset > 0) {
+ return readMessage(0, 0, targetMessageNumber);
+ } else {
+ return null;
+ }
}
public synchronized List<PagedMessage> read() throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 4909c0b..5024c82 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1664,6 +1664,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void federationBindingsLookupError(@Cause Throwable e, SimpleString address);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222288, value = "Page {0}, message {1} could not be found on offset {2}, with starting message {3}. This represents a logic error or inconsistency on the data, and the system will try once again from the beggining of the page file.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void pageLookupError(int pageNr, int messageNr, int offset, int startNr);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
index 93cf5bd..2ad18e5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
@@ -96,6 +97,36 @@ public class PageReaderTest extends ActiveMQTestBase {
pageReader.close();
}
+
+ @Test
+ public void testForceInvalidPosition() throws Exception {
+
+ AssertionLoggerHandler.startCapture();
+ try {
+ recreateDirectory(getTestDir());
+ int num = 2;
+ int[] offsets = createPage(num);
+ PageReader pageReader = getPageReader();
+
+ PagedMessage[] pagedMessages = pageReader.getMessages();
+ assertEquals(pagedMessages.length, num);
+
+ PagePosition pagePosition = new PagePositionImpl(10, 0, 50);
+ PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition);
+ assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize());
+ PagePosition nextPagePosition = new PagePositionImpl(10, 1, 5000);
+ PagedMessage pagedMessage = pageReader.getMessage(nextPagePosition);
+ assertNotNull(pagedMessage);
+ assertEquals(pagedMessage.getMessage().getMessageID(), 1);
+ assertEquals(pagedMessages[1].getMessage().getMessageID(), 1);
+ pageReader.close();
+ Assert.assertTrue("Logging did not throw warn expected", AssertionLoggerHandler.findText("AMQ222288"));
+ } finally {
+ AssertionLoggerHandler.stopCapture();
+ AssertionLoggerHandler.clear();
+ }
+ }
+
@Test
public void testPageReadMessageBeyondPage() throws Exception {
recreateDirectory(getTestDir());