You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by dp...@apache.org on 2012/04/26 15:36:10 UTC
svn commit: r1330847 - in /jackrabbit/oak/trunk/oak-mk/src:
main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
Author: dpfister
Date: Thu Apr 26 13:36:10 2012
New Revision: 1330847
URL: http://svn.apache.org/viewvc?rev=1330847&view=rev
Log:
GC for revisions
- support branch & merge
Modified:
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java?rev=1330847&r1=1330846&r2=1330847&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java Thu Apr 26 13:36:10 2012
@@ -20,6 +20,8 @@ import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
import java.util.WeakHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -82,17 +84,22 @@ public class DefaultRevisionStore extend
* GC executor.
*/
private ScheduledExecutorService gcExecutor;
-
+
/**
- * Put tokens (Key: token, Value: null).
+ * Active put tokens (Key: token, Value: null).
*/
private final Map<PutTokenImpl, Object> putTokens = Collections.synchronizedMap(new WeakHashMap<PutTokenImpl, Object>());
/**
- * Mark lock for put tokens.
+ * Read-write lock for put tokens.
*/
- private final ReentrantReadWriteLock markLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock tokensLock = new ReentrantReadWriteLock();
+ /**
+ * Active branches (Key: branch root id, Value: branch head).
+ */
+ private final Map<Id,Id> branches = Collections.synchronizedMap(new TreeMap<Id, Id>());
+
public DefaultRevisionStore(Persistence pm) {
this.pm = pm;
this.gcpm = (pm instanceof GCPersistence) ? (GCPersistence) pm : null;
@@ -227,7 +234,7 @@ public class DefaultRevisionStore extend
* Make sure that a GC cycle can not sweep this newly persisted node
* before we have updated our token
*/
- markLock.readLock().lock();
+ tokensLock.readLock().lock();
try {
Id id = pm.writeNode(node);
@@ -245,7 +252,7 @@ public class DefaultRevisionStore extend
return id;
} finally {
- markLock.readLock().unlock();
+ tokensLock.readLock().unlock();
}
}
@@ -284,7 +291,11 @@ public class DefaultRevisionStore extend
Id id = writeCommit(token, commit);
setHeadCommitId(id);
+
putTokens.remove(token);
+ if (branchRootId != null) {
+ branches.remove(branchRootId);
+ }
return id;
}
@@ -294,7 +305,12 @@ public class DefaultRevisionStore extend
Id commitId = writeCommit(token, commit);
putTokens.remove(token);
-
+
+ Id branchRootId = commit.getBranchRootId();
+ if (branchRootId != null) {
+ branches.put(branchRootId, commitId);
+ }
+
return commitId;
}
@@ -468,15 +484,27 @@ public class DefaultRevisionStore extend
}
try {
- markUncommittedPuts();
- markBranches();
- markCommits();
+ markUncommittedNodes();
+ Id firstBranchRootId = markBranches();
+ Id firstCommitId = markCommits();
+
+ if (firstBranchRootId != null && firstBranchRootId.compareTo(firstCommitId) < 0) {
+ firstCommitId = firstBranchRootId;
+ }
+ StoredCommit commit = getCommit(firstCommitId);
+ if (commit.getParentId() != null) {
+ MutableCommit firstCommit = new MutableCommit(commit);
+ firstCommit.setParentId(null);
+ gcpm.replaceCommit(firstCommit.getId(), firstCommit);
+ }
+
} catch (Exception e) {
/* unable to perform GC */
gcState.set(NOT_ACTIVE);
e.printStackTrace();
return;
}
+
gcState.set(SWEEPING);
try {
@@ -490,13 +518,13 @@ public class DefaultRevisionStore extend
}
/**
- * Mark uncommitted puts.
+ * Mark nodes that have already been put but not committed yet.
*
* @throws Exception
* if an error occurs
*/
- private void markUncommittedPuts() throws Exception {
- markLock.writeLock().lock();
+ private void markUncommittedNodes() throws Exception {
+ tokensLock.writeLock().lock();
try {
gcpm.start();
@@ -506,18 +534,43 @@ public class DefaultRevisionStore extend
markNode(token.getLastModified());
}
} finally {
- markLock.writeLock().unlock();
+ tokensLock.writeLock().unlock();
}
}
/**
* Mark branches.
*
+ * @return first branch root id that needs to be preserved, or {@code null}
* @throws Exception
* if an error occurs
*/
- private void markBranches() throws Exception {
- // TODO
+ private Id markBranches() throws Exception {
+ /* Mark all branch commits */
+ for (Entry<Id, Id> entry : branches.entrySet()) {
+ Id branchRootId = entry.getKey();
+ Id branchHeadId = entry.getValue();
+ while (!branchHeadId.equals(branchRootId)) {
+ StoredCommit commit = getCommit(branchHeadId);
+ markCommit(commit);
+ branchHeadId = commit.getParentId();
+ }
+ }
+ /* Mark all master commits till the first branch root id */
+ if (!branches.isEmpty()) {
+ Id firstBranchRootId = branches.keySet().iterator().next();
+ StoredCommit commit = getHeadCommit();
+
+ for (;;) {
+ markCommit(commit);
+ if (commit.getId().equals(firstBranchRootId)) {
+ break;
+ }
+ commit = getCommit(commit.getParentId());
+ }
+ return firstBranchRootId;
+ }
+ return null;
}
/**
@@ -525,10 +578,11 @@ public class DefaultRevisionStore extend
* customized by subclasses. If this method throws an exception, the cycle
* will be stopped without sweeping.
*
+ * @return first commit id that will be preserved
* @throws Exception
* if an error occurs
*/
- protected void markCommits() throws Exception {
+ protected Id markCommits() throws Exception {
StoredCommit commit = getHeadCommit();
long tsLimit = commit.getCommitTS() - (60 * 60 * 1000);
@@ -544,11 +598,7 @@ public class DefaultRevisionStore extend
}
commit = parentCommit;
}
- if (commit.getParentId() != null) {
- MutableCommit firstCommit = new MutableCommit(commit);
- firstCommit.setParentId(null);
- gcpm.replaceCommit(firstCommit.getId(), firstCommit);
- }
+ return commit.getId();
}
/**
Modified: jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java?rev=1330847&r1=1330846&r2=1330847&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java Thu Apr 26 13:36:10 2012
@@ -28,13 +28,12 @@ import org.apache.jackrabbit.mk.core.Mic
import org.apache.jackrabbit.mk.core.Repository;
import org.apache.jackrabbit.mk.json.fast.Jsop;
import org.apache.jackrabbit.mk.json.fast.JsopArray;
-import org.apache.jackrabbit.mk.model.MutableCommit;
+import org.apache.jackrabbit.mk.model.Id;
import org.apache.jackrabbit.mk.model.StoredCommit;
import org.apache.jackrabbit.mk.persistence.GCPersistence;
import org.apache.jackrabbit.mk.persistence.InMemPersistence;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
/**
@@ -49,15 +48,11 @@ public class DefaultRevisionStoreTest {
public void setup() throws Exception {
rs = new DefaultRevisionStore(createPersistence()) {
@Override
- protected void markCommits() throws Exception {
- StoredCommit commit = getHeadCommit();
-
+ protected Id markCommits() throws Exception {
// Keep head commit only
+ StoredCommit commit = getHeadCommit();
markCommit(commit);
-
- MutableCommit headCommit = new MutableCommit(commit);
- headCommit.setParentId(null);
- gcpm.replaceCommit(headCommit.getId(), headCommit);
+ return commit.getId();
}
};
rs.initialize();
@@ -105,7 +100,6 @@ public class DefaultRevisionStoreTest {
*
* @throws Exception if an error occurs
*/
- @Ignore
@Test
public void testBranchMerge() throws Exception {
mk.commit("/", "+\"a\" : { \"b\":{}, \"c\":{} }", mk.getHeadRevision(), null);
@@ -118,6 +112,11 @@ public class DefaultRevisionStoreTest {
branchRevisionId = mk.commit("/a", "+\"f\" : {}", branchRevisionId, null);
mk.merge(branchRevisionId, null);
+
+ rs.gc();
+
+ String history = mk.getRevisionHistory(Long.MIN_VALUE, Integer.MIN_VALUE);
+ assertEquals(1, ((JsopArray) Jsop.parse(history)).size());
}
/**
@@ -150,4 +149,36 @@ public class DefaultRevisionStoreTest {
gcExecutor.shutdown();
}
}
+
+ /**
+ * Verify garbage collection can run concurrently with commits.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testConcurrentMergeGC() throws Exception {
+ ScheduledExecutorService gcExecutor = Executors.newScheduledThreadPool(1);
+ gcExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ rs.gc();
+ }
+ }, 100, 20, TimeUnit.MILLISECONDS);
+
+ mk.commit("/", "+\"a\" : { \"b\" : { \"c\" : { \"d\" : {} } } }",
+ mk.getHeadRevision(), null);
+
+ try {
+ for (int i = 0; i < 20; i++) {
+ String branchId = mk.branch(mk.getHeadRevision());
+ branchId = mk.commit("/a/b/c/d", "+\"e\" : {}", branchId, null);
+ Thread.sleep(10);
+ branchId = mk.commit("/a/b/c/d", "-\"e\"", branchId, null);
+ Thread.sleep(30);
+ mk.merge(branchId, null);
+ }
+ } finally {
+ gcExecutor.shutdown();
+ }
+ }
}