You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by dp...@apache.org on 2012/04/26 15:36:10 UTC

svn commit: r1330847 - in /jackrabbit/oak/trunk/oak-mk/src: main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java

Author: dpfister
Date: Thu Apr 26 13:36:10 2012
New Revision: 1330847

URL: http://svn.apache.org/viewvc?rev=1330847&view=rev
Log:
GC for revisions
- support branch & merge

Modified:
    jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
    jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java

Modified: jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java?rev=1330847&r1=1330846&r2=1330847&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/main/java/org/apache/jackrabbit/mk/store/DefaultRevisionStore.java Thu Apr 26 13:36:10 2012
@@ -20,6 +20,8 @@ import java.io.Closeable;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
 import java.util.WeakHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -82,17 +84,22 @@ public class DefaultRevisionStore extend
      * GC executor.
      */
     private ScheduledExecutorService gcExecutor;
-
+    
     /**
-     * Put tokens (Key: token, Value: null).
+     * Active put tokens (Key: token, Value: null).
      */
     private final Map<PutTokenImpl, Object> putTokens = Collections.synchronizedMap(new WeakHashMap<PutTokenImpl, Object>());
 
     /**
-     * Mark lock for put tokens.
+     * Read-write lock for put tokens.
      */
-    private final ReentrantReadWriteLock markLock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock tokensLock = new ReentrantReadWriteLock();
 
+    /**
+     * Active branches (Key: branch root id, Value: branch head).
+     */
+    private final Map<Id,Id> branches = Collections.synchronizedMap(new TreeMap<Id, Id>());
+    
     public DefaultRevisionStore(Persistence pm) {
         this.pm = pm;
         this.gcpm = (pm instanceof GCPersistence) ? (GCPersistence) pm : null;
@@ -227,7 +234,7 @@ public class DefaultRevisionStore extend
          * Make sure that a GC cycle can not sweep this newly persisted node
          * before we have updated our token
          */
-        markLock.readLock().lock();
+        tokensLock.readLock().lock();
 
         try {
             Id id = pm.writeNode(node);
@@ -245,7 +252,7 @@ public class DefaultRevisionStore extend
             return id;
 
         } finally {
-            markLock.readLock().unlock();
+            tokensLock.readLock().unlock();
         }
     }
 
@@ -284,7 +291,11 @@ public class DefaultRevisionStore extend
 
         Id id = writeCommit(token, commit);
         setHeadCommitId(id);
+        
         putTokens.remove(token);
+        if (branchRootId != null) {
+            branches.remove(branchRootId);
+        }
 
         return id;
     }
@@ -294,7 +305,12 @@ public class DefaultRevisionStore extend
 
         Id commitId = writeCommit(token, commit);
         putTokens.remove(token);
-
+        
+        Id branchRootId = commit.getBranchRootId();
+        if (branchRootId != null) {
+            branches.put(branchRootId, commitId);
+        }
+        
         return commitId;
     }
 
@@ -468,15 +484,27 @@ public class DefaultRevisionStore extend
         }
 
         try {
-            markUncommittedPuts();
-            markBranches();
-            markCommits();
+            markUncommittedNodes();
+            Id firstBranchRootId = markBranches();
+            Id firstCommitId = markCommits();
+
+            if (firstBranchRootId != null && firstBranchRootId.compareTo(firstCommitId) < 0) {
+                firstCommitId = firstBranchRootId;
+            }
+            StoredCommit commit = getCommit(firstCommitId);
+            if (commit.getParentId() != null) {
+                MutableCommit firstCommit = new MutableCommit(commit);
+                firstCommit.setParentId(null);
+                gcpm.replaceCommit(firstCommit.getId(), firstCommit);
+            }
+            
         } catch (Exception e) {
             /* unable to perform GC */
             gcState.set(NOT_ACTIVE);
             e.printStackTrace();
             return;
         }
+        
         gcState.set(SWEEPING);
 
         try {
@@ -490,13 +518,13 @@ public class DefaultRevisionStore extend
     }
     
     /**
-     * Mark uncommitted puts.
+     * Mark nodes that have already been put but not committed yet.
      * 
      * @throws Exception
      *             if an error occurs
      */
-    private void markUncommittedPuts() throws Exception {
-        markLock.writeLock().lock();
+    private void markUncommittedNodes() throws Exception {
+        tokensLock.writeLock().lock();
 
         try {
             gcpm.start();
@@ -506,18 +534,43 @@ public class DefaultRevisionStore extend
                 markNode(token.getLastModified());
             }
         } finally {
-            markLock.writeLock().unlock();
+            tokensLock.writeLock().unlock();
         }
     }
 
     /**
      * Mark branches.
      * 
+     * @return first branch root id that needs to be preserved, or {@code null}
      * @throws Exception
      *             if an error occurs
      */
-    private void markBranches() throws Exception {
-        // TODO
+    private Id markBranches() throws Exception {
+        /* Mark all branch commits */
+        for (Entry<Id, Id> entry : branches.entrySet()) {
+            Id branchRootId = entry.getKey();
+            Id branchHeadId = entry.getValue();
+            while (!branchHeadId.equals(branchRootId)) {
+                StoredCommit commit = getCommit(branchHeadId);
+                markCommit(commit);
+                branchHeadId = commit.getParentId();
+            }
+        }
+        /* Mark all master commits till the first branch root id */
+        if (!branches.isEmpty()) {
+            Id firstBranchRootId = branches.keySet().iterator().next();
+            StoredCommit commit = getHeadCommit();
+
+            for (;;) {
+                markCommit(commit);
+                if (commit.getId().equals(firstBranchRootId)) {
+                    break;
+                }
+                commit = getCommit(commit.getParentId());
+            }
+            return firstBranchRootId;
+        }
+        return null;
     }
     
     /**
@@ -525,10 +578,11 @@ public class DefaultRevisionStore extend
      * customized by subclasses. If this method throws an exception, the cycle
      * will be stopped without sweeping.
      * 
+     * @return first commit id that will be preserved
      * @throws Exception
      *             if an error occurs
      */
-    protected void markCommits() throws Exception {
+    protected Id markCommits() throws Exception {
         StoredCommit commit = getHeadCommit();
         long tsLimit = commit.getCommitTS() - (60 * 60 * 1000);
 
@@ -544,11 +598,7 @@ public class DefaultRevisionStore extend
             }
             commit = parentCommit;
         }
-        if (commit.getParentId() != null) {
-            MutableCommit firstCommit = new MutableCommit(commit);
-            firstCommit.setParentId(null);
-            gcpm.replaceCommit(firstCommit.getId(), firstCommit);
-        }
+        return commit.getId();
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java?rev=1330847&r1=1330846&r2=1330847&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-mk/src/test/java/org/apache/jackrabbit/mk/store/DefaultRevisionStoreTest.java Thu Apr 26 13:36:10 2012
@@ -28,13 +28,12 @@ import org.apache.jackrabbit.mk.core.Mic
 import org.apache.jackrabbit.mk.core.Repository;
 import org.apache.jackrabbit.mk.json.fast.Jsop;
 import org.apache.jackrabbit.mk.json.fast.JsopArray;
-import org.apache.jackrabbit.mk.model.MutableCommit;
+import org.apache.jackrabbit.mk.model.Id;
 import org.apache.jackrabbit.mk.model.StoredCommit;
 import org.apache.jackrabbit.mk.persistence.GCPersistence;
 import org.apache.jackrabbit.mk.persistence.InMemPersistence;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -49,15 +48,11 @@ public class DefaultRevisionStoreTest {
     public void setup() throws Exception {
         rs = new DefaultRevisionStore(createPersistence()) {
             @Override
-            protected void markCommits() throws Exception {
-                StoredCommit commit = getHeadCommit();
-                
+            protected Id markCommits() throws Exception {
                 // Keep head commit only
+                StoredCommit commit = getHeadCommit();
                 markCommit(commit);
-                
-                MutableCommit headCommit = new MutableCommit(commit);  
-                headCommit.setParentId(null);
-                gcpm.replaceCommit(headCommit.getId(), headCommit);
+                return commit.getId();
             }
         };
         rs.initialize();
@@ -105,7 +100,6 @@ public class DefaultRevisionStoreTest {
      * 
      * @throws Exception if an error occurs
      */
-    @Ignore
     @Test
     public void testBranchMerge() throws Exception {
         mk.commit("/", "+\"a\" : { \"b\":{}, \"c\":{} }", mk.getHeadRevision(), null);
@@ -118,6 +112,11 @@ public class DefaultRevisionStoreTest {
 
         branchRevisionId = mk.commit("/a", "+\"f\" : {}", branchRevisionId, null);
         mk.merge(branchRevisionId, null);
+
+        rs.gc();
+
+        String history = mk.getRevisionHistory(Long.MIN_VALUE, Integer.MIN_VALUE);
+        assertEquals(1, ((JsopArray) Jsop.parse(history)).size());
     }
     
     /**
@@ -150,4 +149,36 @@ public class DefaultRevisionStoreTest {
             gcExecutor.shutdown();
         }
     }
+
+    /**
+     * Verify garbage collection can run concurrently with commits.
+     * 
+     * @throws Exception if an error occurs
+     */
+    @Test
+    public void testConcurrentMergeGC() throws Exception {
+        ScheduledExecutorService gcExecutor = Executors.newScheduledThreadPool(1);
+        gcExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                rs.gc();
+            }
+        }, 100, 20, TimeUnit.MILLISECONDS);
+
+        mk.commit("/", "+\"a\" : { \"b\" : { \"c\" : { \"d\" : {} } } }",
+                mk.getHeadRevision(), null);
+
+        try {
+            for (int i = 0; i < 20; i++) {
+                String branchId = mk.branch(mk.getHeadRevision());
+                branchId = mk.commit("/a/b/c/d", "+\"e\" : {}", branchId, null);
+                Thread.sleep(10);
+                branchId = mk.commit("/a/b/c/d", "-\"e\"", branchId, null);
+                Thread.sleep(30);
+                mk.merge(branchId, null);
+            }
+        } finally {
+            gcExecutor.shutdown();
+        }
+    }
 }