You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/15 20:24:06 UTC
[activemq-artemis] branch master updated: ARTEMIS-2425 Message loss
due to writing incomplete page file
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new fa46647 ARTEMIS-2425 Message loss due to writing incomplete page file
new 5feb212 This closes #2748
fa46647 is described below
commit fa46647818b61368600320e094037a7ab2ca3767
Author: yang wei <wy...@gmail.com>
AuthorDate: Fri Jul 12 18:34:36 2019 +0800
ARTEMIS-2425 Message loss due to writing incomplete page file
---
.../activemq/artemis/core/paging/impl/Page.java | 5 +-
.../artemis/core/paging/impl/PagingStoreImpl.java | 9 +++
.../unit/core/paging/impl/PagingStoreImplTest.java | 77 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 2 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 96ed2ba..5f98b4c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -259,8 +259,9 @@ public final class Page implements Comparable<Page> {
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
}
- if (file.position() != fileSize) {
- file.position(fileSize);
+ size.lazySet(processedBytes);
+ if (file.position() != processedBytes) {
+ file.position(processedBytes);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 7853b81..96e1817 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -468,6 +468,15 @@ public class PagingStoreImpl implements PagingStore {
currentPage = page;
cursorProvider.addPageCache(pageCache);
+
+ /**
+ * The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged.
+ * In case 1 we can keep writing the file. But in case 2 we'd better not bcs old data might be overwritten.
+ * Here we open a new page so the incomplete page would be reserved for recovery if needed.
+ */
+ if (page.getSize() != page.getFile().size()) {
+ openNewPage();
+ }
}
// We will not mark it for paging if there's only a single empty file
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 90725a2..9cc23a1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -726,6 +726,74 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testWriteIncompletePage() throws Exception {
+ clearDataRecreateServerDirs();
+ SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1);
+
+ PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+ final int MAX_SIZE = 1024 * 1024;
+
+ AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
+
+ storeImpl.start();
+
+ Assert.assertEquals(0, storeImpl.getNumberOfPages());
+
+ // Marked the store to be paged
+ storeImpl.startPaging();
+
+ Page page = storeImpl.getCurrentPage();
+
+ int num1 = 20;
+ for (int i = 0; i < num1; i++) {
+ writePageMessage(storeImpl, i);
+ }
+ // simulate uncompleted page
+ long position = page.getFile().position();
+ writePageMessage(storeImpl, 30);
+ page.getFile().position(position);
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ for (int i = 0; i < buffer.capacity(); i++) {
+ buffer.put((byte) 'Z');
+ }
+ buffer.rewind();
+ page.getFile().writeDirect(buffer, true);
+ storeImpl.stop();
+
+ // write uncompleted page
+ storeImpl.start();
+ int num2 = 10;
+ for (int i = 0; i < num2; i++) {
+ writePageMessage(storeImpl, i + num1);
+ }
+
+ // simulate broker restart
+ storeImpl.stop();
+ storeImpl.start();
+
+ long msgsRead = 0;
+
+ while (msgsRead < num1 + num2) {
+ page = storeImpl.depage();
+ assertNotNull("no page after read " + msgsRead + " msg", page);
+ page.open();
+ List<PagedMessage> messages = page.read(new NullStorageManager());
+
+ for (PagedMessage pgmsg : messages) {
+ Message msg = pgmsg.getMessage();
+ Assert.assertEquals(msgsRead, msg.getMessageID());
+ Assert.assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
+ msgsRead++;
+ }
+ }
+
+ storeImpl.stop();
+ }
+
/**
* @return
*/
@@ -747,6 +815,15 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
};
}
+ protected void writePageMessage(final PagingStore storeImpl,
+ final long id) throws Exception {
+ Message msg = createMessage(id, storeImpl, PagingStoreImplTest.destinationTestName, createRandomBuffer(id, 10));
+ msg.putLongProperty("count", id);
+
+ final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
+ storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock);
+ }
+
private CoreMessage createMessage(final long id,
final PagingStore store,
final SimpleString destination,