You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by dp...@apache.org on 2012/04/25 13:38:27 UTC

svn commit: r1330214 - in /jackrabbit/oak/trunk/oak-mk/src: main/java/org/apache/jackrabbit/mk/model/ main/java/org/apache/jackrabbit/mk/store/ test/java/org/apache/jackrabbit/mk/store/

Author: dpfister
Date: Wed Apr 25 11:38:27 2012
New Revision: 1330214

URL: http://svn.apache.org/viewvc?rev=1330214&view=rev
Log:
GC for revisions
- make GC concurrent, add test

Modified:
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java
    jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/ChildNodeEntriesTree.java Wed Apr 25 11:38:27 2012
@@ -499,12 +499,12 @@ public class ChildNodeEntriesTree implem
 
     //-------------------------------------------------------< implementation >
     
-    protected void persistDirtyBuckets(RevisionStore store) throws Exception {
+    protected void persistDirtyBuckets(RevisionStore store, RevisionStore.PutToken token) throws Exception {
         for (int i = 0; i < index.length; i++) {
             if (index[i] instanceof Bucket) {
                 // dirty bucket
                 Bucket bucket = (Bucket) index[i];
-                Id id = store.putCNEMap(bucket);
+                Id id = store.putCNEMap(token, bucket);
                 index[i] = new BucketInfo(id, bucket.getSize());
             }
         }

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/CommitBuilder.java Wed Apr 25 11:38:27 2012
@@ -119,8 +119,9 @@ public class CommitBuilder {
             }
         }
 
+        RevisionStore.PutToken token = store.createPutToken();
         Id rootNodeId =
-                changeLog.isEmpty() ? baseCommit.getRootNodeId() : persistStagedNodes();
+                changeLog.isEmpty() ? baseCommit.getRootNodeId() : persistStagedNodes(token);
 
         Id newRevId;
 
@@ -133,7 +134,7 @@ public class CommitBuilder {
                     StoredNode theirRoot = store.getRootNode(currentHead);
                     StoredNode ourRoot = store.getNode(rootNodeId);
 
-                    rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot);
+                    rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot, token);
 
                     baseRevId = currentHead;
                 }
@@ -157,7 +158,7 @@ public class CommitBuilder {
                 newCommit.setChanges(diff.toString());
                 newCommit.setRootNodeId(rootNodeId);
                 newCommit.setBranchRootId(null);
-                newRevId = store.putHeadCommit(newCommit);
+                newRevId = store.putHeadCommit(token, newCommit);
             } finally {
                 store.unlockHead();
             }
@@ -181,7 +182,7 @@ public class CommitBuilder {
             } else {
                 newCommit.setBranchRootId(baseCommit.getBranchRootId());
             }
-            newRevId = store.putCommit(newCommit);
+            newRevId = store.putCommit(token, newCommit);
         }
 
         // reset instance
@@ -198,8 +199,9 @@ public class CommitBuilder {
             throw new Exception("can only merge a private branch commit");
         }
 
+        RevisionStore.PutToken token = store.createPutToken();
         Id rootNodeId =
-                changeLog.isEmpty() ? branchCommit.getRootNodeId() : persistStagedNodes();
+                changeLog.isEmpty() ? branchCommit.getRootNodeId() : persistStagedNodes(token);
 
         Id newRevId;
 
@@ -211,7 +213,7 @@ public class CommitBuilder {
             StoredNode theirRoot = store.getRootNode(currentHead);
             StoredNode ourRoot = store.getNode(rootNodeId);
 
-            rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot);
+            rootNodeId = mergeTree(baseRoot, ourRoot, theirRoot, token);
 
             if (store.getCommit(currentHead).getRootNodeId().equals(rootNodeId)) {
                 // the merge didn't cause any changes,
@@ -227,7 +229,7 @@ public class CommitBuilder {
             newCommit.setChanges(diff);
             newCommit.setRootNodeId(rootNodeId);
             newCommit.setBranchRootId(null);
-            newRevId = store.putHeadCommit(newCommit);
+            newRevId = store.putHeadCommit(token, newCommit);
         } finally {
             store.unlockHead();
         }
@@ -304,7 +306,7 @@ public class CommitBuilder {
         }
     }
 
-    Id /* new id of root node */ persistStagedNodes() throws Exception {
+    Id /* new id of root node */ persistStagedNodes(RevisionStore.PutToken token) throws Exception {
         // sort paths in in depth-descending order
         ArrayList<String> orderedPaths = new ArrayList<String>(staged.keySet());
         Collections.sort(orderedPaths, new Comparator<String>() {
@@ -322,7 +324,7 @@ public class CommitBuilder {
         Id rootNodeId = null;
         for (String path : orderedPaths) {
             // persist node
-            Id id = store.putNode(staged.get(path));
+            Id id = store.putNode(token, staged.get(path));
             if (PathUtils.denotesRoot(path)) {
                 rootNodeId = id;
             } else {
@@ -345,7 +347,9 @@ public class CommitBuilder {
      * @return id of merged root node
      * @throws Exception
      */
-    Id /* id of merged root node */ mergeTree(StoredNode baseRoot, StoredNode ourRoot, StoredNode theirRoot) throws Exception {
+    Id /* id of merged root node */ mergeTree(StoredNode baseRoot, StoredNode ourRoot, StoredNode theirRoot,
+            RevisionStore.PutToken token) throws Exception {
+        
         // as we're going to use the staging area for the merge process,
         // we need to clear it first
         staged.clear();
@@ -353,7 +357,7 @@ public class CommitBuilder {
         // recursively merge 'our' changes with 'their' changes...
         mergeNode(baseRoot, ourRoot, theirRoot, "/");
 
-        return persistStagedNodes();
+        return persistStagedNodes(token);
     }
 
     void mergeNode(StoredNode baseNode, StoredNode ourNode, StoredNode theirNode, String path) throws Exception {

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/model/MutableNode.java Wed Apr 25 11:38:27 2012
@@ -69,15 +69,15 @@ public class MutableNode extends Abstrac
     //----------------------------------------------------------< PersistHook >
 
     @Override
-    public void prePersist(RevisionStore store) throws Exception {
+    public void prePersist(RevisionStore store, RevisionStore.PutToken token) throws Exception {
         if (!childEntries.inlined()) {
             // persist dirty buckets
-            ((ChildNodeEntriesTree) childEntries).persistDirtyBuckets(store);
+            ((ChildNodeEntriesTree) childEntries).persistDirtyBuckets(store, token);
         }
     }
 
     @Override
-    public void postPersist(RevisionStore store) throws Exception {
+    public void postPersist(RevisionStore store, RevisionStore.PutToken token) throws Exception {
         // there's nothing to do
     }
 

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java Wed Apr 25 11:38:27 2012
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.WeakHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -47,8 +48,8 @@ import org.apache.jackrabbit.mk.util.Sim
  * Default revision store implementation, passing calls to a {@code Persistence}
  * and a {@code BlobStore}, respectively and providing caching.
  */
-public class DefaultRevisionStore extends AbstractRevisionStore
-        implements Closeable {
+public class DefaultRevisionStore extends AbstractRevisionStore implements
+        Closeable {
 
     public static final String CACHE_SIZE = "mk.cacheSize";
     public static final int DEFAULT_CACHE_SIZE = 10000;
@@ -60,8 +61,8 @@ public class DefaultRevisionStore extend
     private final Persistence pm;
     protected final GCPersistence gcpm;
 
-    /* avoid synthetic accessor */ int initialCacheSize;
-    /* avoid synthetic accessor */ Map<Id, Object> cache;
+    /* avoid synthetic accessor */int initialCacheSize;
+    /* avoid synthetic accessor */Map<Id, Object> cache;
 
     /**
      * GC run state constants.
@@ -75,34 +76,46 @@ public class DefaultRevisionStore extend
      * GC run state.
      */
     private final AtomicInteger gcState = new AtomicInteger();
-    
+
     /**
      * GC executor.
      */
     private ScheduledExecutorService gcExecutor;
-    
+
+    /**
+     * Put tokens (Key: token, Value: null).
+     */
+    private final Map<PutTokenImpl, Object> putTokens = Collections.synchronizedMap(new WeakHashMap<PutTokenImpl, Object>());
+
+    /**
+     * Mark lock for put tokens.
+     */
+    private final ReentrantReadWriteLock markLock = new ReentrantReadWriteLock();
+
     public DefaultRevisionStore(Persistence pm) {
         this.pm = pm;
         this.gcpm = (pm instanceof GCPersistence) ? (GCPersistence) pm : null;
-        
+
         commitCounter = new AtomicLong();
     }
-    
+
     public void initialize() throws Exception {
         if (initialized) {
             throw new IllegalStateException("already initialized");
         }
 
         initialCacheSize = determineInitialCacheSize();
-        cache = Collections.synchronizedMap(SimpleLRUCache.<Id, Object>newInstance(initialCacheSize));
+        cache = Collections.synchronizedMap(SimpleLRUCache
+                .<Id, Object> newInstance(initialCacheSize));
 
         // make sure we've got a HEAD commit
         head = pm.readHead();
         if (head == null || head.getBytes().length == 0) {
             // assume virgin repository
-            byte[] rawHead = Id.fromLong(commitCounter.incrementAndGet()).getBytes();
+            byte[] rawHead = Id.fromLong(commitCounter.incrementAndGet())
+                    .getBytes();
             head = new Id(rawHead);
-            
+
             Id rootNodeId = pm.writeNode(new MutableNode(this, "/"));
             MutableCommit initialCommit = new MutableCommit();
             initialCommit.setCommitTS(System.currentTimeMillis());
@@ -124,13 +137,13 @@ public class DefaultRevisionStore extend
                 }
             }, 60, 60, TimeUnit.SECONDS);
         }
-        
+
         initialized = true;
     }
-    
+
     public void close() {
         verifyInitialized();
-        
+
         if (gcExecutor != null) {
             gcExecutor.shutdown();
         }
@@ -153,41 +166,96 @@ public class DefaultRevisionStore extend
         return (val != null) ? Integer.parseInt(val) : DEFAULT_CACHE_SIZE;
     }
 
-    //--------------------------------------------------------< RevisionStore >
+    // --------------------------------------------------------< RevisionStore >
+
+    /**
+     * Put token implementation.
+     */
+    static class PutTokenImpl extends PutToken {
+
+        private static int idCounter;
+        private int id;
+        private StoredNode lastModifiedNode;
+
+        public PutTokenImpl() {
+            this.id = ++idCounter;
+        }
+
+        @Override
+        public int hashCode() {
+            return id;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof PutTokenImpl) {
+                return ((PutTokenImpl) obj).id == id;
+            }
+            return super.equals(obj);
+        }
+
+        public void updateLastModifed(StoredNode lastModifiedNode) {
+            this.lastModifiedNode = lastModifiedNode;
+        }
+
+        public StoredNode getLastModified() {
+            return lastModifiedNode;
+        }
+    }
+
+    public RevisionStore.PutToken createPutToken() {
+        return new PutTokenImpl();
+    }
 
-    public Id putNode(MutableNode node) throws Exception {
+    public Id putNode(PutToken token, MutableNode node) throws Exception {
         verifyInitialized();
 
         PersistHook callback = null;
         if (node instanceof PersistHook) {
             callback = (PersistHook) node;
-            callback.prePersist(this);
+            callback.prePersist(this, token);
         }
 
-        Id id = pm.writeNode(node);
-        
-        if (callback != null)  {
-            callback.postPersist(this);
-        }
-        
-        cache.put(id, new StoredNode(id, node, this));
+        /*
+         * Make sure that a GC cycle can not sweep this newly persisted node
+         * before we have updated our token
+         */
+        markLock.readLock().lock();
 
-        return id;
+        try {
+            Id id = pm.writeNode(node);
+
+            if (callback != null) {
+                callback.postPersist(this, token);
+            }
+
+            StoredNode snode = new StoredNode(id, node, this);
+            cache.put(id, snode);
+
+            PutTokenImpl pti = (PutTokenImpl) token;
+            pti.updateLastModifed(snode);
+            putTokens.put(pti, null);
+            return id;
+
+        } finally {
+            markLock.readLock().unlock();
+        }
     }
 
-    public Id putCNEMap(ChildNodeEntriesMap map) throws Exception {
+    public Id putCNEMap(PutToken token, ChildNodeEntriesMap map)
+            throws Exception {
         verifyInitialized();
 
         PersistHook callback = null;
         if (map instanceof PersistHook) {
             callback = (PersistHook) map;
-            callback.prePersist(this);
+            callback.prePersist(this, token);
         }
 
-       Id id = pm.writeCNEMap(map);
+        Id id = pm.writeCNEMap(map);
 
-        if (callback != null)  {
-            callback.postPersist(this);
+        if (callback != null) {
+            callback.postPersist(this, token);
         }
 
         cache.put(id, map);
@@ -199,29 +267,35 @@ public class DefaultRevisionStore extend
         headLock.writeLock().lock();
     }
 
-    public Id putHeadCommit(MutableCommit commit) throws Exception {
+    public Id putHeadCommit(PutToken token, MutableCommit commit)
+            throws Exception {
         verifyInitialized();
         if (!headLock.writeLock().isHeldByCurrentThread()) {
-            throw new IllegalStateException("putHeadCommit called without holding write lock.");
+            throw new IllegalStateException(
+                    "putHeadCommit called without holding write lock.");
         }
 
-        Id id = writeCommit(commit);
+        Id id = writeCommit(token, commit);
         setHeadCommitId(id);
+        putTokens.remove(token);
 
         return id;
     }
 
-    public Id putCommit(MutableCommit commit) throws Exception {
+    public Id putCommit(PutToken token, MutableCommit commit) throws Exception {
         verifyInitialized();
 
-        return writeCommit(commit);
+        Id commitId = writeCommit(token, commit);
+        putTokens.remove(token);
+
+        return commitId;
     }
 
     public void unlockHead() {
         headLock.writeLock().unlock();
     }
 
-    //-----------------------------------------------------< RevisionProvider >
+    // -----------------------------------------------------< RevisionProvider >
 
     public StoredNode getNode(Id id) throws NotFoundException, Exception {
         verifyInitialized();
@@ -239,7 +313,8 @@ public class DefaultRevisionStore extend
         return node;
     }
 
-    public ChildNodeEntriesMap getCNEMap(Id id) throws NotFoundException, Exception {
+    public ChildNodeEntriesMap getCNEMap(Id id) throws NotFoundException,
+            Exception {
         verifyInitialized();
 
         ChildNodeEntriesMap map = (ChildNodeEntriesMap) cache.get(id);
@@ -268,7 +343,8 @@ public class DefaultRevisionStore extend
         return commit;
     }
 
-    public StoredNode getRootNode(Id commitId) throws NotFoundException, Exception {
+    public StoredNode getRootNode(Id commitId) throws NotFoundException,
+            Exception {
         return getNode(getCommit(commitId).getRootNodeId());
     }
 
@@ -287,13 +363,14 @@ public class DefaultRevisionStore extend
         }
     }
 
-    //-------------------------------------------------------< implementation >
+    // -------------------------------------------------------< implementation >
 
-    private Id writeCommit(MutableCommit commit) throws Exception {
+    private Id writeCommit(RevisionStore.PutToken token, MutableCommit commit)
+            throws Exception {
         PersistHook callback = null;
         if (commit instanceof PersistHook) {
             callback = (PersistHook) commit;
-            callback.prePersist(this);
+            callback.prePersist(this, token);
         }
 
         Id id = commit.getId();
@@ -302,8 +379,8 @@ public class DefaultRevisionStore extend
         }
         pm.writeCommit(id, commit);
 
-        if (callback != null)  {
-            callback.postPersist(this);
+        if (callback != null) {
+            callback.postPersist(this, token);
         }
         cache.put(id, new StoredCommit(id, commit));
         return id;
@@ -321,10 +398,11 @@ public class DefaultRevisionStore extend
         }
     }
 
-    //------------------------------------------------------------< overrides >
+    // ------------------------------------------------------------< overrides >
 
     @Override
-    public void compare(final NodeState before, final NodeState after, final NodeStateDiff diff) {
+    public void compare(final NodeState before, final NodeState after,
+            final NodeStateDiff diff) {
         // OAK-46: Efficient diffing of large child node lists
 
         Node beforeNode = ((StoredNodeAsState) before).unwrap();
@@ -337,8 +415,10 @@ public class DefaultRevisionStore extend
             }
 
             @Override
-            public void propChanged(String propName, String oldValue, String newValue) {
-                diff.propertyChanged(before.getProperty(propName), after.getProperty(propName));
+            public void propChanged(String propName, String oldValue,
+                    String newValue) {
+                diff.propertyChanged(before.getProperty(propName),
+                        after.getProperty(propName));
             }
 
             @Override
@@ -361,38 +441,55 @@ public class DefaultRevisionStore extend
             @Override
             public void childNodeChanged(ChildNode changed, Id newId) {
                 String name = changed.getName();
-                diff.childNodeChanged(name, before.getChildNode(name), after.getChildNode(name));
+                diff.childNodeChanged(name, before.getChildNode(name),
+                        after.getChildNode(name));
             }
         });
     }
 
-    //----------------------------------------------------------------------- GC
+    // -----------------------------------------------------------------------
+    // GC
 
     /**
      * Perform a garbage collection. If a garbage collection cycle is already
-     * running, this method returns immediately. 
+     * running, this method returns immediately.
      */
     public void gc() {
         if (gcpm == null || !gcState.compareAndSet(NOT_ACTIVE, STARTING)) {
             // already running
             return;
         }
-        
-        gcpm.start();
-        
-        gcState.set(MARKING);
-        
+
+        // Mark all nodes that belong to currently active puts
+        markLock.writeLock().lock();
+
         try {
-            doMark();
+            gcpm.start();
+            gcState.set(MARKING);
+
+            for (PutTokenImpl token : putTokens.keySet()) {
+                markNode(token.getLastModified());
+            }
+
         } catch (Exception e) {
             /* unable to perform GC */
             gcState.set(NOT_ACTIVE);
             e.printStackTrace();
             return;
+        } finally {
+            markLock.writeLock().unlock();
         }
 
+        try {
+            doMark();
+        } catch (Exception e) {
+            /* unable to perform GC */
+            gcState.set(NOT_ACTIVE);
+            e.printStackTrace();
+            return;
+        }
         gcState.set(SWEEPING);
-        
+
         try {
             gcpm.sweep();
             cache.clear();
@@ -402,18 +499,19 @@ public class DefaultRevisionStore extend
             gcState.set(NOT_ACTIVE);
         }
     }
-    
+
     /**
      * Mark all commits and nodes in a garbage collection cycle. Can be
-     * customized by subclasses. If this method throws an exception, the
-     * cycle will be stopped without sweeping.
+     * customized by subclasses. If this method throws an exception, the cycle
+     * will be stopped without sweeping.
      * 
-     * @throws Exception if an error occurs
+     * @throws Exception
+     *             if an error occurs
      */
     protected void doMark() throws Exception {
         StoredCommit commit = getHeadCommit();
         long tsLimit = commit.getCommitTS() - (60 * 60 * 1000);
-        
+
         for (;;) {
             markCommit(commit);
             Id id = commit.getParentId();
@@ -433,19 +531,26 @@ public class DefaultRevisionStore extend
         }
     }
 
-    protected void markCommit(StoredCommit commit) 
-            throws Exception {
-        
+    /**
+     * Mark a commit. This marks all nodes belonging to this commit as well.
+     * 
+     * @param commit commit
+     * @throws Exception if an error occurs
+     */
+    protected void markCommit(StoredCommit commit) throws Exception {
         if (!gcpm.markCommit(commit.getId())) {
             return;
         }
-        
         markNode(getNode(commit.getRootNodeId()));
     }
-    
-    private void markNode(StoredNode node) 
-            throws Exception {
-        
+
+    /**
+     * Mark a node. This marks all children as well.
+     * 
+     * @param node node
+     * @throws Exception if an error occurs
+     */
+    private void markNode(StoredNode node) throws Exception {
         if (!gcpm.markNode(node.getId())) {
             return;
         }

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/PersistHook.java Wed Apr 25 11:38:27 2012
@@ -21,6 +21,6 @@ package org.apache.jackrabbit.mk.store;
  */
 public interface PersistHook {
     
-    void prePersist(RevisionStore store) throws Exception;
-    void postPersist(RevisionStore store) throws Exception;
+    void prePersist(RevisionStore store, RevisionStore.PutToken token) throws Exception;
+    void postPersist(RevisionStore store, RevisionStore.PutToken token) throws Exception;
 }

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/RevisionStore.java Wed Apr 25 11:38:27 2012
@@ -26,8 +26,24 @@ import org.apache.jackrabbit.mk.model.Mu
  */
 public interface RevisionStore extends RevisionProvider {
 
-    Id /*id*/ putNode(MutableNode node) throws Exception;
-    Id /*id*/ putCNEMap(ChildNodeEntriesMap map) throws Exception;
+    /**
+     * Token that must be created first before invoking any put operation.
+     */
+    public abstract class PutToken {
+        
+        /* Prevent other implementations. */
+        PutToken() {}
+    }
+    
+    /**
+     * Create a put token.
+     * 
+     * @return put token
+     */
+    PutToken createPutToken();
+    
+    Id /*id*/ putNode(PutToken token, MutableNode node) throws Exception;
+    Id /*id*/ putCNEMap(PutToken token, ChildNodeEntriesMap map) throws Exception;
     
     /**
      * Lock the head. Must be called prior to putting a new head commit.
@@ -41,12 +57,13 @@ public interface RevisionStore extends R
      * Put a new head commit. Must be called while holding a
      * lock on the head.
      * 
+     * @param token put token
      * @param commit commit
      * @return head commit id
      * @throws Exception if an error occurs
      * @see #lockHead()
      */
-    Id /*id*/ putHeadCommit(MutableCommit commit) throws Exception;
+    Id /*id*/ putHeadCommit(PutToken token, MutableCommit commit) throws Exception;
     
     /**
      * Unlock the head.
@@ -62,9 +79,10 @@ public interface RevisionStore extends R
      * does not affect the current head commit and therefore doesn't
      * require a lock on the head.
      *
+     * @param token put token
      * @param commit commit
      * @return new commit id
      * @throws Exception if an error occurs
      */
-    Id /*id*/ putCommit(MutableCommit commit) throws Exception;
+    Id /*id*/ putCommit(PutToken token, MutableCommit commit) throws Exception;
 }

Modified: jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java?rev=1330214&r1=1330213&r2=1330214&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java Wed Apr 25 11:38:27 2012
@@ -18,6 +18,10 @@ package org.apache.jackrabbit.mk.store;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
 import org.apache.jackrabbit.mk.core.MicroKernelImpl;
@@ -25,6 +29,8 @@ import org.apache.jackrabbit.mk.core.Rep
 import org.apache.jackrabbit.mk.json.fast.Jsop;
 import org.apache.jackrabbit.mk.json.fast.JsopArray;
 import org.apache.jackrabbit.mk.model.MutableCommit;
+import org.apache.jackrabbit.mk.model.StoredCommit;
+import org.apache.jackrabbit.mk.persistence.GCPersistence;
 import org.apache.jackrabbit.mk.persistence.InMemPersistence;
 import org.junit.After;
 import org.junit.Before;
@@ -35,25 +41,32 @@ import org.junit.Test;
  */
 public class DefaultRevisionStoreTest {
 
-    private DefaultRevisionStore rs;
+    /* avoid synthetic accessor */  DefaultRevisionStore rs;
     private MicroKernel mk;
     
     @Before
     public void setup() throws Exception {
-        rs = new DefaultRevisionStore(new InMemPersistence()) {
+        rs = new DefaultRevisionStore(createPersistence()) {
             @Override
             protected void doMark() throws Exception {
+                StoredCommit commit = getHeadCommit();
+                
                 // Keep head commit only
-                markCommit(getHeadCommit());
+                markCommit(commit);
                 
-                MutableCommit headCommit = new MutableCommit(getHeadCommit());  
+                MutableCommit headCommit = new MutableCommit(commit);  
                 headCommit.setParentId(null);
                 gcpm.replaceCommit(headCommit.getId(), headCommit);
             }
         };
         rs.initialize();
+
         mk = new MicroKernelImpl(new Repository(rs, new MemoryBlobStore()));
     }
+    
+    protected GCPersistence createPersistence() throws Exception {
+        return new InMemPersistence();
+    }
 
     @After
     public void tearDown() throws Exception {
@@ -62,8 +75,13 @@ public class DefaultRevisionStoreTest {
         }
     }
     
+    /**
+     * Verify revision history works with garbage collection.
+     * 
+     * @throws Exception if an error occurs
+     */
     @Test
-    public void testGC() {
+    public void testRevisionHistory() {
         mk.commit("/", "+\"a\" : { \"c\":{}, \"d\":{} }", mk.getHeadRevision(), null);
         mk.commit("/", "+\"b\" : {}", mk.getHeadRevision(), null);
         mk.commit("/b", "+\"e\" : {}", mk.getHeadRevision(), null);
@@ -80,4 +98,35 @@ public class DefaultRevisionStoreTest {
         String history = mk.getRevisionHistory(Long.MIN_VALUE, Integer.MIN_VALUE);
         assertEquals(1, ((JsopArray) Jsop.parse(history)).size());
     }
+
+    /**
+     * Verify garbage collection can run concurrently with commits.
+     * 
+     * @throws Exception if an error occurs
+     */
+    @Test
+    public void testConcurrentGC() throws Exception {
+        ScheduledExecutorService gcExecutor = Executors.newScheduledThreadPool(1);
+        gcExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                rs.gc();
+            }
+        }, 100, 20, TimeUnit.MILLISECONDS);
+
+        mk.commit("/", "+\"a\" : { \"b\" : { \"c\" : { \"d\" : {} } } }",
+                mk.getHeadRevision(), null);
+
+        try {
+            for (int i = 0; i < 20; i++) {
+                mk.commit("/a/b/c/d", "+\"e\" : {}", mk.getHeadRevision(), null);
+                Thread.sleep(10);
+                mk.commit("/a/b/c/d/e", "+\"f\" : {}", mk.getHeadRevision(), null);
+                Thread.sleep(30);
+                mk.commit("/a/b/c/d", "-\"e\"", mk.getHeadRevision(), null);
+            }
+        } finally {
+            gcExecutor.shutdown();
+        }
+    }
 }