You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/15 20:24:06 UTC

[activemq-artemis] branch master updated: ARTEMIS-2425 Message loss due to writing incomplete page file

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new fa46647  ARTEMIS-2425 Message loss due to writing incomplete page file
     new 5feb212  This closes #2748
fa46647 is described below

commit fa46647818b61368600320e094037a7ab2ca3767
Author: yang wei <wy...@gmail.com>
AuthorDate: Fri Jul 12 18:34:36 2019 +0800

    ARTEMIS-2425 Message loss due to writing incomplete page file
---
 .../activemq/artemis/core/paging/impl/Page.java    |  5 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  9 +++
 .../unit/core/paging/impl/PagingStoreImplTest.java | 77 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 2 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 96ed2ba..5f98b4c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -259,8 +259,9 @@ public final class Page implements Comparable<Page> {
          if (fileBuffer != null) {
             fileFactory.releaseBuffer(fileBuffer);
          }
-         if (file.position() != fileSize) {
-            file.position(fileSize);
+         size.lazySet(processedBytes);
+         if (file.position() != processedBytes) {
+            file.position(processedBytes);
          }
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 7853b81..96e1817 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -468,6 +468,15 @@ public class PagingStoreImpl implements PagingStore {
                   currentPage = page;
 
                   cursorProvider.addPageCache(pageCache);
+
+                  /**
+                   * The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged.
+                   * In case 1 we can keep writing the file. But in case 2 we'd better not bcs old data might be overwritten.
+                   * Here we open a new page so the incomplete page would be reserved for recovery if needed.
+                   */
+                  if (page.getSize() != page.getFile().size()) {
+                     openNewPage();
+                  }
                }
 
                // We will not mark it for paging if there's only a single empty file
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 90725a2..9cc23a1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -726,6 +726,74 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testWriteIncompletePage() throws Exception {
+      clearDataRecreateServerDirs();
+      SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1);
+
+      PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+      final int MAX_SIZE = 1024 * 1024;
+
+      AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
+
+      storeImpl.start();
+
+      Assert.assertEquals(0, storeImpl.getNumberOfPages());
+
+      // Marked the store to be paged
+      storeImpl.startPaging();
+
+      Page page = storeImpl.getCurrentPage();
+
+      int num1 = 20;
+      for (int i = 0; i < num1; i++) {
+         writePageMessage(storeImpl, i);
+      }
+      // simulate uncompleted page
+      long position = page.getFile().position();
+      writePageMessage(storeImpl, 30);
+      page.getFile().position(position);
+      ByteBuffer buffer = ByteBuffer.allocate(10);
+      for (int i = 0; i < buffer.capacity(); i++) {
+         buffer.put((byte) 'Z');
+      }
+      buffer.rewind();
+      page.getFile().writeDirect(buffer, true);
+      storeImpl.stop();
+
+      // write uncompleted page
+      storeImpl.start();
+      int num2 = 10;
+      for (int i = 0; i < num2; i++) {
+         writePageMessage(storeImpl, i + num1);
+      }
+
+      // simulate broker restart
+      storeImpl.stop();
+      storeImpl.start();
+
+      long msgsRead = 0;
+
+      while (msgsRead < num1 + num2) {
+         page = storeImpl.depage();
+         assertNotNull("no page after read " + msgsRead + " msg", page);
+         page.open();
+         List<PagedMessage> messages = page.read(new NullStorageManager());
+
+         for (PagedMessage pgmsg : messages) {
+            Message msg = pgmsg.getMessage();
+            Assert.assertEquals(msgsRead, msg.getMessageID());
+            Assert.assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
+            msgsRead++;
+         }
+      }
+
+      storeImpl.stop();
+   }
+
    /**
     * @return
     */
@@ -747,6 +815,15 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       };
    }
 
+   protected void writePageMessage(final PagingStore storeImpl,
+                                  final long id) throws Exception {
+      Message msg = createMessage(id, storeImpl, PagingStoreImplTest.destinationTestName, createRandomBuffer(id, 10));
+      msg.putLongProperty("count", id);
+
+      final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
+      storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock);
+   }
+
    private CoreMessage createMessage(final long id,
                                        final PagingStore store,
                                        final SimpleString destination,