You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/11/07 11:33:00 UTC

activemq git commit: AMQ-7082 We should make sure that pages managed during recovery are not recovered in error variation of patch from Alan Protasio closes #317

Repository: activemq
Updated Branches:
  refs/heads/master 81062fde8 -> 85859fd8d


AMQ-7082 We should make sure that pages managed during recovery are not recovered in error
variation of patch from Alan Protasio <al...@gmail.com> closes #317


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85859fd8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85859fd8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85859fd8

Branch: refs/heads/master
Commit: 85859fd8dc22e3251de294c23d84c12b29d4fe81
Parents: 81062fd
Author: gtully <ga...@gmail.com>
Authored: Wed Nov 7 11:29:14 2018 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Nov 7 11:29:14 2018 +0000

----------------------------------------------------------------------
 .../store/kahadb/disk/page/PageFile.java        |  34 +++---
 .../store/kahadb/disk/util/SequenceSet.java     |  24 ++++
 .../store/kahadb/disk/page/PageFileTest.java    | 112 +++++++++++++++++++
 .../store/kahadb/disk/util/SequenceSetTest.java |  22 ++++
 4 files changed, 178 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index 7456dfa..5b898f2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -135,7 +135,10 @@ public class PageFile {
     // Keeps track of free pages.
     private final AtomicLong nextFreePageId = new AtomicLong();
     private SequenceSet freeList = new SequenceSet();
+
     private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>();
+    private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>();
+
     private final AtomicLong nextTxid = new AtomicLong();
 
     // Persistent settings stored in the page file.
@@ -146,8 +149,6 @@ public class PageFile {
     private boolean useLFRUEviction = false;
     private float LFUEvictionFactor = 0.2f;
 
-    private boolean needsFreePageRecovery = false;
-
     /**
      * Use to keep track of updated pages which have not yet been committed.
      */
@@ -412,7 +413,7 @@ public class PageFile {
             } else {
                 LOG.debug(toString() + ", Recovering page file...");
                 nextTxid.set(redoRecoveryUpdates());
-                needsFreePageRecovery = true;
+                trackingFreeDuringRecovery.set(new SequenceSet());
             }
 
             if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
@@ -424,7 +425,7 @@ public class PageFile {
             storeMetaData();
             getFreeFile().delete();
             startWriter();
-            if (needsFreePageRecovery) {
+            if (trackingFreeDuringRecovery.get() != null) {
                 asyncFreePageRecovery(nextFreePageId.get());
             }
         } else {
@@ -478,8 +479,6 @@ public class PageFile {
             // allow flush (with index lock held) to merge eventually
             recoveredFreeList.lazySet(newFreePages);
         }
-        // all set for clean shutdown
-        needsFreePageRecovery = false;
     }
 
     private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
@@ -518,7 +517,7 @@ public class PageFile {
             }
 
             metaData.setLastTxId(nextTxid.get() - 1);
-            if (needsFreePageRecovery) {
+            if (trackingFreeDuringRecovery.get() != null) {
                 // async recovery incomplete, will have to try again
                 metaData.setCleanShutdown(false);
             } else {
@@ -567,14 +566,16 @@ public class PageFile {
             throw new IOException("Page file already stopped: checkpointing is not allowed");
         }
 
-        SequenceSet toMerge = recoveredFreeList.get();
-        if (toMerge != null) {
+        SequenceSet recovered = recoveredFreeList.get();
+        if (recovered != null) {
             recoveredFreeList.lazySet(null);
-            Sequence seq = toMerge.getHead();
-            while (seq != null) {
-                freeList.add(seq);
-                seq = seq.getNext();
-            }
+            SequenceSet inUse = trackingFreeDuringRecovery.get();
+            recovered.remove(inUse);
+            freeList.merge(recovered);
+
+            // all set for clean shutdown
+            trackingFreeDuringRecovery.set(null);
+            inUse.clear();
         }
 
         // Setup a latch that gets notified when all buffered writes hits the disk.
@@ -961,6 +962,11 @@ public class PageFile {
     public void freePage(long pageId) {
         freeList.add(pageId);
         removeFromCache(pageId);
+
+        SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get();
+        if (trackFreeDuringRecovery != null) {
+            trackFreeDuringRecovery.add(pageId);
+        }
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
index fac831b..e589e84 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
@@ -94,6 +94,30 @@ public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Lo
         }
     }
 
+    public void merge(SequenceSet sequenceSet) {
+        Sequence node = sequenceSet.getHead();
+
+        while (node != null) {
+            add(node);
+            node = node.getNext();
+        }
+    }
+
+    public void remove(SequenceSet sequenceSet) {
+        Sequence node = sequenceSet.getHead();
+
+        while (node != null) {
+            remove(node);
+            node = node.getNext();
+        }
+    }
+
+    public void remove(Sequence value) {
+        for(long i=value.first; i<value.last+1; i++) {
+            remove(i);
+        }
+    }
+
     /**
      *
      * @param value

http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
index db1ecf3..e2e4ec5 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
@@ -18,7 +18,11 @@ package org.apache.activemq.store.kahadb.disk.page;
 
 import junit.framework.TestCase;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +35,7 @@ import java.io.OutputStream;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @SuppressWarnings("rawtypes")
 public class PageFileTest extends TestCase {
@@ -355,4 +360,111 @@ public class PageFileTest extends TestCase {
             }
         }, 12000000));
     }
+
+    public void testBackgroundWillMarkUsedPagesAsFreeInTheBeginning() throws Exception {
+        final int numberOfPages = 100000;
+        final AtomicBoolean recoveryEnd = new AtomicBoolean();
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Recovered pageFile free list")) {
+                    recoveryEnd.set(true);
+                }
+            }
+        };
+
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(PageFile.class);
+        log4jLogger.addAppender(appender);
+        log4jLogger.setLevel(Level.DEBUG);
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.setEnableRecoveryFile(false);
+        pf.load();
+
+        List<Long> pagesToFree = new LinkedList<>();
+
+        LOG.info("Creating Transactions");
+        for (int i = 0; i < numberOfPages; i++) {
+            Transaction tx = pf.tx();
+            Page page = tx.allocate();
+            String t = "page:" + i;
+            page.set(t);
+            tx.store(page, StringMarshaller.INSTANCE, false);
+            tx.commit();
+
+            if (i >= numberOfPages / 2) {
+                pagesToFree.add(page.getPageId());
+            }
+        }
+
+        pf.flush();
+
+        LOG.info("Number of free pages:" + pf.getFreePageCount());
+
+        //Simulate an unclean shutdown
+        final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
+        pf2.setEnableRecoveryFile(false);
+        pf2.load();
+
+        LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount());
+
+        Transaction tx = pf2.tx();
+
+        for (Long pageId : pagesToFree) {
+            tx.free(tx.load(pageId,  StringMarshaller.INSTANCE));
+            tx.commit();
+        }
+
+        LOG.info("RecoveredPageFile: Number of free pages Before Reusing:" + pf2.getFreePageCount());
+        List<Transaction> transactions = new LinkedList<>();
+
+        int totalFreePages = numberOfPages / 2;
+        int totalPages = numberOfPages;
+
+        for (int i = 0; i < 20; i++) {
+            tx = pf2.tx();
+            Page page = tx.allocate();
+            String t = "page:" + i;
+            page.set(t);
+            tx.store(page, StringMarshaller.INSTANCE, false);
+            tx.commit();
+            transactions.add(tx);
+
+            // Free pages was already recovered
+            if (page.getPageId() < numberOfPages) {
+                totalFreePages--;
+            } else {
+                totalPages++;
+            }
+        }
+
+        LOG.info("RecoveredPageFile: Number of free pages After Reusing:" + pf2.getFreePageCount());
+
+        assertTrue("Recovery Finished", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                pf2.flush();
+                long freePages = pf2.getFreePageCount();
+                LOG.info("free page count: " + freePages);
+                return  recoveryEnd.get();
+            }
+        }, 100000));
+
+        LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount());
+
+        for (Transaction txConcurrent: transactions) {
+            for (Page page : txConcurrent) {
+                assertFalse(pf2.isFreePage(page.pageId));
+            }
+        }
+
+        // Make sure we dont have leaking pages.
+        assertEquals(pf2.getFreePageCount(), totalFreePages);
+        assertEquals(pf2.getPageCount(), totalPages);
+
+        assertEquals("pages freed during recovery should be reused", numberOfPages, totalPages);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
index a25b4e7..7df8351 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
@@ -139,6 +139,28 @@ public class SequenceSetTest {
         set.remove(10);
         assertEquals(3, set.size());
         assertEquals(97, set.rangeSize());
+
+        SequenceSet toRemove = new SequenceSet();
+        toRemove.add(new Sequence(0, 100));
+
+        set.remove(toRemove);
+        assertEquals(0, set.size());
+        assertEquals(0, set.rangeSize());
+
+    }
+
+    @Test
+    public void testMerge() {
+        SequenceSet set = new SequenceSet();
+        set.add(new Sequence(0, 100));
+
+        SequenceSet set2 = new SequenceSet();
+        set.add(new Sequence(50, 150));
+
+        set.merge(set2);
+        assertEquals(151, set.rangeSize());
+        assertEquals(1, set.size());
+
     }
 
     @Test