You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by dp...@apache.org on 2012/04/25 13:38:27 UTC
svn commit: r1330214 - in /jackrabbit/oak/trunk/oak-mk/src:
main/java/org/apache/jackrabbit/mk/model/
main/java/org/apache/jackrabbit/mk/store/
test/java/org/apache/jackrabbit/mk/store/
Author: dpfister
Date: Wed Apr 25 11:38:27 2012
New Revision: 1330214
URL: http://svn.apache.org/viewvc?rev=1330214&view=rev
Log:
GC for revisions
- make GC concurrent, add test
Modified:
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java
jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java
jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java Wed Apr 25 11:38:27 2012
@@ -499,12 +499,12 @@ public class ChildNodeEntriesTree implem
//-------------------------------------------------------< implementation >
- protected void persistDirtyBuckets(RevisionStore store) throws Exception {
+ protected void persistDirtyBuckets(RevisionStore store, RevisionStore.PutToken token) throws Exception {
for (int i = 0; i < index.length; i++) {
if (index[i] instanceof Bucket) {
// dirty bucket
Bucket bucket = (Bucket) index[i];
- Id id = store.putCNEMap(bucket);
+ Id id = store.putCNEMap(token, bucket);
index[i] = new BucketInfo(id, bucket.getSize());
}
}
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java Wed Apr 25 11:38:27 2012
@@ -119,8 +119,9 @@ public class CommitBuilder {
}
}
+ RevisionStore.PutToken token = store.createPutToken();
Id rootNodeId =
- changeLog.isEmpty() ? baseCommit.getRootNodeId() : persistStagedNodes();
+ changeLog.isEmpty() ? baseCommit.getRootNodeId() : persistStagedNodes(token);
Id newRevId;
@@ -133,7 +134,7 @@ public class CommitBuilder {
StoredNode theirRoot = store.getRootNode(currentHead);
StoredNode ourRoot = store.getNode(rootNodeId);
- rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot);
+ rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot, token);
baseRevId = currentHead;
}
@@ -157,7 +158,7 @@ public class CommitBuilder {
newCommit.setChanges(diff.toString());
newCommit.setRootNodeId(rootNodeId);
newCommit.setBranchRootId(null);
- newRevId = store.putHeadCommit(newCommit);
+ newRevId = store.putHeadCommit(token, newCommit);
} finally {
store.unlockHead();
}
@@ -181,7 +182,7 @@ public class CommitBuilder {
} else {
newCommit.setBranchRootId(baseCommit.getBranchRootId());
}
- newRevId = store.putCommit(newCommit);
+ newRevId = store.putCommit(token, newCommit);
}
// reset instance
@@ -198,8 +199,9 @@ public class CommitBuilder {
throw new Exception("can only merge a private branch commit");
}
+ RevisionStore.PutToken token = store.createPutToken();
Id rootNodeId =
- changeLog.isEmpty() ? branchCommit.getRootNodeId() : persistStagedNodes();
+ changeLog.isEmpty() ? branchCommit.getRootNodeId() : persistStagedNodes(token);
Id newRevId;
@@ -211,7 +213,7 @@ public class CommitBuilder {
StoredNode theirRoot = store.getRootNode(currentHead);
StoredNode ourRoot = store.getNode(rootNodeId);
- rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot);
+ rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot, token);
if (store.getCommit(currentHead).getRootNodeId().equals(rootNodeId)) {
// the merge didn't cause any changes,
@@ -227,7 +229,7 @@ public class CommitBuilder {
newCommit.setChanges(diff);
newCommit.setRootNodeId(rootNodeId);
newCommit.setBranchRootId(null);
- newRevId = store.putHeadCommit(newCommit);
+ newRevId = store.putHeadCommit(token, newCommit);
} finally {
store.unlockHead();
}
@@ -304,7 +306,7 @@ public class CommitBuilder {
}
}
- Id /* new id of root node */ persistStagedNodes() throws Exception {
+ Id /* new id of root node */ persistStagedNodes(RevisionStore.PutToken token) throws Exception {
// sort paths in in depth-descending order
ArrayList<String> orderedPaths = new ArrayList<String>(staged.keySet());
Collections.sort(orderedPaths, new Comparator<String>() {
@@ -322,7 +324,7 @@ public class CommitBuilder {
Id rootNodeId = null;
for (String path : orderedPaths) {
// persist node
- Id id = store.putNode(staged.get(path));
+ Id id = store.putNode(token, staged.get(path));
if (PathUtils.denotesRoot(path)) {
rootNodeId = id;
} else {
@@ -345,7 +347,9 @@ public class CommitBuilder {
* @return id of merged root node
* @throws Exception
*/
- Id /* id of merged root node */ mergeTree(StoredNode baseRoot, StoredNode ourRoot, StoredNode theirRoot) throws Exception {
+ Id /* id of merged root node */ mergeTree(StoredNode baseRoot, StoredNode ourRoot, StoredNode theirRoot,
+ RevisionStore.PutToken token) throws Exception {
+
// as we're going to use the staging area for the merge process,
// we need to clear it first
staged.clear();
@@ -353,7 +357,7 @@ public class CommitBuilder {
// recursively merge 'our' changes with 'their' changes...
mergeNode(baseRoot, ourRoot, theirRoot, "/");
- return persistStagedNodes();
+ return persistStagedNodes(token);
}
void mergeNode(StoredNode baseNode, StoredNode ourNode, StoredNode theirNode, String path) throws Exception {
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java Wed Apr 25 11:38:27 2012
@@ -69,15 +69,15 @@ public class MutableNode extends Abstrac
//----------------------------------------------------------< PersistHook >
@Override
- public void prePersist(RevisionStore store) throws Exception {
+ public void prePersist(RevisionStore store, RevisionStore.PutToken token) throws Exception {
if (!childEntries.inlined()) {
// persist dirty buckets
- ((ChildNodeEntriesTree) childEntries).persistDirtyBuckets(store);
+ ((ChildNodeEntriesTree) childEntries).persistDirtyBuckets(store, token);
}
}
@Override
- public void postPersist(RevisionStore store) throws Exception {
+ public void postPersist(RevisionStore store, RevisionStore.PutToken token) throws Exception {
// there's nothing to do
}
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java Wed Apr 25 11:38:27 2012
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
+import java.util.WeakHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -47,8 +48,8 @@ import org.apache.jackrabbit.mk.util.Sim
* Default revision store implementation, passing calls to a {@code Persistence}
* and a {@code BlobStore}, respectively and providing caching.
*/
-public class DefaultRevisionStore extends AbstractRevisionStore
- implements Closeable {
+public class DefaultRevisionStore extends AbstractRevisionStore implements
+ Closeable {
public static final String CACHE_SIZE = "mk.cacheSize";
public static final int DEFAULT_CACHE_SIZE = 10000;
@@ -60,8 +61,8 @@ public class DefaultRevisionStore extend
private final Persistence pm;
protected final GCPersistence gcpm;
- /* avoid synthetic accessor */ int initialCacheSize;
- /* avoid synthetic accessor */ Map<Id, Object> cache;
+ /* avoid synthetic accessor */int initialCacheSize;
+ /* avoid synthetic accessor */Map<Id, Object> cache;
/**
* GC run state constants.
@@ -75,34 +76,46 @@ public class DefaultRevisionStore extend
* GC run state.
*/
private final AtomicInteger gcState = new AtomicInteger();
-
+
/**
* GC executor.
*/
private ScheduledExecutorService gcExecutor;
-
+
+ /**
+ * Put tokens (Key: token, Value: null).
+ */
+ private final Map<PutTokenImpl, Object> putTokens = Collections.synchronizedMap(new WeakHashMap<PutTokenImpl, Object>());
+
+ /**
+ * Mark lock for put tokens.
+ */
+ private final ReentrantReadWriteLock markLock = new ReentrantReadWriteLock();
+
public DefaultRevisionStore(Persistence pm) {
this.pm = pm;
this.gcpm = (pm instanceof GCPersistence) ? (GCPersistence) pm : null;
-
+
commitCounter = new AtomicLong();
}
-
+
public void initialize() throws Exception {
if (initialized) {
throw new IllegalStateException("already initialized");
}
initialCacheSize = determineInitialCacheSize();
- cache = Collections.synchronizedMap(SimpleLRUCache.<Id, Object>newInstance(initialCacheSize));
+ cache = Collections.synchronizedMap(SimpleLRUCache
+ .<Id, Object> newInstance(initialCacheSize));
// make sure we've got a HEAD commit
head = pm.readHead();
if (head == null || head.getBytes().length == 0) {
// assume virgin repository
- byte[] rawHead = Id.fromLong(commitCounter.incrementAndGet()).getBytes();
+ byte[] rawHead = Id.fromLong(commitCounter.incrementAndGet())
+ .getBytes();
head = new Id(rawHead);
-
+
Id rootNodeId = pm.writeNode(new MutableNode(this, "/"));
MutableCommit initialCommit = new MutableCommit();
initialCommit.setCommitTS(System.currentTimeMillis());
@@ -124,13 +137,13 @@ public class DefaultRevisionStore extend
}
}, 60, 60, TimeUnit.SECONDS);
}
-
+
initialized = true;
}
-
+
public void close() {
verifyInitialized();
-
+
if (gcExecutor != null) {
gcExecutor.shutdown();
}
@@ -153,41 +166,96 @@ public class DefaultRevisionStore extend
return (val != null) ? Integer.parseInt(val) : DEFAULT_CACHE_SIZE;
}
- //--------------------------------------------------------< RevisionStore >
+ // --------------------------------------------------------< RevisionStore >
+
+ /**
+ * Put token implementation.
+ */
+ static class PutTokenImpl extends PutToken {
+
+ private static int idCounter;
+ private int id;
+ private StoredNode lastModifiedNode;
+
+ public PutTokenImpl() {
+ this.id = ++idCounter;
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PutTokenImpl) {
+ return ((PutTokenImpl) obj).id == id;
+ }
+ return super.equals(obj);
+ }
+
+ public void updateLastModifed(StoredNode lastModifiedNode) {
+ this.lastModifiedNode = lastModifiedNode;
+ }
+
+ public StoredNode getLastModified() {
+ return lastModifiedNode;
+ }
+ }
+
+ public RevisionStore.PutToken createPutToken() {
+ return new PutTokenImpl();
+ }
- public Id putNode(MutableNode node) throws Exception {
+ public Id putNode(PutToken token, MutableNode node) throws Exception {
verifyInitialized();
PersistHook callback = null;
if (node instanceof PersistHook) {
callback = (PersistHook) node;
- callback.prePersist(this);
+ callback.prePersist(this, token);
}
- Id id = pm.writeNode(node);
-
- if (callback != null) {
- callback.postPersist(this);
- }
-
- cache.put(id, new StoredNode(id, node, this));
+ /*
+ * Make sure that a GC cycle can not sweep this newly persisted node
+ * before we have updated our token
+ */
+ markLock.readLock().lock();
- return id;
+ try {
+ Id id = pm.writeNode(node);
+
+ if (callback != null) {
+ callback.postPersist(this, token);
+ }
+
+ StoredNode snode = new StoredNode(id, node, this);
+ cache.put(id, snode);
+
+ PutTokenImpl pti = (PutTokenImpl) token;
+ pti.updateLastModifed(snode);
+ putTokens.put(pti, null);
+ return id;
+
+ } finally {
+ markLock.readLock().unlock();
+ }
}
- public Id putCNEMap(ChildNodeEntriesMap map) throws Exception {
+ public Id putCNEMap(PutToken token, ChildNodeEntriesMap map)
+ throws Exception {
verifyInitialized();
PersistHook callback = null;
if (map instanceof PersistHook) {
callback = (PersistHook) map;
- callback.prePersist(this);
+ callback.prePersist(this, token);
}
- Id id = pm.writeCNEMap(map);
+ Id id = pm.writeCNEMap(map);
- if (callback != null) {
- callback.postPersist(this);
+ if (callback != null) {
+ callback.postPersist(this, token);
}
cache.put(id, map);
@@ -199,29 +267,35 @@ public class DefaultRevisionStore extend
headLock.writeLock().lock();
}
- public Id putHeadCommit(MutableCommit commit) throws Exception {
+ public Id putHeadCommit(PutToken token, MutableCommit commit)
+ throws Exception {
verifyInitialized();
if (!headLock.writeLock().isHeldByCurrentThread()) {
- throw new IllegalStateException("putHeadCommit called without holding write lock.");
+ throw new IllegalStateException(
+ "putHeadCommit called without holding write lock.");
}
- Id id = writeCommit(commit);
+ Id id = writeCommit(token, commit);
setHeadCommitId(id);
+ putTokens.remove(token);
return id;
}
- public Id putCommit(MutableCommit commit) throws Exception {
+ public Id putCommit(PutToken token, MutableCommit commit) throws Exception {
verifyInitialized();
- return writeCommit(commit);
+ Id commitId = writeCommit(token, commit);
+ putTokens.remove(token);
+
+ return commitId;
}
public void unlockHead() {
headLock.writeLock().unlock();
}
- //-----------------------------------------------------< RevisionProvider >
+ // -----------------------------------------------------< RevisionProvider >
public StoredNode getNode(Id id) throws NotFoundException, Exception {
verifyInitialized();
@@ -239,7 +313,8 @@ public class DefaultRevisionStore extend
return node;
}
- public ChildNodeEntriesMap getCNEMap(Id id) throws NotFoundException, Exception {
+ public ChildNodeEntriesMap getCNEMap(Id id) throws NotFoundException,
+ Exception {
verifyInitialized();
ChildNodeEntriesMap map = (ChildNodeEntriesMap) cache.get(id);
@@ -268,7 +343,8 @@ public class DefaultRevisionStore extend
return commit;
}
- public StoredNode getRootNode(Id commitId) throws NotFoundException, Exception {
+ public StoredNode getRootNode(Id commitId) throws NotFoundException,
+ Exception {
return getNode(getCommit(commitId).getRootNodeId());
}
@@ -287,13 +363,14 @@ public class DefaultRevisionStore extend
}
}
- //-------------------------------------------------------< implementation >
+ // -------------------------------------------------------< implementation >
- private Id writeCommit(MutableCommit commit) throws Exception {
+ private Id writeCommit(RevisionStore.PutToken token, MutableCommit commit)
+ throws Exception {
PersistHook callback = null;
if (commit instanceof PersistHook) {
callback = (PersistHook) commit;
- callback.prePersist(this);
+ callback.prePersist(this, token);
}
Id id = commit.getId();
@@ -302,8 +379,8 @@ public class DefaultRevisionStore extend
}
pm.writeCommit(id, commit);
- if (callback != null) {
- callback.postPersist(this);
+ if (callback != null) {
+ callback.postPersist(this, token);
}
cache.put(id, new StoredCommit(id, commit));
return id;
@@ -321,10 +398,11 @@ public class DefaultRevisionStore extend
}
}
- //------------------------------------------------------------< overrides >
+ // ------------------------------------------------------------< overrides >
@Override
- public void compare(final NodeState before, final NodeState after, final NodeStateDiff diff) {
+ public void compare(final NodeState before, final NodeState after,
+ final NodeStateDiff diff) {
// OAK-46: Efficient diffing of large child node lists
Node beforeNode = ((StoredNodeAsState) before).unwrap();
@@ -337,8 +415,10 @@ public class DefaultRevisionStore extend
}
@Override
- public void propChanged(String propName, String oldValue, String newValue) {
- diff.propertyChanged(before.getProperty(propName), after.getProperty(propName));
+ public void propChanged(String propName, String oldValue,
+ String newValue) {
+ diff.propertyChanged(before.getProperty(propName),
+ after.getProperty(propName));
}
@Override
@@ -361,38 +441,55 @@ public class DefaultRevisionStore extend
@Override
public void childNodeChanged(ChildNode changed, Id newId) {
String name = changed.getName();
- diff.childNodeChanged(name, before.getChildNode(name), after.getChildNode(name));
+ diff.childNodeChanged(name, before.getChildNode(name),
+ after.getChildNode(name));
}
});
}
- //----------------------------------------------------------------------- GC
+ // -----------------------------------------------------------------------
+ // GC
/**
* Perform a garbage collection. If a garbage collection cycle is already
- * running, this method returns immediately.
+ * running, this method returns immediately.
*/
public void gc() {
if (gcpm == null || !gcState.compareAndSet(NOT_ACTIVE, STARTING)) {
// already running
return;
}
-
- gcpm.start();
-
- gcState.set(MARKING);
-
+
+ // Mark all nodes that belong to currently active puts
+ markLock.writeLock().lock();
+
try {
- doMark();
+ gcpm.start();
+ gcState.set(MARKING);
+
+ for (PutTokenImpl token : putTokens.keySet()) {
+ markNode(token.getLastModified());
+ }
+
} catch (Exception e) {
/* unable to perform GC */
gcState.set(NOT_ACTIVE);
e.printStackTrace();
return;
+ } finally {
+ markLock.writeLock().unlock();
}
+ try {
+ doMark();
+ } catch (Exception e) {
+ /* unable to perform GC */
+ gcState.set(NOT_ACTIVE);
+ e.printStackTrace();
+ return;
+ }
gcState.set(SWEEPING);
-
+
try {
gcpm.sweep();
cache.clear();
@@ -402,18 +499,19 @@ public class DefaultRevisionStore extend
gcState.set(NOT_ACTIVE);
}
}
-
+
/**
* Mark all commits and nodes in a garbage collection cycle. Can be
- * customized by subclasses. If this method throws an exception, the
- * cycle will be stopped without sweeping.
+ * customized by subclasses. If this method throws an exception, the cycle
+ * will be stopped without sweeping.
*
- * @throws Exception if an error occurs
+ * @throws Exception
+ * if an error occurs
*/
protected void doMark() throws Exception {
StoredCommit commit = getHeadCommit();
long tsLimit = commit.getCommitTS() - (60 * 60 * 1000);
-
+
for (;;) {
markCommit(commit);
Id id = commit.getParentId();
@@ -433,19 +531,26 @@ public class DefaultRevisionStore extend
}
}
- protected void markCommit(StoredCommit commit)
- throws Exception {
-
+ /**
+ * Mark a commit. This marks all nodes belonging to this commit as well.
+ *
+ * @param commit commit
+ * @throws Exception if an error occurs
+ */
+ protected void markCommit(StoredCommit commit) throws Exception {
if (!gcpm.markCommit(commit.getId())) {
return;
}
-
markNode(getNode(commit.getRootNodeId()));
}
-
- private void markNode(StoredNode node)
- throws Exception {
-
+
+ /**
+ * Mark a node. This marks all children as well.
+ *
+ * @param node node
+ * @throws Exception if an error occurs
+ */
+ private void markNode(StoredNode node) throws Exception {
if (!gcpm.markNode(node.getId())) {
return;
}
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java Wed Apr 25 11:38:27 2012
@@ -21,6 +21,6 @@ package org.apache.jackrabbit.mk.store;
*/
public interface PersistHook {
- void prePersist(RevisionStore store) throws Exception;
- void postPersist(RevisionStore store) throws Exception;
+ void prePersist(RevisionStore store, RevisionStore.PutToken token) throws Exception;
+ void postPersist(RevisionStore store, RevisionStore.PutToken token) throws Exception;
}
Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java Wed Apr 25 11:38:27 2012
@@ -26,8 +26,24 @@ import org.apache.jackrabbit.mk.model.Mu
*/
public interface RevisionStore extends RevisionProvider {
- Id /*id*/ putNode(MutableNode node) throws Exception;
- Id /*id*/ putCNEMap(ChildNodeEntriesMap map) throws Exception;
+ /**
+ * Token that must be created first before invoking any put operation.
+ */
+ public abstract class PutToken {
+
+ /* Prevent other implementations. */
+ PutToken() {}
+ }
+
+ /**
+ * Create a put token.
+ *
+ * @return put token
+ */
+ PutToken createPutToken();
+
+ Id /*id*/ putNode(PutToken token, MutableNode node) throws Exception;
+ Id /*id*/ putCNEMap(PutToken token, ChildNodeEntriesMap map) throws Exception;
/**
* Lock the head. Must be called prior to putting a new head commit.
@@ -41,12 +57,13 @@ public interface RevisionStore extends R
* Put a new head commit. Must be called while holding a
* lock on the head.
*
+ * @param token put token
* @param commit commit
* @return head commit id
* @throws Exception if an error occurs
* @see #lockHead()
*/
- Id /*id*/ putHeadCommit(MutableCommit commit) throws Exception;
+ Id /*id*/ putHeadCommit(PutToken token, MutableCommit commit) throws Exception;
/**
* Unlock the head.
@@ -62,9 +79,10 @@ public interface RevisionStore extends R
* does not affect the current head commit and therefore doesn't
* require a lock on the head.
*
+ * @param token put token
* @param commit commit
* @return new commit id
* @throws Exception if an error occurs
*/
- Id /*id*/ putCommit(MutableCommit commit) throws Exception;
+ Id /*id*/ putCommit(PutToken token, MutableCommit commit) throws Exception;
}
Modified: jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java Wed Apr 25 11:38:27 2012
@@ -18,6 +18,10 @@ package org.apache.jackrabbit.mk.store;
import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.jackrabbit.mk.api.MicroKernel;
import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
import org.apache.jackrabbit.mk.core.MicroKernelImpl;
@@ -25,6 +29,8 @@ import org.apache.jackrabbit.mk.core.Rep
import org.apache.jackrabbit.mk.json.fast.Jsop;
import org.apache.jackrabbit.mk.json.fast.JsopArray;
import org.apache.jackrabbit.mk.model.MutableCommit;
+import org.apache.jackrabbit.mk.model.StoredCommit;
+import org.apache.jackrabbit.mk.persistence.GCPersistence;
import org.apache.jackrabbit.mk.persistence.InMemPersistence;
import org.junit.After;
import org.junit.Before;
@@ -35,25 +41,32 @@ import org.junit.Test;
*/
public class DefaultRevisionStoreTest {
- private DefaultRevisionStore rs;
+ /* avoid synthetic accessor */ DefaultRevisionStore rs;
private MicroKernel mk;
@Before
public void setup() throws Exception {
- rs = new DefaultRevisionStore(new InMemPersistence()) {
+ rs = new DefaultRevisionStore(createPersistence()) {
@Override
protected void doMark() throws Exception {
+ StoredCommit commit = getHeadCommit();
+
// Keep head commit only
- markCommit(getHeadCommit());
+ markCommit(commit);
- MutableCommit headCommit = new MutableCommit(getHeadCommit());
+ MutableCommit headCommit = new MutableCommit(commit);
headCommit.setParentId(null);
gcpm.replaceCommit(headCommit.getId(), headCommit);
}
};
rs.initialize();
+
mk = new MicroKernelImpl(new Repository(rs, new MemoryBlobStore()));
}
+
+ protected GCPersistence createPersistence() throws Exception {
+ return new InMemPersistence();
+ }
@After
public void tearDown() throws Exception {
@@ -62,8 +75,13 @@ public class DefaultRevisionStoreTest {
}
}
+ /**
+ * Verify revision history works with garbage collection.
+ *
+ * @throws Exception if an error occurs
+ */
@Test
- public void testGC() {
+ public void testRevisionHistory() {
mk.commit("/", "+\"a\" : { \"c\":{}, \"d\":{} }", mk.getHeadRevision(), null);
mk.commit("/", "+\"b\" : {}", mk.getHeadRevision(), null);
mk.commit("/b", "+\"e\" : {}", mk.getHeadRevision(), null);
@@ -80,4 +98,35 @@ public class DefaultRevisionStoreTest {
String history = mk.getRevisionHistory(Long.MIN_VALUE, Integer.MIN_VALUE);
assertEquals(1, ((JsopArray) Jsop.parse(history)).size());
}
+
+ /**
+ * Verify garbage collection can run concurrently with commits.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Test
+ public void testConcurrentGC() throws Exception {
+ ScheduledExecutorService gcExecutor = Executors.newScheduledThreadPool(1);
+ gcExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ rs.gc();
+ }
+ }, 100, 20, TimeUnit.MILLISECONDS);
+
+ mk.commit("/", "+\"a\" : { \"b\" : { \"c\" : { \"d\" : {} } } }",
+ mk.getHeadRevision(), null);
+
+ try {
+ for (int i = 0; i < 20; i++) {
+ mk.commit("/a/b/c/d", "+\"e\" : {}", mk.getHeadRevision(), null);
+ Thread.sleep(10);
+ mk.commit("/a/b/c/d/e", "+\"f\" : {}", mk.getHeadRevision(), null);
+ Thread.sleep(30);
+ mk.commit("/a/b/c/d", "-\"e\"", mk.getHeadRevision(), null);
+ }
+ } finally {
+ gcExecutor.shutdown();
+ }
+ }
}