You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/09/25 12:41:36 UTC

svn commit: r1705268 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/blob/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: amitj
Date: Fri Sep 25 10:41:36 2015
New Revision: 1705268

URL: http://svn.apache.org/viewvc?rev=1705268&view=rev
Log:
OAK-3443: Track the start time of mark in GC

Added a test case which simulates the condition by delaying the blob identification phase and adds additional blobs which should not be collected
Updated some methods to protected
Minor code re-arragement

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1705268&r1=1705267&r2=1705268&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java Fri Sep 25 10:41:36 2015
@@ -36,7 +36,7 @@ import org.apache.jackrabbit.oak.commons
  * Also, manages any temporary files needed as well as external sorting.
  * 
  */
-class GarbageCollectorFileState implements Closeable{
+public class GarbageCollectorFileState implements Closeable{
     /** The root of the gc file state directory. */
     private final File home;
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1705268&r1=1705267&r2=1705268&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Fri Sep 25 10:41:36 2015
@@ -51,7 +51,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.io.Closeables;
 import com.google.common.io.Files;
-
 import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
@@ -235,16 +234,17 @@ public class MarkSweepGarbageCollector i
      * @param markOnly whether to mark only
      * @throws Exception the exception
      */
-    private void markAndSweep(boolean markOnly) throws Exception {
+    protected void markAndSweep(boolean markOnly) throws Exception {
         boolean threw = true;
         GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
         try {
             Stopwatch sw = Stopwatch.createStarted();
             LOG.info("Starting Blob garbage collection");
-
+            
+            long markStart = System.currentTimeMillis();
             mark(fs);
             if (!markOnly) {
-                long deleteCount = sweep(fs);
+                long deleteCount = sweep(fs, markStart);
                 threw = false;
 
                 LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
@@ -261,7 +261,7 @@ public class MarkSweepGarbageCollector i
      * Mark phase of the GC.
      * @param fs the garbage collector file state
      */
-    private void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
+    protected void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
         LOG.debug("Starting mark phase of the garbage collector");
         
         // Create a time marker in the data store if applicable
@@ -351,8 +351,9 @@ public class MarkSweepGarbageCollector i
      * @return the number of blobs deleted
      * @throws Exception the exception
      * @param fs the garbage collector file state
+     * @param markStart the start time of mark to take as reference for deletion
      */
-    private long sweep(GarbageCollectorFileState fs) throws Exception {
+    protected long sweep(GarbageCollectorFileState fs, long markStart) throws Exception {
         long earliestRefAvailTime;
         // Merge all the blob references available from all the reference files in the data store meta store
         // Only go ahead if merge succeeded
@@ -360,6 +361,7 @@ public class MarkSweepGarbageCollector i
             earliestRefAvailTime =
                     GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
             LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime);
+            earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart);
         } catch (Exception e) {
             return 0;
         }
@@ -371,10 +373,11 @@ public class MarkSweepGarbageCollector i
         difference(fs);
         long count = 0;
         long deleted = 0;
-
+        
+        long lastMaxModifiedTime = getLastMaxModifiedTime(earliestRefAvailTime); 
         LOG.debug("Starting sweep phase of the garbage collector");
         LOG.debug("Sweeping blobs with modified time > than the configured max deleted time ({}). ",
-                timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+                timestampToString(lastMaxModifiedTime));
 
         ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
 
@@ -387,13 +390,13 @@ public class MarkSweepGarbageCollector i
 
             if (ids.size() >= getBatchCount()) {
                 count += ids.size();
-                deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+                deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
                 ids = newArrayList();
             }
         }
         if (!ids.isEmpty()) {
             count += ids.size();
-            deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+            deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
         }
 
         BufferedWriter writer = null;
@@ -416,7 +419,7 @@ public class MarkSweepGarbageCollector i
             LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates identified. This may happen if blob " 
                          + "modified time is > "
                          + "than the max deleted time ({})", deleted, count,
-                        timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+                        timestampToString(lastMaxModifiedTime));
         }
 
         // Remove all the merged marked references
@@ -456,7 +459,7 @@ public class MarkSweepGarbageCollector i
         long deleted = 0;
         try {
             LOG.trace("Blob ids to be deleted {}", ids);
-            deleted = blobStore.countDeleteChunks(ids, getLastMaxModifiedTime(maxModified));
+            deleted = blobStore.countDeleteChunks(ids, maxModified);
             if (deleted != ids.size()) {
                 // Only log and do not add to exception queue since some blobs may not match the
                 // lastMaxModifiedTime criteria.
@@ -473,7 +476,7 @@ public class MarkSweepGarbageCollector i
      * Iterates the complete node tree and collect all blob references
      * @param fs the garbage collector file state
      */
-    private void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
+    protected void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
         final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
         final AtomicInteger count = new AtomicInteger();
         try {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1705268&r1=1705267&r2=1705268&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Fri Sep 25 10:41:36 2015
@@ -21,23 +21,29 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DBCollection;
 import junit.framework.Assert;
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
@@ -51,12 +57,17 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 /**
  * Tests for MongoMK GC
  */
 public class MongoBlobGCTest extends AbstractMongoConnectionTest {
     private Clock clock;
+    private static final Logger log = LoggerFactory.getLogger(MongoBlobGCTest.class);
 
     public DataStoreState setUp(boolean deleteDirect) throws Exception {
         DocumentNodeStore s = mk.getNodeStore();
@@ -122,7 +133,7 @@ public class MongoBlobGCTest extends Abs
         Set<String> blobsPresent = Sets.newHashSet();
     }
     
-    public HashSet<String> addInlined() throws Exception {
+    private HashSet<String> addInlined() throws Exception {
         HashSet<String> set = new HashSet<String>();
         DocumentNodeStore s = mk.getNodeStore();
         NodeBuilder a = s.getRoot().builder();
@@ -134,6 +145,7 @@ public class MongoBlobGCTest extends Abs
         s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
         return set;
     }
+
     private void deleteFromMongo(String nodeId) {
         DBCollection coll = mongoConnection.getDB().getCollection("nodes");
         BasicDBObject blobNodeObj = new BasicDBObject();
@@ -237,7 +249,33 @@ public class MongoBlobGCTest extends Abs
         Iterator<ReferencedBlob> blobs = mk.getNodeStore().getReferencedBlobsIterator();
         assertTrue(blobs instanceof MongoBlobReferenceIterator);
     }
-
+    
+    @Test
+    public void gcLongRunningBlobCollection() throws Exception {
+        DataStoreState state = setUp(true);
+        log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+        log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+        
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+        DocumentNodeStore store = mk.getNodeStore();
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(store);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
+        TestGarbageCollector gc = new TestGarbageCollector(
+            new DocumentBlobReferenceRetriever(store),
+            (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+        gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate();
+        log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+    
+        assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+        assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+    }
+    
     private Set<String> gc(int blobGcMaxAgeInSecs) throws Exception {
         ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gc = init(blobGcMaxAgeInSecs, executor);
@@ -287,4 +325,82 @@ public class MongoBlobGCTest extends Abs
         clock.waitUntil(Revision.getCurrentTimestamp());
         return clock;
     }
+    
+    /**
+     * Waits for some time and adds additional blobs after blob referenced identified to simulate
+     * long running blob id collection phase.
+     */
+    class TestGarbageCollector extends MarkSweepGarbageCollector {
+        long maxLastModifiedInterval;
+        String root;
+        GarbageCollectableBlobStore blobStore;
+        Set<String> additionalBlobs;
+        
+        public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+                                    Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+                                    @Nullable String repositoryId) throws IOException {
+            super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+            this.root = root;
+            this.blobStore = blobStore;
+            this.maxLastModifiedInterval = maxLastModifiedInterval;
+            this.additionalBlobs = Sets.newHashSet();
+        }
+        
+        @Override
+        protected void markAndSweep(boolean markOnly) throws Exception {
+            boolean threw = true;
+            GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+            try {
+                Stopwatch sw = Stopwatch.createStarted();
+                LOG.info("Starting Test Blob garbage collection");
+                
+                // Sleep a little more than the max interval to get over the interval for valid blobs
+                Thread.sleep(maxLastModifiedInterval + 100);
+                LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+                
+                long markStart = System.currentTimeMillis();
+                mark(fs);
+                LOG.info("Mark finished");
+                
+                additionalBlobs = createAdditional();
+    
+                if (!markOnly) {
+                    Thread.sleep(maxLastModifiedInterval + 100);
+                    LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+    
+                    long deleteCount = sweep(fs, markStart);
+                    threw = false;
+            
+                    LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+                        deleteCount, maxLastModifiedInterval);
+                }
+            } finally {
+                if (!LOG.isTraceEnabled()) {
+                    Closeables.close(fs, threw);
+                }
+            }
+        }
+    
+        public HashSet<String> createAdditional() throws Exception {
+            HashSet<String> blobSet = new HashSet<String>();
+            DocumentNodeStore s = mk.getNodeStore();
+            NodeBuilder a = s.getRoot().builder();
+            int number = 5;
+            for (int i = 0; i < number; i++) {
+                Blob b = s.createBlob(randomStream(100 + i, 16516));
+                a.child("cafter" + i).setProperty("x", b);
+                Iterator<String> idIter =
+                    ((GarbageCollectableBlobStore) s.getBlobStore())
+                        .resolveChunks(b.toString());
+                while (idIter.hasNext()) {
+                    String chunk = idIter.next();
+                    blobSet.add(chunk);
+                }                
+            }
+            log.info("{} Additional created {}", blobSet.size(), blobSet);
+    
+            s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return blobSet;
+        }
+    }    
 }