You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by dp...@apache.org on 2012/04/19 17:29:44 UTC
svn commit: r1327998 - in /jackrabbit/oak/trunk/oak-mk/src:
main/java/org/apache/jackrabbit/mk/persistence/
main/java/org/apache/jackrabbit/mk/store/
test/java/org/apache/jackrabbit/mk/persistence/
test/java/org/apache/jackrabbit/mk/store/
Author: dpfister
Date: Thu Apr 19 15:29:43 2012
New Revision: 1327998
URL: http://svn.apache.org/viewvc?rev=1327998&view=rev
Log:
GC for revisions
- remove copying GC, replaced by mark&sweep
Removed:
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/CopyingGC.java
jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/
Modified:
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/persistence/GCPersistence.java
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/persistence/GCPersistenceTest.java
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/persistence/GCPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/persistence/GCPersistence.java?rev=1327998&r1=1327997&r2=1327998&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/persistence/GCPersistence.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/persistence/GCPersistence.java Thu Apr 19 15:29:43 2012
@@ -21,8 +21,8 @@ import org.apache.jackrabbit.mk.model.Id
/**
* Advanced persistence implementation offering GC support.
* <p>
- * The persistence implementation must ensure that objects written after {@link #start()}
- * was invoked are not swept.
+ * The persistence implementation must ensure that objects written between {@link #start()}
+ * and {@link #sweep()} are not swept, in other words, they must be marked implicitely.
*/
public interface GCPersistence extends Persistence {
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java?rev=1327998&r1=1327997&r2=1327998&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java Thu Apr 19 15:29:43 2012
@@ -18,7 +18,12 @@ package org.apache.jackrabbit.mk.store;
import java.io.Closeable;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,6 +38,7 @@ import org.apache.jackrabbit.mk.model.No
import org.apache.jackrabbit.mk.model.NodeStateDiff;
import org.apache.jackrabbit.mk.model.StoredCommit;
import org.apache.jackrabbit.mk.model.StoredNode;
+import org.apache.jackrabbit.mk.persistence.GCPersistence;
import org.apache.jackrabbit.mk.persistence.Persistence;
import org.apache.jackrabbit.mk.util.IOUtils;
import org.apache.jackrabbit.mk.util.SimpleLRUCache;
@@ -52,11 +58,33 @@ public class DefaultRevisionStore extend
private AtomicLong commitCounter;
private final ReentrantReadWriteLock headLock = new ReentrantReadWriteLock();
private final Persistence pm;
+ private final GCPersistence gcpm;
+ private int initialCacheSize;
private Map<Id, Object> cache;
+ /**
+ * GC run state constants.
+ */
+ private static final int NOT_ACTIVE = 0;
+ private static final int STARTING = 1;
+ private static final int MARKING = 2;
+ private static final int SWEEPING = 3;
+
+ /**
+ * GC run state.
+ */
+ private final AtomicInteger gcState = new AtomicInteger();
+
+ /**
+ * GC executor.
+ */
+ private ScheduledExecutorService gcExecutor;
+
public DefaultRevisionStore(Persistence pm) {
this.pm = pm;
+ this.gcpm = (pm instanceof GCPersistence) ? (GCPersistence) pm : null;
+
commitCounter = new AtomicLong();
}
@@ -65,7 +93,8 @@ public class DefaultRevisionStore extend
throw new IllegalStateException("already initialized");
}
- cache = Collections.synchronizedMap(SimpleLRUCache.<Id, Object>newInstance(determineInitialCacheSize()));
+ initialCacheSize = determineInitialCacheSize();
+ cache = Collections.synchronizedMap(SimpleLRUCache.<Id, Object>newInstance(initialCacheSize));
// make sure we've got a HEAD commit
head = pm.readHead();
@@ -84,11 +113,25 @@ public class DefaultRevisionStore extend
commitCounter.set(Long.parseLong(head.toString(), 16));
}
+ if (gcpm != null) {
+ gcExecutor = Executors.newScheduledThreadPool(1);
+ gcExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ doGC();
+ }
+ }, 60, 60, TimeUnit.SECONDS);
+ }
+
initialized = true;
}
-
+
public void close() {
verifyInitialized();
+
+ if (gcExecutor != null) {
+ gcExecutor.shutdown();
+ }
cache.clear();
@@ -103,7 +146,7 @@ public class DefaultRevisionStore extend
}
}
- protected int determineInitialCacheSize() {
+ protected static int determineInitialCacheSize() {
String val = System.getProperty(CACHE_SIZE);
return (val != null) ? Integer.parseInt(val) : DEFAULT_CACHE_SIZE;
}
@@ -336,4 +379,76 @@ public class DefaultRevisionStore extend
}
});
}
+
+ //----------------------------------------------------------------------- GC
+
+ void doGC() {
+ if (cache.size() < initialCacheSize) {
+ // GC unneeded
+ return;
+ }
+ if (!gcState.compareAndSet(NOT_ACTIVE, STARTING)) {
+ // already running
+ return;
+ }
+
+ gcpm.start();
+
+ gcState.set(MARKING);
+
+ try {
+ StoredCommit commit = getHeadCommit();
+ long tsLimit = commit.getCommitTS() - (60 * 60 * 1000);
+
+ for (;;) {
+ markCommit(commit);
+ Id id = commit.getParentId();
+ if (id == null) {
+ break;
+ }
+ commit = getCommit(id);
+ if (commit.getCommitTS() < tsLimit) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ /* unable to perform GC */
+ gcState.set(NOT_ACTIVE);
+ e.printStackTrace();
+ }
+
+ gcState.set(SWEEPING);
+
+ try {
+ gcpm.sweep();
+ cache.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ gcState.set(NOT_ACTIVE);
+ }
+ }
+
+ private void markCommit(StoredCommit commit)
+ throws Exception {
+
+ if (gcpm.markCommit(commit.getId())) {
+ return;
+ }
+
+ markNode(getNode(commit.getRootNodeId()));
+ }
+
+ private void markNode(StoredNode node)
+ throws Exception {
+
+ if (gcpm.markNode(node.getId())) {
+ return;
+ }
+ Iterator<ChildNode> iter = node.getChildNodeEntries(0, -1);
+ while (iter.hasNext()) {
+ ChildNode c = iter.next();
+ markNode(getNode(c.getId()));
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/persistence/GCPersistenceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/persistence/GCPersistenceTest.java?rev=1327998&r1=1327997&r2=1327998&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/persistence/GCPersistenceTest.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/persistence/GCPersistenceTest.java Thu Apr 19 15:29:43 2012
@@ -96,42 +96,28 @@ public class GCPersistenceTest {
MutableNode node = new MutableNode(null, "/");
Id id = pm.writeNode(node);
- pm.start();
- pm.markNode(id);
- pm.sweep();
-
- pm.readNode(new StoredNode(id, null));
- }
-
- @Test
- public void testOldNodeIsUnmarked() throws Exception {
- MutableNode node = new MutableNode(null, "/");
- Id id = pm.writeNode(node);
-
// small delay needed
Thread.sleep(100);
pm.start();
+
+ // old node must not be marked
assertTrue(pm.markNode(id));
+
+ pm.sweep();
+ pm.readNode(new StoredNode(id, null));
}
@Test
- public void testNewNodeIsMarked() throws Exception {
+ public void testNewNodeIsNotSwept() throws Exception {
pm.start();
MutableNode node = new MutableNode(null, "/");
Id id = pm.writeNode(node);
+ // new node must already be marked
assertFalse(pm.markNode(id));
- }
- @Test
- public void testNewNodeIsNotSwept() throws Exception {
- pm.start();
-
- MutableNode node = new MutableNode(null, "/");
- Id id = pm.writeNode(node);
-
pm.sweep();
pm.readNode(new StoredNode(id, null));
}