You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/11/07 11:33:00 UTC
activemq git commit: AMQ-7082 We should make sure that pages managed
during recovery are not recovered in error variation of patch from Alan
Protasio closes #317
Repository: activemq
Updated Branches:
refs/heads/master 81062fde8 -> 85859fd8d
AMQ-7082 We should make sure that pages managed during recovery are not recovered in error
variation of patch from Alan Protasio <al...@gmail.com> closes #317
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85859fd8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85859fd8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85859fd8
Branch: refs/heads/master
Commit: 85859fd8dc22e3251de294c23d84c12b29d4fe81
Parents: 81062fd
Author: gtully <ga...@gmail.com>
Authored: Wed Nov 7 11:29:14 2018 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Nov 7 11:29:14 2018 +0000
----------------------------------------------------------------------
.../store/kahadb/disk/page/PageFile.java | 34 +++---
.../store/kahadb/disk/util/SequenceSet.java | 24 ++++
.../store/kahadb/disk/page/PageFileTest.java | 112 +++++++++++++++++++
.../store/kahadb/disk/util/SequenceSetTest.java | 22 ++++
4 files changed, 178 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index 7456dfa..5b898f2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -135,7 +135,10 @@ public class PageFile {
// Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
+
private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>();
+ private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>();
+
private final AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file.
@@ -146,8 +149,6 @@ public class PageFile {
private boolean useLFRUEviction = false;
private float LFUEvictionFactor = 0.2f;
- private boolean needsFreePageRecovery = false;
-
/**
* Use to keep track of updated pages which have not yet been committed.
*/
@@ -412,7 +413,7 @@ public class PageFile {
} else {
LOG.debug(toString() + ", Recovering page file...");
nextTxid.set(redoRecoveryUpdates());
- needsFreePageRecovery = true;
+ trackingFreeDuringRecovery.set(new SequenceSet());
}
if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
@@ -424,7 +425,7 @@ public class PageFile {
storeMetaData();
getFreeFile().delete();
startWriter();
- if (needsFreePageRecovery) {
+ if (trackingFreeDuringRecovery.get() != null) {
asyncFreePageRecovery(nextFreePageId.get());
}
} else {
@@ -478,8 +479,6 @@ public class PageFile {
// allow flush (with index lock held) to merge eventually
recoveredFreeList.lazySet(newFreePages);
}
- // all set for clean shutdown
- needsFreePageRecovery = false;
}
private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
@@ -518,7 +517,7 @@ public class PageFile {
}
metaData.setLastTxId(nextTxid.get() - 1);
- if (needsFreePageRecovery) {
+ if (trackingFreeDuringRecovery.get() != null) {
// async recovery incomplete, will have to try again
metaData.setCleanShutdown(false);
} else {
@@ -567,14 +566,16 @@ public class PageFile {
throw new IOException("Page file already stopped: checkpointing is not allowed");
}
- SequenceSet toMerge = recoveredFreeList.get();
- if (toMerge != null) {
+ SequenceSet recovered = recoveredFreeList.get();
+ if (recovered != null) {
recoveredFreeList.lazySet(null);
- Sequence seq = toMerge.getHead();
- while (seq != null) {
- freeList.add(seq);
- seq = seq.getNext();
- }
+ SequenceSet inUse = trackingFreeDuringRecovery.get();
+ recovered.remove(inUse);
+ freeList.merge(recovered);
+
+ // all set for clean shutdown
+ trackingFreeDuringRecovery.set(null);
+ inUse.clear();
}
// Setup a latch that gets notified when all buffered writes hits the disk.
@@ -961,6 +962,11 @@ public class PageFile {
public void freePage(long pageId) {
freeList.add(pageId);
removeFromCache(pageId);
+
+ SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get();
+ if (trackFreeDuringRecovery != null) {
+ trackFreeDuringRecovery.add(pageId);
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
index fac831b..e589e84 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java
@@ -94,6 +94,30 @@ public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Lo
}
}
+ public void merge(SequenceSet sequenceSet) {
+ Sequence node = sequenceSet.getHead();
+
+ while (node != null) {
+ add(node);
+ node = node.getNext();
+ }
+ }
+
+ public void remove(SequenceSet sequenceSet) {
+ Sequence node = sequenceSet.getHead();
+
+ while (node != null) {
+ remove(node);
+ node = node.getNext();
+ }
+ }
+
+ public void remove(Sequence value) {
+ for(long i=value.first; i<value.last+1; i++) {
+ remove(i);
+ }
+ }
+
/**
*
* @param value
http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
index db1ecf3..e2e4ec5 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
@@ -18,7 +18,11 @@ package org.apache.activemq.store.kahadb.disk.page;
import junit.framework.TestCase;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +35,7 @@ import java.io.OutputStream;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("rawtypes")
public class PageFileTest extends TestCase {
@@ -355,4 +360,111 @@ public class PageFileTest extends TestCase {
}
}, 12000000));
}
+
+ public void testBackgroundWillMarkUsedPagesAsFreeInTheBeginning() throws Exception {
+ final int numberOfPages = 100000;
+ final AtomicBoolean recoveryEnd = new AtomicBoolean();
+
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Recovered pageFile free list")) {
+ recoveryEnd.set(true);
+ }
+ }
+ };
+
+ org.apache.log4j.Logger log4jLogger =
+ org.apache.log4j.Logger.getLogger(PageFile.class);
+ log4jLogger.addAppender(appender);
+ log4jLogger.setLevel(Level.DEBUG);
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.setEnableRecoveryFile(false);
+ pf.load();
+
+ List<Long> pagesToFree = new LinkedList<>();
+
+ LOG.info("Creating Transactions");
+ for (int i = 0; i < numberOfPages; i++) {
+ Transaction tx = pf.tx();
+ Page page = tx.allocate();
+ String t = "page:" + i;
+ page.set(t);
+ tx.store(page, StringMarshaller.INSTANCE, false);
+ tx.commit();
+
+ if (i >= numberOfPages / 2) {
+ pagesToFree.add(page.getPageId());
+ }
+ }
+
+ pf.flush();
+
+ LOG.info("Number of free pages:" + pf.getFreePageCount());
+
+ //Simulate an unclean shutdown
+ final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
+ pf2.setEnableRecoveryFile(false);
+ pf2.load();
+
+ LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount());
+
+ Transaction tx = pf2.tx();
+
+ for (Long pageId : pagesToFree) {
+ tx.free(tx.load(pageId, StringMarshaller.INSTANCE));
+ tx.commit();
+ }
+
+ LOG.info("RecoveredPageFile: Number of free pages Before Reusing:" + pf2.getFreePageCount());
+ List<Transaction> transactions = new LinkedList<>();
+
+ int totalFreePages = numberOfPages / 2;
+ int totalPages = numberOfPages;
+
+ for (int i = 0; i < 20; i++) {
+ tx = pf2.tx();
+ Page page = tx.allocate();
+ String t = "page:" + i;
+ page.set(t);
+ tx.store(page, StringMarshaller.INSTANCE, false);
+ tx.commit();
+ transactions.add(tx);
+
+ // Free pages was already recovered
+ if (page.getPageId() < numberOfPages) {
+ totalFreePages--;
+ } else {
+ totalPages++;
+ }
+ }
+
+ LOG.info("RecoveredPageFile: Number of free pages After Reusing:" + pf2.getFreePageCount());
+
+ assertTrue("Recovery Finished", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ pf2.flush();
+ long freePages = pf2.getFreePageCount();
+ LOG.info("free page count: " + freePages);
+ return recoveryEnd.get();
+ }
+ }, 100000));
+
+ LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount());
+
+ for (Transaction txConcurrent: transactions) {
+ for (Page page : txConcurrent) {
+ assertFalse(pf2.isFreePage(page.pageId));
+ }
+ }
+
+ // Make sure we dont have leaking pages.
+ assertEquals(pf2.getFreePageCount(), totalFreePages);
+ assertEquals(pf2.getPageCount(), totalPages);
+
+ assertEquals("pages freed during recovery should be reused", numberOfPages, totalPages);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/85859fd8/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
index a25b4e7..7df8351 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java
@@ -139,6 +139,28 @@ public class SequenceSetTest {
set.remove(10);
assertEquals(3, set.size());
assertEquals(97, set.rangeSize());
+
+ SequenceSet toRemove = new SequenceSet();
+ toRemove.add(new Sequence(0, 100));
+
+ set.remove(toRemove);
+ assertEquals(0, set.size());
+ assertEquals(0, set.rangeSize());
+
+ }
+
+ @Test
+ public void testMerge() {
+ SequenceSet set = new SequenceSet();
+ set.add(new Sequence(0, 100));
+
+ SequenceSet set2 = new SequenceSet();
+ set.add(new Sequence(50, 150));
+
+ set.merge(set2);
+ assertEquals(151, set.rangeSize());
+ assertEquals(1, set.size());
+
}
@Test