You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2018/11/06 13:26:54 UTC
activemq git commit: AMQ-7082 - Make sure that the recovery will only
mark pages as free if they were created in a previous execution
Repository: activemq
Updated Branches:
refs/heads/activemq-5.15.x c988a145a -> 45d7676bd
AMQ-7082 - Make sure that the recovery will only mark pages as free if they were created in a previous execution
(cherry picked from commit 0d34338919edee863bd71693ee30999d9d9d6ce9)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/45d7676b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/45d7676b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/45d7676b
Branch: refs/heads/activemq-5.15.x
Commit: 45d7676bd95a7e4594a80b025994ea3242c94586
Parents: c988a14
Author: Alan Protasio <al...@gmail.com>
Authored: Tue Nov 6 01:13:18 2018 -0800
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Nov 6 08:26:21 2018 -0500
----------------------------------------------------------------------
.../store/kahadb/disk/page/PageFile.java | 16 ++-
.../store/kahadb/disk/page/PageFileTest.java | 108 +++++++++++++++++--
2 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/45d7676b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index fe79a2d..7456dfa 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -425,19 +425,19 @@ public class PageFile {
getFreeFile().delete();
startWriter();
if (needsFreePageRecovery) {
- asyncFreePageRecovery();
+ asyncFreePageRecovery(nextFreePageId.get());
}
} else {
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}
}
- private void asyncFreePageRecovery() {
+ private void asyncFreePageRecovery(final long lastRecoveryPage) {
Thread thread = new Thread("KahaDB Index Free Page Recovery") {
@Override
public void run() {
try {
- recoverFreePages();
+ recoverFreePages(lastRecoveryPage);
} catch (Throwable e) {
if (loaded.get()) {
LOG.warn("Error recovering index free page list", e);
@@ -450,7 +450,7 @@ public class PageFile {
thread.start();
}
- private void recoverFreePages() throws Exception {
+ private void recoverFreePages(final long lastRecoveryPage) throws Exception {
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
SequenceSet newFreePages = new SequenceSet();
// need new pageFile instance to get unshared readFile
@@ -459,6 +459,11 @@ public class PageFile {
try {
for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
Page page = i.next();
+
+ if (page.getPageId() >= lastRecoveryPage) {
+ break;
+ }
+
if (page.getType() == Page.PAGE_FREE_TYPE) {
newFreePages.add(page.getPageId());
}
@@ -817,6 +822,9 @@ public class PageFile {
return toOffset(nextFreePageId.get());
}
+ public boolean isFreePage(long pageId) {
+ return freeList.contains(pageId);
+ }
/**
* @return the number of pages allocated in the PageFile
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/45d7676b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
index 72e8d7b..b8dcc92 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.store.kahadb.disk.page;
+import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -23,13 +29,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
-
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-
-import junit.framework.TestCase;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.LinkedList;
+import java.util.List;
@SuppressWarnings("rawtypes")
public class PageFileTest extends TestCase {
@@ -240,4 +241,97 @@ public class PageFileTest extends TestCase {
pf2.unload();
}
}
+ }
+
+ public void testBackgroundRecoveryIsThreadSafe() throws Exception {
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.setEnableRecoveryFile(false);
+ pf.load();
+
+ Transaction tx = pf.tx();
+ tx.allocate(100000);
+ tx.commit();
+ LOG.info("Number of free pages:" + pf.getFreePageCount());
+ pf.flush();
+
+ //Simulate an unclean shutdown
+ final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
+ pf2.setEnableRecoveryFile(false);
+ pf2.load();
+
+ Transaction tx2 = pf2.tx();
+ tx2.allocate(100000);
+ tx2.commit();
+ LOG.info("Number of free pages:" + pf2.getFreePageCount());
+
+ List<Transaction> transactions = new LinkedList<>();
+
+ Thread.sleep(500);
+ LOG.info("Creating Transactions");
+ for (int i = 0; i < 20; i++) {
+ Transaction txConcurrent = pf2.tx();
+ Page page = txConcurrent.allocate();
+ String t = "page:" + i;
+ page.set(t);
+ txConcurrent.store(page, StringMarshaller.INSTANCE, false);
+ txConcurrent.commit();
+ transactions.add(txConcurrent);
+ Thread.sleep(50);
+ }
+
+ assertTrue("We have 199980 free pages", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ pf2.flush();
+ long freePages = pf2.getFreePageCount();
+ LOG.info("free page count: " + freePages);
+ return freePages == 199980;
+ }
+ }, 12000000));
+
+ for (Transaction txConcurrent2: transactions) {
+ for (Page page : txConcurrent2) {
+ assertFalse(pf2.isFreePage(page.pageId));
+ }
+ }
+
+ }
+
+ public void testBackgroundWillNotMarkEaslyPagesAsFree() throws Exception {
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.setEnableRecoveryFile(false);
+ pf.load();
+
+ Transaction tx = pf.tx();
+ tx.allocate(100000);
+ tx.commit();
+ LOG.info("Number of free pages:" + pf.getFreePageCount());
+ pf.flush();
+
+ //Simulate an unclean shutdown
+ final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
+ pf2.setEnableRecoveryFile(false);
+ pf2.load();
+
+ Transaction tx2 = pf2.tx();
+ tx2.allocate(200);
+ tx2.commit();
+ LOG.info("Number of free pages:" + pf2.getFreePageCount());
+
+ Transaction tx3 = pf2.tx();
+ tx3.allocate(100);
+
+ assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ pf2.flush();
+ long freePages = pf2.getFreePageCount();
+ LOG.info("free page count: " + freePages);
+ return freePages == 100100;
+ }
+ }, 12000000));
}