You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2018/11/06 13:26:54 UTC

activemq git commit: AMQ-7082 - Make sure that the recovery will only mark pages as free if they were created in a previous execution

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x c988a145a -> 45d7676bd


AMQ-7082 - Make sure that the recovery will only mark pages as free if they were created in a previous execution

(cherry picked from commit 0d34338919edee863bd71693ee30999d9d9d6ce9)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/45d7676b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/45d7676b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/45d7676b

Branch: refs/heads/activemq-5.15.x
Commit: 45d7676bd95a7e4594a80b025994ea3242c94586
Parents: c988a14
Author: Alan Protasio <al...@gmail.com>
Authored: Tue Nov 6 01:13:18 2018 -0800
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Nov 6 08:26:21 2018 -0500

----------------------------------------------------------------------
 .../store/kahadb/disk/page/PageFile.java        |  16 ++-
 .../store/kahadb/disk/page/PageFileTest.java    | 108 +++++++++++++++++--
 2 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/45d7676b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index fe79a2d..7456dfa 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -425,19 +425,19 @@ public class PageFile {
             getFreeFile().delete();
             startWriter();
             if (needsFreePageRecovery) {
-                asyncFreePageRecovery();
+                asyncFreePageRecovery(nextFreePageId.get());
             }
         } else {
             throw new IllegalStateException("Cannot load the page file when it is already loaded.");
         }
     }
 
-    private void asyncFreePageRecovery() {
+    private void asyncFreePageRecovery(final long lastRecoveryPage) {
         Thread thread = new Thread("KahaDB Index Free Page Recovery") {
             @Override
             public void run() {
                 try {
-                    recoverFreePages();
+                    recoverFreePages(lastRecoveryPage);
                 } catch (Throwable e) {
                     if (loaded.get()) {
                         LOG.warn("Error recovering index free page list", e);
@@ -450,7 +450,7 @@ public class PageFile {
         thread.start();
     }
 
-    private void recoverFreePages() throws Exception {
+    private void recoverFreePages(final long lastRecoveryPage) throws Exception {
         LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
         SequenceSet newFreePages = new SequenceSet();
         // need new pageFile instance to get unshared readFile
@@ -459,6 +459,11 @@ public class PageFile {
         try {
             for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
                 Page page = i.next();
+
+                if (page.getPageId() >= lastRecoveryPage) {
+                    break;
+                }
+
                 if (page.getType() == Page.PAGE_FREE_TYPE) {
                     newFreePages.add(page.getPageId());
                 }
@@ -817,6 +822,9 @@ public class PageFile {
         return toOffset(nextFreePageId.get());
     }
 
+    public boolean isFreePage(long pageId) {
+        return freeList.contains(pageId);
+    }
     /**
      * @return the number of pages allocated in the PageFile
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/45d7676b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
index 72e8d7b..b8dcc92 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.store.kahadb.disk.page;
 
+import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -23,13 +29,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.HashSet;
-
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-
-import junit.framework.TestCase;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.LinkedList;
+import java.util.List;
 
 @SuppressWarnings("rawtypes")
 public class PageFileTest extends TestCase {
@@ -240,4 +241,97 @@ public class PageFileTest extends TestCase {
             pf2.unload();
         }
     }
+    }
+
+    public void testBackgroundRecoveryIsThreadSafe() throws Exception {
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.setEnableRecoveryFile(false);
+        pf.load();
+
+        Transaction tx = pf.tx();
+        tx.allocate(100000);
+        tx.commit();
+        LOG.info("Number of free pages:" + pf.getFreePageCount());
+        pf.flush();
+
+        //Simulate an unclean shutdown
+        final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
+        pf2.setEnableRecoveryFile(false);
+        pf2.load();
+
+        Transaction tx2 = pf2.tx();
+        tx2.allocate(100000);
+        tx2.commit();
+        LOG.info("Number of free pages:" + pf2.getFreePageCount());
+
+        List<Transaction> transactions = new LinkedList<>();
+
+        Thread.sleep(500);
+        LOG.info("Creating Transactions");
+        for (int i = 0; i < 20; i++) {
+            Transaction txConcurrent = pf2.tx();
+            Page page = txConcurrent.allocate();
+            String t = "page:" + i;
+            page.set(t);
+            txConcurrent.store(page, StringMarshaller.INSTANCE, false);
+            txConcurrent.commit();
+            transactions.add(txConcurrent);
+            Thread.sleep(50);
+        }
+
+        assertTrue("We have 199980 free pages", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                pf2.flush();
+                long freePages = pf2.getFreePageCount();
+                LOG.info("free page count: " + freePages);
+                return  freePages == 199980;
+            }
+        }, 12000000));
+
+        for (Transaction txConcurrent2: transactions) {
+            for (Page page : txConcurrent2) {
+                assertFalse(pf2.isFreePage(page.pageId));
+            }
+        }
+
+    }
+
+    public void testBackgroundWillNotMarkEaslyPagesAsFree() throws Exception {
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.setEnableRecoveryFile(false);
+        pf.load();
+
+        Transaction tx = pf.tx();
+        tx.allocate(100000);
+        tx.commit();
+        LOG.info("Number of free pages:" + pf.getFreePageCount());
+        pf.flush();
+
+        //Simulate an unclean shutdown
+        final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
+        pf2.setEnableRecoveryFile(false);
+        pf2.load();
+
+        Transaction tx2 = pf2.tx();
+        tx2.allocate(200);
+        tx2.commit();
+        LOG.info("Number of free pages:" + pf2.getFreePageCount());
+
+        Transaction tx3 = pf2.tx();
+        tx3.allocate(100);
+
+        assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                pf2.flush();
+                long freePages = pf2.getFreePageCount();
+                LOG.info("free page count: " + freePages);
+                return  freePages == 100100;
+            }
+        }, 12000000));
 }