You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/09/25 16:19:17 UTC

svn commit: r1705312 - in /jackrabbit/oak/branches/1.2: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: amitj
Date: Fri Sep 25 14:19:16 2015
New Revision: 1705312

URL: http://svn.apache.org/viewvc?rev=1705312&view=rev
Log:
OAK-3443: Track the start time of mark in GC

Merged revision(s) 1705268, 1705273 from trunk

Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 25 14:19:16 2015
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820
 ,1684868,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693209,1693421,1693525-1693526,1694007,1694393-1694394,1695050,1695122,1695280,1695299,1695457,1695482,1695507,1695521,1695540,1696194,1696242,1696285,1696578,1696759,1696916,1697363,1697373,1697410,1697
 582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701948,1701955,1701959,1701965,1701986,1702022,1702272,1702387,1702405,1702423,1702860,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705027,1705043,1705055
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820
 ,1684868,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693209,1693421,1693525-1693526,1694007,1694393-1694394,1695050,1695122,1695280,1695299,1695457,1695482,1695507,1695521,1695540,1696194,1696242,1696285,1696578,1696759,1696916,1697363,1697373,1697410,1697
 582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701948,1701955,1701959,1701965,1701986,1702022,1702272,1702387,1702405,1702423,1702860,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705027,1705043,1705055,1705268,1705273
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java Fri Sep 25 14:19:16 2015
@@ -36,7 +36,7 @@ import org.apache.jackrabbit.oak.commons
  * Also, manages any temporary files needed as well as external sorting.
  * 
  */
-class GarbageCollectorFileState implements Closeable{
+public class GarbageCollectorFileState implements Closeable{
     /** The root of the gc file state directory. */
     private final File home;
 

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Fri Sep 25 14:19:16 2015
@@ -49,7 +49,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.io.Closeables;
 import com.google.common.io.Files;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.jackrabbit.core.data.DataRecord;
@@ -229,16 +228,17 @@ public class MarkSweepGarbageCollector i
      * @param markOnly whether to mark only
      * @throws Exception the exception
      */
-    private void markAndSweep(boolean markOnly) throws Exception {
+    protected void markAndSweep(boolean markOnly) throws Exception {
         boolean threw = true;
         GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
         try {
             Stopwatch sw = Stopwatch.createStarted();
             LOG.info("Starting Blob garbage collection");
-
+            
+            long markStart = System.currentTimeMillis();
             mark(fs);
             if (!markOnly) {
-                long deleteCount = sweep(fs);
+                long deleteCount = sweep(fs, markStart);
                 threw = false;
 
                 LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
@@ -255,7 +255,7 @@ public class MarkSweepGarbageCollector i
      * Mark phase of the GC.
      * @param fs the garbage collector file state
      */
-    private void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
+    protected void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
         LOG.debug("Starting mark phase of the garbage collector");
         
         // Create a time marker in the data store if applicable
@@ -341,8 +341,9 @@ public class MarkSweepGarbageCollector i
      * @return the number of blobs deleted
      * @throws Exception the exception
      * @param fs the garbage collector file state
+     * @param markStart the start time of mark to take as reference for deletion
      */
-    private long sweep(GarbageCollectorFileState fs) throws Exception {
+    protected long sweep(GarbageCollectorFileState fs, long markStart) throws Exception {
         long earliestRefAvailTime;
         // Merge all the blob references available from all the reference files in the data store meta store
         // Only go ahead if merge succeeded
@@ -350,6 +351,7 @@ public class MarkSweepGarbageCollector i
             earliestRefAvailTime =
                     GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
             LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime);
+            earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart);
         } catch (Exception e) {
             return 0;
         }
@@ -361,10 +363,11 @@ public class MarkSweepGarbageCollector i
         difference(fs);
         long count = 0;
         long deleted = 0;
-
+        
+        long lastMaxModifiedTime = getLastMaxModifiedTime(earliestRefAvailTime); 
         LOG.debug("Starting sweep phase of the garbage collector");
         LOG.debug("Sweeping blobs with modified time > than the configured max deleted time ({}). " +
-                timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+                timestampToString(lastMaxModifiedTime));
 
         ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
 
@@ -377,13 +380,13 @@ public class MarkSweepGarbageCollector i
 
             if (ids.size() >= getBatchCount()) {
                 count += ids.size();
-                deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+                deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
                 ids = newArrayList();
             }
         }
         if (!ids.isEmpty()) {
             count += ids.size();
-            deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+            deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
         }
 
         BufferedWriter writer = null;
@@ -406,7 +409,7 @@ public class MarkSweepGarbageCollector i
             LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates identified. This may happen if blob " 
                          + "modified time is > "
                          + "than the max deleted time ({})", deleted, count,
-                        timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+                        timestampToString(lastMaxModifiedTime));
         }
 
         // Remove all the merged marked references
@@ -446,7 +449,7 @@ public class MarkSweepGarbageCollector i
         long deleted = 0;
         try {
             LOG.trace("Blob ids to be deleted {}", ids);
-            deleted = blobStore.countDeleteChunks(ids, getLastMaxModifiedTime(maxModified));
+            deleted = blobStore.countDeleteChunks(ids, maxModified);
             if (deleted != ids.size()) {
                 // Only log and do not add to exception queue since some blobs may not match the
                 // lastMaxModifiedTime criteria.
@@ -463,7 +466,7 @@ public class MarkSweepGarbageCollector i
      * Iterates the complete node tree and collect all blob references
      * @param fs the garbage collector file state
      */
-    private void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
+    protected void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
         final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
         final AtomicInteger count = new AtomicInteger();
         try {

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Fri Sep 25 14:19:16 2015
@@ -21,22 +21,29 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DBCollection;
 import junit.framework.Assert;
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
@@ -48,12 +55,17 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 /**
  * Tests for MongoMK GC
  */
 public class MongoBlobGCTest extends AbstractMongoConnectionTest {
     private Clock clock;
+    private static final Logger log = LoggerFactory.getLogger(MongoBlobGCTest.class);
 
     public DataStoreState setUp(boolean deleteDirect) throws Exception {
         DocumentNodeStore s = mk.getNodeStore();
@@ -115,7 +127,7 @@ public class MongoBlobGCTest extends Abs
         Set<String> blobsPresent = Sets.newHashSet();
     }
     
-    public HashSet<String> addInlined() throws Exception {
+    private HashSet<String> addInlined() throws Exception {
         HashSet<String> set = new HashSet<String>();
         DocumentNodeStore s = mk.getNodeStore();
         NodeBuilder a = s.getRoot().builder();
@@ -127,6 +139,7 @@ public class MongoBlobGCTest extends Abs
         s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
         return set;
     }
+
     private void deleteFromMongo(String nodeId) {
         DBCollection coll = mongoConnection.getDB().getCollection("nodes");
         BasicDBObject blobNodeObj = new BasicDBObject();
@@ -169,10 +182,37 @@ public class MongoBlobGCTest extends Abs
         Set<String> existingAfterGC = gc(0);
         assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
     }
-
+    
+    @Test
+    public void gcLongRunningBlobCollection() throws Exception {
+        DataStoreState state = setUp(true);
+        log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+        log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+        
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+        DocumentNodeStore store = mk.getNodeStore();
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(store);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
+        TestGarbageCollector gc = new TestGarbageCollector(
+            new DocumentBlobReferenceRetriever(store),
+            (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+        gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate();
+        log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+    
+        assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+        assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+    }
+    
     private Set<String> gc(int blobGcMaxAgeInSecs) throws Exception {
         DocumentNodeStore store = mk.getNodeStore();
         String repoId = null;
+
         if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
             repoId = ClusterRepositoryInfo.createId(store);
             ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
@@ -214,4 +254,82 @@ public class MongoBlobGCTest extends Abs
         clock.waitUntil(Revision.getCurrentTimestamp());
         return clock;
     }
+    
+    /**
+     * Waits for some time and adds additional blobs after blob referenced identified to simulate
+     * long running blob id collection phase.
+     */
+    class TestGarbageCollector extends MarkSweepGarbageCollector {
+        long maxLastModifiedInterval;
+        String root;
+        GarbageCollectableBlobStore blobStore;
+        Set<String> additionalBlobs;
+        
+        public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+                                    Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+                                    @Nullable String repositoryId) throws IOException {
+            super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+            this.root = root;
+            this.blobStore = blobStore;
+            this.maxLastModifiedInterval = maxLastModifiedInterval;
+            this.additionalBlobs = Sets.newHashSet();
+        }
+        
+        @Override
+        protected void markAndSweep(boolean markOnly) throws Exception {
+            boolean threw = true;
+            GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+            try {
+                Stopwatch sw = Stopwatch.createStarted();
+                LOG.info("Starting Test Blob garbage collection");
+                
+                // Sleep a little more than the max interval to get over the interval for valid blobs
+                Thread.sleep(maxLastModifiedInterval + 100);
+                LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+                
+                long markStart = System.currentTimeMillis();
+                mark(fs);
+                LOG.info("Mark finished");
+                
+                additionalBlobs = createAdditional();
+    
+                if (!markOnly) {
+                    Thread.sleep(maxLastModifiedInterval + 100);
+                    LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+    
+                    long deleteCount = sweep(fs, markStart);
+                    threw = false;
+            
+                    LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+                        deleteCount, maxLastModifiedInterval);
+                }
+            } finally {
+                if (!LOG.isTraceEnabled()) {
+                    Closeables.close(fs, threw);
+                }
+            }
+        }
+    
+        public HashSet<String> createAdditional() throws Exception {
+            HashSet<String> blobSet = new HashSet<String>();
+            DocumentNodeStore s = mk.getNodeStore();
+            NodeBuilder a = s.getRoot().builder();
+            int number = 5;
+            for (int i = 0; i < number; i++) {
+                Blob b = s.createBlob(randomStream(100 + i, 16516));
+                a.child("cafter" + i).setProperty("x", b);
+                Iterator<String> idIter =
+                    ((GarbageCollectableBlobStore) s.getBlobStore())
+                        .resolveChunks(b.toString());
+                while (idIter.hasNext()) {
+                    String chunk = idIter.next();
+                    blobSet.add(chunk);
+                }                
+            }
+            log.info("{} Additional created {}", blobSet.size(), blobSet);
+    
+            s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return blobSet;
+        }
+    }    
 }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Fri Sep 25 14:19:16 2015
@@ -22,6 +22,7 @@ import static org.apache.commons.io.File
 import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -34,14 +35,22 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -60,8 +69,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 /**
  * Tests for SegmentNodeStore DataStore GC
  */
@@ -98,7 +105,7 @@ public class SegmentDataStoreBlobGCTest
         blobStore = DataStoreUtils.getBlobStore();
         nodeStore = getNodeStore(blobStore);
         startDate = new Date();
-        
+
         NodeBuilder a = nodeStore.getRoot().builder();
 
         /* Create garbage by creating in-lined blobs (size < 16KB) */
@@ -136,7 +143,7 @@ public class SegmentDataStoreBlobGCTest
                 processed.add(n);
             }
         }
-    
+
         DataStoreState state = new DataStoreState();
         for (int i = 0; i < numBlobs; i++) {
             SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(i, 16516));
@@ -150,7 +157,7 @@ public class SegmentDataStoreBlobGCTest
             }
             a.child("c" + i).setProperty("x", b);
         }
-        
+
         nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
         log.info("Created blobs : {}", state.blobsAdded.size());
 
@@ -166,19 +173,19 @@ public class SegmentDataStoreBlobGCTest
 
         return state;
     }
-    
+
     private class DataStoreState {
         Set<String> blobsAdded = Sets.newHashSet();
         Set<String> blobsPresent = Sets.newHashSet();
     }
-    
+
     private void delete(String nodeId) throws CommitFailedException {
         NodeBuilder builder = nodeStore.getRoot().builder();
         builder.child(nodeId).remove();
 
         nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
     }
-    
+
     @Test
     public void gc() throws Exception {
         DataStoreState state = setUp();
@@ -187,7 +194,7 @@ public class SegmentDataStoreBlobGCTest
         Set<String> existingAfterGC = gcInternal(0);
         assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
     }
-    
+
     @Test
     public void noGc() throws Exception {
         DataStoreState state = setUp();
@@ -196,7 +203,7 @@ public class SegmentDataStoreBlobGCTest
         Set<String> existingAfterGC = gcInternal(86400);
         assertTrue(Sets.symmetricDifference(state.blobsAdded, existingAfterGC).isEmpty());
     }
-    
+
     private Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
         String repoId = null;
         if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
@@ -205,11 +212,11 @@ public class SegmentDataStoreBlobGCTest
                 new ByteArrayInputStream(new byte[0]),
                 REPOSITORY.getNameFromId(repoId));
         }
-        
+
         ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new SegmentBlobReferenceRetriever(store.getTracker()),
-                    (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 2048, maxBlobGcInSecs, 
+                    (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 2048, maxBlobGcInSecs,
                                                                         repoId);
         gc.collectGarbage(false);
 
@@ -219,6 +226,31 @@ public class SegmentDataStoreBlobGCTest
         return existingAfterGC;
     }
 
+    @Test
+    public void gcLongRunningBlobCollection() throws Exception {
+        DataStoreState state = setUp();
+        log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+        log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(nodeStore);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
+        TestGarbageCollector gc = new TestGarbageCollector(
+            new SegmentBlobReferenceRetriever(store.getTracker()),
+            (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+        gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate();
+        log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+
+        assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+        assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+    }
+
     protected Set<String> iterate() throws Exception {
         Iterator<String> cur = blobStore.getAllChunkIds(0);
 
@@ -245,5 +277,81 @@ public class SegmentDataStoreBlobGCTest
         r.nextBytes(data);
         return new ByteArrayInputStream(data);
     }
+
+    /**
+    * Waits for some time and adds additional blobs after blob referenced identified to simulate
+    * long running blob id collection phase.
+     */
+    class TestGarbageCollector extends MarkSweepGarbageCollector {
+        long maxLastModifiedInterval;
+        String root;
+        GarbageCollectableBlobStore blobStore;
+        Set<String> additionalBlobs;
+
+        public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+                                    Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+                                    @Nullable String repositoryId) throws IOException {
+            super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+            this.root = root;
+            this.blobStore = blobStore;
+            this.maxLastModifiedInterval = maxLastModifiedInterval;
+            this.additionalBlobs = Sets.newHashSet();
+        }
+
+        @Override
+        protected void markAndSweep(boolean markOnly) throws Exception {
+            boolean threw = true;
+            GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+            try {
+                Stopwatch sw = Stopwatch.createStarted();
+                LOG.info("Starting Test Blob garbage collection");
+
+                // Sleep a little more than the max interval to get over the interval for valid blobs
+                Thread.sleep(maxLastModifiedInterval + 100);
+                LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+
+                long markStart = System.currentTimeMillis();
+                mark(fs);
+                LOG.info("Mark finished");
+
+                additionalBlobs = createAdditional();
+
+                if (!markOnly) {
+                    Thread.sleep(maxLastModifiedInterval + 100);
+                    LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+
+                    long deleteCount = sweep(fs, markStart);
+                    threw = false;
+
+                    LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+                        deleteCount, maxLastModifiedInterval);
+                }
+            } finally {
+                if (!LOG.isTraceEnabled()) {
+                    Closeables.close(fs, threw);
+                }
+            }
+        }
+
+        public HashSet<String> createAdditional() throws Exception {
+            HashSet<String> blobSet = new HashSet<String>();
+            NodeBuilder a = nodeStore.getRoot().builder();
+            int number = 5;
+            for (int i = 0; i < number; i++) {
+                SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(100 + i, 16516));
+                a.child("cafter" + i).setProperty("x", b);
+                Iterator<String> idIter =
+                    ((GarbageCollectableBlobStore) blobStore).resolveChunks(b.getBlobId());
+                while (idIter.hasNext()) {
+                    String chunk = idIter.next();
+                    blobSet.add(chunk);
+                }
+            }
+            log.info("{} Additional created {}", blobSet.size(), blobSet);
+
+            nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return blobSet;
+        }
+    }
 }